菜鸟笔记
提升您的技术认知

C++11线程池

 这个版本的线程池,可以多次添加Task,基本可用

缺陷:

  • 没有使用C++11的条件变量 
#include <thread>
#include <functional>

#include <vector>
#include <queue>

#include <semaphore.h>
#include <iostream>

::sem_t taskSem;
class Threadpool
{
public:
    Threadpool():
    threadNums(10)
    {   
        ::sem_init(&taskSem, 0, 0);
    }
    ~Threadpool()
    {
        for(auto &i : threadList)
        {
            i.join();
        }
    }

private:
    std::vector<std::thread> threadList;

    using Task = std::function<void()>;
    std::queue<Task> taskQueue;

    int threadNums;
public:
    void setThreadNums(int nums) { threadNums = nums; }

    void start()
    {
        for(int i = 0; i < threadNums; i++)
        {
            threadList.emplace_back(std::thread(Threadpool::runTask, this));
        }
    }

    void addTask(Task task)
    {
        taskQueue.push(task);

        ::sem_post(&taskSem);
    }
private:
    void runTask()
    {
        ::sem_wait(&taskSem);
    
        Task task = taskQueue.front();
        taskQueue.pop();
        task();
    }
};

int main()
{
    Threadpool threadpool;

    threadpool.start();
    threadpool.addTask(
        [](){std::cout << "addTask" << std::endl;}
    );

    threadpool.addTask(
        [](){std::cout << "addTask" << std::endl;}
    );

    threadpool.addTask(
        [](){std::cout << "addTask" << std::endl;}
    );
 
    return 0;
}

加了锁的线程池

//"Threadpool.h"
#ifndef THREADPOOL
#define THREADPOOL

#pragma once

#include <functional>
#include <vector>
#include <thread>

#include <semaphore.h>

#include <mutex>

// #include <condition_variable>

::sem_t taskSem;
std::mutex mutex;
namespace ThreadPool
{

class Threadpool
{
private:
    int threadNums;

    using Task = std::function<void()>;
    std::vector<Task> taskQueue;

    std::vector<std::thread*> threadPool;

public:
    Threadpool();
    ~Threadpool();
    void setThreadNums(int nums) { threadNums = nums; }
public:
    void start();
    void addTask(Task task);
private:
    void run();
};

}

#endif

 

//"Threadpool.cc"
#include "./Threadpool.h"

#include <iostream>
#include <chrono>

ThreadPool::Threadpool::Threadpool():
    threadNums(10)
{
    ::sem_init(&taskSem, 0, 0);
}

ThreadPool::Threadpool::~Threadpool()
{
    for(auto &i : threadPool)
    {
        i->join();
    }
}

void ThreadPool::Threadpool::start()
{
    for(int i; i < threadNums; ++i)
    {
        threadPool.emplace_back(new std::thread(ThreadPool::Threadpool::run, this));
    }
}

void ThreadPool::Threadpool::addTask(Task task)
{
    {
        std::unique_lock<std::mutex> lock(mutex);
        taskQueue.emplace_back(task);

        ::sem_post(&taskSem);
    }
}

void ThreadPool::Threadpool::run()
{
    {
        ::sem_wait(&taskSem);//有任务到来,收到通知,开始运行
        std::unique_lock<std::mutex> lock(mutex);
        Task task = taskQueue.front();
        taskQueue.pop_back();
        task();
    }
}

void add(ThreadPool::Threadpool& threadpool, int i)
{
    std::this_thread::sleep_for(std::chrono::microseconds(1000));

    threadpool.addTask(
        [&](){std::cout << "addTask" << i << std::endl;}
    );
}

int main()
{
    ThreadPool::Threadpool threadpool;
 
    threadpool.setThreadNums(400);
    threadpool.start();

    for(int i = 0; i < 400; i++)
    {
        add(threadpool, i);
    }

    return 0;
}