ReadWriteLock
代码示例:
package com.yw.rw;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//只做写入操作
for (int i = 1;i<=5;i++){
final int temp = i;
new Thread(()->{
myCache.put(temp+"",temp+"");
},String.valueOf(i)).start();
}
//只做读取操作
for (int i = 1;i<=5;i++){
final int temp = i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
//自定义缓存
class MyCache{
private volatile Map<String,Object> map = new HashMap<>();
//读写锁:更加细粒度的操作
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
//存,写
public void put(String key,Object value){
reentrantReadWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入" + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入OK" );
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
reentrantReadWriteLock.writeLock().unlock();
}
}
//取,读
public void get(String key){
reentrantReadWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读取" + key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName() + "读取OK" );
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
reentrantReadWriteLock.readLock().unlock();
}
}
}
原理
核心概念
ReentrantReadWriteLock
的核心是区分读锁和写锁:
- 读锁(
ReadLock
):允许多个读线程同时访问共享资源。 - 写锁(
WriteLock
):写线程访问共享资源时,会独占资源,不允许其他读线程或写线程访问。
这种锁的设计基于以下原则:
- 读读共享:多个读线程可以同时读取共享资源。
- 读写互斥:读线程和写线程不能同时访问共享资源。
- 写写互斥:写线程之间也不能同时访问共享资源。
主要方法
ReentrantReadWriteLock
提供了读锁和写锁的接口,分别通过 readLock()
和 writeLock()
获取。
ReentrantReadWriteLock()
:构造方法,创建一个默认的读写锁。ReentrantReadWriteLock(boolean fair)
:构造方法,指定是否为公平锁。如果为true
,则按照线程请求锁的顺序分配锁。Lock readLock()
:获取读锁。Lock writeLock()
:获取写锁。
读锁和写锁都实现了 Lock
接口,因此可以使用 Lock
接口提供的方法,例如:
void lock()
:获取锁。void unlock()
:释放锁。boolean tryLock()
:尝试获取锁,如果当前没有锁,则立即返回true
;否则返回false
。boolean tryLock(long timeout, TimeUnit unit)
:尝试获取锁,直到超时。
底层实现
ReentrantReadWriteLock
的底层实现也是基于 AQS(AbstractQueuedSynchronizer)。AQS 提供了一个共享锁的框架,ReentrantReadWriteLock
利用这个框架实现了读写锁的机制。
- 状态表示:AQS 的状态(
state
)被用来表示读锁和写锁的持有情况。状态的高 16 位表示读锁的持有数量,低 16 位表示写锁的持有数量。 - 锁的获取与释放:
- 读锁:当一个线程尝试获取读锁时,会检查写锁是否被其他线程持有。如果没有写锁被持有,或者写锁被当前线程持有(可重入),则允许读锁的获取。
- 写锁:当一个线程尝试获取写锁时,会检查是否有其他线程持有读锁或写锁。如果没有其他线程持有锁,或者写锁已经被当前线程持有(可重入),则允许写锁的获取。
工作流程
- 初始化:创建
ReentrantReadWriteLock
对象。 - 获取读锁:读线程调用
readLock().lock()
,尝试获取读锁。 - 获取写锁:写线程调用
writeLock().lock()
,尝试获取写锁。 - 释放锁:线程完成操作后,调用
unlock()
方法释放锁。
阻塞队列
可见它与list,set处于同一层级
由此可见
BlockingQueue 不是新的东西
什么时候我们会用到阻塞队列?多线程并发处理,线程池!
BlockingQueue的四组常用API
方式 | 抛出异常 | 不会抛出异常,有返回值 | 阻塞 等待 | 超时等待 |
添加 | add | offer | put | offer(..) |
移除 | remove | poll | take | poll(..) |
判断队列首 | element | peek |
抛出异常
package com.yw.bq;
import java.util.concurrent.ArrayBlockingQueue;
public class Demo1 {
public static void main(String[] args) {
//这里的参数3表示队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.add("d"));
System.out.println("=======================");
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
}
}
我们的队列大小为3,此时如果再添加一个元素,就会抛出异常
如果队列为空,如果继续抛出,依旧会抛出异常
package com.yw.bq;
import java.util.concurrent.ArrayBlockingQueue;
public class Demo1 {
public static void main(String[] args) {
//这里的参数3表示队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// System.out.println(blockingQueue.add("d"));
System.out.println("=======================");
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
}
}
有返回值,不抛出异常
package com.yw.bq;
import java.util.concurrent.ArrayBlockingQueue;
public class Demo1 {
public static void main(String[] args) {
//这里的参数3表示队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d"));
System.out.println("=======================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
}
等待,阻塞
package com.yw.bq;
import java.util.concurrent.ArrayBlockingQueue;
public class Demo1 {
public static void main(String[] args) throws InterruptedException {
//这里的参数3表示队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// blockingQueue.put("d");
System.out.println("=======================");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
}
}
超时等待
package com.yw.bq;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Demo1 {
public static void main(String[] args) throws InterruptedException {
//这里的参数3表示队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
//设置超时时间和单位
blockingQueue.offer("d", 2,TimeUnit.SECONDS);
System.out.println("=======================");
blockingQueue.poll();
blockingQueue.poll();
blockingQueue.poll();
//设置超时时间和单位
blockingQueue.poll(2,TimeUnit.SECONDS);
}
}
原理
BlockingQueue
是 Java 并发包(java.util.concurrent
)中提供的一种线程安全的队列接口,用于在多线程环境中协调线程之间的通信和同步。它支持在队列为空时阻塞插入操作,或者在队列满时阻塞移除操作,从而简化了线程间的数据共享和通信。
核心概念
BlockingQueue
的核心是一个线程安全的队列,支持以下两种主要操作:
- 插入操作:将元素放入队列。如果队列已满,插入操作会被阻塞,直到有空间可用。
- 移除操作:从队列中取出元素。如果队列为空,移除操作会被阻塞,直到有元素可用。
此外,BlockingQueue
还提供了非阻塞操作(如 offer()
和 poll()
)和带超时的阻塞操作(如 offer(E e, long timeout, TimeUnit unit)
和 poll(long timeout, TimeUnit unit)
)。
线程安全机制
BlockingQueue
的实现类通过以下方式保证线程安全:
- 锁机制:使用内部锁(如 ReentrantLock)来保护队列的插入和移除操作。
- 条件变量:使用条件变量(如
Condition
)来实现阻塞和唤醒机制。例如,当队列为空时,移除操作会等待“非空”条件;当队列满时,插入操作会等待“非满”条件。
阻塞机制
BlockingQueue
的阻塞机制基于条件变量和锁:
插入阻塞:
- 当队列满时,调用
put()
方法的线程会被阻塞。 - 线程会被放入“非满”条件的等待队列中,直到有空间可用。
- 当有线程调用
take()
或poll()
从队列中移除元素后,会唤醒“非满”条件上的一个或多个线程。
移除阻塞
- 当队列为空时,调用
take()
方法的线程会被阻塞。 - 线程会被放入“非空”条件的等待队列中,直到有元素可用。
- 当有线程调用
put()
向队列中插入元素后,会唤醒“非空”条件上的一个或多个线程。
工作流程
初始化
- 创建
BlockingQueue
实例时,可以指定容量(对于有界队列)。 - 初始化锁和条件变量。
插入操作
- 生产者线程调用
put()
方法。 - 如果队列未满,插入元素并唤醒等待在
notEmpty
条件上的线程。 - 如果队列已满,当前线程被阻塞,直到有空间可用。
移除操作
- 消费者线程调用
take()
方法。 - 如果队列非空,移除元素并唤醒等待在
notFull
条件上的线程。 - 如果队列为空,当前线程被阻塞,直到有元素可用。
关键点
锁分离
LinkedBlockingQueue
使用两个锁(putLock
和 takeLock
)分别保护插入和移除操作,减少了锁竞争,提高了并发性能。
条件变量
使用条件变量实现阻塞和唤醒机制,确保线程在合适的时间被唤醒。
线程安全
所有操作都是线程安全的,无需额外的同步措施。
阻塞与超时
提供了阻塞操作(如 put()
和 take()
)和带超时的阻塞操作(如 offer(E e, long timeout, TimeUnit unit)
和 poll(long timeout, TimeUnit unit)
),方便在不同场景下使用。
总结
BlockingQueue
的原理基于锁和条件变量,通过阻塞和唤醒机制实现线程间的同步。它的实现类(如 LinkedBlockingQueue
)通过分离锁和条件变量,减少了锁竞争,提高了并发性能。这种设计使得 BlockingQueue
非常适合用于生产者-消费者模型和其他需要线程间同步的场景。
SynchronousQueue 同步队列
没用容量
进去一个元素,必须等取出来之后,才能继续放下一个元素
代码演示:
package com.yw.bq;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + "put 1");
queue.put("1");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "put 2");
queue.put("2");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "put 3");
queue.put("3");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "get " + queue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "get " + queue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "get " + queue.take());
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"T2").start();
}
}
原理
SynchronousQueue
是 Java 并发包(java.util.concurrent
)中提供的一种特殊的阻塞队列,它的独特之处在于它不存储元素。也就是说,它不能像其他阻塞队列(如 ArrayBlockingQueue
或 LinkedBlockingQueue
)那样在内部缓存数据。相反,SynchronousQueue
的作用更像是一个“直接传递”的通道,生产者线程必须等待消费者线程准备好接收数据,反之亦然。
核心概念
SynchronousQueue
的核心在于:
- 直接传递:生产者线程将数据传递给消费者线程时,必须直接将数据交给消费者,而不能将数据存储在队列中。
- 一对一匹配:每个插入操作(
put
)必须等待一个移除操作(take
),反之亦然。生产者和消费者必须“同步”操作。
主要方法
SynchronousQueue
实现了 BlockingQueue
接口,因此提供了以下常用方法:
void put(E e)
:将一个元素放入队列。如果当前没有消费者线程等待接收数据,则阻塞。E take()
:从队列中移除并返回一个元素。如果当前没有生产者线程等待传递数据,则阻塞。boolean offer(E e)
:尝试将一个元素放入队列,如果当前有消费者线程等待,则成功传递并返回true
;否则返回false
。E poll()
:尝试从队列中移除并返回一个元素,如果当前有生产者线程等待,则成功接收并返回数据;否则返回null
。
底层实现
SynchronousQueue
的底层实现基于 AQS(AbstractQueuedSynchronizer),它通过共享锁的方式实现线程间的同步。
- 锁机制:
SynchronousQueue
使用一个共享锁来协调生产者和消费者线程。 - 队列结构:虽然它不存储元素,但它维护了两个队列:
- 匹配机制:
-
当一个生产者线程调用
put()
方法时,它会被放入生产者队列,并等待消费者线程调用take()
。 -
当一个消费者线程调用
take()
方法时,它会被放入消费者队列,并等待生产者线程调用put()
。 -
一旦生产者和消费者匹配成功,数据就会直接从生产者传递给消费者,然后双方线程都会被唤醒并继续执行。
工作流程
1、生产者线程调用 put()
:
- 如果当前有消费者线程在等待(调用了
take()
),则直接将数据传递给消费者,双方线程都被唤醒。 - 如果没有消费者线程等待,则生产者线程被阻塞,放入生产者队列。
2、消费者线程调用 take()
:
- 如果当前有生产者线程在等待(调用了
put()
),则直接从生产者接收数据,双方线程都被唤醒。 - 如果没有生产者线程等待,则消费者线程被阻塞,放入消费者队列。
3、匹配成功:
- 当生产者和消费者线程匹配成功后,数据直接从生产者传递给消费者,双方线程继续执行。
总结
SynchronousQueue
是一种特殊的阻塞队列,它通过直接传递数据的方式实现生产者和消费者之间的同步。它的设计目标是减少数据在队列中的停留时间,提高数据传递的效率。虽然它不适合需要缓存数据的场景,但在直接传递数据或对延迟要求较高的场景中非常有用。