Java多线程实战:ReentrantLock与信号量Semaphore的5个高频使用场景解析
Java多线程实战ReentrantLock与信号量Semaphore的5个高频使用场景解析在Java并发编程领域ReentrantLock和Semaphore是两个至关重要的同步工具。它们虽然都属于JUCjava.util.concurrent包中的并发控制机制但设计理念和应用场景却存在显著差异。本文将深入剖析这两种工具在真实项目中的典型应用场景帮助开发者做出更精准的技术选型。1. 并发资源池管理资源池如数据库连接池、线程池是多线程环境中的典型共享资源。当多个线程需要从有限池中获取资源时Semaphore的计数特性使其成为天然解决方案。public class ConnectionPool { private final Semaphore semaphore; private final LinkedListConnection pool new LinkedList(); public ConnectionPool(int poolSize) { this.semaphore new Semaphore(poolSize); for(int i0; ipoolSize; i) { pool.addLast(createConnection()); } } public Connection getConnection() throws InterruptedException { semaphore.acquire(); // 获取许可 synchronized(pool) { return pool.removeFirst(); } } public void releaseConnection(Connection conn) { synchronized(pool) { pool.addLast(conn); } semaphore.release(); // 释放许可 } private Connection createConnection() { // 创建真实连接 return new MockConnection(); } }关键对比Semaphore适合控制资源访问总量ReentrantLock更适合保护资源访问的互斥性提示当资源获取需要支持超时机制时可使用tryAcquire(long timeout, TimeUnit unit)方法避免线程无限期阻塞。2. 生产者-消费者模型实现生产者-消费者问题是并发编程中的经典案例。ReentrantLock配合Condition可以实现更精细的线程间协调public class MessageQueueT { private final ReentrantLock lock new ReentrantLock(); private final Condition notFull lock.newCondition(); private final Condition notEmpty lock.newCondition(); private final QueueT queue new LinkedList(); private final int capacity; public MessageQueue(int capacity) { this.capacity capacity; } public void put(T message) throws InterruptedException { lock.lock(); try { while(queue.size() capacity) { notFull.await(); // 队列满时等待 } queue.add(message); notEmpty.signal(); // 唤醒消费者 } finally { lock.unlock(); } } public T take() throws InterruptedException { lock.lock(); try { while(queue.isEmpty()) { notEmpty.await(); // 队列空时等待 } T message queue.remove(); notFull.signal(); // 唤醒生产者 return message; } finally { lock.unlock(); } } }优势分析相比synchronizedCondition提供了更灵活的等待/通知机制单个ReentrantLock可创建多个Condition实现不同条件的精确控制支持公平锁策略避免线程饥饿问题3. 流量控制与限流在高并发系统中Semaphore常被用作简单的限流器。以下是一个基于信号量的API限流实现public class RateLimiter { private final Semaphore semaphore; private final int maxPermits; private final TimeUnit timeUnit; private ScheduledExecutorService scheduler; public RateLimiter(int maxPermits, long period, TimeUnit timeUnit) { this.semaphore new Semaphore(maxPermits); this.maxPermits maxPermits; this.timeUnit timeUnit; this.scheduler Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() - { int current semaphore.availablePermits(); if(current maxPermits) { semaphore.release(maxPermits - current); } }, 0, period, timeUnit); } public boolean tryAcquire() { return semaphore.tryAcquire(); } public void shutdown() { scheduler.shutdown(); } }参数配置示例参数名示例值说明maxPermits100每秒最大请求数period1时间窗口长度timeUnitTimeUnit.SECONDS时间单位这种实现相比ReentrantLock的优势在于天然支持并发许可数的概念无需手动维护计数器可与定时任务无缝集成4. 分布式锁的本地模拟在无法使用Redis等分布式锁的场景下ReentrantLock可以作为本地替代方案。以下是一个支持重入的锁服务实现public class ReentrantLockService { private final ReentrantLock lock new ReentrantLock(true); // 公平锁 private final MapString, Integer lockCounts new HashMap(); private final MapString, Thread lockOwners new HashMap(); public boolean tryLock(String resourceId, long timeout) throws InterruptedException { Thread current Thread.currentThread(); if(lockOwners.get(resourceId) current) { lockCounts.put(resourceId, lockCounts.get(resourceId) 1); return true; } if(lock.tryLock(timeout, TimeUnit.MILLISECONDS)) { try { if(!lockCounts.containsKey(resourceId)) { lockOwners.put(resourceId, current); lockCounts.put(resourceId, 1); return true; } } finally { lock.unlock(); } } return false; } public void unlock(String resourceId) { Thread current Thread.currentThread(); if(lockOwners.get(resourceId) ! current) { throw new IllegalMonitorStateException(); } int count lockCounts.get(resourceId) - 1; if(count 0) { lockCounts.remove(resourceId); lockOwners.remove(resourceId); lock.unlock(); } else { lockCounts.put(resourceId, count); } } }特性对比表特性ReentrantLockSemaphore重入性支持不支持公平性可配置可配置条件等待支持不支持资源计数不支持支持中断响应支持支持5. 多阶段任务协同复杂任务往往需要多个线程分阶段协作完成。以下示例展示如何组合使用ReentrantLock和Semaphore实现阶段控制public class PhaseController { private final ReentrantLock phaseLock new ReentrantLock(); private final Condition phaseCondition phaseLock.newCondition(); private final Semaphore workerSemaphore; private volatile int currentPhase 0; private final int maxWorkers; public PhaseController(int maxWorkers) { this.maxWorkers maxWorkers; this.workerSemaphore new Semaphore(maxWorkers); } public void startPhase(int phase) throws InterruptedException { phaseLock.lock(); try { while(phase ! currentPhase) { phaseCondition.await(); } workerSemaphore.acquire(maxWorkers); // 获取所有许可 } finally { phaseLock.unlock(); } } public void endPhase() { phaseLock.lock(); try { currentPhase; workerSemaphore.release(maxWorkers); // 释放所有许可 phaseCondition.signalAll(); } finally { phaseLock.unlock(); } } public void doWork(Runnable task) throws InterruptedException { workerSemaphore.acquire(); try { task.run(); } finally { workerSemaphore.release(); } } }典型工作流程主线程调用startPhase(0)进入第一阶段工作线程通过doWork()执行任务主线程调用endPhase()结束当前阶段主线程调用startPhase(1)进入下一阶段这种模式特别适用于需要严格阶段控制的批处理场景如ETL数据处理流水线。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2473799.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!