菜鸟笔记
提升您的技术认知

brpc的精华bthread源码剖析

阅读 : 86

首先我们先明确几个概念。何谓brpc,bthread,协程

  • brpc
    百度内最常使用的工业级RPC框架,RPC把网络交互类比为“client端访问server端上的函数”:client向server发送request后开始等待,知道server收到请求后,处理、回复client,client又再度恢复并根据response做出反应。
  • bthread
    bthread是brpc开发的一套协程,可以通俗的理解为轻量级的线程
  • 协程
    协程是单线程下的并发,又称微线程,纤程。它是实现多任务的另一种方式,只不过是比线程更小的执行单元。因为它自带CPU的上下文,这样只要在合适的时机,我们可以把一个协程切换到另一个协程。

正如标题所说,brpc的精华全部都在bthread上,而bthread就是我们brpc开发的一套“协程”。而进程,线程,和bthread的关系是什么样的呢?一个进程里面可以开辟多个线程,而线程和协程的关系呢。在微信开源的libco上,线程 :协程 = 1 :N。而在bthread上 线程 :协程 = M :N,而bthread实现的关键就是工作窃取算法。后续会展开描述。
bthread有三大件:

  • TaskControl(进程内唯一)
  • TaskGroup(线程内唯一)
  • TaskMeta(bthread上下文)

由于函数量巨大,所以我绘制了一张函数总体思路总结表,供大家方便理解。

我们从创建函数开始讲起,并且以展开的方式依次对一些我能理解的地方进行剖析。首先我们介绍的是bthread创建函数。

bthread创建函数

好的 下面我一一讲解这两个bthread创建函数。

首先我们会去尝试获取到bthread的tls_task_group,而我们查看tls_task_group的定义处可以看到
__thread TaskGroup* tls_task_group = NULL; 那就证明如果没有tls_task_group的话则需要执行到return中的语句start_from_non_worker(),接下来我们查看到这个函数的定义。

//其一
int bthread_start_background(bthread_t* __restrict tid,
                             const bthread_attr_t* __restrict attr,
                             void * (*fn)(void*),
                             void* __restrict arg) {
  
    bthread::TaskGroup* g = bthread::tls_task_group;
    if (g) {
  
        // start from worker
        return g->start_background<false>(tid, attr, fn, arg);
    }
    return bthread::start_from_non_worker(tid, attr, fn, arg);
}

//其二
int bthread_start_urgent(bthread_t* __restrict tid,
                         const bthread_attr_t* __restrict attr,
                         void * (*fn)(void*),
                         void* __restrict arg) {
  
    bthread::TaskGroup* g = bthread::tls_task_group;
    if (g) {
  
        // start from worker
        return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
    }
    return bthread::start_from_non_worker(tid, attr, fn, arg);
}

start_from_non_worker

这个函数一开始,我们会进行对TaskControl的获取或者新建TaskControl,也就是函数get_or_new_task_control()。

start_from_non_worker(bthread_t* __restrict tid,
                      const bthread_attr_t* __restrict attr,
                      void * (*fn)(void*),
                      void* __restrict arg) {
  
    TaskControl* c = get_or_new_task_control();
    if (NULL == c) {
  
        return ENOMEM;
    }
    if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
  
        // Remember the TaskGroup to insert NOSIGNAL tasks for 2 reasons:
        // 1. NOSIGNAL is often for creating many bthreads in batch,
        //    inserting into the same TaskGroup maximizes the batch.
        // 2. bthread_flush() needs to know which TaskGroup to flush.
        TaskGroup* g = tls_task_group_nosignal;
        if (NULL == g) {
  
            g = c->choose_one_group();
            tls_task_group_nosignal = g;
        }
        return g->start_background<true>(tid, attr, fn, arg);
    }
    return c->choose_one_group()->start_background<true>(
        tid, attr, fn, arg);
}

然后我们查看这个函数的定义:

get_or_new_task_control()

这个函数一开始先对全局变量TC(g_task_control)初始化原子变量,然后通过原子变量进行load,取出TC指针,如果不为空,直接返回。然后对竞争加上自旋锁,重复上一操作。如果加自旋锁也没有获取到TC指针,则说明TC指针确实为NULL,所以我们new一个TC指针,并且用并发度concurrency来初始化全局TC,最后蒋全局TC存入原子变量并返回。

inline TaskControl* get_or_new_task_control() {
  
    butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control;
    TaskControl* c = p->load(butil::memory_order_consume);
    if (c != NULL) {
  
        return c;
    }
    BAIDU_SCOPED_LOCK(g_task_control_mutex);
    c = p->load(butil::memory_order_consume);
    if (c != NULL) {
  
        return c;
    }
    c = new (std::nothrow) TaskControl;
    if (NULL == c) {
  
        return NULL;
    }
    int concurrency = FLAGS_bthread_min_concurrency > 0 ?
        FLAGS_bthread_min_concurrency :
        FLAGS_bthread_concurrency;
    if (c->init(concurrency) != 0) {
  
        LOG(ERROR) << "Fail to init g_task_control";
        delete c;
        return NULL;
    }
    p->store(c, butil::memory_order_release);
    return c;
}

现在有同学就有疑问了,并发度指的是什么?我们先看TaskControl::init()的源码。

TaskControl::init

我们抛开不重要的东西,我们只看到for循环那里,我们可以看到TC的初始化,就是调用了pthread_create()去创建了concurrency个线程,而concurrency就是我们之前提到的并发度,而我之前提到的线程:协程 = N:M在这里就开始展露头角了。

int TaskControl::init(int concurrency) {
  
    if (_concurrency != 0) {
  
        LOG(ERROR) << "Already initialized";
        return -1;
    }
    if (concurrency <= 0) {
  
        LOG(ERROR) << "Invalid concurrency=" << concurrency;
        return -1;
    }
    _concurrency = concurrency;

    // Make sure TimerThread is ready.
    if (get_or_create_global_timer_thread() == NULL) {
  
        LOG(ERROR) << "Fail to get global_timer_thread";
        return -1;
    }
    
    _workers.resize(_concurrency);   
    for (int i = 0; i < _concurrency; ++i) {
  
        const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
        if (rc) {
  
            LOG(ERROR) << "Fail to create _workers[" << i << "], " << berror(rc);
            return -1;
        }
    }
    _worker_usage_second.expose("bthread_worker_usage");
    _switch_per_second.expose("bthread_switch_second");
    _signal_per_second.expose("bthread_signal_second");
    _status.expose("bthread_group_status");

    // Wait for at least one group is added so that choose_one_group()
    // never returns NULL.
    // TODO: Handle the case that worker quits before add_group
    while (_ngroup == 0) {
  
        usleep(100);  // TODO: Elaborate
    }
    return 0;
}

在这里我们可以看到每个线程有一个回调函数worker_thread,下面我们看到这个回调函数的源码。

TaskControl::worker_thread

首先我们先获取一个TC的指针,然后用create_group创建一个TG,然后初始化tls_task_group,并且把_nworkers加一,然后运行主任务run_main_task,注意了,这里是一个死循环,如果推出了一些死循环,就输出一些日志,并且进行清理工作。

void* TaskControl::worker_thread(void* arg) {
  
    run_worker_startfn();    
#ifdef BAIDU_INTERNAL
    logging::ComlogInitializer comlog_initializer;
#endif
    
    TaskControl* c = static_cast<TaskControl*>(arg);
    TaskGroup* g = c->create_group();
    TaskStatistics stat;
    if (NULL == g) {
  
        LOG(ERROR) << "Fail to create TaskGroup in pthread=" << pthread_self();
        return NULL;
    }
    BT_VLOG << "Created worker=" << pthread_self()
            << " bthread=" << g->main_tid();

    tls_task_group = g;
    c->_nworkers << 1;
    g->run_main_task();

    stat = g->main_stat();
    BT_VLOG << "Destroying worker=" << pthread_self() << " bthread="
            << g->main_tid() << " idle=" << stat.cputime_ns / 1000000.0
            << "ms uptime=" << g->current_uptime_ns() / 1000000.0 << "ms";
    tls_task_group = NULL;
    g->destroy_self();
    c->_nworkers << -1;
    return NULL;
}

好的,这里我们先对create_group和run_main_task的源码进行分析一下。

TaskControl::create_group()

这里我们先创建了一个TG,然后用TaskGroup::init进行初始化大小。然后用_add_group判断非工作线程,如果是则delete,这里我们先看到TaskGroup::init函数。

TaskGroup* TaskControl::create_group() {
  
    TaskGroup* g = new (std::nothrow) TaskGroup(this);
    if (NULL == g) {
  
        LOG(FATAL) << "Fail to new TaskGroup";
        return NULL;
    }
    if (g->init(FLAGS_task_group_runqueue_capacity) != 0) {
  
        LOG(ERROR) << "Fail to init TaskGroup";
        delete g;
        return NULL;
    }
    if (_add_group(g) != 0) {
  
        delete g;
        return NULL;
    }
    return g;
}

TaskGroup::init

这里我们可以看到这个函数用runqueue_capacity来控制这rq和remote_rq队列的大小。而这个数值的默认值为4096,然后进行get_stack()操作。进行完get_stack()操作之后,然后建立资源池slot,然后从资源池中获取TM。

int TaskGroup::init(size_t runqueue_capacity) {
  
    if (_rq.init(runqueue_capacity) != 0) {
  
        LOG(FATAL) << "Fail to init _rq";
        return -1;
    }
    if (_remote_rq.init(runqueue_capacity / 2) != 0) {
  
        LOG(FATAL) << "Fail to init _remote_rq";
        return -1;
    }
    ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
    if (NULL == stk) {
  
        LOG(FATAL) << "Fail to get main stack container";
        return -1;
    }
    butil::ResourceId<TaskMeta> slot;
    TaskMeta* m = butil::get_resource<TaskMeta>(&slot);
    if (NULL == m) {
  
        LOG(FATAL) << "Fail to get TaskMeta";
        return -1;
    }
    m->stop = false;
    m->interrupted = false;
    m->about_to_quit = false;
    m->fn = NULL;
    m->arg = NULL;
    m->local_storage = LOCAL_STORAGE_INIT;
    m->cpuwide_start_ns = butil::cpuwide_time_ns();
    m->stat = EMPTY_STAT;
    m->attr = BTHREAD_ATTR_TASKGROUP;
    m->tid = make_tid(*m->version_butex, slot);
    m->set_stack(stk);

    _cur_meta = m;
    _main_tid = m->tid;
    _main_stack = stk;
    _last_run_ns = butil::cpuwide_time_ns();
    return 0;
}

get_stack

这是get_stack()的源码,这个函数里面只有一个switch操作,有四个栈类型选项。分别是:

栈名 栈大小
STACK_TYPE_SMALL 32k(32768)
STACK_TYPE_NORMAL 1M(1048576)
STACK_TYPE_LARGE 8M(8388608)

还有一个是STACK_TYPE_MAIN

前三个都归为一类,是StackFactory的通用模板,而最后一个另起一类,是其的特例化模板。

inline ContextualStack* get_stack(StackType type, void (*entry)(intptr_t)) {
  
    switch (type) {
  
    case STACK_TYPE_PTHREAD:
        return NULL;
    case STACK_TYPE_SMALL:
        return StackFactory<SmallStackClass>::get_stack(entry);
    case STACK_TYPE_NORMAL:
        return StackFactory<NormalStackClass>::get_stack(entry);
    case STACK_TYPE_LARGE:
        return StackFactory<LargeStackClass>::get_stack(entry);
    case STACK_TYPE_MAIN:
        return StackFactory<MainStackClass>::get_stack(entry);
    }
    return NULL;
}

首先我们看到通用模板的定义:

template <typename StackClass> struct StackFactory {
  
    struct Wrapper : public ContextualStack {
  
        explicit Wrapper(void (*entry)(intptr_t)) {
  
            if (allocate_stack_storage(&storage, *StackClass::stack_size_flag,
                                       FLAGS_guard_page_size) != 0) {
  
                storage.zeroize();
                context = NULL;
                return;
            }
            context = bthread_make_fcontext(storage.bottom, storage.stacksize, entry);
            stacktype = (StackType)StackClass::stacktype;
        }
        ~Wrapper() {
  
            if (context) {
  
                context = NULL;
                deallocate_stack_storage(&storage);
                storage.zeroize();
            }
        }
    };
    
    static ContextualStack* get_stack(void (*entry)(intptr_t)) {
  
        return butil::get_object<Wrapper>(entry);
    }
    
    static void return_stack(ContextualStack* sc) {
  
        butil::return_object(static_cast<Wrapper*>(sc));
    }
};

这是StackFactory的通用模板,我们可以发现两个函数和一个内部类Wapper。这两个函数一个是获取栈,一个是归还栈,这两个操作是互逆的。这个内部类的bthread_make_fcontext函数会对其三个变量进行初始化,而这三个变量都是ContextualStack的成员。

struct ContextualStack {
  
    bthread_fcontext_t context;
    StackType stacktype;
    StackStorage storage;
};

说完了通用模板,我们来说一下特例化模板。

template <> struct StackFactory<MainStackClass> {
  
    static ContextualStack* get_stack(void (*)(intptr_t)) {
  
        ContextualStack* s = new (std::nothrow) ContextualStack;
        if (NULL == s) {
  
            return NULL;
        }
        s->context = NULL;
        s->stacktype = STACK_TYPE_MAIN;
        s->storage.zeroize();
        return s;
    }
    
    static void return_stack(ContextualStack* s) {
  
        delete s;
    }
};

特例化模板的代码量比通用模板的代码量要少,那么说明特例化模板的思路更加简单啦。因为特例化没有Wapper,也就没有调用那个分配上下文的函数bthread_make_fcontext()。

TaskGroup::run_main_task()

void TaskGroup::run_main_task() {
  
    bvar::PassiveStatus<double> cumulated_cputime(
        get_cumulated_cputime_from_this, this);
    std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;

    TaskGroup* dummy = this;
    bthread_t tid;
    while (wait_task(&tid)) {
  
        TaskGroup::sched_to(&dummy, tid);
        DCHECK_EQ(this, dummy);
        DCHECK_EQ(_cur_meta->stack, _main_stack);
        if (_cur_meta->tid != _main_tid) {
  
            TaskGroup::task_runner(1/*skip remained*/);
        }
        if (FLAGS_show_per_worker_usage_in_vars && !usage_bvar) {
  
            char name[32];
#if defined(OS_MACOSX)
            snprintf(name, sizeof(name), "bthread_worker_usage_%" PRIu64,
                     pthread_numeric_id());
#else
            snprintf(name, sizeof(name), "bthread_worker_usage_%ld",
                     (long)syscall(SYS_gettid));
#endif
            usage_bvar.reset(new bvar::PerSecond<bvar::PassiveStatus<double> >
                             (name, &cumulated_cputime, 1));
        }
    }
    // Don't forget to add elapse of last wait_task.
    current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
}

run_main_task的三大关键函数。

  • wait_task(等)
    wait_task就是等待找到任务,而这里就会涉及到工作窃取。下面我们先去看一下wait_task的源代码,看看究竟是怎么做到工作窃取的。

run_main_task

bool TaskGroup::wait_task(bthread_t* tid) {
  
    do {
  
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
        if (_last_pl_state.stopped()) {
  
            return false;
        }
        _pl->wait(_last_pl_state);
        if (steal_task(tid)) {
  
            return true;
        }
    } while (true);
}

这里我们先看到_last_pl_state和stopped()两个新颖的东西,我们先看到一下stopped的源码。

class BAIDU_CACHELINE_ALIGNMENT ParkingLot {
  
public:
    class State {
  
    public:
        State(): val(0) {
  }
        bool stopped() const {
   return val & 1; }
    private:
    friend class ParkingLot;
        State(int val) : val(val) {
  }
        int val;
    };

    ParkingLot() : _pending_signal(0) {
  }

    // Wake up at most `num_task' workers.
    // Returns #workers woken up.
    int signal(int num_task) {
  
        _pending_signal.fetch_add((num_task << 1), butil::memory_order_release);
        return futex_wake_private(&_pending_signal, num_task);
    }

    // Get a state for later wait().
    State get_state() {
  
        return _pending_signal.load(butil::memory_order_acquire);
    }

    // Wait for tasks.
    // If the `expected_state' does not match, wait() may finish directly.
    void wait(const State& expected_state) {
  
        futex_wait_private(&_pending_signal, expected_state.val, NULL);
    }

    // Wakeup suspended wait() and make them unwaitable ever. 
    void stop() {
  
        _pending_signal.fetch_or(1);
        futex_wake_private(&_pending_signal, 10000);
    }
private:
    // higher 31 bits for signalling, LSB for stopping.
    butil::atomic<int> _pending_signal;
};

好的,看stopped的源码直接给我揪出了一个类,那我们就不避免的去了解一下这个类了。这个类里面有一个内部类State,Parkinglot是它的友元。我们先看一下它的成员函数signal,唤醒最多num_task<<1个worker,这个位操作世纪就是把num_task乘了2。接着调用了futex_wake_private。而这个函数的源码为:

inline int futex_wait_private(
    void* addr1, int expected, const timespec* timeout) {
  
    return syscall(SYS_futex, addr1, (FUTEX_WAIT | FUTEX_PRIVATE_FLAG),
                   expected, timeout, NULL, 0);
}

这个函数只有一条语句,就是return语句。之所以要这样做,其实就是对于系统调用SYS_futex的封装。
然后我们回到wait_task函数,wait函数就是阻塞等待通知,被通知后执行steal_task()函数。

steal_task

首先是_remote_rq队列中的任务出队,如果没有则全局TC来窃取任务。

bool steal_task(bthread_t* tid) {
  
        if (_remote_rq.pop(tid)) {
  
            return true;
        }
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
        _last_pl_state = _pl->get_state();
#endif
        return _control->steal_task(tid, &_steal_seed, _steal_offset);
    }

调用到的TaskControl::steal_task()源码如下:

TaskControl::steal_task

先随机找一个TG,先从它的rq队列窃取任务,如果失败则从remote_rq队列窃取任务。所以说,rq比remote_rq的优先级更高。这里疑问就来了,为啥是这么个顺序?这里是为了避免资源竞态,避免多个TG等待任务的时候,当前TG从rq中取任务,与其他TG过来自己这边窃取任务造成竞态。

bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
  
    // 1: Acquiring fence is paired with releasing fence in _add_group to
    // avoid accessing uninitialized slot of _groups.
    const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
    if (0 == ngroup) {
  
        return false;
    }

    // NOTE: Don't return inside `for' iteration since we need to update |seed|
    bool stolen = false;
    size_t s = *seed;
    for (size_t i = 0; i < ngroup; ++i, s += offset) {
  
        TaskGroup* g = _groups[s % ngroup];
        // g is possibly NULL because of concurrent _destroy_group
        if (g) {
  
            if (g->_rq.steal(tid)) {
  
                stolen = true;
                break;
            }
            if (g->_remote_rq.pop(tid)) {
  
                stolen = true;
                break;
            }
        }
    }
    *seed = s;
    return stolen;
}

TaskGroup::sched_to

  • sched_to(切换栈)
    首先先通过传入的参数next_tid找到TM,next_meta,和对应的ContextualStack信息:stk。然后给next_meta设置栈stk。然后调用重载的sched_to函数。
inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
  
    TaskMeta* next_meta = address_meta(next_tid);
    if (next_meta->stack == NULL) {
  
        ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
        if (stk) {
  
            next_meta->set_stack(stk);
        } else {
  
            // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
            // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
            // This basically means that if we can't allocate stack, run
            // the task in pthread directly.
            next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
            next_meta->set_stack((*pg)->_main_stack);
        }
    }
    // Update now_ns only when wait_task did yield.
    sched_to(pg, next_meta);
}

TaskGroup::sched_to

显示记录一些数据,判断下一个TM和当前TM是否相等,如果不相等,则去切换栈。tls_bls是当前TM的局部存储,先做还原,然后赋值成下一个TM的局部存储,然后jump_stack()去切换栈。至于jump_stack()涉及到汇编知识,而我不太擅长,就不展开讲了。

void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
  
    TaskGroup* g = *pg;
#ifndef NDEBUG
    if ((++g->_sched_recursive_guard) > 1) {
  
        LOG(FATAL) << "Recursively(" << g->_sched_recursive_guard - 1
                   << ") call sched_to(" << g << ")";
    }
#endif
    // Save errno so that errno is bthread-specific.
    const int saved_errno = errno;
    void* saved_unique_user_ptr = tls_unique_user_ptr;

    TaskMeta* const cur_meta = g->_cur_meta;
    const int64_t now = butil::cpuwide_time_ns();
    const int64_t elp_ns = now - g->_last_run_ns;
    g->_last_run_ns = now;
    cur_meta->stat.cputime_ns += elp_ns;
    if (cur_meta->tid != g->main_tid()) {
  
        g->_cumulated_cputime_ns += elp_ns;
    }
    ++cur_meta->stat.nswitch;
    ++ g->_nswitch;
    // Switch to the task
    if (__builtin_expect(next_meta != cur_meta, 1)) {
  
        g->_cur_meta = next_meta;
        // Switch tls_bls
        cur_meta->local_storage = tls_bls;
        tls_bls = next_meta->local_storage;

        // Logging must be done after switching the local storage, since the logging lib
        // use bthread local storage internally, or will cause memory leak.
        if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) ||
            (next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) {
  
            LOG(INFO) << "Switch bthread: " << cur_meta->tid << " -> "
                      << next_meta->tid;
        }

        if (cur_meta->stack != NULL) {
  
            if (next_meta->stack != cur_meta->stack) {
  
                jump_stack(cur_meta->stack, next_meta->stack);
                // probably went to another group, need to assign g again.
                g = tls_task_group;
            }
#ifndef NDEBUG
            else {
  
                // else pthread_task is switching to another pthread_task, sc
                // can only equal when they're both _main_stack
                CHECK(cur_meta->stack == g->_main_stack);
            }
#endif
        }
        // else because of ending_sched(including pthread_task->pthread_task)
    } else {
  
        LOG(FATAL) << "bthread=" << g->current_tid() << " sched_to itself!";
    }

    while (g->_last_context_remained) {
  
        RemainedFn fn = g->_last_context_remained;
        g->_last_context_remained = NULL;
        fn(g->_last_context_remained_arg);
        g = tls_task_group;
    }

    // Restore errno
    errno = saved_errno;
    tls_unique_user_ptr = saved_unique_user_ptr;

#ifndef NDEBUG
    --g->_sched_recursive_guard;
#endif
    *pg = g;
}

  • task_runner(执行)
    下面我们来看一下task_runner的源码

TaskGroup::task_runner

源码很长,由于task_runner的输入参数为1,则最上面的if逻辑语句会跳过。然后下面就是一个do-while循环,先执行回调函数,然后清理线程局部变量,累加版本号,唤醒join,_nbthreads减一。然后调用ending_sched()查找下一个任务。当tls_task_group ==main_tid时终止循环。

void TaskGroup::task_runner(intptr_t skip_remained) {
  
    // NOTE: tls_task_group is volatile since tasks are moved around
    //       different groups.
    TaskGroup* g = tls_task_group;

    if (!skip_remained) {
  
        while (g->_last_context_remained) {
  
            RemainedFn fn = g->_last_context_remained;
            g->_last_context_remained = NULL;
            fn(g->_last_context_remained_arg);
            g = tls_task_group;
        }

#ifndef NDEBUG
        --g->_sched_recursive_guard;
#endif
    }

    do {
  
        // A task can be stopped before it gets running, in which case
        // we may skip user function, but that may confuse user:
        // Most tasks have variables to remember running result of the task,
        // which is often initialized to values indicating success. If an
        // user function is never called, the variables will be unchanged
        // however they'd better reflect failures because the task is stopped
        // abnormally.

        // Meta and identifier of the task is persistent in this run.
        TaskMeta* const m = g->_cur_meta;

        if (FLAGS_show_bthread_creation_in_vars) {
  
            // NOTE: the thread triggering exposure of pending time may spend
            // considerable time because a single bvar::LatencyRecorder
            // contains many bvar.
            g->_control->exposed_pending_time() <<
                (butil::cpuwide_time_ns() - m->cpuwide_start_ns) / 1000L;
        }

        // Not catch exceptions except ExitException which is for implementing
        // bthread_exit(). User code is intended to crash when an exception is
        // not caught explicitly. This is consistent with other threading
        // libraries.
        void* thread_return;
        try {
  
            thread_return = m->fn(m->arg);
        } catch (ExitException& e) {
  
            thread_return = e.value();
        }

        // Group is probably changed
        g = tls_task_group;

        // TODO: Save thread_return
        (void)thread_return;

        // Logging must be done before returning the keytable, since the logging lib
        // use bthread local storage internally, or will cause memory leak.
        // FIXME: the time from quiting fn to here is not counted into cputime
        if (m->attr.flags & BTHREAD_LOG_START_AND_FINISH) {
  
            LOG(INFO) << "Finished bthread " << m->tid << ", cputime="
                      << m->stat.cputime_ns / 1000000.0 << "ms";
        }

        // Clean tls variables, must be done before changing version_butex
        // otherwise another thread just joined this thread may not see side
        // effects of destructing tls variables.
        KeyTable* kt = tls_bls.keytable;
        if (kt != NULL) {
  
            return_keytable(m->attr.keytable_pool, kt);
            // After deletion: tls may be set during deletion.
            tls_bls.keytable = NULL;
            m->local_storage.keytable = NULL; // optional
        }

        // Increase the version and wake up all joiners, if resulting version
        // is 0, change it to 1 to make bthread_t never be 0. Any access
        // or join to the bthread after changing version will be rejected.
        // The spinlock is for visibility of TaskGroup::get_attr.
        {
  
            BAIDU_SCOPED_LOCK(m->version_lock);
            if (0 == ++*m->version_butex) {
  
                ++*m->version_butex;
            }
        }
        butex_wake_except(m->version_butex, 0);

        g->_control->_nbthreads << -1;
        g->set_remained(TaskGroup::_release_last_context, m);
        ending_sched(&g);

    } while (g->_cur_meta->tid != g->_main_tid);

    // Was called from a pthread and we don't have BTHREAD_STACKTYPE_PTHREAD
    // tasks to run, quit for more tasks.
}

好的,现在我们讲解完了get_or_new_task_control()函数,我们继续往上一级看。然后会调用start_background< true >函数。由于在bthread_start_background中if(g)也使用了start_background函数,只是后面是true和false的差别。我们一起来看一看这个函数的源码。

TaskGroup::start_background

这个函数的前文就是对一些变量的初始化。而前面的true和false的判断就体现在最后的if语句中。如果是真,则运行的是ready_to_run_remote()

template <bool REMOTE>
int TaskGroup::start_background(bthread_t* __restrict th,
                                const bthread_attr_t* __restrict attr,
                                void * (*fn)(void*),
                                void* __restrict arg) {
  
    if (__builtin_expect(!fn, 0)) {
  
        return EINVAL;
    }
    const int64_t start_ns = butil::cpuwide_time_ns();
    const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
    butil::ResourceId<TaskMeta> slot;
    TaskMeta* m = butil::get_resource(&slot);
    if (__builtin_expect(!m, 0)) {
  
        return ENOMEM;
    }
    CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
    m->stop = false;
    m->interrupted = false;
    m->about_to_quit = false;
    m->fn = fn;
    m->arg = arg;
    CHECK(m->stack == NULL);
    m->attr = using_attr;
    m->local_storage = LOCAL_STORAGE_INIT;
    m->cpuwide_start_ns = start_ns;
    m->stat = EMPTY_STAT;
    m->tid = make_tid(*m->version_butex, slot);
    *th = m->tid;
    if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
  
        LOG(INFO) << "Started bthread " << m->tid;
    }
    _control->_nbthreads << 1;
    if (REMOTE) {
  
        ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    } else {
  
        ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    }
    return 0;
}

TaskGroup::ready_to_run_remote

先给当前TG的remote_rq加互斥锁。然后入队,然后一个while循环,入队失败则休眠1ms,再继续入队。入队失败的原因只有一个,那就是队列容量已满。nosigner一般多为false,所以选择到一个else这里,最后调用了一个signal_task。

void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
  
    _remote_rq._mutex.lock();
    while (!_remote_rq.push_locked(tid)) {
  
        flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
        LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity="
                                << _remote_rq.capacity();
        ::usleep(1000);
        _remote_rq._mutex.lock();
    }
    if (nosignal) {
  
        ++_remote_num_nosignal;
        _remote_rq._mutex.unlock();
    } else {
  
        const int additional_signal = _remote_num_nosignal;
        _remote_num_nosignal = 0;
        _remote_nsignaled += 1 + additional_signal;
        _remote_rq._mutex.unlock();
        _control->signal_task(1 + additional_signal);
    }
}

TaskControl::signal_task

num_task如果大于2,则重新赋值为2。因为如果num_task越大,消费得也就越快,但是如果一直消费,函数调用是阻塞的,这样bthread_start_background就不好生产任务,在这里也是达到了一种平衡。

void TaskControl::signal_task(int num_task) {
  
    if (num_task <= 0) {
  
        return;
    }
    // TODO(gejun): Current algorithm does not guarantee enough threads will
    // be created to match caller's requests. But in another side, there's also
    // many useless signalings according to current impl. Capping the concurrency
    // is a good balance between performance and timeliness of scheduling.
    if (num_task > 2) {
  
        num_task = 2;
    }
    int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
    num_task -= _pl[start_index].signal(1);
    if (num_task > 0) {
  
        for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
  
            if (++start_index >= PARKING_LOT_NUM) {
  
                start_index = 0;
            }
            num_task -= _pl[start_index].signal(1);
        }
    }
    if (num_task > 0 &&
        FLAGS_bthread_min_concurrency > 0 &&    // test min_concurrency for performance
        _concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) {
  
        // TODO: Reduce this lock
        BAIDU_SCOPED_LOCK(g_task_control_mutex);
        if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) {
  
            add_workers(1);
        }
    }
}

而当为false时,则调用ready_ro_run。

TaskGroup::ready_to_run

而ready_to_run相较于ready_to_remote对比就简洁很多了。而这两个的区别就是ready_to_run()就是把任务入队到TG的 rq,ready_to_run_remote()是在当前线程不是brpc的worker()的时候(在worker外创建的 bthread任务),把任务通过TC入队到某个TG的 remote_rq。

void TaskGroup::ready_to_run(bthread_t tid, bool nosignal) {
  
    push_rq(tid);
    if (nosignal) {
  
        ++_num_nosignal;
    } else {
  
        const int additional_signal = _num_nosignal;
        _num_nosignal = 0;
        _nsignaled += 1 + additional_signal;
        _control->signal_task(1 + additional_signal);
    }
}

说了这么多了,回归到最初的两个新建函数,还一个新建函数是bthread_start_urgent。

bthread_start_urgent

大致是相同的,但我们会发现在存在TG的情况下,调用的函数会稍微有些区别。这里调用的是TaskGroup::start_foreground。

int bthread_start_urgent(bthread_t* __restrict tid,
                         const bthread_attr_t* __restrict attr,
                         void * (*fn)(void*),
                         void* __restrict arg) {
  
    bthread::TaskGroup* g = bthread::tls_task_group;
    if (g) {
  
        // start from worker
        return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
    }
    return bthread::start_from_non_worker(tid, attr, fn, arg);
}

TaskGroup::start_foreground

我们可以通过源码观察到,其实这个新建bthread函数就是把运行一起承包了,当然,耗时也会有差别。我们在测试运行上可以看出来差距。

int TaskGroup::start_foreground(TaskGroup** pg,
                                bthread_t* __restrict th,
                                const bthread_attr_t* __restrict attr,
                                void * (*fn)(void*),
                                void* __restrict arg) {
  
    if (__builtin_expect(!fn, 0)) {
  
        return EINVAL;
    }
    const int64_t start_ns = butil::cpuwide_time_ns();
    const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
    butil::ResourceId<TaskMeta> slot;
    TaskMeta* m = butil::get_resource(&slot);
    if (__builtin_expect(!m, 0)) {
  
        return ENOMEM;
    }
    CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
    m->stop = false;
    m->interrupted = false;
    m->about_to_quit = false;
    m->fn = fn;
    m->arg = arg;
    CHECK(m->stack == NULL);
    m->attr = using_attr;
    m->local_storage = LOCAL_STORAGE_INIT;
    m->cpuwide_start_ns = start_ns;
    m->stat = EMPTY_STAT;
    m->tid = make_tid(*m->version_butex, slot);
    *th = m->tid;
    if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
  
        LOG(INFO) << "Started bthread " << m->tid;
    }

    TaskGroup* g = *pg;
    g->_control->_nbthreads << 1;
    if (g->is_current_pthread_task()) {
  
        // never create foreground task in pthread.
        g->ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    } else {
  
        // NOSIGNAL affects current task, not the new task.
        RemainedFn fn = NULL;
        if (g->current_task()->about_to_quit) {
  
            fn = ready_to_run_in_worker_ignoresignal;
        } else {
  
            fn = ready_to_run_in_worker;
        }
        ReadyToRunArgs args = {
  
            g->current_tid(),
            (bool)(using_attr.flags & BTHREAD_NOSIGNAL)
        };
        g->set_remained(fn, &args);
        TaskGroup::sched_to(pg, m->tid);
    }
    return 0;
}

在这里我们进行的测试是测试利用bthread_start_urgent和bthread_start_background的创建耗时对比。创建10000个样例然后进行除法测出两种方法创建bthread所需时间。

TEST_F(BthreadTest, start_latency_when_high_idle) {
  
    bool warmup = true;
    long elp1 = 0;
    long elp2 = 0;
    int REP = 0;
    for (int i = 0; i < 10000; ++i) {
  
        butil::Timer tm;
        tm.start();
        bthread_t th;
        bthread_start_urgent(&th, NULL, log_start_latency, &tm);
        bthread_join(th, NULL);
        bthread_t th2;
        butil::Timer tm2;
        tm2.start();
        bthread_start_background(&th2, NULL, log_start_latency, &tm2);
        bthread_join(th2, NULL);
        if (!warmup) {
  
            ++REP;
            elp1 += tm.n_elapsed();
            elp2 += tm2.n_elapsed();
        } else if (i == 100) {
  
            warmup = false;
        }
    }
    LOG(INFO) << "start_urgent=" << elp1 / REP << "ns start_background="
              << elp2 / REP << "ns";
}

这是一段测试代码,我们通过make然后脚本运行后观察数据。

发现bthread_start_background要比bthread_start_urgent耗时虽说差距不大,但是bthread_start_background还时要快一点。

以上就是我对bthread的理解了,有问题欢迎指出哦!