1.基础概念
线程池:一种线程的使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性。而线程池维护着多个线程,等待监督管理者分配可并行执行的任务。这样避免了在短时间内创建和销毁线程的代价。线程池不仅能够内核的充分利用,还能防止过分调度。可用的线程数据取决于可用的并发处理器,处理内核,内存,网络sockets等数量。
2.线程池的组成
2.1 线程池管理器
创建一定数量的线程,启动线程,调配任务,管理着线程池。
线程池目前只需要Start()启动方法,Stop()方法,AddTask()方法。
Start():创建一定数量的线程,进入线程循环。
Stop():停止线程循环,回收所有的线程。
AddTask():添加任务。
2.2 工作者线程
线程池中线程,在线程池中等待并执行任务。
该文使用条件变量condition_variable实现等待和唤醒进制。
2.3 任务接口
添加任务接口,以供工作线程的调度任务和执行。
2.4 任务队列
用于存放没有处理的任务,提供一种缓存机制。
3.线程池工作的四种状态
假设目前我们的线程池大小为3,任务队列的大小我们不做限制。
3.1 主线程当前没有任务要执行,线程池中任务队列为空。
下面所有的情况工作线程处于空闲状态,任务缓冲队列为空。3.2 我们在3.1 的基础上添加小于等于线程池中线程数量的的任务数。
基于3.1的情况,所有的工作线程处于等待状态,我们给主线程添加3个任务,然后通知(notify())线程池中的线程开始取(Take())任务开始执行,此时的任务缓冲队列还是空。
3.3 主线程添加任务数量大于当前线程池中线程的数量。
基于3.2的情况,线程池中所有的工作线程都处于工作状态,主线程开始添加第四个任务,发现线程池中没有空闲线程,于是将任务存入缓冲队列。工作线程空闲后,主动从任务队列中获取任务执行。
3.4 主线程添加任务数量大于当前线程池中线程的数量,且任务队列已满。
主线程添加第N个任务,添加后发现线程池中的线程已经用完,并且任务队列已满。于是主线程进入等待状态,等待任务队列腾空通知,这种情况会阻塞主线程。
4.线程池的C++实现:
由上可知,线程池由3部分组成:
(1) 任务队列(Task Queue):存储需要处理的任务,由工作者线程来处理这些任务。
①通过线程池提供的API函数,将任务添加到任务队列或者从任务队列中删除任务。
②已处理的任务会从任务队列中删除。
③线程池的使用者,也就是往任务队列中添加任务的线程就是生产者线程。
(2) 工作者线程(任务队列任务的消费者,N个)
①线程池中维护了一定数量的工作者线程,他们的任务是不断的读取任务队列中的任务,并且取出执行。
②工作的线程相当于任务的消费者角色。
③如果任务队列为空,工作者线程会被阻塞。(使用条件变量/信号量阻塞)
④一旦任务队列有任务了,由生产者将任务队列解除,工作者线程开始工作。
(3) 管理者线程
①它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作者线程个数进行检测。
②当任务数量过多时,可以适当的创建一些新的工作线程。
②当任务过少时,可以适当的销毁一些工作的线程。
5.下面来看具体的代码
1.任务队列类
ThreadPool.h
#include <thread>
#include <mutex>
#include <queue>
#include <vector>
#include <condition_variable>
using namespace std;
using namespace std::literals::chrono_literals;
using callback = void(*)(void*);
// 任务队列类
// 成员函数介绍:参数1:任务执行函数;参数2:任务执行函数的参数
class Task
{
public:
callback function;
void* arg;
public:
Task(callback f, void* arg)
{
function = f;
this->arg = arg;
}
};
// 线程池类
class ThreadPool
{
public:
ThreadPool(int min, int max);
// 添加任务
void Add(callback f, void* arg);
void Add(Task task);
// 忙线程个数
int Busynum();
// 存活线程个数
int Alivenum();
~ThreadPool();
private:
// 任务队列
queue<Task> taskQ;
thread managerID; //管理者线程ID
vector<thread> threadIDs; //
int minNum; //最小线程数
int maxNum; //最大线程数
int busyNum; //忙的线程数
int liveNum; //存活的线程数
int exitNum; //要销毁的线程数
mutex mutexPool; //整个线程池的锁
condition_variable cond; //任务队列是否为空,阻塞工作者线程
bool shutdown; //是否销毁线程池,销毁为1,不销毁为0
static void manager(void* arg); //管理者线程
static void worker(void* arg); //工作线程
};
线程池类具体实现
ThreadPool.cpp
#include "threadpool.h"
#include <stdlib.h>
#include <iostream>
#include <string.h>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;
const int NUMBER = 2;
ThreadPool::ThreadPool(int min, int max)
{
do
{
minNum = min;
maxNum = max;
busyNum = 0;
liveNum = min;
exitNum = 0;
shutdown = false;
// this:传递给线程入口函数的参数,即线程池
managerID = thread(manager, this);
threadIDs.resize(max);
for (int i = 0; i < min; ++i)
{
threadIDs[i] = thread(worker, this);
}
return;
} while (0);
}
ThreadPool::~ThreadPool()
{
shutdown = true;
//阻塞回收管理者线程
if (managerID.joinable()) managerID.join();
//唤醒阻塞的消费者线程
cond.notify_all();
for (int i = 0; i < maxNum; ++i)
{
if (threadIDs[i].joinable()) threadIDs[i].join();
}
}
void ThreadPool::Add(Task t)
{
unique_lock<mutex> lk(mutexPool);
if (shutdown)
{
return;
}
//添加任务
taskQ.push(t);
cond.notify_all();
}
void ThreadPool::Add(callback f, void* a)
{
unique_lock<mutex> lk(mutexPool);
if (shutdown)
{
return;
}
//添加任务
taskQ.push(Task(f, a));
cond.notify_all();
}
int ThreadPool::Busynum()
{
mutexPool.lock();
int busy = busyNum;
mutexPool.unlock();
return busy;
}
int ThreadPool::Alivenum()
{
mutexPool.lock();
int alive = liveNum;
mutexPool.unlock();
return alive;
}
void ThreadPool::worker(void* arg)
{
ThreadPool* pool = static_cast<ThreadPool*>(arg);
// 工作者线程需要不停的获取线程池任务队列,所以使用while
while (true)
{
// 每一个线程都需要对线程池进任务队列行操作,因此线程池是共享资源,需要加锁
unique_lock<mutex> lk(pool->mutexPool);
// 当前任务队列是否为空
while (pool->taskQ.empty() && !pool->shutdown)
{
// 如果任务队列中任务为0,并且线程池没有被关闭,则阻当前工作线程
pool->cond.wait(lk);
// 判断是否要销毁线程,管理者让该工作者线程自杀
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
cout << "threadid: " << std::this_thread::get_id() << " exit......" << endl;
// 当前线程拥有互斥锁,所以需要解锁,不然会死锁
lk.unlock();
return;
}
}
}
// 判断线程池是否关闭了
if (pool->shutdown)
{
cout << "threadid: " << std::this_thread::get_id() << "exit......" << endl;
return;
}
// 从任务队列中去除一个任务
Task task = pool->taskQ.front();
pool->taskQ.pop();
pool->busyNum++;
// 当访问完线程池队列时,线程池解锁
lk.unlock();
// 取出Task任务后,就可以在当前线程中执行该任务了
cout << "thread: " << std::this_thread::get_id() << " start working..." << endl;
task.function(task.arg);
//(*task.function)(task.arg);
free(task.arg);
task.arg = nullptr;
// 任务执行完毕,忙线程解锁
cout << "thread: " << std::this_thread::get_id() << " end working..." << endl;
lk.lock();
pool->busyNum--;
lk.unlock();
}
}
// 检测是否需要添加线程还是销毁线程
void ThreadPool::manager(void* arg)
{
ThreadPool* pool = static_cast<ThreadPool*>(arg);
// 管理者线程也需要不停的监视线程池队列和工作者线程
while (!pool->shutdown)
{
//每隔3秒检测一次
//sleep(3);
std::this_thread::sleep_for(std::chrono::seconds(3));
// 取出线程池中任务的数量和当前线程的数量,别的线程有可能在写数据,所以我们需要加锁
// 目的是添加或者销毁线程
unique_lock<mutex> lk(pool->mutexPool);
int queuesize = pool->taskQ.size();
int livenum = pool->liveNum;
int busynum = pool->busyNum;
lk.unlock();
//添加线程
//任务的个数>存活的线程个数 && 存活的线程数 < 最大线程数
if (queuesize > livenum && livenum < pool->maxNum)
{
// 因为在for循环中操作了线程池变量,所以需要加锁
lk.lock();
// 用于计数,添加的线程个数
int count = 0;
// 添加线程
for (int i = 0; i < pool->maxNum && count < NUMBER && pool->liveNum < pool->maxNum; ++i)
{
// 判断当前线程ID,用来存储创建的线程ID
if (pool->threadIDs[i].get_id() == thread::id())
{
cout << "Create a new thread..." << endl;
pool->threadIDs[i] = thread(worker, pool);
// 线程创建完毕
count++;
pool->liveNum++;
}
}
lk.unlock();
}
//销毁线程:当前存活的线程太多了,工作的线程太少了
//忙的线程*2 < 存活的线程数 && 存活的线程数 > 最小的线程数
if (busynum * 2 < livenum && livenum > pool->minNum)
{
// 访问了线程池,需要加锁
lk.lock();
// 一次性销毁两个
pool->exitNum = NUMBER;
lk.unlock();
// 让工作的线程自杀,无法做到直接杀死空闲线程,只能通知空闲线程让它自杀
for (int i = 0; i < NUMBER; ++i)
pool->cond.notify_all(); // 工作线程阻塞在条件变量cond上
}
}
}
- 测试代码
#include "threadpool.h"
#include <iostream>
#include <stdlib.h>
using namespace std;
void taskFunc(void* arg)
{
int nNum = *(int*)arg;
cout << "thread: " << std::this_thread::get_id() << ", number=" << nNum << endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
int main()
{
// 设置线程池最小5个线程,最大10个线程
ThreadPool pool(5, 10);
int i;
// 往任务队列中添加100个任务
for (i = 0; i < 100; ++i)
{
int* pNum = new int(i + 100);
pool.Add(taskFunc, (void*)pNum);
}
for (; i < 200; ++i)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
int* pNum = new int(i + 100);
pool.Add(taskFunc, (void*)pNum);
}
return 0;
}
4.测试结果如下图: