一. 线程池的核心参数(线程池的执行原理)
线程池核心参数主要参考ThreadPoolExecutor这个类的7个参数的构造函数
-
corePoolSize 核心线程数目
-
maximumPoolSize 最大线程数目 = (核心线程+救急线程的最大数目)
-
keepAliveTime 生存时间 - 救急线程的生存时间,生存时间内没有新任务,此线程资源会释放
-
unit 时间单位 - 救急线程的生存时间单位,如秒、毫秒等
-
workQueue - 当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务
-
threadFactory 线程工厂 - 可以定制线程对象的创建,例如设置线程名字、是否是守护线程等
-
handler 拒绝策略 - 当所有线程都在繁忙,workQueue 也放满时,会触发拒绝策略
1.1 工作流程
拒绝策略:
1.AbortPolicy:直接抛出异常,默认策略;
2.CallerRunsPolicy:用调用者所在的线程来执行任务;
3.DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4.DiscardPolicy:直接丢弃任务;
public class TestThreadPoolExecutor {
static class MyTask implements Runnable {
private final String name;
private final long duration;
public MyTask(String name) {
this(name, 0);
}
public MyTask(String name, long duration) {
this.name = name;
this.duration = duration;
}
@Override
public void run() {
try {
LoggerUtils.get("myThread").debug("running..." + this);
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "MyTask(" + name + ")";
}
}
public static void main(String[] args) throws InterruptedException {
AtomicInteger c = new AtomicInteger(1);
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(2);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
3,
0,
TimeUnit.MILLISECONDS,
queue,
r -> new Thread(r, "myThread" + c.getAndIncrement()),
new ThreadPoolExecutor.AbortPolicy());
showState(queue, threadPool);
threadPool.submit(new MyTask("1", 3600000));
showState(queue, threadPool);
threadPool.submit(new MyTask("2", 3600000));
showState(queue, threadPool);
threadPool.submit(new MyTask("3"));
showState(queue, threadPool);
threadPool.submit(new MyTask("4"));
showState(queue, threadPool);
threadPool.submit(new MyTask("5",3600000));
showState(queue, threadPool);
threadPool.submit(new MyTask("6"));
showState(queue, threadPool);
}
private static void showState(ArrayBlockingQueue<Runnable> queue, ThreadPoolExecutor threadPool) {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<Object> tasks = new ArrayList<>();
for (Runnable runnable : queue) {
try {
Field callable = FutureTask.class.getDeclaredField("callable");
callable.setAccessible(true);
Object adapter = callable.get(runnable);
Class<?> clazz = Class.forName("java.util.concurrent.Executors$RunnableAdapter");
Field task = clazz.getDeclaredField("task");
task.setAccessible(true);
Object o = task.get(adapter);
tasks.add(o);
} catch (Exception e) {
e.printStackTrace();
}
}
LoggerUtils.main.debug("pool size: {}, queue: {}", threadPool.getPoolSize(), tasks);
}
}
总结
在线程池中一共有7个核心参数:
corePoolSize 核心线程数目 - 池中会保留的最多线程数
maximumPoolSize 最大线程数目 - 核心线程+救急线程的最大数目
keepAliveTime 生存时间 - 救急线程的生存时间,生存时间内没有新任务,此线程资源会释放
unit 时间单位 - 救急线程的生存时间单位,如秒、毫秒等
workQueue - 当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务
threadFactory 线程工厂 - 可以定制线程对象的创建,例如设置线程名字、是否是守护线程等
handler 拒绝策略 - 当所有线程都在繁忙,workQueue 也放满时,会触发拒绝策略
拒绝策略有4种,当线程数过多以后,第一种是抛异常、第二种是由调用者执行任务、第三是丢弃当前的任务,第四是丢弃最早排队任务。默认是直接抛异常。
二. 线程池中常见的阻塞队列
workQueue - 当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务
比较常见的有4个,用的最多是ArrayBlockingQueue和LinkedBlockingQueue
1.ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。
2.LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。
3.DelayedWorkQueue :是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的
4.SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作。
区别
左边是LinkedBlockingQueue加锁的方式,右边是ArrayBlockingQueue加锁的方式
-
LinkedBlockingQueue读和写各有一把锁,性能相对较好
-
ArrayBlockingQueue只有一把锁,读和写公用,性能相对于LinkedBlockingQueue差一些
总结
Jdk中提供了很多阻塞队列,开发中常见的有两个:
ArrayBlockingQueue
和LinkedBlockingQueue
ArrayBlockingQueue
和LinkedBlockingQueue
是Java中两种常见的阻塞队列,它们在实现和使用上有一些关键的区别。首先,
ArrayBlockingQueue
是一个有界队列,它在创建时必须指定容量,并且这个容量不能改变。而LinkedBlockingQueue
默认是无界的,但也可以在创建时指定最大容量,使其变为有界队列。其次,它们在内部数据结构上也有所不同。
ArrayBlockingQueue
是基于数组实现的,而LinkedBlockingQueue
则是基于链表实现的。这意味着ArrayBlockingQueue
在访问元素时可能会更快,因为它可以直接通过索引访问数组中的元素。而LinkedBlockingQueue
则在添加和删除元素时可能更快,因为它不需要移动其他元素来填充空间。另外,它们在加锁机制上也有所不同。
ArrayBlockingQueue
使用一把锁来控制对队列的访问,这意味着读写操作都是互斥的。而LinkedBlockingQueue
则使用两把锁,一把用于控制读操作,另一把用于控制写操作,这样可以提高并发性能。
三. 核心线程数
执行线程池执行任务的类型
-
IO密集型任务
一般来说:文件读写、DB读写、网络请求等
推荐:核心线程数大小设置为2N+1 (N为计算机的CPU核数)
-
CPU密集型任务
一般来说:计算型代码、Bitmap转换、Gson转换等
推荐:核心线程数大小设置为N+1 (N为计算机的CPU核数)
java代码查看CPU核数
总结
面试官:如何确定核心线程数?
候选人:
1. 高并发、任务执行时间短 -->( CPU核数+1 ),减少线程上下文的切换
2. 并发不高、任务执行时间长
IO密集型的任务 --> (CPU核数 * 2 + 1)
计算密集型任务 --> ( CPU核数+1 )
3. 并发高、业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,增加服务器是第二步,至于线程池的设置,设置参考(2)(IO密集型的任务 /计算密集型任务)
四. 线程池的种类有哪些
在java.util.concurrent.Executors类中提供了大量创建连接池的静态方法,常见就有四种
-
创建使用固定线程数的线程池
-
核心线程数与最大线程数一样,没有救急线程
-
阻塞队列是LinkedBlockingQueue,最大容量为Integer.MAX_VALUE
-
适用场景:适用于任务量已知,相对耗时的任务
public class FixedThreadPoolCase {
static class FixedThreadDemo implements Runnable{
@Override
public void run() {
String name = Thread.currentThread().getName();
for (int i = 0; i < 2; i++) {
System.out.println(name + ":" + i);
}
}
}
public static void main(String[] args) throws InterruptedException {
//创建一个固定大小的线程池,核心线程数和最大线程数都是3
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.submit(new FixedThreadDemo());
Thread.sleep(10);
}
executorService.shutdown();
}
}
2.单线程化的线程池,它只会用唯一的工作线程来执行任 务,保证所有任务按照指定顺序(FIFO)执行
适用于按照顺序执行的任务
public class NewSingleThreadCase {
static int count = 0;
static class Demo implements Runnable {
@Override
public void run() {
count++;
System.out.println(Thread.currentThread().getName() + ":" + count);
}
}
public static void main(String[] args) throws InterruptedException {
//单个线程池,核心线程数和最大线程数都是1
ExecutorService exec = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
exec.execute(new Demo());
Thread.sleep(5);
}
exec.shutdown();
}
}
3.可缓存线程池
4. 提供了“延迟”和“周期执行”功能的ThreadPoolExecutor
适用于 有定时和延迟执行的任务
总结
面试官:线程池的种类有哪些?
候选人:
在jdk中默认提供了4中方式创建线程池
第一个是:newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回 收空闲线程,若无可回收,则新建线程。
第二个是:newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列 中等待。
第三个是:newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
第四个是:newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任 务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
五. 为什么不建议用Executors创建线程池?
参考阿里开发手册《Java开发手册-嵩山版》
总结
面试官:为什么不建议用Executors创建线程池?
候选人:
其实这个事情在阿里提供的最新开发手册《Java开发手册-嵩山版》中也提到了
主要原因是如果使用Executors创建线程池的话,它允许的请求队列默认长度是Integer.MAX_VALUE,这样的话,有可能导致堆积大量的请求,从而导致OOM(内存溢出)。
所以,我们一般推荐使用ThreadPoolExecutor来创建线程池,这样可以明确规定线程池的参数,避免资源的耗尽。
六. 线程使用场景问题
6.1 CountDownLatch
CountDownLatch(闭锁/倒计时锁)用来进行线程同步协作,等待所有线程完成倒计时(一个或者多个线程,等待其他多个线程完成某件事情之后才能执行)
-
其中构造参数用来初始化等待计数值
-
await() 用来等待计数归零
-
countDown() 用来让计数减一
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//初始化了一个倒计时锁 参数为 3
CountDownLatch latch = new CountDownLatch(3);
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"-begin...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//count--
latch.countDown();
System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount());
}).start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"-begin...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//count--
latch.countDown();
System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount());
}).start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"-begin...");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//count--
latch.countDown();
System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount());
}).start();
String name = Thread.currentThread().getName();
System.out.println(name + "-waiting...");
//等待其他线程完成
latch.await();
System.out.println(name + "-wait end...");
}
}
6.2 案例一 (es数据批量导入)
在我们项目上线之前,我们需要把数据库中的数据一次性的同步到es索引库中,但是当时的数据好像是1000万左右,一次性读取数据肯定不行(oom异常),当时我就想到可以使用线程池的方式导入,利用CountDownLatch来控制,就能避免一次性加载过多,防止内存溢出
整体流程就是通过CountDownLatch+线程池配合去执行
流程图
6.3 案例二(数据汇总)
在一个电商网站中,用户下单之后,需要查询数据,数据包含了三部分:订单信息、包含的商品、物流信息;这三块信息都在不同的微服务中进行实现的,我们如何完成这个业务呢?
6.4 案例三(异步调用)
在进行搜索的时候,需要保存用户的搜索记录,而搜索记录不能影响用户的正常搜索,我们通常会开启一个线程去执行历史记录的保存,在新开启的线程在执行的过程中,可以利用线程提交任务
代码:
总结
嗯~~,我想一下当时的场景[根据自己简历上的模块设计多线程场景]
- 参考场景一:
es数据批量导入
在我们项目上线之前,我们需要把数据量的数据一次性的同步到es索引库中,但是当时的数据好像是1000万左右,一次性读取数据肯定不行(oom异常),如果分批执行的话,耗时也太久了。所以,当时我就想到可以使用线程池的方式导入,利用CountDownLatch+Future来控制,就能大大提升导入的时间。
- 参考场景二:
在我做那个xx电商网站的时候,里面有一个数据汇总的功能,在用户下单之后需要查询订单信息,也需要获得订单中的商品详细信息(可能是多个),还需要查看物流发货信息。因为它们三个对应的分别三个微服务,如果一个一个的操作的话,互相等待的时间比较长。所以,我当时就想到可以使用线程池,让多个线程同时处理,最终再汇总结果就可以了,当然里面需要用到Future来获取每个线程执行之后的结果才行
- 参考场景三:
《黑马头条》项目中使用的
我当时做了一个文章搜索的功能,用户输入关键字要搜索文章,同时需要保存用户的搜索记录(搜索历史),这块我设计的时候,为了不影响用户的正常搜索,我们采用的异步的方式进行保存的,为了提升性能,我们加入了线程池,也就说在调用异步方法的时候,直接从线程池中获取线程使用
七. 控制某个方法允许并发访问线程的数量
Semaphore [ˈsɛməˌfɔr] 信号量,是JUC包下的一个工具类,我们可以通过其限制执行的线程数量,达到限流的效果
当一个线程执行时先通过其方法进行获取许可操作,获取到许可的线程继续执行业务逻辑,当线程执行完成后进行释放许可操作,未获取达到许可的线程进行等待或者直接结束。
Semaphore两个重要的方法
lsemaphore.acquire(): 请求一个信号量,这时候的信号量个数-1(一旦没有可使用的信号量,也即信号量个数变为负数时,再次请求的时候就会阻塞,直到其他线程释放了信号量)
lsemaphore.release():释放一个信号量,此时信号量个数+1
总结
嗯~~,我想一下
在jdk中提供了一个Semaphore[seməfɔːr]类(信号量)
它提供了两个方法,semaphore.acquire() 请求信号量,可以限制线程的个数,是一个正数,如果信号量是-1,就代表已经用完了信号量,其他线程需要阻塞了
第二个方法是semaphore.release(),代表是释放一个信号量,此时信号量的个数+1
八. 谈谈你对ThreadLocal的理解
ThreadLocal是多线程中对于解决线程安全的一个操作类,它会为每个线程都分配一个独立的线程副本从而解决了变量并发访问冲突的问题。ThreadLocal 同时实现了线程内的资源共享
案例:使用JDBC操作数据库时,会将每一个线程的Connection放入各自的ThreadLocal中,从而保证每个线程都在各自的 Connection 上进行数据库的操作,避免A线程关闭了B线程的连接。
8.1 ThreadLocal基本使用
三个主要方法:
-
set(value) 设置值
-
get() 获取值
-
remove() 清除值
public class ThreadLocalTest {
static ThreadLocal<String> threadLocal = new ThreadLocal<>();
public static void main(String[] args) {
new Thread(() -> {
String name = Thread.currentThread().getName();
threadLocal.set("itcast");
print(name);
System.out.println(name + "-after remove : " + threadLocal.get());
}, "t1").start();
new Thread(() -> {
String name = Thread.currentThread().getName();
threadLocal.set("itheima");
print(name);
System.out.println(name + "-after remove : " + threadLocal.get());
}, "t2").start();
}
static void print(String str) {
//打印当前线程中本地内存中本地变量的值
System.out.println(str + " :" + threadLocal.get());
//清除本地内存中的本地变量
threadLocal.remove();
}
}
8.2 ThreadLocal的实现原理&源码解析
ThreadLocal本质来说就是一个线程内部存储类,从而让多个线程只操作自己内部的值,从而实现线程数据隔离
在ThreadLocal中有一个内部类叫做ThreadLocalMap,类似于HashMap
ThreadLocalMap中有一个属性table数组,这个是真正存储数据的位置
8.3 ThreadLocal-内存泄露问题
Java对象中的四种引用类型:强引用、软引用、弱引用、虚引用
-
强引用:最为普通的引用方式,表示一个对象处于有用且必须的状态,如果一个对象具有强引用,则GC并不会回收它。即便堆中内存不足了,宁可出现OOM,也不会对其进行回收
-
弱引用:表示一个对象处于可能有用且非必须的状态。在GC线程扫描内存区域时,一旦发现弱引用,就会回收到弱引用相关联的对象。对于弱引用的回收,无关内存区域是否足够,一旦发现则会被回收
每一个Thread维护一个ThreadLocalMap,在ThreadLocalMap中的Entry对象继承了WeakReference。其中key为使用弱引用的ThreadLocal实例,value为线程变量的副本
总结
面试官:谈谈你对ThreadLocal的理解
候选人: 嗯,是这样的~~
ThreadLocal 主要功能有两个,第一个是可以实现资源对象的线程隔离,让每个线程各用各的资源对象,避免争用引发的线程安全问题,第二个是实现了线程内的资源共享
面试官:好的,那你知道ThreadLocal的底层原理实现吗?
候选人:嗯,知道一些~
在ThreadLocal内部维护了一个一个 ThreadLocalMap 类型的成员变量,用来存储资源对象
当我们调用 set 方法,就是以 ThreadLocal 自己作为 key,资源对象作为 value,放入当前线程的 ThreadLocalMap 集合中
当调用 get 方法,就是以 ThreadLocal 自己作为 key,到当前线程中查找关联的资源值
当调用 remove 方法,就是以 ThreadLocal 自己作为 key,移除当前线程关联的资源值
面试官:好的,那关于ThreadLocal会导致内存溢出这个事情,了解吗?
候选人:嗯,我之前看过源码,我想一下~~
是因为ThreadLocalMap 中的 key 被设计为弱引用,它是被动的被GC调用释放key,不过关键的是只有key可以得到内存释放,而value不会,因为value是一个强引用。
在使用ThreadLocal 时都把它作为静态变量(即强引用),因此无法被动依靠 GC 回收,建议主动的remove 释放 key,这样就能避免内存溢出。