C++ 笔记 高级线程同步原语与线程池实现
在std::thread基础上C11 还提供了std::condition_variable条件变量和std::atomic原子变量两大高级同步原语分别解决 “线程间协作通知” 和 “无锁数据竞争” 问题而线程池则是对std::thread的高层封装通过预创建线程池避免频繁创建 / 销毁线程的开销是多线程开发的核心工具。本文将从底层原理、代码示例到完整实现系统拆解这三大知识点。一、std::condition_variable线程间的条件同步1.1 核心作用与原理std::condition_variable用于线程间的协作通知一个线程等待某个条件成立而阻塞另一个线程在条件成立时通知阻塞线程继续执行。它必须与std::unique_lockstd::mutex配合使用 ——mutex 保护共享条件条件变量负责线程的阻塞与唤醒二者结合解决 “忙等待Busy Wait” 问题避免线程空转浪费 CPU。1.2 关键函数表格函数作用wait(lock)阻塞当前线程直到被通知自动释放锁被唤醒后重新获取锁wait(lock, pred)带谓词的wait阻塞直到被通知且pred()为true自动处理虚假唤醒notify_one()唤醒一个等待的线程notify_all()唤醒所有等待的线程1.3 经典示例生产者 - 消费者模型这是条件变量最典型的应用场景生产者线程生成数据放入队列消费者线程从队列取数据处理队列满时生产者阻塞队列空时消费者阻塞。#include iostream #include thread #include mutex #include condition_variable #include queue #include chrono std::queueint data_queue; // 共享数据队列 std::mutex mtx; // 保护队列的互斥锁 std::condition_variable cv; // 条件变量 const int MAX_QUEUE_SIZE 5; // 队列最大容量 // 生产者线程生成数据放入队列 void producer() { for (int i 1; i 10; i) { { std::unique_lockstd::mutex lock(mtx); // 队列满时阻塞等待消费者取数据谓词防止虚假唤醒 cv.wait(lock, []() { return data_queue.size() MAX_QUEUE_SIZE; }); data_queue.push(i); std::cout [生产者] 放入数据: i 队列大小: data_queue.size() \n; } cv.notify_one(); // 通知一个消费者线程 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产耗时 } } // 消费者线程从队列取数据处理 void consumer(int id) { while (true) { int data; { std::unique_lockstd::mutex lock(mtx); // 队列空时阻塞等待生产者放数据同时检查是否生产结束 cv.wait(lock, []() { return !data_queue.empty(); }); data data_queue.front(); data_queue.pop(); std::cout [消费者 id ] 取出数据: data 队列大小: data_queue.size() \n; } cv.notify_one(); // 通知一个生产者线程 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟消费耗时 if (data 10) break; // 生产结束退出循环 } } int main() { std::thread prod(producer); std::thread cons1(consumer, 1); std::thread cons2(consumer, 2); prod.join(); cons1.join(); cons2.join(); return 0; }1.4 关键细节虚假唤醒wait()可能在没有被notify_one()/notify_all()调用时被唤醒称为 “虚假唤醒”因此必须用带谓词的wait谓词用于验证条件是否真正成立确保线程安全。二、std::atomic无锁原子操作2.1 核心作用与原理std::atomic提供无锁的原子操作底层依赖硬件的原子指令如 x86 的LOCK前缀比std::mutex更轻量适合简单的共享数据如计数器、标志位避免了锁的开销和死锁风险。2.2 基本用法#include iostream #include thread #include atomic #include vector // 1. 原子计数器无锁实现多线程安全 std::atomicint atomic_count(0); // 对比非原子计数器会有数据竞争 int unsafe_count 0; void atomic_increment() { for (int i 0; i 10000; i) { atomic_count; // 原子自增操作底层是硬件原子指令 } } void unsafe_increment() { for (int i 0; i 10000; i) { unsafe_count; // 非原子操作多线程下会出错 } } int main() { // 测试原子计数器 std::vectorstd::thread threads1; for (int i 0; i 10; i) { threads1.emplace_back(atomic_increment); } for (auto t : threads1) t.join(); std::cout [原子计数器] 最终结果: atomic_count 期望 100000实际一致\n; // 测试非原子计数器 std::vectorstd::thread threads2; for (int i 0; i 10; i) { threads2.emplace_back(unsafe_increment); } for (auto t : threads2) t.join(); std::cout [非原子计数器] 最终结果: unsafe_count 期望 100000实际可能更小\n; // 2. 原子标志位用于线程间的停止信号 std::atomicbool stop_flag(false); std::thread worker([stop_flag]() { while (!stop_flag) { std::cout [工作线程] 运行中...\n; std::this_thread::sleep_for(std::chrono::milliseconds(500)); } std::cout [工作线程] 收到停止信号退出\n; }); std::this_thread::sleep_for(std::chrono::seconds(2)); stop_flag true; // 原子设置标志位通知工作线程停止 worker.join(); return 0; }2.3 内存序简介进阶std::atomic支持不同的内存序Memory Order用于控制多线程下的指令重排和可见性默认是std::memory_order_seq_cst顺序一致性最安全但性能稍低其他内存序如memory_order_relaxed、memory_order_acquire、memory_order_release可用于优化性能但需谨慎使用容易出错。三、线程池高效的线程管理3.1 为什么需要线程池直接使用std::thread有两个核心问题频繁创建 / 销毁线程开销大线程创建需要分配栈空间、内核态切换销毁也需要回收资源短任务场景下开销甚至超过任务本身。线程数量不可控无限制创建线程会导致系统资源耗尽、CPU 调度开销过大。线程池的核心思想是预创建一组线程固定数量或动态调整将任务提交到任务队列线程池中的线程循环从队列取任务执行避免了频繁创建 / 销毁线程的开销同时控制了线程数量。3.2 线程池的核心组成一个完整的线程池包含以下部分线程数组预创建的工作线程集合。任务队列存储待执行任务的队列需用 mutex 和条件变量保护。同步机制std::mutex保护任务队列std::condition_variable通知工作线程有新任务。任务提交接口支持提交任意可调用对象函数、lambda、成员函数等并返回std::future获取任务结果。3.3 完整实现一个可提交任务、获取结果的线程池#include iostream #include thread #include mutex #include condition_variable #include queue #include vector #include functional #include future #include memory #include stdexcept class ThreadPool { public: // 构造函数创建指定数量的工作线程 ThreadPool(size_t num_threads) : stop(false) { for (size_t i 0; i num_threads; i) { // 每个工作线程循环执行从任务队列取任务并执行 workers.emplace_back([this]() { while (true) { std::functionvoid() task; { std::unique_lockstd::mutex lock(this-queue_mutex); // 阻塞直到线程池停止 或 有新任务 this-condition.wait(lock, [this]() { return this-stop || !this-tasks.empty(); }); // 线程池停止且任务队列为空退出线程 if (this-stop this-tasks.empty()) return; // 从队列取任务 task std::move(this-tasks.front()); this-tasks.pop(); } // 执行任务 task(); } }); } } // 析构函数停止所有线程 ~ThreadPool() { { std::unique_lockstd::mutex lock(queue_mutex); stop true; // 设置停止标志 } condition.notify_all(); // 唤醒所有工作线程 for (std::thread worker : workers) { worker.join(); // 等待所有线程退出 } } // 禁止拷贝和移动 ThreadPool(const ThreadPool) delete; ThreadPool operator(const ThreadPool) delete; // 核心接口提交任务返回 std::future 获取结果 templateclass F, class... Args auto submit(F f, Args... args) - std::futuretypename std::result_ofF(Args...)::type { using return_type typename std::result_ofF(Args...)::type; // 将任务包装为 std::packaged_task支持获取返回值 auto task std::make_sharedstd::packaged_taskreturn_type()( std::bind(std::forwardF(f), std::forwardArgs(args)...) ); std::futurereturn_type res task-get_future(); { std::unique_lockstd::mutex lock(queue_mutex); // 线程池停止后禁止提交任务 if (stop) throw std::runtime_error(submit on stopped ThreadPool); // 将任务放入队列包装为 std::functionvoid() tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); // 通知一个工作线程有新任务 return res; } private: std::vectorstd::thread workers; // 工作线程数组 std::queuestd::functionvoid() tasks; // 任务队列 std::mutex queue_mutex; // 保护任务队列的互斥锁 std::condition_variable condition; // 条件变量通知工作线程 bool stop; // 停止标志 }; // ------------------------------ 线程池使用示例 ------------------------------ // 示例1简单的无返回值任务 void print_task(int id) { std::cout [线程池] 执行任务 id 线程ID: std::this_thread::get_id() \n; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } // 示例2有返回值的任务 int add_task(int a, int b) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); return a b; } int main() { // 创建线程池4个工作线程 ThreadPool pool(4); std::cout [主线程] 线程池已启动工作线程数量: 4\n; // 提交10个无返回值任务 std::cout \n[主线程] 提交10个无返回值任务...\n; for (int i 0; i 10; i) { pool.submit(print_task, i 1); } // 提交3个有返回值任务用 std::future 获取结果 std::cout \n[主线程] 提交3个有返回值任务...\n; std::vectorstd::futureint futures; futures.push_back(pool.submit(add_task, 10, 20)); futures.push_back(pool.submit(add_task, 30, 40)); futures.push_back(pool.submit(add_task, 50, 60)); // 获取并打印有返回值任务的结果 std::cout \n[主线程] 等待有返回值任务完成...\n; for (size_t i 0; i futures.size(); i) { std::cout [主线程] 任务 i 1 结果: futures[i].get() \n; } // 主线程等待一段时间让无返回值任务执行完毕 std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout \n[主线程] 所有任务执行完毕线程池将自动析构\n; return 0; } // 线程池析构自动停止所有工作线程3.4 关键细节任务包装用std::packaged_task和std::bind将任意可调用对象包装为无参数的std::functionvoid()同时支持通过std::future获取返回值。线程安全任务队列的访问必须用std::mutex保护条件变量用于通知工作线程有新任务或线程池停止。析构安全析构函数中先设置stop标志再唤醒所有线程最后join()等待线程退出确保线程池安全销毁。四、总结与最佳实践std::condition_variable用于线程间协作通知必须与std::unique_lock配合优先用带谓词的wait()处理虚假唤醒典型场景是生产者 - 消费者模型。std::atomic用于无锁原子操作比mutex轻量适合简单共享数据计数器、标志位默认内存序memory_order_seq_cst最安全进阶可按需调整。线程池是多线程开发的核心工具通过预创建线程避免频繁创建 / 销毁开销核心是 “任务队列 工作线程”支持提交任意任务并获取结果适合大量短任务场景。掌握这三大知识点能让你高效编写高性能、线程安全的 C 多线程程序。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2538354.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!