菜鸟笔记
提升您的技术认知

brpc 笔记

bthread(一) 前言bthread(二) 线程模型及bthreadbthread(三) bthread数据结构bthread(四) bthread用户接口和代码执行路径bthread(五) 无锁队列rq的代码实现bthread(六) 小结brpc的精华bthread源码剖析brpc介绍、编译与使用brpc源码解析(一)—— rpc服务添加以及服务器启动主要过程brpc源码解析(二)—— brpc收到请求的处理过程brpc源码解析(三)—— 请求其他服务器以及往socket写数据的机制brpc源码解析(四)—— Bthread机制brpc源码解析(五)—— 基础类resource pool详解brpc源码解析(六)—— 基础类socket详解brpc源码解析(七)—— worker基于ParkingLot的bthread调度brpc源码解析(八)—— 基础类EventDispatcher详解brpc源码解析(九)—— 基础类WorkStealingQueue详解brpc源码解析(十)—— 核心组件bvar详解(1)简介和整体架构brpc源码解析(十一)—— Reducer类和Adder类解析brpc源码解析(十二)—— 核心组件bvar详解 AgentGroup类详解brpc源码解析(十三)—— 核心组件bvar详解(4)combiner详解brpc源码解析(十四)—— 核心组件bvar详解 sampler详解brpc源码解析(十五)—— bthread栈创建和切换详解brpc源码解析(十六)—— 作为client的连接建立和处理详解brpc源码解析(十七)—— bthread上的类futex同步组件butex详解brpc源码解析(十八)—— MPSC队列ExecutionQueue详解brpc源码解析(十九)—— 双buffer数据结构DoublyBufferedData详解brpc源码解析(二十)—— 用于访问下游的Channel类详解

brpc源码解析(十七)—— bthread上的类futex同步组件butex详解

阅读 : 669

我们知道在linux 下,锁和其他一些同步机制都会用到futex,futex诞生后扮演着非常重要的角色,可以说futex是linux底层最重要的同步手段之一,无论是pthread_mutex还是semaphore都用到了futex,bthread作为基于pthread实现的一套线程库,自然也需要类似的线程同步机制,用于在bhtread上进行阻塞同步,因此就有了类futex的butex,这篇文章就来聊聊butex的实现。

一、futex简介

在介绍butex之前我们首先来简单看下futex,网上关于futex的介绍有很多,这里只是简单介绍下用于引出本文的主题butex,想更深入地学习可以看看2篇参考文章,写得很不错。

futex是一个高效的同步组件,futex由一个内核态的队列和一个用户态的integer构成,有竞争时会放到等待队列供后面唤醒,整个操作主要用到了自旋锁来保护临界区。基于futex构造锁到时候一个典型的模式是先通过对一个原子变量进行cas操作尝试直接获取锁,如果没竞争直接返回,有竞争调用futex_wait。

简单来说,futex主要包括等待和唤醒两个方法:futex_wait和futex_wake,简化后的定义如下:

//uaddr指向一个地址,val代表这个地址期待的值,当*uaddr==val时,才会进行wait
int futex_wait(int *uaddr, int val);

//唤醒n个在uaddr指向的锁变量上挂起等待的进程
int futex_wake(int *uaddr, int n);

这里贴一下参考文章对两个流程的总结:

(1)futex_wait:

  1. 加自旋锁
  2. 检测*uaddr是否等于val,如果不相等则会立即返回
  3. 将进程状态设置为TASK_INTERRUPTIBLE
  4. 将当期进程插入到等待队列中
  5. 释放自旋锁
  6. 创建定时任务:当超过一定时间还没被唤醒时,将进程唤醒
  7. 挂起当前进程

(1)futex_wake:

  1. 找到uaddr对应的futex_hash_bucket
  2. 对其加自旋锁
  3. 遍历f链表,找到uaddr对应的节点
  4. 调用wake_futex唤起等待的进程
  5. 释放自旋锁

二、butex源码解析

既然是类futex,实现机制自然基本类似,只是更为上层,下面就从源码层面来看下butex的实现。

2.1 butex相关数据结构

因为需要维护等待队列,因此需要push到等待队列的数据结构,bthread的核心的api都是兼容pthread的,所以butex也要支持pthread。也就有了ButexBthreadWaiter和ButexPthreadWaiter两种用于阻塞后唤醒的waiter。

另一个重要的数据结构就是butex本身,里面的value扮演的也就是我们futex wait所用到的u32的val的角色,waiters则是当前butex对应的等待队列,waiter_lock则是操作队列用到的互斥锁,在futex里,操作队列用的是内核的自旋锁,butex本身的逻辑是在用户态的,也就是不阻塞pthread仅切换bthread,因此可以直接使用pthread互斥锁,这里的FastPthreadMutex按官方文档的说明是比原生的pthread_mutex性能略好。

2.2 butex主要机制

下面就以bthread中等待fd的fd_wait函数为例来看butex的整体机制,这个函数用于等待fd就绪,比如ssl相关的逻辑就用到了这个来等待ssl握手完毕。


在初始状态下,需要新建butex对象,在这个场景下用的是cas来建立单例,butex对象的类型是EpollButex,也就是一个原子int,首先需要调用butex_create_checked来创建,函数内容如下:

之所以叫create_checked,是因为有一个类型大小的check,我们知道futex依赖的就是一个u32的整形,但是butex为了方便使用是直接支持所有大小为32位的类型的,因此会先做一个sizeof(T) == sizeof(int)的check。随后才是从objec pool里获得真正的butex对象,类型就是上面提到的Butex,用户侧的butex value是被封装在这个结构里面的、butex_create_checked返回的是static_cast之后的类型,之所以可以这么转是因为butex的第一个成员变量就是一个原子int。

得到butex变量后,先把butex值保存起来,随后添加epoll时间后调用butex_wait,也就是butex机制的核心函数之一。

2.2.1 butex_wait



和futex_wait类似,butex_wait里,首先也是判断值是否匹配,如果不匹配就直接返回不阻塞,注意这里对butex变量的使用,传入的arg变量是用户侧实际使用的32位长度的任意类型的butex变量,根据前面获取的过程我们可以知道,这个变量的地址其实也就是背后隐含的Butex struct的起始地址,因此可以直接通过这个地址来拿到整个Butex对象,也就是container_of宏定义做的事情,不得不感叹各种神奇的玩法是真的多。这么绕一圈的好处就是对于用户来说,只需要关心自己的butex变量。

不等于expected_value的时候,直接返回。否则进入wait相关操作,如果是pthread,就走pthread_wait的逻辑,原理类似,这里不一一介绍了。否则创建bthread 的waiter并进行相关变量的赋值,包括超时时间的设定。说到超时,这里提一下,是通过一个全局的timer thread实现的,它的schedule函数会新增一项调度,也就是abstime到了之后用bbw作为参数去调用erase_from_butex_and_wakeup,实现超时唤醒。

waiter的container是用来保存butex的指针的,在唤醒从队列移除的时候需要用它来找到锁,这里首先设置为null,在下面的wait_for_butex函数里会被置成b,这里为NULL也代表了没有在队列里的状态。

然后就是把当前bthread挂到队列里并让出worker。具体到实现,首先是把当前waiter保存到当前任务信息里,然后是将wait_for_butex通过set_remained交给下一个获得worker的bthread来运行从而将waiter挂载进队列。

wait_for_butex函数如下:

首先会对waiter_list的互斥锁加锁,在真正放进队列之前再check一次value是否发生了变化,如果发生了变化,直接unsleep并且调度原来的bthread,否则,append到waiter队列。如前面所说,这里将butex的指针保存到了这个waiter的container里,这是为了从双端队列里删除的时候对butex加锁。

2.2.2 butex_wake

前面说了butex wait的过程,wait之后该如何唤醒呢,回到上面那个场景,前面的入口fd_wait就是需要等待fd就绪,就绪后的唤醒由一个epoll thread来负责,一旦发生了相应的epoll out事件就会调用butex_wake_all来唤醒睡在对应butex上的bthread,如下:

butex_wake_all是唤醒等在butex上的所有线程,还有一个butex_wake只会唤醒一个,原理类似,这里只介绍wak_all。首先是对butex值原子+1,原子+1保证了后面的butex_wait会直接返回。随后调用butex_wake_all唤醒所有等在当前butex上的bthread。butex_wake_all的主要逻辑如下:

开头仍然是通过宏定义拿到butex对象。然后遍历butex的waiter队列,分别将其中的bth和pth放到不同的临时list里,因为对双端队列进行删除操作非线程安全,所以会加锁。这里有个对container置NULL的操作,表明已经从队列移除了,一个典型的场景是对于设置了超时时间的wait,在唤醒后最初定的超市任务到了看到container为NULL就不需要进行移除操作了。

通过wakeup_pthread函数唤醒所有的pthread。

通过bthread函数唤醒所有的bthread,除了移出队列进行调度以外,还调用了unsleep_if_necessary用于取消超时的任务。

除此还有包括butex_requeue和butex_wake_except在内的其他一些函数,原理都差不多,这里不再赘述。

参考:

  1. 关于同步的一点思考-上
  2. linux内核级同步机制–futex