JUC 学习笔记
本文为笔者对 JUC 的学习记录,主要参考了尚硅谷的 JUC 教程
文章目录
- JUC 学习笔记
- 1. JUC 概述
- 什么是 JUC?
- 线程和进程:
- 进程的状态:
- wait 和 sleep:
- 并发和并行:
- 管程:
- 用户线程和守护线程:
- 2. Synchronized 关键字和 Lock 接口
- Synchronized 关键字:
- 多线程的编程步骤(上):
- Lock 接口:
- 创建线程方法:
- Lock 和 synchronized 关键字的区别:
- 3. 线程间通信
- 多线程的编程步骤(中):
- 虚假唤醒问题:
- Lock 实现:
- 4. 线程间定制化通信
- 线程间定制化通信:
- 5. 集合的线程安全
- 集合的线程不安全演示:
- 解决方案1——Vector:
- 解决方案2——Collections:
- 解决方案3——CopyOnWriteArrayList:
- HashSet 解决方案——CopyOnWriteArraySet:
- HashMap 解决方案——ConcurrentHashMap:
- 6. 多线程锁
- 8锁问题:
- 公平锁和非公平锁:
- 可重入锁:
- 死锁:
- 7. Callable 接口
- Runnable 和 Callable 的区别:
- FutureTask:
- 8. JUC 强大的辅助类
- 减少计数 CountDownLatch:
- 循环栅栏 CyclicBarrier:
- 信号量 Semaphore:
- 9. ReentrantReadWriteLock 读写锁
- MySQL 事物隔离级别:
- ReentrantReadWriteLock:
- 为什么用读写锁:
- 锁降级:
- 10. BlockingQueue 阻塞队列
- 阻塞队列:
- 为什么叫阻塞队列:
- 阻塞队列基本架构:
- 阻塞队列实现类分类:
- 常用方法:
- 11. ThreadPool 线程池
- 线程池优势:
- 线程池实现:
- 线程池使用:
- ThreadPoolExecutor 七种参数:
- 拒绝策略:
- 创建线程池:
- 12. Fork/Join 分支合并框架
- Fork/Join 操作:
- 13. CompletableFuture 异步回调
- 同步和异步:
- CompletableFuture:
1. JUC 概述
什么是 JUC?
java.util.concurrent 工具包的简称,是一个处理线程的工具包,JDK1.5 开始出现的
线程和进程:
- 进程 Process 是系统进行资源分配和调度的基本单位,是线程的容器。进程是程序的实体,是程序在某一数据集合上的一次活动
- 线程 Thread 是操作系统执行的最小单位,是进程中实际运作的单位,是进程中的一个单一顺序控制流。一个进程可以有多个线程,每个线程执行不同的任务
进程的状态:
Thread.State 是一个枚举类,包含6种信息:NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED
WAITING 和 TIMED_WAITING 区别:WAITING(不见不散),TIMED_WAITING(过时不候)
wait 和 sleep:
- sleep 是 Thread 的静态方法,wait 是 Object 的方法,任何实例对象都能调用
- sleep 不释放锁也不占用锁,wait 会释放锁,但调用的前提是当前线程占有锁(也就是代码要在 synchronized 中)
- 都可以被 interrupted 方法中断
并发和并行:
并行对应的是串行。并发是多个线程对应一个点(春运抢票、电商秒杀),并行是多个工作一起执行,后面再汇总(边泡面边烧水边看 b 站)
管程:
管程 Monitor 监视器,所说的锁就是监视器,是一种同步机制,保证同一时间只有一个线程访问被保护的数据或代码。JVM 的同步是基于进入和退出实现的,使用管程对象实现
用户线程和守护线程:
用户线程是平时使用的线程,自定义线程。守护线程是特殊线程,运行在后台,如垃圾回收线程
- 主线程结束了,用户线程还在运行,那么 JVM 还在存活状态
- 没有用户线程了,都是守护线程,那么 JVM 结束。(设置守护线程要在 start 前)
2. Synchronized 关键字和 Lock 接口
Synchronized 关键字:
- 修饰代码块:synchronized { 代码 },里面的代码叫同步语句块,作用于调用这个代码块的对象
- 修饰方法:作用于调用方法的对象
- 修饰静态方法:作用于这个类的所有对象
- 修饰类:作用于这个类的所有对象
多线程的编程步骤(上):
有一套固定套路
- 第一步:创建资源类,在资源类创建属性和操作方法(买一个空调,内容不能更改,有啥用啥)
- 第二步:创建多个线程,调用资源类的操作方法(3个售票员,卖30张票)
package com.bluestragglers.juc.sync;
// 第一步,创建资源类,定义属性和操作方法
class Ticket {
// 票数
private int number = 30;
// 卖票方法
public synchronized void sale() {
// 判断是否有票
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余:" + number);
}
}
}
public class SaleTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
// 卖票
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}
}, "AA");
Thread thread2 = new Thread(thread1, "BB");
Thread thread3 = new Thread(thread1, "CC");
thread1.start();
thread2.start();
thread3.start();
}
}
需要注意的是,这里的 synchronized 关键字的作用是避免两个人卖同一张票
Synchronized 是自动上锁和解锁。要想实现手动上锁和解锁,可以用 JUC 的 Lock 接口
Lock 接口:
提供了比 synchronized 功能更强大,更好扩展。Lock 是一个接口,实现类有 ReentrantLock, ReentrantReadWriteLock, ReadLock, ReentrantReadWriteLock, WriteLock 等。可重入锁:公共厕所
package com.bluestragglers.juc.lock;
import java.util.concurrent.locks.ReentrantLock;
class LTicket {
// 票数
private int number = 30;
// 创建可重入锁
private final ReentrantLock lock = new ReentrantLock();
// 卖票方法
public void sale() {
// 上锁
lock.lock();
try {
// 判断是否有票
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余:" + number);
}
} finally {
// 释放锁
lock.unlock();
}
}
}
public class LSaleTicket {
public static void main(String[] args) {
LTicket lTicket = new LTicket();
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 40; i++) {
lTicket.sale();
}
}, "AA");
Thread thread2 = new Thread(thread1, "BB");
Thread thread3 = new Thread(thread1, "CC");
thread1.start();
thread2.start();
thread3.start();
}
}
调用 start 后,线程是否会马上生成?不会,因为 start 方法内部调用了 start0() 方法,这是个 native 方法,具体什么时候生成线程,要看操作系统创建
创建线程方法:
- 继承 Thread 类
- 实现 Runnable 接口
- 使用 Callable 接口
- 使用线程池
Lock 和 synchronized 关键字的区别:
- Lock 是一个接口,而 synchronized 是一个关键字,是内置的语言实现
- synchronized 发生异常时,会自动释放线程占有的锁,因此不会发生死锁。Lock 发生异常时如果不调用 unLock() 释放锁,会容易造成死锁。所以 Lock 用 try finally 写并在 finally 中释放锁
- Lock 可以让等待锁的线程响应终端,synchronized 不行,等待的线程会一直等待下去,不能响应中断
- Lock 可以知道有没有成功获取锁,而 synchronized 不行
- Lock 可以提高多个线程进行读操作的效率
- 性能上来说,如果资源竞争不激烈,两者性能差不多。而资源竞争激烈时,Lock 性能远远优于 synchronized
3. 线程间通信
多线程的编程步骤(中):
- 创建资源类
- 在资源类操作方法
- 判断(注意防止虚假唤醒)
- 干活
- 通知
- 创建多个线程,调用资源类的操作方法
样例:两个线程,对同一个初始值为0的变量操作。一个线程+1,一个-1,一直为0
方法:使用 synchronized 关键字。内部用 Object 的 wait() 和 notifyAll() 方法实现等待和通知
package com.bluestragglers.juc.sync;
class Share {
private int num = 0;
public synchronized void incr() throws InterruptedException {
if (num != 0) {
this.wait(); // 等待
}
num++; // 干活
System.out.println(Thread.currentThread().getName() + " " + num);
this.notifyAll(); // 唤醒其他线程
}
public synchronized void decr() throws InterruptedException {
if (num != 1) {
this.wait(); // 等待
}
num--; // 干活
System.out.println(Thread.currentThread().getName() + " " + num);
this.notifyAll(); // 唤醒其他线程
}
}
public class ThreadDemo1 {
public static void main(String[] args) {
Share share = new Share();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
}
}
虚假唤醒问题:
上面的例子,假如 AA 和 BB 是 +1,CC 和 DD 是 -1。假如 AA 之后是 BB 执行,那 BB 会 wait(),这时 AA 又执行完一次唤醒了 BB,那么 BB 就会从这个 wait() 这里被唤醒,然后跳过 if 判断执行后续操作,判断失效了。wait() 的特点是哪里睡哪里醒
解决方案:不使用 if,使用 while,避免虚假唤醒
Lock 实现:
package com.bluestragglers.juc.lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Share {
private int num = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
// +1
public void incr() {
lock.lock();
try {
while (num != 0) { // 判断
condition.await(); // 等待
}
num++; // 干活
System.out.println(Thread.currentThread().getName() + " " + num);
condition.signalAll(); // 唤醒其他线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// -1
public void decr() {
lock.lock();
try {
while (num != 1) { // 判断
condition.await(); // 等待
}
num--; // 干活
System.out.println(Thread.currentThread().getName() + " " + num);
condition.signalAll(); // 唤醒其他线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ThreadDemo2 {
public static void main(String[] args) {
Share share = new Share();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
share.incr();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
share.decr();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
share.decr();
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
share.incr();
}
}, "D").start();
}
}
4. 线程间定制化通信
线程间定制化通信:
流程:启动三个线程,按照要求:AA 打印 5 次,BB 10 次,CC 15 次,进行 10 轮
方案:设置一个标志位 flag,同时设置三个锁 Condition,分别控制 AA, BB 和 CC 的关系。通过 condition.await() 等待信号,同时通过 condition.signal() 释放信号
package com.bluestragglers.juc.lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ShareResource {
// 定义标志位
private int flag = 1; // 1 AA, 2 BB, 3 CC
// 创建 Lock 锁
private Lock lock = new ReentrantLock();
// 创建三个 condition,相当于三把钥匙
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
// 打印 AA 的方法
public void print5() {
// 上锁
lock.lock();
try {
while (flag != 1) {
// 等待
condition1.await();
}
// 干活
for (int i = 1; i <= 5; ++i) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 通知
flag = 2; // 修改标志位
condition2.signal(); // 通知 BB 线程
} catch (Exception e) {
e.printStackTrace();
} finally {
// 解锁
lock.unlock();
}
}
// 打印 BB 的方法
public void print10() {
// 上锁
lock.lock();
try {
while (flag != 2) {
// 等待
condition2.await();
}
// 干活
for (int i = 1; i <= 10; ++i) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 通知
flag = 3; // 修改标志位
condition3.signal(); // 通知 CC 线程
} catch (Exception e) {
e.printStackTrace();
} finally {
// 解锁
lock.unlock();
}
}
// 打印 CC 的方法
public void print15() {
// 上锁
lock.lock();
try {
while (flag != 3) {
// 等待
condition3.await();
}
// 干活
for (int i = 1; i <= 15; ++i) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 通知
flag = 1; // 修改标志位
condition1.signal(); // 通知 AA 线程
} catch (Exception e) {
e.printStackTrace();
} finally {
// 解锁
lock.unlock();
}
}
}
public class ThreadDemo03 {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(() -> {
for (int i = 1; i <= 10; ++i) {
shareResource.print5();
}
}, "AA").start();
new Thread(() -> {
for (int i = 1; i <= 10; ++i) {
shareResource.print10();
}
}, "BB").start();
new Thread(() -> {
for (int i = 1; i <= 10; ++i) {
shareResource.print15();
}
}, "CC").start();
}
}
5. 集合的线程安全
集合的线程不安全演示:
List<String> list = new ArrayList<>();
for (int i = 0; i < 30; ++i) {
new Thread(() -> {
list.add(Thread.currentThread().getName());
System.out.println(list);
}, String.valueOf(i)).start();
}
执行可能会报 ConcurrentModificationException 错(Java 13 之前),原因是进行了并发修改
解决方案1——Vector:
原理:加了 synchronized 关键字
解决方案2——Collections:
原理:Collections 工具类提供了许多方法,其中 Collections.synchronizedList(new ArrayList<>()) 这样就可以同步了
解决方案3——CopyOnWriteArrayList:
这个方法是最常用的!多用这个方法
原理:写时复制技术。先用 lock 给对象上锁,然后复制一个相同内容的集合,独立写,最后将原容器的引用指向新副本
HashSet 解决方案——CopyOnWriteArraySet:
HashSet 是根据 HashMap 构建的,它的内容就是 HashMap 的 key。同样可以使用 CopyOnWriteArraySet 解决这个问题。
HashMap 解决方案——ConcurrentHashMap:
JDK 1.7:volatile + synchronized + segment
- volatile:确保所有线程看到这个变量的值是一致的,同时禁止指令重排
- volatile 保证可见性和有序性,但不保证原子性
- volatile 具体实现过程:强制变量将修改的值写入主存,同时线程 B 修改时会让线程 A 的缓存行无效,这样线程 A 就必须去主存读取
- synchronized:实现原子性
- segment:segment 继承了 ReentrantLock,能配合 synchronized 实现互斥同步
- 优势:对单个 segment[i] 加锁,也就是如果 segment 有 16 个,就支持最多 16 个线程并发
使用 volatile 的场景:
- 状态标记量
- 单例模式 double check
JDK 1.8:CAS + synchronized + 红黑树
- CAS:乐观锁的一种实现。悲观锁是每次执行前都上锁,是阻塞同步。乐观锁是先执行,如果执行时发生了冲突再采取其他策略,是非阻塞同步
- JUC 的很多类采用了 Unsafe 类的 CAS 操作
- synchronized:当 table[i] 不为空,并且没有线程正在扩容的时候,进入 table[i] 并使用 synchronized 上锁,然后判断是链表还是红黑树,然后将内容放进来
6. 多线程锁
8锁问题:
package com.bluestragglers.juc.sync;
import java.util.concurrent.TimeUnit;
class Phone {
public static synchronized void sendSMS() throws Exception {
//停留4秒
TimeUnit.SECONDS.sleep(4);
System.out.println("------sendSMS");
}
public synchronized void sendEmail() throws Exception {
System.out.println("------sendEmail");
}
public void getHello() {
System.out.println("------getHello");
}
}
/**
* @Description: 8锁
*
1 标准访问,先打印短信还是邮件
------sendSMS
------sendEmail
2 停4秒在短信方法内,先打印短信还是邮件
------sendSMS
------sendEmail
3 新增普通的hello方法,是先打短信还是hello
------getHello
------sendSMS
4 现在有两部手机,先打印短信还是邮件
------sendEmail
------sendSMS
5 两个静态同步方法,1部手机,先打印短信还是邮件
------sendSMS
------sendEmail
6 两个静态同步方法,2部手机,先打印短信还是邮件
------sendSMS
------sendEmail
7 1个静态同步方法,1个普通同步方法,1部手机,先打印短信还是邮件
------sendEmail
------sendSMS
8 1个静态同步方法,1个普通同步方法,2部手机,先打印短信还是邮件
------sendEmail
------sendSMS
*/
public class Lock_8 {
public static void main(String[] args) throws Exception {
Phone phone = new Phone();
Phone phone2 = new Phone();
new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "AA").start();
Thread.sleep(100);
new Thread(() -> {
try {
// phone.sendEmail();
// phone.getHello();
phone2.sendEmail();
} catch (Exception e) {
e.printStackTrace();
}
}, "BB").start();
}
}
问题 1 和 2:使用的 synchronized 方法锁了方法,也就是锁了当前对象,所以按照顺序执行
问题 3:hello 没有使用 synchronized 方法,所以不被锁对象,所以可以先执行
问题 4:两个对象,所以不被锁对象影响,所以不按顺序执行
问题 5 和 6:static synchronized 连用,锁了同一把类锁,所以两个对象按照顺序执行
问题 7 和 8:static synchronized 锁了一把类锁,但是不影响 synchronized 锁对象,所以不按照顺序执行。换句话说,类锁像一个公司大门,只有一把锁,但是公司里的房间的锁 (对象的锁)有很多把,所以互相不影响,两个对象不按顺序执行
公平锁和非公平锁:
ReentrantLock:默认是非公平锁,一个线程把所有活都干了,其他线程都被饿死。如果提供一个 true 参数,就成了公平锁
- 非公平锁:效率高,但是线程饿死
- 公平锁:效率低,但是阳光普照
- 非公平锁实现:直接 lock,执行操作
- 公平锁实现:队列
可重入锁:
synchronized 和 Lock 都是可重入锁。synchronized 是自动的,Lock 需要手动获取和释放锁。synchronized 的锁可以拿着钥匙来回开,像下面的例子就因为可以来回开,所以会报错
package com.bluestragglers.juc.sync;
public class SyncLockDemo {
public synchronized void add() {
add();
}
public static void main(String[] args) {
new Thread(() -> {
new SyncLockDemo().add();
}, "t1").start();
Object o = new Object();
new Thread(() -> {
synchronized (o) {
System.out.println(Thread.currentThread().getName() + "外层");
synchronized (o) {
System.out.println(Thread.currentThread().getName() + "中层");
synchronized (o) {
System.out.println(Thread.currentThread().getName() + "内层");
}
}
}
}, "t1").start();
}
}
同理,Lock 需要手动获取锁和释放锁。如果没有释放,就会出现死锁
死锁:
造成死锁的四个必要条件:
- 互斥
- 不可抢占
- 占有且等待(请求保持)
- 多个线程形成循环等待链(循环等待)
验证是否会发生死锁:
- jps 命令(查看进程号,类似 Linux 的 ps -ef)
- jstack(jvm 自带的堆栈跟踪工具)
首先通过 jps 获取进程号,然后通过 jstack+进程号,分析是否出现死锁
7. Callable 接口
Runnable 和 Callable 的区别:
- Runnable 在线程终止时,也就是 run() 完成时,无法返回线程结果。Callable 则会可以返回结果
- Runnable 需要实现不返回任何内容的 run() 方法,Callable 需要实现(重写)完成时返回结果的 call() 方法
- run() 方法不能引发异常,call() 可以
- Thread 的构造方法只支持 Runnable,不支持 Callable。FutureTask 实现了 Runnable,同时构造方法支持 Callable,所以可以用 FutureTask 构造 Thread
FutureTask:
功能包括:
- 老师上课口渴了,课不能停,新开了一个班长线程买水,买完水后老师线程可以随时 get 水
- 4 个同学计算,第 2 个同学计算量特别大,可以先汇总 1、3、4 的答案,最后等 2 的答案回来统一汇总
- 考试,首先做会做的题目,最后做不会做的题目
同时需要注意,子线程计算完一次后会将返回的结果保存,后面再调用就直接返回
package com.bluestragglers.juc.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
class MyThread1 implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " come in runnable");
}
}
class MyThread2 implements Callable {
@Override
public Integer call() throws Exception {
return 200;
}
}
public class Demo01 {
public static void main(String[] args) {
new Thread(new MyThread1(), "AA").start();
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName() + " come in callable");
return 1024;
});
new Thread(futureTask, "BB").start();
}
}
FutureTask 只计算一次:
8. JUC 强大的辅助类
减少计数 CountDownLatch:
主要方法:
构造方法设初始值,countDown() 每次减一,await() 阻塞,直到 0 时才允许线程执行
例子:教室六个同学先走,班长最后锁门再走
// 先设置一个 CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch(6);
// 6个同学离开教室
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 离开教室");
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
try {
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "\t 班长关门走人");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
不使用 CountDownLatch,就会出现班长可能先走的情况
使用了 CountDownLatch,就能避免班长先走
循环栅栏 CyclicBarrier:
CyclicBarrier 构造方法设置障碍数。首先跨域障碍数为 0,每次执行完都加一,最后到达障碍数了执行 await() 后的语句。按照官方的说法是到达终点的线程数量达到要求后执行后续动作
提供 Runnable 的构造方法:启动
例子:集齐七颗龙珠即可满足愿望
private static final int DRAGON_BALL_COUNT = 7;
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(DRAGON_BALL_COUNT, () -> {
System.out.println("召唤神龙");
});
for (int i = 1; i <= DRAGON_BALL_COUNT; ++i) {
int finalI = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "收集到第" + finalI + "颗龙珠");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
信号量 Semaphore:
常用内容:构建方法,设置信号量数量、acquire() 拿一个信号量、release() 释放一个信号量
例子:6 辆汽车抢 3 个车位
// 3 个车位
Semaphore semaphore = new Semaphore(3);
// 6 辆汽车
for (int i = 1; i <= 6; ++i) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "\t抢到车位");
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "\t离开车位");
semaphore.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
9. ReentrantReadWriteLock 读写锁
MySQL 事物隔离级别:
读取未提交、读取已提交、可重复读、串行化
脏读(读取了还没有提交的数据)、不可重复读(一个事务范围内相同的两次查询得到了不同数据)、幻读(多次读取返回的结果集不一样,如增加或减少了行数据)
幻读原因:两个线程不会修改正在修改或阅读的记录,但可能会增加和删除几个记录,造成记录条数不一样
ReentrantReadWriteLock:
可以通过 readLock() 和 writeLock() 上读写锁
例子:建一个 map,同时设置 2 个线程,分别读和写 map
没有读写锁时,会出现问题
加上之后就没有问题了。可以发现,写是独占的,读是共享的
为什么用读写锁:
一阶段:无锁,多个线程抢夺资源,造成混乱
二阶段:独占锁 synchronized 和 ReentrantLock,读不能共享
三阶段:读写锁 ReentrantReadWriteLock,读可以共享,写不能共享。但是仍然存在问题,就是锁饥饿问题
锁降级:
写操作肯定要同时保留读锁和写锁,锁降级就是把写锁释放了,只保留读锁,这样其他线程也可以来读了
同时要注意,读锁不能升级为写锁。简单来说就是写的时候可以读,读的时候不能写
reentrantReadWriteLock.readLock().lock();
System.out.println("主线程读取");
reentrantReadWriteLock.writeLock().lock();
System.out.println("主线程写入");
reentrantReadWriteLock.writeLock().lock();
System.out.println("主线程写入");
reentrantReadWriteLock.readLock().lock();
System.out.println("主线程读取");
10. BlockingQueue 阻塞队列
阻塞队列:
当队列为空时,获取元素操作会被阻塞,直到其他线程往空队列中插入了新元素。添加元素也是类似的,队列满的时候被阻塞,直到其他线程移除队列中的元素
为什么叫阻塞队列:
在多线程中,有的时候会将线程阻塞,直到其他线程唤醒。阻塞队列就是这样的,队列满的时候添加线程阻塞,直到其他线程移除元素并唤醒这个添加线程。读取线程也是类似的情况
阻塞队列不需要手动阻塞和唤醒线程,BlockingQueue 都包办了
阻塞队列可以用于生产者消费者情况
阻塞队列基本架构:
阻塞队列 BlockingQueue 是一个接口,实现类有许多 ArrayBlockingQueue …
阻塞队列实现类分类:
ArrayBlockingQueue, LinkedBlockingQueue
PriorityBlockingQueue
DelayQueue:使用优先级队列、带延迟的、无界的阻塞队列
SynchronousQueue:不存储元素的、单个的阻塞队列
常用方法:
可以发现,真正想使用阻塞队列的特色功能,需要用 put(e)、take() 方法,或者 offer(e, time, unit)、poll(time, unit) 方法
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
System.out.println(blockingQueue.take());
blockingQueue.put("d");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
11. ThreadPool 线程池
线程池优势:
- 降低资源消耗
- 提高响应速度
- 提高资源的可管理性
线程池实现:
线程池通过 Executor 框架实现,这个框架使用了 Executor、Executors、ExecutorService、ThreadPoolExecutor 这几个类
线程池使用:
常用方法:Executors 提供了下面的方法,其中 newCachedThreadPool() 可以根据需求创建,可扩容;newFixedThreadPool(int) 可以提供固定数量的线程;newSingleThreadPool() 是单线程池单线程,一个任务一个任务执行。它们都返回一个 ExecutorService 对象,可以通过 execute(Runnable) 方法执行
例子:有 5 个窗口,10 个人办理业务
通过 Executors.newFixedThreadPool 获取 ExecutorService,然后执行 execute(Runnable) 方法调用线程池中的线程执行内容
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
ExecutorService threadPool3 = Executors.newCachedThreadPool();
for (int i = 1; i <= 10; ++i) {
threadPool1.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t办理业务");
});
}
ThreadPoolExecutor 七种参数:
打开 Executors,可以发现其实它们都是通过 ThreadPoolExecutor 实现的功能
corePoolSize:在线程池中保留的线程数量,即使线程是空闲状态也保留,除非设置了 allowCoreThreadTimeOut
maximumPoolSize:线程池中允许的最大线程数量
keepAliveTime:线程数大于 corePoolSize 了,就允许空闲状态线程最多保留的时间,超过时间就终止线程
unit:时间单位
workQueue:等待任务执行的队列。这个队列只保留 execute() 提交的 Runnable 任务
threadFactory:生成新线程的工厂
handler:当线程达到最大数量同时队列容量也达到最大数量时的解决方案
拒绝策略:
四种。抛出异常、退回调用者执行、抛弃最早的任务、丢弃任务
创建线程池:
阿里要求不能用 Executors 创建线程池,因为可能会造成 OOM 异常。所有线程池都要用 ThreadPoolExecutor 自定义创建
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
for (int i = 1; i <= 10; ++i) {
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t办理业务");
});
}
threadPoolExecutor.shutdown();
12. Fork/Join 分支合并框架
Fork/Join 操作:
Fork 是把任务拆分,Join 是把任务合并。把复杂任务拆分成小任务,然后并行执行,最后合并结果
上面的例子中,通过 fork() 方法构建分支,然后通过 join() 方法合并
例子:1+2+3+…+100,要求相加的两个数差值不能超过 10
class MyTask extends RecursiveTask<Integer> {
private static final Integer VALUE = 10;
private int begin, end, result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
if (this.end - this.begin <= VALUE) {
for (int i = this.begin; i <= this.end; ++i) {
this.result += i;
}
} else {
int middle = (this.begin + this.end) / 2;
MyTask task01 = new MyTask(this.begin, middle);
MyTask task02 = new MyTask(middle + 1, this.end);
task01.fork();
task02.fork();
this.result = task01.join() + task02.join();
}
return this.result;
}
}
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask myTask = new MyTask(0, 100);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
Integer result = forkJoinTask.get();
System.out.println(result);
forkJoinPool.shutdown();
}
}
需要注意:
- 构建的类需要继承 RecursiveTask 方法
- 需要构建 ForkJoinPool 对象,并通过 ForkJoinPool.submit(Task) 提供任务,最后通过 ForkJoinPool.get() 获取结果
- 分治时使用 Task.fork(),获取结果时使用 Task.join()
13. CompletableFuture 异步回调
同步和异步:
CompletableFuture 是用来做异步回调的,也就是 A 同学不在,在了给我打电话
Future 接口:异步回调。CompletableFuture 实现了这个接口,因此可以做异步回调
CompletableFuture:
常用功能:没有返回值的 runAsync(Runnable) 和有返回值的 supplyAsync(Supplier)
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + " CompletableFuture.runAsync");
});
completableFuture.get();
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " CompletableFuture.supplyAsync");
int i = 10 / 0;
return 1024;
});
completableFuture2.whenComplete((t, u) -> {
System.out.println("t: " + t);
System.out.println("u: " + u);
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 233;
}).get();