程序员的知识教程库

网站首页 > 教程分享 正文

C语言线程池设计指南(c语言编写线程)

henian88 2025-04-10 22:53:03 教程分享 15 ℃ 0 评论

1. 引言

在许多需要高并发或频繁执行短任务的场景中,频繁地创建与销毁线程会带来较大的性能开销。线程池模式通过预先创建一定数量的工作线程(Worker Threads),将任务放入共享队列中,等待空闲线程取出执行,从而有效复用线程,降低系统开销。

线程池设计关键点

  • 任务队列管理: 用链表(或数组)实现一个任务 FIFO 队列,每个任务包含回调函数和参数。
  • 线程同步: 使用互斥锁(mutex)和条件变量(condition variable)保护任务队列,保证多个线程安全访问。
  • 线程退出机制: 通过 shutdown 标志通知池中所有线程安全退出,避免资源泄漏。
  • 错误检查: 每个系统调用(如 pthread_create、pthread_mutex_lock 等)都要检查返回值,确保线程池能在异常情况下安全清理资源。

2. 设计架构

下图展示了线程池的基本运作流程(使用 ASCII 图形描述):

                +----------------+      提交任务      +---------------+
                |                | -----------------> |               |
      用户程序  |  添加任务到队列 |                    |   任务队列    |
                |                | <----------------- ---------------- --------------- v --------------- ----------------->
                                      |               |
                                      +---------------+
                                              |
                                              V     退出/等待
                                      +---------------+
                                      |               |
                                      | 销毁、退出    |
                                      |               |
                                      +---------------+
  • 任务提交:外部通过 threadpool_add_task 函数将任务入队,并通过条件变量唤醒等待中的线程。
  • 工作线程:线程池中的每个线程循环等待队列中出现新任务,获取锁、取出任务,执行该任务后继续等待。
  • 线程退出:当线程池需要销毁时(或整个程序退出时),设置 shutdown 标志,并唤醒所有线程等待退出,最后回收所有线程资源与任务队列。

3. 数据结构设计

下面是两个最主要的数据结构:

  1. 任务节点(Task Node) 每个任务包含一个函数指针和对应的参数,以及一个指向下一个任务的指针。

typedef struct task {
void (*function)(void *); // 任务回调函数
void *argument; // 回调函数参数
struct task *next;
} task_t;

  1. 线程池 包含工作线程数组、任务队列、互斥锁、条件变量及控制标志。

typedef struct threadpool {
pthread_mutex_t lock;
pthread_cond_t cond;
pthread_t *threads; // 工作线程数组
task_t *task_queue_front; // 队头
task_t *task_queue_rear; // 队尾
int thread_count;
int shutdown; // 0:运行;1:关闭中
} threadpool_t;


4. 主要功能实现

下面我们分模块说明如何实现各个功能:

4.1 工作线程函数

工作线程持续从任务队列中取任务执行。流程如下:

  1. 获得锁后检查任务队列是否为空。
  2. 如果为空并且线程池没有关闭,则调用条件变量等待(同时释放锁)。
  3. 等待到条件变量被唤醒后再次检测状态。
  4. 如果队列有任务,则取出任务,并在解锁后执行这个任务。
  5. 如果线程池处于关闭状态且任务队列为空,则安全退出线程。

4.2 添加任务

函数 threadpool_add_task 将新任务放入队列尾部,并通知等待线程。确保在操作共享数据前后加锁,防止竞态条件。

4.3 销毁线程池

销毁时需要:

  • 将 shutdown 标志置位。
  • 通过条件变量唤醒所有等待线程。
  • 等待(join)所有工作线程结束。
  • 清理剩余任务以及释放互斥量、条件变量和线程数组内存。

5. 参考代码

下面是一份基于 POSIX 线程的完整参考代码,该代码实现了一个简单的线程池,可以添加任务、执行任务,并在销毁线程池时安全退出:

 #include 
 #include 
 #include 
 #include 
 
 // 任务节点结构
 typedef struct task {
     void (*function)(void *);
     void *argument;
     struct task *next;
 } task_t;
 
 // 线程池结构
 typedef struct threadpool {
     pthread_mutex_t lock;
     pthread_cond_t cond;
     pthread_t *threads;
     task_t *task_queue_front;
     task_t *task_queue_rear;
     int thread_count;
     int shutdown;
 } threadpool_t;
 
 // 工作线程函数
 void *threadpool_worker(void *arg) {
     threadpool_t *pool = (threadpool_t *)arg;
     task_t *task;
 
     while (1) {
         pthread_mutex_lock(&pool->lock);
         // 当任务队列为空且没有关闭,等待新任务
         while (pool->task_queue_front == NULL && !pool->shutdown) {
             pthread_cond_wait(&pool->cond, &pool->lock);
         }
         // 如果线程池标记为关闭,则退出
         if (pool->shutdown && pool->task_queue_front == NULL) {
             pthread_mutex_unlock(&pool->lock);
             pthread_exit(NULL);
         }
         // 获取队列第一个任务
         task = pool->task_queue_front;
         if (task != NULL) {
             pool->task_queue_front = task->next;
             // 队列变为空时重置队尾指针
             if (pool->task_queue_front == NULL)
                 pool->task_queue_rear = NULL;
         }
         pthread_mutex_unlock(&pool->lock);
 
         if (task != NULL) {
             // 执行任务
             task->function(task->argument);
             free(task);
         }
     }
     return NULL;
 }
 
 // 创建线程池
 threadpool_t *threadpool_create(int thread_count) {
     threadpool_t *pool;
     int i;
 
     if ((pool = malloc(sizeof(threadpool_t))) == NULL) {
         perror("创建线程池: 内存分配失败");
         return NULL;
     }
     pool->thread_count = thread_count;
     pool->shutdown = 0;
     pool->task_queue_front = pool->task_queue_rear = NULL;
 
     if (pthread_mutex_init(&pool->lock, NULL) != 0 ||
         pthread_cond_init(&pool->cond, NULL) != 0) {
         perror("创建线程池: 同步工具初始化失败");
         free(pool);
         return NULL;
     }
 
     pool->threads = malloc(sizeof(pthread_t) * thread_count);
     if (pool->threads == NULL) {
         perror("创建线程池: 内存分配失败");
         free(pool);
         return NULL;
     }
 
     for (i = 0; i < thread_count i if pthread_createpool->threads[i], NULL, threadpool_worker, (void *)pool) != 0) {
             perror("创建线程池: 创建线程失败");
             threadpool_destroy(pool);
             return NULL;
         }
     }
     return pool;
 }
 
 // 添加任务到线程池
 int threadpool_add_task(threadpool_t *pool, void (*function)(void *), void *argument) {
     task_t *new_task = malloc(sizeof(task_t));
     if (new_task == NULL) {
         perror("添加任务: 内存分配失败");
         return -1;
     }
     new_task->function = function;
     new_task->argument = argument;
     new_task->next = NULL;
 
     pthread_mutex_lock(&pool->lock);
     // 将任务添加到队尾
     if (pool->task_queue_rear == NULL) {
         pool->task_queue_front = pool->task_queue_rear = new_task;
     } else {
         pool->task_queue_rear->next = new_task;
         pool->task_queue_rear = new_task;
     }
     // 通知一个等待线程
     pthread_cond_signal(&pool->cond);
     pthread_mutex_unlock(&pool->lock);
     return 0;
 }
 
 // 销毁线程池
 int threadpool_destroy(threadpool_t *pool) {
     int i;
 
     if (pool == NULL) return -1;
 
     pthread_mutex_lock(&pool->lock);
     pool->shutdown = 1;
     // 唤醒所有等待线程
     pthread_cond_broadcast(&pool->cond);
     pthread_mutex_unlock(&pool->lock);
 
     // 回收所有线程
     for (i = 0; i < pool->thread_count; i++) {
         pthread_join(pool->threads[i], NULL);
     }
 
     // 清理任务队列
     while (pool->task_queue_front != NULL) {
         task_t *temp = pool->task_queue_front;
         pool->task_queue_front = pool->task_queue_front->next;
         free(temp);
     }
 
     pthread_mutex_destroy(&pool->lock);
     pthread_cond_destroy(&pool->cond);
     free(pool->threads);
     free(pool);
     return 0;
 }
 
 // 下面是测试任务函数示例
 void test_task(void *arg) {
     int task_num = *(int *)arg;
     printf("线程 %ld 正在处理任务 %d\n", pthread_self(), task_num);
     // 模拟任务处理时间
     sleep(1);
 }
 
 //
 // 示例:使用线程池执行多个任务
 //
 int main() {
     const int THREAD_NUM = 4;
     const int TASK_NUM = 10;
     threadpool_t *pool = threadpool_create(THREAD_NUM);
     if (pool == NULL) {
         fprintf(stderr, "线程池创建失败\n");
         exit(EXIT_FAILURE);
     }
 
     // 提交 TASK_NUM 个任务,每个任务传递一个不同的任务编号
     for (int i = 0; i < TASK_NUM; i++) {
         int *arg = malloc(sizeof(int));
         if (arg == NULL) {
             perror("主函数: 内存分配失败");
             continue;
         }
         *arg = i;
         threadpool_add_task(pool, test_task, arg);
     }
 
     // 等待一段时间(确保任务都执行完成)
     sleep(5);
 
     threadpool_destroy(pool);
     printf("线程池已销毁,主程序退出\n");
     return 0;
 }

6. 进一步说明

线程安全与同步

  1. 本示例使用了 pthread_mutex_lockpthread_cond_wait/signal 来保护任务队列。
  2. 生产者(任务提交)在添加任务完成后调用 pthread_cond_signal 通知等待线程;工作线程在队列为空时进入等待状态。

错误处理

  1. 每个函数调用都做了简单的错误检查,这在实际开发中非常关键,确保在内存分配或线程创建失败时能立即返回错误并做出相应处理。

扩展与优化

  1. 你可以扩展任务结构,添加任务优先级。
  2. 可考虑实现线程池的动态扩缩容,实时根据任务量调整线程数量。
  3. 对于长时间运行的任务,可以加入任务取消或超时处理机制。

7. 总结

通过上面的设计指南,你可以掌握构建一个简单但功能完备的 C 语言线程池需要注意的各个方面。从任务队列的设计、工作线程的循环机制到线程同步和安全退出,每个环节都至关重要。参考代码展示了一个全流程的实现案例,便于你在实际项目中进行测试与拓展。

希望能为你在 C 语言项目中实现高效的多线程任务调度提供一点帮助!

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表