菜鸟笔记
提升您的技术认知

bthread(四) bthread用户接口和代码执行路径

阅读 : 105

bthread用户接口

bthread_t(类似pthread_t):64位int,前32位版本号防止ABA问题(释放后又重新被分配了,歧义),后32位为资源池(无全局竞争可O(1)访问的数组)中下标,可以很快地找到TaskMeta(里面有个变量就是这个bth的栈)

起bth基本函数

bthread_start_urgent & bthread_start_background 里面都是先判断g是否为空(是否运行在worker里),是的话运行start_xxx,不是运行start_from_non_worker(直接调用start_background<true>)

  • start_foreground:如果当前在bthread内,就在当前worker内原地启动(调ready_to_run或ready_to_run_remote)保证locality,把当前bth加到_rq队尾,再去跑新bthread,实现:set_remained(ready_to_run(current_bth)) + sched_to(new_bth)
  • start_background:后台起,但是不希望立刻跑这个bthread,将来有时间跑就行,直接调ready_to_run把新bth加到_rq队尾,如果是在start_from_non_worker里起就加到_remote_rq,实现:ready_to_run[_remote](new_bth)
// 让出当前worker立即执行新bthread,当前bthread随后调度
int bthread_start_urgent(bthread_t* __restrict tid,
                         const bthread_attr_t* __restrict attr,
                         void * (*fn)(void*),
                         void* __restrict arg) __THROW {
    bthread::TaskGroup* g = bthread::tls_task_group;
    if (g) {
         // 从worker里起的bthread,调ready_to_run,再主动调一遍sched_to去执行这个tid
        return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
    }
    // 一些初始化的工作,最后会创建1个TC、4个PL、9个pthread、9个TG
    return bthread::start_from_non_worker(tid, attr, fn, arg);
}
 
// 放到队列里,不急着切线程(sched_to)
int bthread_start_background(bthread_t* __restrict tid,
                             const bthread_attr_t* __restrict attr,
                             void * (*fn)(void*),
                             void* __restrict arg) __THROW {
    bthread::TaskGroup* g = bthread::tls_task_group;
    if (g) {
        // 从worker里起的bthread,调ready_to_run
        return g->start_background<false>(tid, attr, fn, arg);
    }
    // 同上
    return bthread::start_from_non_worker(tid, attr, fn, arg);
}
 
start_from_non_worker(bthread_t* __restrict tid,
                      const bthread_attr_t* __restrict attr,
                      void * (*fn)(void*),
                      void* __restrict arg) {
    // 核心函数:创建TaskGroup,见下
    TaskControl* c = get_or_new_task_control();
    ...
    // 选择一个TaskGroup执行start_background<true>,调ready_to_run_remote
    return c->choose_one_group()->start_background<true>(
        tid, attr, fn, arg);
}
 
inline TaskControl* get_or_new_task_control() {
    // 全局变量TC(g_task_control)初始化原子变量
    butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control;
    // 通过原子变量进行load,取出TC指针,如果不为空,直接返回
    TaskControl* c = p->load(butil::memory_order_consume);
    if (c != NULL) {
        return c;
    }
    ...   
    // 走到这,说明TC确实为NULL,开始new一个
    c = new (std::nothrow) TaskControl;
    if (NULL == c) {
        return NULL;
    }
    // 用并发度concurrency来初始化全局TC
    int concurrency = FLAGS_bthread_min_concurrency > 0 ?
        FLAGS_bthread_min_concurrency :
        FLAGS_bthread_concurrency;
 
 
    // init函数核心见下
    if (c->init(concurrency) != 0) {
        LOG(ERROR) << "Fail to init g_task_control";
        delete c;
        return NULL;
    }
 
    // 4. 将全局TC存入原子变量中
    p->store(c, butil::memory_order_release);
    return c;
}
 
// TaskControl的init函数
for (int i = 0; i < _concurrency; ++i) {
    const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
    if (rc) {
        LOG(ERROR) << "Fail to create _workers[" << i << "], " << berror(rc);
        return -1;
    }
}
// worker_thread之前有过说明

【代码说明】若TaskGroup为空,证明当前是跑在pthread上,调用start_from_non_worker检查是否已经创建了TackControl单例,已经创建就无所谓了,没有就new TaskControl去创建,new后会执行TackControl的init,核心就是用pthread启动指定数量的worker。得到TackControl单例后,用TackControl选取一个TackGroup,新建bthread进行调度;TaskGroup不为空,表明此bthread就是在worker内被创建的,在对应TackGroup(worker)执行新bthread的启动即可。

两个常见的阻塞操作

  • yield:把当前的bth加到_rq队尾,执行调度函数选下一个,实现:set_remained(ready_to_run(current_bth)) + sched
  • usleep:涉及超时管理,先从当前队里摘掉,把当前bth加到_remote_rq里,当定时器到了,加到_rq里运行,实现:add timer(ready_to_run_remote(current_bth)) + sched

基本流程

TC创建后会pthread_create 9个线程去初始化TG,避免全局变量加锁处理,效率低,也要避免惊群。一旦出现某一个bthread可以被偷了会唤醒很多worker,会发生惊群,要处理。

void* TaskControl::worker_thread(void* arg) {
    // 获取TC指针
    TaskControl* c = static_cast<TaskControl*>(arg);
    // 创建一个task_group
    TaskGroup* g = c->create_group();
    TaskStatistics stat;
    if (NULL == g) {
        LOG(ERROR) << "Fail to create TaskGroup in pthread=" << pthread_self();
        return NULL;
    }
  
    // 重要变量,定义BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
    // tls_task_group是一个thread_local变量,且只有由TaskControl启动的worker线程所拥有的task_group tls_task_group非null
    tls_task_group = g;
    // 计数+1
    c->_nworkers << 1;
    // 当前线程主要运行的任务,即死循环中等待唤醒
    g->run_main_task();
    // ...
    // 销毁
    tls_task_group = NULL;
    g->destroy_self();
    c->_nworkers << -1;
    return NULL;
}

每个TG创建后会创建后当前线程的各种变量,然后去调 run_main_task 去循环等待是否有可处理的bthread。

void TaskGroup::run_main_task() {
    // ......
    TaskGroup* dummy = this;
    bthread_t tid;
    // 这个while就是卡住等bthread id,这个id可能是从队列里取出来的,也可能是偷到的
    while (wait_task(&tid)) {   // 1
        // 这里的唤醒是用futex实现的,也就是利用一个原子变量判断是否需要去尝试加锁访问,然后在实际去加锁访问
        // 走到这里就说明已经获取到一个可执行的bthread,那么调sched_to去切线程
        TaskGroup::sched_to(&dummy, tid);   // 2
        DCHECK_EQ(this, dummy);
        DCHECK_EQ(_cur_meta->stack, _main_stack);
        if (_cur_meta->tid != _main_tid) {
            // 只有当前线程不是调度bthread线程的时候才会去执行用户func
            TaskGroup::task_runner(1/*skip remained*/); // 3
        }
        // ......
}
 
// 1 每个worker调度线程卡在的位置
bool TaskGroup::wait_task(bthread_t* tid) {
    do {
        // ...
        _pl->wait(_last_pl_state);  
        // wait:内部调用的futex做的wait操作,这里可以简单理解为阻塞等待被通知来终止阻塞
        // 当阻塞结束之后,执行steal_task()来work stealing,如果窃取成功则返回,不成功就继续wait
        if (steal_task(tid)) {
            // 顺序为先看本地_remote_rq再去其他worker偷
            // 偷的顺序为先偷别_rq的再偷_remote_rq的
            return true;
        }
    } while (true);
}
 
// 2 用内核调用实现上下文的切换
inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
    // 根据tid找TaskMeta
    TaskMeta* next_meta = address_meta(next_tid);
    // 给stk用next_meta赋一些值
    if (next_meta->stack == NULL) {
        ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
        // ...
    }
    // 重载的sched_to会判断next_meta和cur_meta是否为同一个不是同一个需要调jump_stack去切栈
    // 这其中直接嵌入了汇编代码模拟pthread切栈操作
    sched_to(pg, next_meta);
}
 
// 3 执行用户实际的func并找下一个
void TaskGroup::task_runner() {
    TaskMeta* const m = g->_cur_meta;
    // 执行应用程序设置的任务函数,在任务函数中可能yield让出cpu,也可能产生新的bthread
    m->fn(m->arg);
    // 任务函数执行完成后,需要唤起等待该任务函数执行结束的pthread/bthread
    butex_wake_except(m->version_butex, 0);
    // ending_sched函数本意:将pthread线程执行流转入下一个可执行的bthread(普通bthread或“调度bthread”)
    // 在ending_sched()内会尝试从本地TaskGroup的rq队列中找出下一个bthread
    // 或者调steal_task从其他pthread的TaskGroup上steal一个bthread(当然也是先看_remote_rq,再偷)
    // 如果没有bthread可用则下一个被执行的就是pthread的“调度bthread”
    // 然后通过sched_to()将pthread的执行流转入下一个bthread的任务函数
    ending_sched(&g);
}

【场景】以proxy为例首先在mixer-framework里已经调用过了bthread_start_background初始化一些必要的东西(TC、TG等),然后在send_minibs_request_impl函数中用bthread_start_background则是相当于在bthread内启动了一个新的bthread,想要执行函数func,由于这个新bth不是在pthread内起的,所以肯定会被放在该TaskGroup的_remote_rq中,然后,这个阻塞在wait_task上的TaskGroup会被唤醒,进入以下步骤取出tid来:

  • 首先从本TaskGroup的_remote_rq队列中pop出一个tid(_remote_rq.pop(tid)),pop成功就直接返回
  • 如果pop失败了,调用steal_task从其它TaskGroup中去偷一个tid,偷的顺序在上面

无论如何拿到tid后,进入TaskGroup::sched_to去切栈,具体栈跳跃步骤为:

1、调用jump_stack进入新的bthread栈,由于这时新的bthread还没有分配栈,会首先为它创建一个栈new_stack

2、新的bthread创建完后,利用bthread_jump_fcontext修改栈顶指针、各寄存器进入新栈new_stack,并记录跳转前的旧栈stack_main的相关内容

3、新栈的执行函数fn为TaskGroup::task_runner(注意这个就是用户的func,层层函数指针传入),这个task_runner主要会干3个事儿

  • 执行用户实际的回调函数func(用户想让bthread干的事)
  • 唤起等待该任务函数执行结束的pthread/bthread
  • ending_sched:尝试再pop出一个bthread来,如果现在没有新的bthread,就把_main_meta作为下一次的跳转对象(相当于返回去继续执行原来的代码),再次调用jump_stack由new_stack跳转入stack_main,相当于又拿到了一个tid继续运行

4、都运行完了继续下一次wait_task中的do while死循环,重新阻塞在_pl→wait(...)上