这个版本的线程池,可以多次添加Task,基本可用
缺陷:
- 没有使用C++11的条件变量
#include <thread>
#include <functional>
#include <vector>
#include <queue>
#include <semaphore.h>
#include <iostream>
::sem_t taskSem;
class Threadpool
{
public:
Threadpool():
threadNums(10)
{
::sem_init(&taskSem, 0, 0);
}
~Threadpool()
{
for(auto &i : threadList)
{
i.join();
}
}
private:
std::vector<std::thread> threadList;
using Task = std::function<void()>;
std::queue<Task> taskQueue;
int threadNums;
public:
void setThreadNums(int nums) { threadNums = nums; }
void start()
{
for(int i = 0; i < threadNums; i++)
{
threadList.emplace_back(std::thread(Threadpool::runTask, this));
}
}
void addTask(Task task)
{
taskQueue.push(task);
::sem_post(&taskSem);
}
private:
void runTask()
{
::sem_wait(&taskSem);
Task task = taskQueue.front();
taskQueue.pop();
task();
}
};
int main()
{
Threadpool threadpool;
threadpool.start();
threadpool.addTask(
[](){std::cout << "addTask" << std::endl;}
);
threadpool.addTask(
[](){std::cout << "addTask" << std::endl;}
);
threadpool.addTask(
[](){std::cout << "addTask" << std::endl;}
);
return 0;
}
加了锁的线程池
//"Threadpool.h"
#ifndef THREADPOOL
#define THREADPOOL
#pragma once
#include <functional>
#include <vector>
#include <thread>
#include <semaphore.h>
#include <mutex>
// #include <condition_variable>
::sem_t taskSem;
std::mutex mutex;
namespace ThreadPool
{
class Threadpool
{
private:
int threadNums;
using Task = std::function<void()>;
std::vector<Task> taskQueue;
std::vector<std::thread*> threadPool;
public:
Threadpool();
~Threadpool();
void setThreadNums(int nums) { threadNums = nums; }
public:
void start();
void addTask(Task task);
private:
void run();
};
}
#endif
//"Threadpool.cc"
#include "./Threadpool.h"
#include <iostream>
#include <chrono>
ThreadPool::Threadpool::Threadpool():
threadNums(10)
{
::sem_init(&taskSem, 0, 0);
}
ThreadPool::Threadpool::~Threadpool()
{
for(auto &i : threadPool)
{
i->join();
}
}
void ThreadPool::Threadpool::start()
{
for(int i; i < threadNums; ++i)
{
threadPool.emplace_back(new std::thread(ThreadPool::Threadpool::run, this));
}
}
void ThreadPool::Threadpool::addTask(Task task)
{
{
std::unique_lock<std::mutex> lock(mutex);
taskQueue.emplace_back(task);
::sem_post(&taskSem);
}
}
void ThreadPool::Threadpool::run()
{
{
::sem_wait(&taskSem);//有任务到来,收到通知,开始运行
std::unique_lock<std::mutex> lock(mutex);
Task task = taskQueue.front();
taskQueue.pop_back();
task();
}
}
void add(ThreadPool::Threadpool& threadpool, int i)
{
std::this_thread::sleep_for(std::chrono::microseconds(1000));
threadpool.addTask(
[&](){std::cout << "addTask" << i << std::endl;}
);
}
int main()
{
ThreadPool::Threadpool threadpool;
threadpool.setThreadNums(400);
threadpool.start();
for(int i = 0; i < 400; i++)
{
add(threadpool, i);
}
return 0;
}