C++ 并发无锁队列的原理与实现
一般无锁队列的情况分为两种,第一种是单个消费者与单个生产者,第二种是多个消费者或者多个生产着的情况。
一.单个消费者与单个生产者的情况
这种情况下可以用环形队列RingBuffer来实现无锁队列,比如dpdk和kfifo的无锁队列就是用环形队列实现的,kfifo里面的入队和出队的处理很巧妙,大家可以去看看。
DPDK:https://www.coonote.com/cplusplus-note/lockless-circular-queue.html
KFIFO:https://www.coonote.com/cplusplus-note/linux-kfifo.html
用数组模拟环形队列举个简单的例子:
#include
#include
#include
#include
using namespace std;
#define RING_QUEUE_SIZE 10
template
class RingBuffer {
public:
RingBuffer(int size): m_size(size),m_head(0), m_tail(0) {
m_buf = new T[size];
}
~RingBuffer() {
delete [] m_buf;
m_buf = NULL;
}
inline bool isEmpty() const {
return m_head == m_tail;
}
inline bool isFull() const {
return m_tail == (m_head + 1) % m_size; //取模是为了考虑队列尾的特殊情况
}
bool push(const T& value) { //以实例的方式传值
if(isFull()) {
return false;
}
m_buf[m_head] = value;
m_head = (m_head + 1) % m_size;
return true;
}
bool push(const T* value) { //以指针的方式传值
if(isFull()) {
return false;
}
m_buf[m_head] = *value;
m_head = (m_head + 1) % m_size;
return true;
}
inline bool pop(T& value)
{
if(isEmpty()) {
return false;
}
value = m_buf[m_tail];
m_tail = (m_tail + 1) % m_size;
return true;
}
inline unsigned int head()const {
return m_head;
}
inline unsigned int tail()const {
return m_tail;
}
inline unsigned int size()const {
return m_size;
}
private:
int m_size; // 队列大小
int m_head; // 队列头部索引
int m_tail; // 队列尾部索引
T* m_buf; // 队列数据缓冲区
};
typedef struct Node { //任务节点
int cmd;
void *value; //可根据情况改变
}taskNode;
#if 1
void produce(RingBuffer* rqueue) {
int i = 0;
for(i=0;ipush(node);
}
}
void consume(RingBuffer* rqueue) {
while(!rqueue->isEmpty()) {
taskNode node;
rqueue->pop(node);
}
}
#endif
int main()
{
RingBuffer * rqueue = new RingBuffer(RING_QUEUE_SIZE);
std::thread producer(produce,rqueue);
std::thread consumer(consume,rqueue);
producer.join();
consumer.join();
delete rqueue;
return 0;
}
二.多个生产者或者多个消费者的情况
这种情况一般都是出现在多线程开发的场合,会有多个对象同时操作队列,此时为了避免这种情况,可以有两种方式,第一种就是最常见的加锁,第二种就是无锁队列,用原子操作实现,无锁数据结构依赖很重要的技术就是CAS操作——Compare & Set,或是 Compare & Swap,基本的思路就是先与内存中的值进行比较,如果相同就进行set或者swap操作。
常见的gcc内置原子操作有下面这些。
- type __sync_fetch_and_add (type *ptr, type value, ...)
// 将value加到*ptr上,结果更新到*ptr,并返回操作之前*ptr的值 - type __sync_fetch_and_sub (type *ptr, type value, ...)
// 从*ptr减去value,结果更新到*ptr,并返回操作之前*ptr的值 - type __sync_fetch_and_or (type *ptr, type value, ...)
// 将*ptr与value相或,结果更新到*ptr, 并返回操作之前*ptr的值 - type __sync_fetch_and_and (type *ptr, type value, ...)
// 将*ptr与value相与,结果更新到*ptr,并返回操作之前*ptr的值 - type __sync_fetch_and_xor (type *ptr, type value, ...)
// 将*ptr与value异或,结果更新到*ptr,并返回操作之前*ptr的值 - type __sync_fetch_and_nand (type *ptr, type value, ...)
// 将*ptr取反后,与value相与,结果更新到*ptr,并返回操作之前*ptr的值 - type __sync_add_and_fetch (type *ptr, type value, ...)
// 将value加到*ptr上,结果更新到*ptr,并返回操作之后新*ptr的值 - type __sync_sub_and_fetch (type *ptr, type value, ...)
// 从*ptr减去value,结果更新到*ptr,并返回操作之后新*ptr的值 - type __sync_or_and_fetch (type *ptr, type value, ...)
// 将*ptr与value相或, 结果更新到*ptr,并返回操作之后新*ptr的值 - type __sync_and_and_fetch (type *ptr, type value, ...)
// 将*ptr与value相与,结果更新到*ptr,并返回操作之后新*ptr的值 - type __sync_xor_and_fetch (type *ptr, type value, ...)
// 将*ptr与value异或,结果更新到*ptr,并返回操作之后新*ptr的值 - type __sync_nand_and_fetch (type *ptr, type value, ...)
// 将*ptr取反后,与value相与,结果更新到*ptr,并返回操作之后新*ptr的值 - bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
// 比较*ptr与oldval的值,如果两者相等,则将newval更新到*ptr并返回true - type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
// 比较*ptr与oldval的值,如果两者相等,则将newval更新到*ptr并返回操作之前*ptr的值 - __sync_synchronize (...)
// 发出完整内存栅栏 - type __sync_lock_test_and_set (type *ptr, type value, ...)
// 将value写入*ptr,对*ptr加锁,并返回操作之前*ptr的值。即,try spinlock语义 - void __sync_lock_release (type *ptr, ...)
// 将0写入到*ptr,并对*ptr解锁。即,unlock spinlock语义
下面就用环形数组来实现无锁队列,举个例子,当然用环形链表也可以,但是链表要考虑到一个ABA的问题(这个后面再介绍分析)。
#include
#include
#include
#include
#include
using namespace std;
#define RING_QUEUE_SIZE 10
//std::mutex mtx;
template
class RingBuffer {
public:
RingBuffer(int size): m_size(size),m_head(0), m_tail(0) {
m_buf = new T[size];
}
~RingBuffer() {
delete [] m_buf;
m_buf = NULL;
}
inline bool isEmpty() const {
return m_head == m_tail;
}
inline bool isFull() const {
return m_tail == (m_head + 1) % m_size; //取模是为了考虑队列尾的特殊情况
}
/*使用互斥锁实现并发情况共用队列*/
# if 0
bool mut_push(const T& value);
bool mut_push(const T* value);
bool mut_pop(T& value);
#endif
/*使用原子操作实现并发情况无锁队列*/
bool cas_push(const T& value);
bool cas_push(const T* value);
bool cas_pop(T& value);
inline unsigned int head()const {
return m_head;
}
inline unsigned int tail()const {
return m_tail;
}
inline unsigned int size()const {
return m_size;
}
private:
int m_size; // 队列大小
int m_head; // 队列头部索引
int m_tail; // 队列尾部索引
T* m_buf; // 队列数据缓冲区
};
# if 0 //互斥锁实现
template
bool RingBuffer::mut_push(const T& value) {
while (mtx.try_lock()) {
if(isFull()) {
return false;
}
m_buf[m_head] = value;
m_head = (m_head + 1) % m_size;
mtx.unlock();
return true;
}
}
template
bool RingBuffer::mut_push(const T* value) {
while (mtx.try_lock()) {
if(isFull()) {
return false;
}
m_buf[m_head] = *value;
m_head = (m_head + 1) % m_size;
mtx.unlock();
return true;
}
}
template
bool RingBuffer::mut_pop(T& value)
{
while (mtx.try_lock()) {
if(isEmpty()) {
return false;
}
value = m_buf[m_tail];
m_tail = (m_tail + 1) % m_size;
mtx.unlock();
return true;
}
}
#endif
template
bool RingBuffer::cas_push(const T& value) {
if(isFull()) {
return false;
}
int oldValue,newValue;
do{
oldValue = m_head;
newValue = (oldValue + 1) % m_size;
}while(__sync_bool_compare_and_swap(&m_head,oldValue,newValue) != true);
m_buf[oldValue] = value;
return true;
}
template
bool RingBuffer::cas_push(const T* value) {
if(isFull()) {
return false;
}
int oldValue,newValue;
do{
oldValue = m_head;
newValue = (oldValue + 1) % m_size;
}while(__sync_bool_compare_and_swap(&m_head,oldValue,newValue) != true);
m_buf[oldValue] = *value;
return true;
}
template
bool RingBuffer::cas_pop(T& value)
{
if(isEmpty()) {
return false;
}
int oldValue,newValue;
do{
oldValue = m_tail;
newValue = (oldValue + 1) % m_size;
}while(__sync_bool_compare_and_swap(&m_tail,oldValue,newValue) != true);
value = m_buf[oldValue];
return true;
}
typedef struct Node { //任务节点
int cmd;
void *value;
}taskNode;
void produce(RingBuffer* rqueue) {
int i = 0;
for(i=0;icas_push(node);
}
}
void consume(RingBuffer* rqueue) {
while(!rqueue->isEmpty()) {
taskNode node;
rqueue->cas_pop(node);
}
}
int main()
{
int i = 0;
RingBuffer * rqueue = new RingBuffer(RING_QUEUE_SIZE);
std::thread producer[20];
std::thread consumer[20];
for(i = 0; i<20; i++) {
producer[i] = std::thread(produce,rqueue);
consumer[i] = std::thread(consume,rqueue);
}
for (auto &thread : producer)
thread.join();
for (auto &thread : consumer)
thread.join();
delete rqueue;
return 0;
}
这段代码中无锁队列实现的关键就是__sync_bool_compare_and_swap(&m_head,oldValue,newValue)函数,它会时刻将m_head和oldValue比较,如果被其它线程抢先改了head的值就会返回失败。
现在说一下前面提到的ABA的问题,基本的情况是下面这样的,多出现在内存复用的时候:
1.假设队列中只有一个节点M,线程A进行出队列操作,取得了当前节点M,节点M的地址是0X123,线程A被线程B抢占打断,还没进行出队列的实际操作
2.线程B同样将节点M出队列了,然后又重新申请了一个节点N,巧的是节点N的地址也是0X123,线程B将N节点入队列
3.线程B又被线程A打断,此时线程A重新开始执行,但是CAS操作的接口比较的是地址,线程A发现节点的地址没有改变,又将N节点出队列了。
这种情况就好比电视据中经常出现的剧情,男主和女主的行李箱发生的交互,但是互不知晓。
通常这种情况会用双重CAS来进行保证,在加一个计数器,使用节点内存引用计数refcnt,来判断值是否是原来的。