1. 引言
在许多需要高并发或频繁执行短任务的场景中,频繁地创建与销毁线程会带来较大的性能开销。线程池模式通过预先创建一定数量的工作线程(Worker Threads),将任务放入共享队列中,等待空闲线程取出执行,从而有效复用线程,降低系统开销。
线程池设计关键点
- 任务队列管理: 用链表(或数组)实现一个任务 FIFO 队列,每个任务包含回调函数和参数。
- 线程同步: 使用互斥锁(mutex)和条件变量(condition variable)保护任务队列,保证多个线程安全访问。
- 线程退出机制: 通过 shutdown 标志通知池中所有线程安全退出,避免资源泄漏。
- 错误检查: 每个系统调用(如 pthread_create、pthread_mutex_lock 等)都要检查返回值,确保线程池能在异常情况下安全清理资源。
2. 设计架构
下图展示了线程池的基本运作流程(使用 ASCII 图形描述):
+----------------+ 提交任务 +---------------+
| | -----------------> | |
用户程序 | 添加任务到队列 | | 任务队列 |
| | <----------------- ---------------- --------------- v --------------- ----------------->
| |
+---------------+
|
V 退出/等待
+---------------+
| |
| 销毁、退出 |
| |
+---------------+
- 任务提交:外部通过 threadpool_add_task 函数将任务入队,并通过条件变量唤醒等待中的线程。
- 工作线程:线程池中的每个线程循环等待队列中出现新任务,获取锁、取出任务,执行该任务后继续等待。
- 线程退出:当线程池需要销毁时(或整个程序退出时),设置 shutdown 标志,并唤醒所有线程等待退出,最后回收所有线程资源与任务队列。
3. 数据结构设计
下面是两个最主要的数据结构:
- 任务节点(Task Node) 每个任务包含一个函数指针和对应的参数,以及一个指向下一个任务的指针。
typedef struct task {
void (*function)(void *); // 任务回调函数
void *argument; // 回调函数参数
struct task *next;
} task_t;
- 线程池 包含工作线程数组、任务队列、互斥锁、条件变量及控制标志。
typedef struct threadpool {
pthread_mutex_t lock;
pthread_cond_t cond;
pthread_t *threads; // 工作线程数组
task_t *task_queue_front; // 队头
task_t *task_queue_rear; // 队尾
int thread_count;
int shutdown; // 0:运行;1:关闭中
} threadpool_t;
4. 主要功能实现
下面我们分模块说明如何实现各个功能:
4.1 工作线程函数
工作线程持续从任务队列中取任务执行。流程如下:
- 获得锁后检查任务队列是否为空。
- 如果为空并且线程池没有关闭,则调用条件变量等待(同时释放锁)。
- 等待到条件变量被唤醒后再次检测状态。
- 如果队列有任务,则取出任务,并在解锁后执行这个任务。
- 如果线程池处于关闭状态且任务队列为空,则安全退出线程。
4.2 添加任务
函数 threadpool_add_task 将新任务放入队列尾部,并通知等待线程。确保在操作共享数据前后加锁,防止竞态条件。
4.3 销毁线程池
销毁时需要:
- 将 shutdown 标志置位。
- 通过条件变量唤醒所有等待线程。
- 等待(join)所有工作线程结束。
- 清理剩余任务以及释放互斥量、条件变量和线程数组内存。
5. 参考代码
下面是一份基于 POSIX 线程的完整参考代码,该代码实现了一个简单的线程池,可以添加任务、执行任务,并在销毁线程池时安全退出:
#include
#include
#include
#include
// 任务节点结构
typedef struct task {
void (*function)(void *);
void *argument;
struct task *next;
} task_t;
// 线程池结构
typedef struct threadpool {
pthread_mutex_t lock;
pthread_cond_t cond;
pthread_t *threads;
task_t *task_queue_front;
task_t *task_queue_rear;
int thread_count;
int shutdown;
} threadpool_t;
// 工作线程函数
void *threadpool_worker(void *arg) {
threadpool_t *pool = (threadpool_t *)arg;
task_t *task;
while (1) {
pthread_mutex_lock(&pool->lock);
// 当任务队列为空且没有关闭,等待新任务
while (pool->task_queue_front == NULL && !pool->shutdown) {
pthread_cond_wait(&pool->cond, &pool->lock);
}
// 如果线程池标记为关闭,则退出
if (pool->shutdown && pool->task_queue_front == NULL) {
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL);
}
// 获取队列第一个任务
task = pool->task_queue_front;
if (task != NULL) {
pool->task_queue_front = task->next;
// 队列变为空时重置队尾指针
if (pool->task_queue_front == NULL)
pool->task_queue_rear = NULL;
}
pthread_mutex_unlock(&pool->lock);
if (task != NULL) {
// 执行任务
task->function(task->argument);
free(task);
}
}
return NULL;
}
// 创建线程池
threadpool_t *threadpool_create(int thread_count) {
threadpool_t *pool;
int i;
if ((pool = malloc(sizeof(threadpool_t))) == NULL) {
perror("创建线程池: 内存分配失败");
return NULL;
}
pool->thread_count = thread_count;
pool->shutdown = 0;
pool->task_queue_front = pool->task_queue_rear = NULL;
if (pthread_mutex_init(&pool->lock, NULL) != 0 ||
pthread_cond_init(&pool->cond, NULL) != 0) {
perror("创建线程池: 同步工具初始化失败");
free(pool);
return NULL;
}
pool->threads = malloc(sizeof(pthread_t) * thread_count);
if (pool->threads == NULL) {
perror("创建线程池: 内存分配失败");
free(pool);
return NULL;
}
for (i = 0; i < thread_count i if pthread_createpool->threads[i], NULL, threadpool_worker, (void *)pool) != 0) {
perror("创建线程池: 创建线程失败");
threadpool_destroy(pool);
return NULL;
}
}
return pool;
}
// 添加任务到线程池
int threadpool_add_task(threadpool_t *pool, void (*function)(void *), void *argument) {
task_t *new_task = malloc(sizeof(task_t));
if (new_task == NULL) {
perror("添加任务: 内存分配失败");
return -1;
}
new_task->function = function;
new_task->argument = argument;
new_task->next = NULL;
pthread_mutex_lock(&pool->lock);
// 将任务添加到队尾
if (pool->task_queue_rear == NULL) {
pool->task_queue_front = pool->task_queue_rear = new_task;
} else {
pool->task_queue_rear->next = new_task;
pool->task_queue_rear = new_task;
}
// 通知一个等待线程
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->lock);
return 0;
}
// 销毁线程池
int threadpool_destroy(threadpool_t *pool) {
int i;
if (pool == NULL) return -1;
pthread_mutex_lock(&pool->lock);
pool->shutdown = 1;
// 唤醒所有等待线程
pthread_cond_broadcast(&pool->cond);
pthread_mutex_unlock(&pool->lock);
// 回收所有线程
for (i = 0; i < pool->thread_count; i++) {
pthread_join(pool->threads[i], NULL);
}
// 清理任务队列
while (pool->task_queue_front != NULL) {
task_t *temp = pool->task_queue_front;
pool->task_queue_front = pool->task_queue_front->next;
free(temp);
}
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->cond);
free(pool->threads);
free(pool);
return 0;
}
// 下面是测试任务函数示例
void test_task(void *arg) {
int task_num = *(int *)arg;
printf("线程 %ld 正在处理任务 %d\n", pthread_self(), task_num);
// 模拟任务处理时间
sleep(1);
}
//
// 示例:使用线程池执行多个任务
//
int main() {
const int THREAD_NUM = 4;
const int TASK_NUM = 10;
threadpool_t *pool = threadpool_create(THREAD_NUM);
if (pool == NULL) {
fprintf(stderr, "线程池创建失败\n");
exit(EXIT_FAILURE);
}
// 提交 TASK_NUM 个任务,每个任务传递一个不同的任务编号
for (int i = 0; i < TASK_NUM; i++) {
int *arg = malloc(sizeof(int));
if (arg == NULL) {
perror("主函数: 内存分配失败");
continue;
}
*arg = i;
threadpool_add_task(pool, test_task, arg);
}
// 等待一段时间(确保任务都执行完成)
sleep(5);
threadpool_destroy(pool);
printf("线程池已销毁,主程序退出\n");
return 0;
}
6. 进一步说明
线程安全与同步
- 本示例使用了 pthread_mutex_lock 和 pthread_cond_wait/signal 来保护任务队列。
- 生产者(任务提交)在添加任务完成后调用 pthread_cond_signal 通知等待线程;工作线程在队列为空时进入等待状态。
错误处理
- 每个函数调用都做了简单的错误检查,这在实际开发中非常关键,确保在内存分配或线程创建失败时能立即返回错误并做出相应处理。
扩展与优化
- 你可以扩展任务结构,添加任务优先级。
- 可考虑实现线程池的动态扩缩容,实时根据任务量调整线程数量。
- 对于长时间运行的任务,可以加入任务取消或超时处理机制。
7. 总结
通过上面的设计指南,你可以掌握构建一个简单但功能完备的 C 语言线程池需要注意的各个方面。从任务队列的设计、工作线程的循环机制到线程同步和安全退出,每个环节都至关重要。参考代码展示了一个全流程的实现案例,便于你在实际项目中进行测试与拓展。
希望能为你在 C 语言项目中实现高效的多线程任务调度提供一点帮助!
本文暂时没有评论,来添加一个吧(●'◡'●)