菜鸟笔记
提升您的技术认知

Linux下简易线程池

线程池简介

   线程池是可以用来在后台执行多个任务的线程集合。 这使主线程可以自由地异步执行其他任务。线程池通常用于服务器应用程序。 每个传入请求都将分配给线程池中的一个线程,因此可以异步处理请求,而不会占用主线程,也不会延迟后续请求的处理。一旦池中的某个线程完成任务,它将返回到等待线程队列中,等待被再次使用。 这种重用使应用程序可以避免为每个任务创建新线程的开销。线程池通常具有最大线程数限制。 如果所有线程都繁忙,则额外的任务将放入队列中,直到有线程可用时才能够得到处理。

线程池技术如何提高服务器程序的性能

  这里所提到服务器程序是指能够接受客户请求并能处理请求的程序,而不只是指那些接受网络客户请求的网络服务器程序。

  多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。但如果对多线程应用不当,会增加对单个任务的处理时间。可以举一个简单的例子:

  假设在一台服务器完成一项任务的时间为T

T1 创建线程的时间
T2 在线程中执行任务的时间,包括线程间同步所需时间
T3 线程销毁的时间

  显然T = T1+T2+T3。注意这是一个极度简化的假设。

  可以看出T1、T3是多线程本身的带来的开销。我们渴望减少T1、T3所用的时间,从而减少T的时间。但一些线程的使用者并没有注意到这一点,所以在程序中频繁的创建或销毁线程,这导致T1和T3在T中占有相当比例。显然这是突出了线程的弱点(T1,T3),而不是优点(并发性)。

  线程池技术正是关注如何缩短或调整T1、T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1、T3的开销了。

  线程池不仅调整T1、T3产生的时间段,而且它还显著减少了创建线程的数目。在看一个例子:

  假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。我们比较利用线程池技术和不利于线程池技术的服务器处理这些请求时所产生的线程总数。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目或者上限(以下简称线程池尺寸),而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池尺寸是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。

线程池技术要点

  从内部实现上看,线程池技术可主要划分为如下6个要点实现,如下图:

  • 工作者线程worker:即线程池中可以重复利用起来执行任务的线程,一个worker的生命周期内会不停的处理多个业务job。线程池“复用”的本质就是复用一个worker去处理多个job,“流控“的本质就是通过对worker数量的控制实现并发数的控制。通过设置不同的参数来控制worker的数量可以实现线程池的容量伸缩从而实现复杂的业务需求
  • 待处理工作job的存储队列:工作者线程workers的数量是有限的,同一时间最多只能处理最多workers数量个job。对于来不及处理的job需要保存到等待队列里,空闲的工作者work会不停的读取空闲队列里的job进行处理。基于不同的队列实现,可以扩展出多种功能的线程池,如定制队列出队顺序实现带处理优先级的线程池、定制队列为阻塞有界队列实现可阻塞能力的线程池等。流控一方面通过控制worker数控制并发数和处理能力,一方面可基于队列控制线程池处理能力的上限。
  • 线程池初始化:即线程池参数的设定和多个工作者workers的初始化。通常有一开始就初始化指定数量的workers或者有请求时逐步初始化工作者两种方式。前者线程池启动初期响应会比较快但造成了空载时的少量性能浪费,后者是基于请求量灵活扩容但牺牲了线程池启动初期性能达不到最优。
  • 处理业务job算法:业务给线程池添加任务job时线程池的处理算法。有的线程池基于算法识别直接处理job还是增加工作者数处理job或者放入待处理队列,也有的线程池会直接将job放入待处理队列,等待工作者worker去取出执行。
  • workers的增减算法:业务线程数不是持久不变的,有高低峰期。线程池要有自己的算法根据业务请求频率高低调节自身工作者workers的数量来调节线程池大小,从而实现业务高峰期增加工作者数量提高响应速度,而业务低峰期减少工作者数来节省服务器资源。增加算法通常基于几个维度进行:待处理工作job数、线程池定义的最大最小工作者数、工作者闲置时间。
  • 线程池终止:应用停止时线程池要有自身的停止逻辑,保证所有job都得到执行或者抛弃。

简易线程池实现

  线程池头文件threadpool.h如下:

 1 #ifndef THREADPOOL_H
 2 #define THREADPOOL_H
 3 
 4 #include <stdio.h>
 5 #include <stdlib.h>
 6 #include <unistd.h>
 7 #include <pthread.h>
 8 
 9 /**
10 * 线程体数据结构
11 */
12 typedef struct runner
13 {
14     void(*callback)(void* arg);    // 回调函数指针
15     void* arg;                     // 回调函数的参数
16     struct runner* next;
17 } thread_runner;
18 
19 /**
20 * 线程池数据结构
21 */
22 typedef struct
23 {
24     pthread_mutex_t mutex;             // 互斥量
25     pthread_cond_t cond;               // 条件变量
26     thread_runner* runner_head;        // 线程池中所有等待任务的头指针
27     thread_runner* runner_tail;        // 线程池所有等待任务的尾指针
28     int shutdown;                      // 线程池是否销毁
29     pthread_t* threads;                // 所有线程
30     int max_thread_size;               // 线程池中允许的活动线程数目
31 } thread_pool;
32 
33 /**
34 * 线程体
35 */
36 void run(void *arg);
37 
38 /**
39 *   初始化线程池
40 *   参数:
41 *   pool:指向线程池结构有效地址的动态指针
42 *   max_thread_size:最大的线程数
43 */
44 void threadpool_init(thread_pool* pool, int max_thread_size);
45 
46 /**
47 *   向线程池加入任务
48 *   参数:
49 *   pool:指向线程池结构有效地址的动态指针
50 *   callback:线程回调函数
51 *   arg:回调函数参数
52 */
53 void threadpool_add_runner(thread_pool* pool, void(*callback)(void *arg), void *arg);
54 
55 /**
56 *   销毁线程池
57 *   参数:
58 *   ppool:指向线程池结构有效地址的动态指针地址(二级指针),销毁后释放内存,该指针为NULL
59 */
60 void threadpool_destroy(thread_pool** ppool);
61 
62 #endif

  线程池实现文件threadpool.c如下:

  1 #include "threadpool.h"
  2 
  3 #define DEBUG 1
  4 
  5 /**
  6 *   初始化线程池
  7 *   参数:
  8 *   pool:指向线程池结构有效地址的动态指针
  9 *   max_thread_size:最大的线程数
 10 */
 11 void threadpool_init(thread_pool* pool, int max_thread_size)
 12 {
 13     // 初始化互斥量
 14     pthread_mutex_init(&(pool->mutex), NULL);
 15     // 初始化条件变量
 16     pthread_cond_init(&(pool->cond), NULL);
 17     pool->runner_head = NULL;
 18     pool->runner_tail = NULL;
 19     pool->max_thread_size = max_thread_size;
 20     pool->shutdown = 0;
 21     
 22     // 创建所有分离态线程(即创建线程池)
 23     pool->threads = (pthread_t *)malloc(max_thread_size * sizeof(pthread_t));
 24     int i = 0;
 25     for (i = 0; i < max_thread_size; i++)
 26     {
 27         pthread_attr_t attr;
 28         pthread_attr_init(&attr);
 29         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
 30         pthread_create(&(pool->threads[i]), &attr, (void*)run, (void*)pool);
 31     }
 32 #ifdef DEBUG
 33     printf("threadpool_init-> create %d detached thread\n", max_thread_size);
 34 #endif
 35 }
 36 
 37 /**
 38 * 线程体
 39 */
 40 void run(void *arg)
 41 {
 42     thread_pool* pool = (thread_pool*)arg;
 43     while (1)
 44     {
 45         // 加锁
 46         pthread_mutex_lock(&(pool->mutex));
 47 #ifdef DEBUG
 48         printf("run-> locked\n");
 49 #endif
 50         // 如果等待队列为0并且线程池未销毁,则处于阻塞状态(即等待任务的唤醒)
 51         while (pool->runner_head == NULL && !pool->shutdown)
 52         {
 53             pthread_cond_wait(&(pool->cond), &(pool->mutex));
 54         }
 55         //如果线程池已经销毁
 56         if (pool->shutdown)
 57         {
 58             // 解锁
 59             pthread_mutex_unlock(&(pool->mutex));
 60 #ifdef DEBUG
 61             printf("run-> unlocked and thread exit\n");
 62 #endif
 63             pthread_exit(NULL);
 64         }
 65         // 取出链表中的头元素
 66         thread_runner *runner = pool->runner_head;
 67         pool->runner_head = runner->next;
 68         // 解锁
 69         pthread_mutex_unlock(&(pool->mutex));
 70 #ifdef DEBUG
 71         printf("run-> unlocked\n");
 72 #endif
 73         // 调用回调函数,执行任务
 74         (runner->callback)(runner->arg);
 75         free(runner);
 76         runner = NULL;
 77 #ifdef DEBUG
 78         printf("run-> runned and free runner\n");
 79 #endif
 80     }
 81     pthread_exit(NULL);
 82 }
 83 
 84 /**
 85 *   向线程池加入任务
 86 *   参数:
 87 *   pool:指向线程池结构有效地址的动态指针
 88 *   callback:线程回调函数
 89 *   arg:回调函数参数
 90 */
 91 void threadpool_add_runner(thread_pool* pool, void(*callback)(void *arg), void *arg)
 92 {
 93     // 构造一个新任务
 94     thread_runner *newrunner = (thread_runner *)malloc(sizeof(thread_runner));
 95     newrunner->callback = callback;
 96     newrunner->arg = arg;
 97     newrunner->next = NULL;
 98     // 加锁
 99     pthread_mutex_lock(&(pool->mutex));
100 #ifdef DEBUG
101     printf("threadpool_add_runner-> locked\n");
102 #endif
103     // 将任务加入到等待队列中
104     if (pool->runner_head != NULL)
105     {
106         pool->runner_tail->next = newrunner;
107         // 尾指针指到最后一个任务
108         pool->runner_tail = newrunner;
109     }
110     else
111     {
112         pool->runner_head = newrunner;
113         pool->runner_tail = newrunner;
114     }
115     // 解锁
116     pthread_mutex_unlock(&(pool->mutex));
117 #ifdef DEBUG
118     printf("threadpool_add_runner-> unlocked\n");
119 #endif
120     // 唤醒一个等待线程
121     pthread_cond_signal(&(pool->cond));
122 #ifdef DEBUG
123     printf("threadpool_add_runner-> add a runner and wakeup a waiting thread\n");
124 #endif
125 }
126 
127 /**
128 *   销毁线程池
129 *   参数:
130 *   ppool:指向线程池结构有效地址的动态指针地址(二级指针)
131 */
132 void threadpool_destroy(thread_pool** ppool)
133 {
134     thread_pool *pool = *ppool;
135     // 防止2次销毁
136     if (!pool->shutdown)
137     {
138         pool->shutdown = 1;
139         // 唤醒所有等待线程,线程池要销毁了
140         pthread_cond_broadcast(&(pool->cond));
141         // 等待所有线程中止
142         sleep(1);
143 #ifdef DEBUG
144         printf("threadpool_destroy-> wakeup all waiting threads\n");
145 #endif
146         // 回收空间
147         free(pool->threads);
148         // 销毁等待队列
149         thread_runner *head = NULL;
150         while (pool->runner_head != NULL)
151         {
152             head = pool->runner_head;
153             pool->runner_head = pool->runner_head->next;
154             free(head);
155         }
156 
157 #ifdef DEBUG
158         printf("threadpool_destroy-> all runners freed\n");
159 #endif
160         // 条件变量和互斥量也别忘了销毁
161         pthread_mutex_destroy(&(pool->mutex));
162         pthread_cond_destroy(&(pool->cond));
163 
164 #ifdef DEBUG
165         printf("threadpool_destroy-> mutex and cond destoryed\n");
166 #endif
167         free(pool);
168         (*ppool) = NULL;
169 
170 #ifdef DEBUG
171         printf("threadpool_destroy-> pool freed\n");
172 #endif
173     }
174 }

  测试文件如下:

 1 #include "threadpool.h"
 2 
 3 void threadrun(void* arg)
 4 {
 5     int *i = (int *)arg;
 6     printf("%d\n", *i);
 7 }
 8 
 9 int main(void)
10 {
11     thread_pool *pool = malloc(sizeof(thread_pool));
12     threadpool_init(pool, 2);
13     
14     int i;
15     int tmp[3];
16     for (i = 0; i < 3; i++)
17     {
18         tmp[i] = i;
19         threadpool_add_runner(pool, threadrun, &tmp[i]);
20     }
21     
22     sleep(1);
23     threadpool_destroy(&pool);
24     printf("main-> %p\n", pool);
25     printf("main-> test over\n");
26     
27     return 0;
28 }

  程序运行结果如下:

  

参考资料

  线程池(C# 和 Visual Basic)

  几种线程池的实现算法分析

  线程池的介绍及简单实现

  Linux线程池(C语言描述) - 互斥量+条件变量同步