pthread(POSIX Threads)本身并不直接支持任务队列
以下是一个简单的C语言示例,展示了如何使用pthread和队列实现一个简单的线程池:
#include#include #include #include #define QUEUE_SIZE 10 #define NUM_THREADS 4 typedef struct Task { void (*func)(void *); void *arg; } Task; typedef struct TaskQueue { Task queue[QUEUE_SIZE]; atomic_int head; atomic_int tail; pthread_mutex_t lock; pthread_cond_t cond; } TaskQueue; typedef struct ThreadPool { pthread_t threads[NUM_THREADS]; TaskQueue task_queue; } ThreadPool; void task_queue_init(TaskQueue *task_queue) { task_queue->head = ATOMIC_VAR_INIT(0); task_queue->tail = ATOMIC_VAR_INIT(0); pthread_mutex_init(&task_queue->lock, NULL); pthread_cond_init(&task_queue->cond, NULL); } void task_queue_push(TaskQueue *task_queue, Task task) { pthread_mutex_lock(&task_queue->lock); while ((task_queue->tail + 1) % QUEUE_SIZE == task_queue->head) { pthread_cond_wait(&task_queue->cond, &task_queue->lock); } task_queue->queue[task_queue->tail] = task; task_queue->tail = (task_queue->tail + 1) % QUEUE_SIZE; pthread_cond_signal(&task_queue->cond); pthread_mutex_unlock(&task_queue->lock); } Task task_queue_pop(TaskQueue *task_queue) { pthread_mutex_lock(&task_queue->lock); while (task_queue->tail == task_queue->head) { pthread_cond_wait(&task_queue->cond, &task_queue->lock); } Task task = task_queue->queue[task_queue->head]; task_queue->head = (task_queue->head + 1) % QUEUE_SIZE; pthread_cond_signal(&task_queue->cond); pthread_mutex_unlock(&task_queue->lock); return task; } void *thread_pool_worker(void *arg) { ThreadPool *thread_pool = (ThreadPool *)arg; while (1) { Task task = task_queue_pop(&thread_pool->task_queue); task.func(task.arg); } return NULL; } void thread_pool_init(ThreadPool *thread_pool) { task_queue_init(&thread_pool->task_queue); for (int i = 0; i < NUM_THREADS; i++) { pthread_create(&thread_pool->threads[i], NULL, thread_pool_worker, thread_pool); } } void thread_pool_add_task(ThreadPool *thread_pool, void (*func)(void *), void *arg) { Task task = {.func = func, .arg = arg}; task_queue_push(&thread_pool->task_queue, task); } int main() { ThreadPool thread_pool; thread_pool_init(&thread_pool); // 添加任务到线程池 for (int i = 0; i < 20; i++) { thread_pool_add_task(&thread_pool, my_function, (void *)(intptr_t)i); } // 等待所有任务完成 // ... return 0; }
这个示例中,我们创建了一个线程池,其中包含一个任务队列和四个工作线程。我们可以向线程池添加任务,这些任务将被工作线程从队列中取出并执行。注意,这个示例仅用于演示目的,实际应用中可能需要考虑更多细节,例如关闭线程池、处理错误等。