生产者消费者模型详解
一、什么是生产者消费者模型生产者消费者模型的核心逻辑很简单存在两类线程生产者线程、消费者线程通过一个“缓冲区”也叫“消息队列”实现通信生产者线程负责生成数据并放入缓冲区消费者线程负责从缓冲区取出数据并处理。类比生活中的场景餐厅里厨师生产者负责制作菜品生成数据并把菜品放到出餐台缓冲区服务员消费者从出餐台取走菜品获取数据送到顾客手中处理数据。厨师和服务员互不直接通信全靠出餐台协同这就是最直观的生产者消费者模型。1.1 核心角色缺一不可生产者Producer负责生成数据如读取文件、查询数据库、接收请求等将数据放入缓冲区不直接与消费者交互。消费者Consumer负责从缓冲区取出数据对数据进行处理如计算、存储、返回响应等不直接与生产者交互。缓冲区Buffer作为生产者和消费者的“中间媒介”用于存储生产者生成的数据平衡两者的处理速度解耦核心。缓冲区可以是数组、队列等数据结构通常会设置容量上限。1.2 为什么需要这个模型解决了什么痛点很多人会疑惑直接让生产者生成数据后调用消费者的处理方法不就行了为什么要多此一举加个缓冲区其实这个模型的核心价值在于解耦、削峰填谷、平衡并发效率解决了以下3个核心痛点解耦生产者与消费者生产者和消费者无需知道对方的存在只需关注自身的核心逻辑生产/消费缓冲区负责协调两者降低代码耦合度便于后续扩展和维护。平衡处理速度差异生产者和消费者的处理速度往往不同比如生产者每秒生成100条数据消费者每秒只能处理50条缓冲区可以暂时存储多余的数据避免生产者因消费者处理缓慢而阻塞或消费者因无数据可处理而空闲。削峰填谷提升系统稳定性当生产速率突然激增如高峰期请求缓冲区可以缓存大量数据避免消费者被瞬间压垮当生产速率下降时消费者可以从缓冲区继续获取数据保证系统平稳运行。二、核心原理线程间通信的关键的是“协同”生产者消费者模型的核心是“线程间协同”本质是通过线程同步机制实现“生产”与“消费”的有序进行避免出现以下问题缓冲区满时生产者继续生产数据溢出缓冲区空时消费者继续消费无数据可处理多生产者/多消费者并发时数据错乱如多个生产者同时放入数据多个消费者同时取出数据。因此线程间通信的关键的是当缓冲区满时让生产者阻塞当缓冲区空时让消费者阻塞当数据被生产/消费后通知对应的线程继续执行。在Java中实现这种协同的核心机制有两种传统方式synchronized关键字 wait()/notify()/notifyAll() 方法基于对象监视器进阶方式Lock锁 Condition接口更灵活支持精准唤醒简化方式Java提供的阻塞队列BlockingQueue自动实现线程协同无需手动处理同步。三、Java实战4种方式实现生产者消费者模型下面我们从“基础到进阶”依次实现4种常见的生产者消费者模型结合代码示例让你直观理解每种方式的优缺点和适用场景。3.1 方式1synchronized wait()/notifyAll()最基础面试高频这是最经典、最基础的实现方式基于Java的对象监视器机制核心是用synchronized保证缓冲区的线程安全用wait()让线程阻塞用notifyAll()唤醒阻塞的线程。核心逻辑生产者获取缓冲区锁 → 判断缓冲区是否满 → 满则wait()阻塞 → 不满则生产数据 → 通知消费者notifyAll() → 释放锁消费者获取缓冲区锁 → 判断缓冲区是否空 → 空则wait()阻塞 → 不空则消费数据 → 通知生产者notifyAll() → 释放锁。完整代码实现import java.util.LinkedList; import java.util.Queue; // 1. 定义缓冲区消息队列 class Buffer { private final QueueInteger queue; // 存储数据的队列 private final int capacity; // 缓冲区容量上限 public Buffer(int capacity) { this.queue new LinkedList(); this.capacity capacity; } // 生产数据生产者调用 public synchronized void produce(int data) throws InterruptedException { // 循环判断缓冲区满时生产者阻塞避免虚假唤醒 while (queue.size() capacity) { System.out.println(缓冲区已满生产者[ Thread.currentThread().getName() ]阻塞等待); wait(); // 释放锁进入阻塞状态 } // 生产数据放入缓冲区 queue.offer(data); System.out.println(生产者[ Thread.currentThread().getName() ]生产数据 data 当前缓冲区大小 queue.size()); // 唤醒所有阻塞的线程消费者/其他生产者 notifyAll(); } // 消费数据消费者调用 public synchronized void consume() throws InterruptedException { // 循环判断缓冲区空时消费者阻塞避免虚假唤醒 while (queue.isEmpty()) { System.out.println(缓冲区为空消费者[ Thread.currentThread().getName() ]阻塞等待); wait(); // 释放锁进入阻塞状态 } // 消费数据从缓冲区取出 int data queue.poll(); System.out.println(消费者[ Thread.currentThread().getName() ]消费数据 data 当前缓冲区大小 queue.size()); // 唤醒所有阻塞的线程生产者/其他消费者 notifyAll(); } } // 2. 定义生产者线程 class Producer implements Runnable { private final Buffer buffer; public Producer(Buffer buffer) { this.buffer buffer; } Override public void run() { // 模拟生产者持续生产数据 for (int i 1; i 10; i) { try { buffer.produce(i); Thread.sleep(500); // 模拟生产耗时 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } // 3. 定义消费者线程 class Consumer implements Runnable { private final Buffer buffer; public Consumer(Buffer buffer) { this.buffer buffer; } Override public void run() { // 模拟消费者持续消费数据 for (int i 1; i 10; i) { try { buffer.consume(); Thread.sleep(1000); // 模拟消费耗时故意比生产慢测试缓冲区作用 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } // 4. 测试类 public class ProducerConsumerDemo1 { public static void main(String[] args) { Buffer buffer new Buffer(3); // 缓冲区容量设为3 // 启动1个生产者、2个消费者 new Thread(new Producer(buffer), P1).start(); new Thread(new Consumer(buffer), C1).start(); new Thread(new Consumer(buffer), C2).start(); } } }关键注意点必须用while循环判断缓冲区状态满/空而不是if判断——避免“虚假唤醒”线程被唤醒后缓冲区状态可能已发生变化notifyAll()会唤醒所有阻塞的线程包括生产者和消费者可能会有不必要的唤醒开销但实现简单synchronized锁的是缓冲区对象保证生产和消费操作的原子性。3.2 方式2Lock Condition更灵活精准唤醒方式1的notifyAll()会唤醒所有线程效率不高。而Lock锁结合Condition接口可以实现“精准唤醒”——生产者只唤醒消费者消费者只唤醒生产者减少不必要的线程切换提升效率。核心逻辑创建两个Condition对象notFull缓冲区不满用于唤醒生产者、notEmpty缓冲区不空用于唤醒消费者生产者获取锁 → 缓冲区满则await(notFull) → 生产数据 → signal(notEmpty)唤醒消费者 → 释放锁消费者获取锁 → 缓冲区空则await(notEmpty) → 消费数据 → signal(notFull)唤醒生产者 → 释放锁。完整代码实现import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; // 1. 定义缓冲区用LockCondition实现 class Buffer2 { private final QueueInteger queue; private final int capacity; private final Lock lock new ReentrantLock(); // 可重入锁 private final Condition notFull lock.newCondition(); // 缓冲区不满的条件 private final Condition notEmpty lock.newCondition(); // 缓冲区不空的条件 public Buffer2(int capacity) { this.queue new LinkedList(); this.capacity capacity; } // 生产数据 public void produce(int data) throws InterruptedException { lock.lock(); // 获取锁 try { // 缓冲区满生产者阻塞等待notFull信号 while (queue.size() capacity) { System.out.println(缓冲区已满生产者[ Thread.currentThread().getName() ]阻塞等待); notFull.await(); // 释放锁进入阻塞 } queue.offer(data); System.out.println(生产者[ Thread.currentThread().getName() ]生产数据 data 当前缓冲区大小 queue.size()); notEmpty.signal(); // 唤醒等待notEmpty的消费者 } finally { lock.unlock(); // 确保锁释放 } } // 消费数据 public void consume() throws InterruptedException { lock.lock(); // 获取锁 try { // 缓冲区空消费者阻塞等待notEmpty信号 while (queue.isEmpty()) { System.out.println(缓冲区为空消费者[ Thread.currentThread().getName() ]阻塞等待); notEmpty.await(); // 释放锁进入阻塞 } int data queue.poll(); System.out.println(消费者[ Thread.currentThread().getName() ]消费数据 data 当前缓冲区大小 queue.size()); notFull.signal(); // 唤醒等待notFull的生产者 } finally { lock.unlock(); // 确保锁释放 } } } // 2. 生产者和消费者与方式1一致无需修改 class Producer2 implements Runnable { private final Buffer2 buffer; public Producer2(Buffer2 buffer) { this.buffer buffer; } Override public void run() { for (int i 1; i 10; i) { try { buffer.produce(i); Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } class Consumer2 implements Runnable { private final Buffer2 buffer; public Consumer2(Buffer2 buffer) { this.buffer buffer; } Override public void run() { for (int i 1; i 10; i) { try { buffer.consume(); Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } // 3. 测试类 public class ProducerConsumerDemo2 { public static void main(String[] args) { Buffer2 buffer new Buffer2(3); new Thread(new Producer2(buffer), P1).start(); new Thread(new Consumer2(buffer), C1).start(); new Thread(new Consumer2(buffer), C2).start(); } } }优点精准唤醒减少线程切换开销比synchronized更灵活支持中断、超时等待等高级特性适合复杂场景。3.3 方式3BlockingQueue最简洁推荐实战使用Java的java.util.concurrent包中提供了阻塞队列BlockingQueue它已经内置了线程协同机制无需手动处理synchronized、wait/notify或Lock/Condition直接调用队列的put()生产和take()消费方法即可实现生产者消费者模型。核心特性put()当队列满时自动阻塞生产者take()当队列空时自动阻塞消费者内置线程安全无需额外加锁。常用的BlockingQueue实现类ArrayBlockingQueue有界队列固定容量效率高适合已知缓冲区大小的场景LinkedBlockingQueue可界/无界队列默认无界适合生产速率波动较大的场景SynchronousQueue无缓冲队列容量为0生产者生产数据后必须等待消费者消费适合“生产即消费”的场景。完整代码实现用ArrayBlockingQueueimport java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; // 1. 生产者线程直接操作BlockingQueue class Producer3 implements Runnable { private final BlockingQueueInteger blockingQueue; public Producer3(BlockingQueueInteger blockingQueue) { this.blockingQueue blockingQueue; } Override public void run() { for (int i 1; i 10; i) { try { // put()队列满时阻塞 blockingQueue.put(i); System.out.println(生产者[ Thread.currentThread().getName() ]生产数据 i 当前队列大小 blockingQueue.size()); Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } // 2. 消费者线程直接操作BlockingQueue class Consumer3 implements Runnable { private final BlockingQueueInteger blockingQueue; public Consumer3(BlockingQueueInteger blockingQueue) { this.blockingQueue blockingQueue; } Override public void run() { for (int i 1; i 10; i) { try { // take()队列空时阻塞 int data blockingQueue.take(); System.out.println(消费者[ Thread.currentThread().getName() ]消费数据 data 当前队列大小 blockingQueue.size()); Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } // 3. 测试类 public class ProducerConsumerDemo3 { public static void main(String[] args) { // 初始化有界阻塞队列容量为3 BlockingQueueInteger blockingQueue new ArrayBlockingQueue(3); // 启动线程 new Thread(new Producer3(blockingQueue), P1).start(); new Thread(new Consumer3(blockingQueue), C1).start(); new Thread(new Consumer3(blockingQueue), C2).start(); } } }优点代码最简洁无需手动处理同步和线程协同BlockingQueue内部已实现不易出错是实战中最推荐的方式。3.4 方式4线程池 BlockingQueue实战进阶高可用在实际开发中我们很少手动创建单个生产者/消费者线程而是使用线程池管理线程控制线程数量、复用线程、提升效率结合BlockingQueue实现高可用的生产者消费者模型。核心逻辑用线程池管理生产者线程池和消费者线程池缓冲区使用BlockingQueue实现“生产-缓冲-消费”的全流程解耦和高效并发。完整代码实现import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ProducerConsumerDemo4 { // 1. 初始化有界阻塞队列缓冲区 private static final BlockingQueueInteger QUEUE new ArrayBlockingQueue(5); // 2. 初始化线程池2个生产者线程3个消费者线程 private static final ExecutorService PRODUCER_POOL Executors.newFixedThreadPool(2); private static final ExecutorService CONSUMER_POOL Executors.newFixedThreadPool(3); // 3. 生产者任务 private static class ProducerTask implements Runnable { private final int id; // 生产者ID区分不同生产者 public ProducerTask(int id) { this.id id; } Override public void run() { try { // 模拟生产10条数据 for (int i 1; i 10; i) { int data (id - 1) * 10 i; // 区分不同生产者的生产数据 QUEUE.put(data); System.out.println(生产者P id 生产数据 data 当前队列大小 QUEUE.size()); Thread.sleep(300); // 模拟生产耗时 } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { System.out.println(生产者P id 完成所有生产任务); } } } // 4. 消费者任务 private static class ConsumerTask implements Runnable { private final int id; // 消费者ID public ConsumerTask(int id) { this.id id; } Override public void run() { try { // 持续消费数据直到队列空且所有生产者任务完成 while (true) { int data QUEUE.take(); System.out.println(消费者C id 消费数据 data 当前队列大小 QUEUE.size()); Thread.sleep(500); // 模拟消费耗时 // 终止条件队列空 生产者线程池已关闭且无任务 if (QUEUE.isEmpty() PRODUCER_POOL.isTerminated()) { break; } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { System.out.println(消费者C id 完成所有消费任务); } } } // 5. 测试入口 public static void main(String[] args) throws InterruptedException { // 提交生产者任务 PRODUCER_POOL.submit(new ProducerTask(1)); PRODUCER_POOL.submit(new ProducerTask(2)); // 提交消费者任务 CONSUMER_POOL.submit(new ConsumerTask(1)); CONSUMER_POOL.submit(new ConsumerTask(2)); CONSUMER_POOL.submit(new ConsumerTask(3)); // 关闭生产者线程池不再接收新任务等待已提交任务完成 PRODUCER_POOL.shutdown(); // 等待生产者线程池所有任务完成 while (!PRODUCER_POOL.isTerminated()) { Thread.sleep(100); } // 关闭消费者线程池所有消费任务完成后 CONSUMER_POOL.shutdown(); } } }实战要点线程池大小根据业务场景调整生产者线程数 ≤ 消费者线程数避免缓冲区溢出需设置合理的终止条件避免消费者线程无限阻塞推荐使用ThreadPoolExecutor自定义线程池而非Executors更灵活控制核心线程数、最大线程数、拒绝策略等。四、4种实现方式对比为了方便大家选择和应对面试整理了4种实现方式的核心对比清晰呈现优缺点和适用场景实现方式核心原理优点缺点适用场景synchronized wait/notifyAll()对象监视器机制线程同步阻塞唤醒实现简单无需额外依赖面试高频唤醒所有线程效率低不支持精准唤醒基础学习、简单并发场景Lock Condition可重入锁条件变量精准唤醒灵活精准唤醒支持中断、超时代码稍复杂需手动释放锁复杂并发场景需要精准唤醒BlockingQueue内置线程协同自动阻塞/唤醒代码最简洁不易出错线程安全灵活性稍低无法自定义唤醒逻辑实战首选大多数并发场景线程池 BlockingQueue线程池管理线程阻塞队列做缓冲高可用、高并发线程复用效率高配置稍复杂需控制线程池参数生产环境、高并发业务如消息队列、请求处理
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2444236.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!