菜鸟笔记
提升您的技术认知

brpc 笔记

bthread(一) 前言bthread(二) 线程模型及bthreadbthread(三) bthread数据结构bthread(四) bthread用户接口和代码执行路径bthread(五) 无锁队列rq的代码实现bthread(六) 小结brpc的精华bthread源码剖析brpc介绍、编译与使用brpc源码解析(一)—— rpc服务添加以及服务器启动主要过程brpc源码解析(二)—— brpc收到请求的处理过程brpc源码解析(三)—— 请求其他服务器以及往socket写数据的机制brpc源码解析(四)—— Bthread机制brpc源码解析(五)—— 基础类resource pool详解brpc源码解析(六)—— 基础类socket详解brpc源码解析(七)—— worker基于ParkingLot的bthread调度brpc源码解析(八)—— 基础类EventDispatcher详解brpc源码解析(九)—— 基础类WorkStealingQueue详解brpc源码解析(十)—— 核心组件bvar详解(1)简介和整体架构brpc源码解析(十一)—— Reducer类和Adder类解析brpc源码解析(十二)—— 核心组件bvar详解 AgentGroup类详解brpc源码解析(十三)—— 核心组件bvar详解(4)combiner详解brpc源码解析(十四)—— 核心组件bvar详解 sampler详解brpc源码解析(十五)—— bthread栈创建和切换详解brpc源码解析(十六)—— 作为client的连接建立和处理详解brpc源码解析(十七)—— bthread上的类futex同步组件butex详解brpc源码解析(十八)—— MPSC队列ExecutionQueue详解brpc源码解析(十九)—— 双buffer数据结构DoublyBufferedData详解brpc源码解析(二十)—— 用于访问下游的Channel类详解

bthread(六) 小结

阅读 : 560

brpc内如何用的bthread

以epoll为例举例,epoll线程跑在bthread里,io bthread是按需起的(只要有读、写事件来就起一个,对于一个fd只起一个读线程和线程),用户回调也是跑在bthread

【Q1】如果在callback里阻塞整个worker,其他worker会偷过来运行,但是万一所有worker都被阻塞住,那就gg了。

【Q2】如果在callback里发起brpc,只会阻塞当前bthread,底层的worker不受影响,他发现后就移出rq,这时,这个callback可能一会儿在worker1里跑、一会儿在worker2里跑,所以如果用了pthread级别的变量,就会有逻辑错误。在bthread中,尽量不要使用pthread local数据,如果一定要使用,需要通过pthread_key_create和pthread_getspecific(mempool.h)。

【Q3】如果在callback里加锁后发起下游rpc请求,那么情况是线程1抢到butex发起rpc请求,它需要等brpc返回后才能解锁,但是处理请求的返回是需要资源的,若资源都被占了就会死锁

疑惑和猜想

1、bthread为啥会涉及rq remote_rq两个队列,一个不可以吗?

如果只有一个,非worker添加的bthread也要入rq,那么就变成多生产者了,当前的WorkStealingQueue是无法满足的,如果要支持多生产者势必会增加开销。

2、为啥wait_task唤醒后,要先去remote_rq里取tid执行呢,而不先从rq里取?

全局的steal优先steal rq,如果本地唤醒后不优先去_remote_rq里取的话remote task可能会长时间得不到调度。

3、bthread在执行过程中需要创建另一个bthread时,会调用TaskGroup::start_foreground(),在start_foreground()内完成bthread 2的TaskMeta对象的创建,并调用sched_to()让worker去执行bthread 2的任务函数,worker在真正执行bthread 2的任务函数前会将bthread 1的tid重新压入TaskGroup的rq队尾,bthread 1不久之后会再次被调度执行,但不一定在此worker内了。

4、多线程相关:若加锁,可能出现一个抢到锁执行到一半时间片轮转了,然后整个系统就不lock-free了。

线程1:futex_wait_private(value, expect),若value与expect相等,当前线程就睡眠过去被kernel调度走(整条语句都是原子的),不相等就失效

线程2:扮演唤醒的角色,我要实现的需求是线程2改变这个value后线程1一定会看到这个改变,change(value)而后futex_wake_private唤醒所有等待在value处的线程

当两bth并发执行时,若th1执行在th2 change之前,因为value是等于expect的,th1睡眠,th2执行到后面唤醒th1,符合预期;若th1执行在th2 change之后,value不等于expect了,th1就不会卡住了。

5、butex:butex几乎等于futex,用butex只会阻塞bthread,不会阻塞pthread

三个成员:原子变量int(value)、waiter queue(所有等在butex上的bth队列)、mutex(保护waiter queue)

  • wait:原子比较value是否等于传入的expect,相等证明要做一次调度,加定时器,把自己放在waiter queue,调度到下一个bthread
  • wake:拿到butex后,从waiter queue中pop出来(一个或多个)挨个放在runq里,用户要按照业务需求自己在wake前原子的改变原子变量值

支持:pthread里也可以调butex,相当于调futex,futex当然会阻塞pthread。

6、ParkingLot:所有worker_thread(TaskGourp)如果没有事情就会等在parkinglot 扮演thread1的作用,只要有1个bthread创建出来,就会唤醒这个睡眠的TaskGroup去偷。那么如何避免全局竞争呢?做法是把全局竞争分散到局部竞争,worker按parkinglot分组(线上一共分8个左右),比如每n个worker放在一个parkinglot里,偷的过程也是无锁的(之前介绍过),这样一次唤醒其实是唤醒本pl里的worker,避免大范围的惊群。

意外收获

1、关于work stealing

不是协程的专利,更不是Go语言的专利。work stealing是一种通用的实现负载均衡的算法。这里的负载均衡指的不是像Nginx那种对于外部网络请求做负载均衡,此处指的是每个CPU处理任务时,每个核的负载均衡。不止协程,其实线程池也可以做work stealing。

2、关于atomic

【例】无锁设计一个并发栈实现push操作

void push(T* node) {
    T* head = _head.load(base::memory_order_relaxed);
    node->next = head;
    while (!_head.compare_exchange_weak(
                head, node, base::memory_order_relaxed)) {
        node->next = head;
    }
}

3、关于_builtin_expect()

_builtin_expect()是gcc提供的一个用于优化分支预测的函数。

if (__builtin_expect(NULL == g, 0)) {
    return -1;
}
// 观察下面两种逻辑的区别
if (NULL == g) {
    fun1();
} else {
    fun2();
}
if (NULL != g) {
    fun2();
} else {
    fun1();
}

如果g绝大概率为NULL,那么第一种性能高,否则是第二种。

CPU中有流水线,具体而言是CPU有指令预取功能:CPU预先取出下一条指令,可以减少CPU等待取指令的耗时,从而可以提供CPU的效率。

但若存在跳转指令(jmp,比如if else的时候), CPU在执行当前指令时,从内存中预取出了当前指令的下一条指令。执行完当前指令后,CPU发现不是要执行预取的指令,则需要重新取指令,所以效率会低。

但是程序员不可能如此费心编排if else,所以可以用__builtin_expect(),其作用为“允许程序员将最有可能执行的分支告诉编译器”。其用法为:

__builtin_expect(EXP, N)    // EXP==N的概率很大。从而让编译器自己生产出高效的代码。
// 所以bthread代码中经常出现下面的代码就是在告诉编译器:NULL!=g的概率很大,以提高效率
if (__builtin_expect(NULL == g, 0)) {
    return -1;
}