菜鸟笔记
提升您的技术认知

bthread(五) 无锁队列rq的代码实现

阅读 : 92

基本介绍

初始化时b=t=1,代表队列为空,load是取值,store是赋值,push是从队尾添加(b+1),pop是从对首弹出(正常是b-1,有竞争了t+1),steal是从对首偷(t+1)。所以,b=t时队列为空,b=t+1时队列有1个元素。

竞争情况

由于对于此队列的push和pop操作发生在一次流程的两个阶段,所以不会产生竞争,push和steal不需要对一个元素竞争那么竞争就发生在pop和steal以及steal和steal中。比如:当前只有一个bth在队列里,tg1想从rq里pop一个,tg2想从rq里steal一个。

// WorkStealingQueue类成员
std::atomic<size_t> BAIDU_CACHELINE_ALIGNMENT _top;   // 队列头部,用于worker直接放入bthread
std::atomic<size_t> _bottom;                          // 队列尾部,用于其他worker去steal或者自己pop出来用
size_t _capacity;                                   // 队列容量
T *_buffer;                                         // 实际存放bthread的内存数组(连续)
 
bool push(const T& x) {
    const size_t b = _bottom.load(base::memory_order_relaxed);
    // 这里之所以使用acq是保证了这里读到的t一定是别的线程修改后或者修改前的值
    const size_t t = _top.load(base::memory_order_acquire);
    if (b >= t + _capacity) { // Full queue.
        return false;
    }
    // 这里用了一个非常巧妙的 & 操作对下标做处理使得取元素时下标一直小于capacity(所以_capacity要求是偶数)
    _buffer[b & (_capacity - 1)] = x;
    // 这里rel保证了写_buffer之后这个b是精确+1的,即对应了steal的load acq
    _bottom.store(b + 1, base::memory_order_release);
    return true;
}
 
// pop操作同样只和steal并发,核心逻辑是b-1之后锁定一个元素防止被steal取走,在只有一个元素的时候通过CAS语义和steal竞争
bool pop(T *val)
{
    const size_t b = _bottom.load(std::memory_order_relaxed);
    size_t t = _top.load(std::memory_order_relaxed);
    if (t >= b)
    {
        return false;
    }
    // 先让b-1,意思是假设我已经取了一个元素,那么我希望各bthread周知
    const size_t new_b = b - 1;
    _bottom.store(new_b, std::memory_order_relaxed);
    // 这里的作用是atomic_thread_fence之前的所有操作会全局可见(从cpu的storebuffer至少刷新到L3 cache使多核可见)
    std::atomic_thread_fence(std::memory_order_seq_cst);
    // 保证当前t读的是最新值?如果不新,可能存在有线程已经steal走了但是我还取的旧值的情况
    t = _top.load(std::memory_order_relaxed);
    // 再次判断,此时说明队列为空,b回到原来的值
    if (t > new_b)
    {
        _bottom.store(b, std::memory_order_relaxed);
        return false;
    }
    *val = _buffer[new_b & (_capacity - 1)];
    if (t != new_b) // 说明队列中还有元素,这时相当于已经用new_b锁住这个元素了,所以可以退出
    {
        // 那就是t < new_b,队列中元素不止一个,直接取出用就行
        return true;
    }
    // 走到这里证明t = new_b,证明只剩一个元素了,需要和steal竞争
    // 若_top等于t,则当前取线程取成功了,修改_top为t+1,返回true
    // 若_top不等于t,则证明该元素被别人抢走了,将t改为_top,返回false
    const bool popped = _top.compare_exchange_strong(
        t, t + 1, base::memory_order_seq_cst, base::memory_order_relaxed);
    _bottom.store(b, base::memory_order_relaxed);
    return popped;
}
 
// steal one item from the queue
bool steal(T *val)
{
    size_t t = _top.load(std::memory_order_acquire);
    size_t b = _bottom.load(std::memory_order_acquire);
    // 队列为空直接返回false
    if (t >= b)
    {
        return false;
    }
    do {
        // 猜测:保证b读到的是最新的b
        std::atomic_thread_fence(std::memory_order_seq_cst);
        b = _bottom.load(std::memory_order_acquire);
        if (t >= b)
        {
            // 空队列返回false
            return false;
        }
        // 取元素
        *val = _buffer[t & (_capacity - 1)];
    } while (!_top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed));
    // 上面这个while的语义是:
    // 若_top等于t,则当前线程取元素成功了,修改_top为t+1,返回true,退出循环走下来
    // 若_top不等于t,则证明该元素被别人抢走了,将t改为_top,返回false,继续进入while
    // 其他steal线程同理,谁拿到谁就t+1,然后告诉所有线程我取走这个元素了
    return true;
}