菜鸟笔记
提升您的技术认知

C++  并发无锁队列的原理与实现

C++  并发无锁队列的原理与实现

一般无锁队列的情况分为两种,第一种是单个消费者与单个生产者,第二种是多个消费者或者多个生产着的情况。

一.单个消费者与单个生产者的情况

这种情况下可以用环形队列RingBuffer来实现无锁队列,比如dpdk和kfifo的无锁队列就是用环形队列实现的,kfifo里面的入队和出队的处理很巧妙,大家可以去看看。
DPDK:https://www.coonote.com/cplusplus-note/lockless-circular-queue.html
KFIFO:https://www.coonote.com/cplusplus-note/linux-kfifo.html

用数组模拟环形队列举个简单的例子:

#include 
#include 
#include 
#include 
using namespace std;

#define RING_QUEUE_SIZE 10
template 
class RingBuffer {
    public:
        RingBuffer(int size): m_size(size),m_head(0), m_tail(0) {
            m_buf = new T[size];
        }

        ~RingBuffer() {
            delete [] m_buf;
            m_buf = NULL;
        }

        inline bool isEmpty() const {
            return m_head == m_tail;
        }

        inline bool isFull() const {
            return m_tail == (m_head + 1) % m_size;     //取模是为了考虑队列尾的特殊情况
        }

        bool push(const T& value) {                     //以实例的方式传值
            if(isFull()) {
                return false;
            }
            m_buf[m_head] = value;
            m_head = (m_head + 1) % m_size;
            return true;
        }

        bool push(const T* value) {                      //以指针的方式传值
            if(isFull()) {
                return false;
            }
            m_buf[m_head] = *value;
            m_head = (m_head + 1) % m_size;
            return true;
        }

        inline bool pop(T& value)
        {
            if(isEmpty()) {
                return false;
            }
            value = m_buf[m_tail];
            m_tail = (m_tail + 1) % m_size;
            return true;
        }
        inline unsigned int head()const {
            return m_head;
        }
        inline unsigned int tail()const {
            return m_tail;
        }
        inline unsigned int size()const {
            return m_size;
        }
    private:
        int m_size;                                    // 队列大小
        int m_head;                                    // 队列头部索引
        int m_tail;                                    // 队列尾部索引
        T* m_buf;                                      // 队列数据缓冲区
};

typedef struct Node {                                  //任务节点
    int cmd;
    void *value;                                       //可根据情况改变
}taskNode;
#if 1
void produce(RingBuffer* rqueue) {
    int i = 0;
    for(i=0;ipush(node);
    }
    
}

void consume(RingBuffer* rqueue) {
    while(!rqueue->isEmpty()) {
       taskNode node;
       rqueue->pop(node); 
    }
}
#endif
int main() 
{
    RingBuffer * rqueue = new RingBuffer(RING_QUEUE_SIZE);
    std::thread producer(produce,rqueue);
    std::thread consumer(consume,rqueue);
    producer.join();
    consumer.join();
    delete rqueue;
    return 0;
}

二.多个生产者或者多个消费者的情况

这种情况一般都是出现在多线程开发的场合,会有多个对象同时操作队列,此时为了避免这种情况,可以有两种方式,第一种就是最常见的加锁,第二种就是无锁队列,用原子操作实现,无锁数据结构依赖很重要的技术就是CAS操作——Compare & Set,或是 Compare & Swap,基本的思路就是先与内存中的值进行比较,如果相同就进行set或者swap操作。

常见的gcc内置原子操作有下面这些。

  1. type __sync_fetch_and_add (type *ptr, type value, ...)
    // 将value加到*ptr上,结果更新到*ptr,并返回操作之前*ptr的值
  2. type __sync_fetch_and_sub (type *ptr, type value, ...)
    // 从*ptr减去value,结果更新到*ptr,并返回操作之前*ptr的值
  3. type __sync_fetch_and_or (type *ptr, type value, ...)
    // 将*ptr与value相或,结果更新到*ptr, 并返回操作之前*ptr的值
  4. type __sync_fetch_and_and (type *ptr, type value, ...)
    // 将*ptr与value相与,结果更新到*ptr,并返回操作之前*ptr的值
  5. type __sync_fetch_and_xor (type *ptr, type value, ...)
    // 将*ptr与value异或,结果更新到*ptr,并返回操作之前*ptr的值
  6. type __sync_fetch_and_nand (type *ptr, type value, ...)
    // 将*ptr取反后,与value相与,结果更新到*ptr,并返回操作之前*ptr的值
  7. type __sync_add_and_fetch (type *ptr, type value, ...)
    // 将value加到*ptr上,结果更新到*ptr,并返回操作之后新*ptr的值
  8. type __sync_sub_and_fetch (type *ptr, type value, ...)
    // 从*ptr减去value,结果更新到*ptr,并返回操作之后新*ptr的值
  9. type __sync_or_and_fetch (type *ptr, type value, ...)
    // 将*ptr与value相或, 结果更新到*ptr,并返回操作之后新*ptr的值
  10. type __sync_and_and_fetch (type *ptr, type value, ...)
    // 将*ptr与value相与,结果更新到*ptr,并返回操作之后新*ptr的值
  11. type __sync_xor_and_fetch (type *ptr, type value, ...)
    // 将*ptr与value异或,结果更新到*ptr,并返回操作之后新*ptr的值
  12. type __sync_nand_and_fetch (type *ptr, type value, ...)
    // 将*ptr取反后,与value相与,结果更新到*ptr,并返回操作之后新*ptr的值
  13. bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
    // 比较*ptr与oldval的值,如果两者相等,则将newval更新到*ptr并返回true
  14. type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
    // 比较*ptr与oldval的值,如果两者相等,则将newval更新到*ptr并返回操作之前*ptr的值
  15. __sync_synchronize (...)
    // 发出完整内存栅栏
  16. type __sync_lock_test_and_set (type *ptr, type value, ...)
    // 将value写入*ptr,对*ptr加锁,并返回操作之前*ptr的值。即,try spinlock语义
  17. void __sync_lock_release (type *ptr, ...)
    // 将0写入到*ptr,并对*ptr解锁。即,unlock spinlock语义

下面就用环形数组来实现无锁队列,举个例子,当然用环形链表也可以,但是链表要考虑到一个ABA的问题(这个后面再介绍分析)。

#include 
#include 
#include 
#include 
#include 
using namespace std;

#define RING_QUEUE_SIZE 10
//std::mutex mtx;
template 
class RingBuffer {
    public:
        RingBuffer(int size): m_size(size),m_head(0), m_tail(0) {
            m_buf = new T[size];
        }

        ~RingBuffer() {
            delete [] m_buf;
            m_buf = NULL;
        }

        inline bool isEmpty() const {
            return m_head == m_tail;
        }

        inline bool isFull() const {
            return m_tail == (m_head + 1) % m_size;     //取模是为了考虑队列尾的特殊情况
        }

        /*使用互斥锁实现并发情况共用队列*/
        # if 0
        bool mut_push(const T& value);
        bool mut_push(const T* value); 
        bool mut_pop(T& value);
        #endif

        /*使用原子操作实现并发情况无锁队列*/
        bool cas_push(const T& value);
        bool cas_push(const T* value); 
        bool cas_pop(T& value);
      
        inline unsigned int head()const {
            return m_head;
        }
        inline unsigned int tail()const {
            return m_tail;
        }
        inline unsigned int size()const {
            return m_size;
        }
    private:
        int m_size;                                    // 队列大小
        int m_head;                                    // 队列头部索引
        int m_tail;                                    // 队列尾部索引
        T* m_buf;                                      // 队列数据缓冲区
};
# if 0  //互斥锁实现
template 
bool RingBuffer::mut_push(const T& value) {
    while (mtx.try_lock()) {
        if(isFull()) {
            return false;
        }
        m_buf[m_head] = value;
        m_head = (m_head + 1) % m_size;
        mtx.unlock();
        return true;
    }
}

template 
bool RingBuffer::mut_push(const T* value) {
    while (mtx.try_lock()) {
        if(isFull()) {
            return false;
        }
        m_buf[m_head] = *value;
        m_head = (m_head + 1) % m_size;
        mtx.unlock();
        return true;
    }
}
template 
bool RingBuffer::mut_pop(T& value)
{
    while (mtx.try_lock()) {
        if(isEmpty()) {
             return false;
        }
        value = m_buf[m_tail];
        m_tail = (m_tail + 1) % m_size;
        mtx.unlock();
        return true;
    }
}
#endif
template 
bool RingBuffer::cas_push(const T& value) {
    if(isFull()) {
        return false;
    }
    int oldValue,newValue;
    do{
        oldValue = m_head;
        newValue = (oldValue + 1) % m_size;
    }while(__sync_bool_compare_and_swap(&m_head,oldValue,newValue) != true);
    m_buf[oldValue] = value;
    return true;
}

template 
bool RingBuffer::cas_push(const T* value) {
   if(isFull()) {
        return false;
    }
    int oldValue,newValue;
    do{
        oldValue = m_head;
        newValue = (oldValue + 1) % m_size;
    }while(__sync_bool_compare_and_swap(&m_head,oldValue,newValue) != true);
    m_buf[oldValue] = *value;
    return true;
}

template 
bool RingBuffer::cas_pop(T& value)
{
    if(isEmpty()) {
        return false;
    }
    int oldValue,newValue;
    do{
        oldValue = m_tail;
        newValue = (oldValue + 1) % m_size;
    }while(__sync_bool_compare_and_swap(&m_tail,oldValue,newValue) != true);
    value = m_buf[oldValue];
    return true;
}


typedef struct Node {                              //任务节点
    int cmd;
    void *value;
}taskNode;

void produce(RingBuffer* rqueue) {
    int i = 0;
    for(i=0;icas_push(node);
    }
    
}

void consume(RingBuffer* rqueue) {
    while(!rqueue->isEmpty()) {
       taskNode node;
       rqueue->cas_pop(node); 
    }
}


int main() 
{
    int i = 0;
    RingBuffer * rqueue = new RingBuffer(RING_QUEUE_SIZE);
    std::thread producer[20];
    std::thread consumer[20];
    for(i = 0; i<20; i++) {
        producer[i] = std::thread(produce,rqueue);
        consumer[i] = std::thread(consume,rqueue);
    }
    for (auto &thread : producer)
        thread.join();
    for (auto &thread : consumer)
        thread.join();
    delete rqueue;
    return 0;
}

这段代码中无锁队列实现的关键就是__sync_bool_compare_and_swap(&m_head,oldValue,newValue)函数,它会时刻将m_head和oldValue比较,如果被其它线程抢先改了head的值就会返回失败。

现在说一下前面提到的ABA的问题,基本的情况是下面这样的,多出现在内存复用的时候:

1.假设队列中只有一个节点M,线程A进行出队列操作,取得了当前节点M,节点M的地址是0X123,线程A被线程B抢占打断,还没进行出队列的实际操作

2.线程B同样将节点M出队列了,然后又重新申请了一个节点N,巧的是节点N的地址也是0X123,线程B将N节点入队列

3.线程B又被线程A打断,此时线程A重新开始执行,但是CAS操作的接口比较的是地址,线程A发现节点的地址没有改变,又将N节点出队列了。

这种情况就好比电视据中经常出现的剧情,男主和女主的行李箱发生的交互,但是互不知晓。

通常这种情况会用双重CAS来进行保证,在加一个计数器,使用节点内存引用计数refcnt,来判断值是否是原来的。