PS:本文的线程池为演示 Demo,皆在理解线程池的工作原理,并没有解决线程安全问题。
最简单一版的线程池
public class MyThreadPool {
// 存放线程,复用已创建的线程
List<Thread> threadList = new ArrayList<>();
public void execute(Runnable command){
Thread thread = new Thread(command);
threadList.add(thread);
thread.start();
}
}
现在有个问题:创建出来的线程,是可复用的吗?
- 答案是否定的。
- 因为线程在执行完
runnable()
就结束,被销毁了。
思考两个问题:
- 线程什么时候创建
- 线程的 runnable() 方法是什么?是参数的 command 吗?
- 线程的
runnable()
是一个死循环,不断从 commandList 取任务执行。 - 参数的 command ,是 commandList(任务队列) 中的任务。
只有一个线程的线程池
简化一下问题:假设线程池现在只有一个线程,该如何设计呢?
- List 中不在存放线程,存放提交的任务。
- Thread 的
runnable()
不断去判断 taskList 中是否有任务。
public class MyThreadPool {
// 存放线程池中的任务
List<Runnable> commandList = new ArrayList<>();
Thread thread = new Thread(() ->{
while (true){
if (!commandList.isEmpty()){
Runnable task = commandList.remove(0);
command.run();
}
}
});
public void execute(Runnable command){
commandList.add(command);
}
}
看似完美的单线程线程池,还有其他问题: while(true){}
循环中,CPU 一直在空转,浪费资源。
有没有一种容器,可以在容器中没有元素的时候阻塞,有元素的时候在获取?
- 有的。 阻塞队列。
public class MyThreadPool {
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(1024);
Thread thread = new Thread(() ->{
while (true){
try {
// 一直阻塞,直到取出元素
Runnable command = blockingQueue.take();
command.run();
} catch (InterruptedException e) {
// 线程在阻塞、休眠时 被打断,都会抛出 InterruptedException 异常
throw new RuntimeException(e);
}
}
},"唯一线程");
{
thread.start();
}
public void execute(Runnable command){
boolean offered = blockingQueue.offer(command);
}
}
线程池
单个线程的线程池,多数情况下是满足不了我们的需求的,需要多个线程共同来完成任务。
public class MyThreadPool {
// 核心线程数
private int corePoolSize = 10;
// 核心线程集合
List<Thread> coreList = new ArrayList<>();
// 存放任务
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(1024);
private final Runnable task = () ->{
while (true){
try {
Runnable command = blockingQueue.take();
command.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
public void execute(Runnable command){
// 小于核心线程数
if (coreList.size() < corePoolSize){
// 创建核心线程
Thread thread = new Thread(task);
coreList.add(thread);
thread.start();
}
// 任务添加失败,说明阻塞队列满了,核心线程处理不过来
if (!blockingQueue.offer(command)) {
Thread thread = new Thread(task);
coreList.add(thread);
thread.start();
return;
}
}
}
问题:阻塞队列满了,核心线程处理不过来了 该如何做呢?
- 可以添加一些辅助线程,帮助核心线程处理任务。
public class MyThreadPool {
private int corePoolSize = 10;
private int maxSize = 16;
List<Thread> coreList = new ArrayList<>();
List<Thread> supportList = new ArrayList<>();
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(1024);
private final Runnable task = () ->{
while (true){
try {
Runnable command = blockingQueue.take();
command.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
public void execute(Runnable command){
// 小于核心线程数
if (coreList.size() < corePoolSize){
// 创建核心线程
Thread thread = new Thread(task);
coreList.add(thread);
thread.start();
}
// 任务添加成功
if (blockingQueue.offer(command)) {
return;
}
// 已创建的线程数 < 最大线程数
if (coreList.size() + supportList.size() < maxSize){
// 创建辅助线程
Thread thread = new Thread(task);
supportList.add(thread);
thread.start();
}
// 任务添加失败
if (!blockingQueue.offer(command)) {
throw new RuntimeException("阻塞队列满了");
}
}
}
问题:一个线程如何在空闲的时候结束呢?
blockingQueue.take()
,阻塞一定时间我就认为空闲了,因为 在规定时间内取不到元素,说明阻塞队列不满。就应该结束辅助线程。blockingQueue.poll(timeout,timeUnit)
,阻塞 指定的时间。
public class MyThreadPool {
private int corePoolSize;
private int maxSize;
private int timeout;
private TimeUnit timeUnit;
private BlockingQueue<Runnable> blockingQueue;
// 线程池参数交给使用者决定
public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit,BlockingQueue<Runnable> blockingQueue) {
this.corePoolSize = corePoolSize;
this.maxSize = maxSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.blockingQueue = blockingQueue;
}
List<Thread> coreList = new ArrayList<>();
List<Thread> supportList = new ArrayList<>();
// 核心线程的任务
private final Runnable coreTask = () ->{
while (true){
try {
Runnable command = blockingQueue.take();
command.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
// 辅助线程的任务
private final Runnable supportTask = () ->{
while (true){
try {
Runnable command = blockingQueue.poll(timeout, timeUnit);
if (command == null){
break;
}
command.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("辅助线程结束了!");
};
public void execute(Runnable command){
if (coreList.size() < corePoolSize){
Thread thread = new Thread(coreTask);
coreList.add(thread);
thread.start();
}
if (blockingQueue.offer(command)) {
return;
}
if (coreList.size() + supportList.size() < maxSize){
Thread thread = new Thread(supportTask);
supportList.add(thread);
thread.start();
}
if (!blockingQueue.offer(command)) {
throw new RuntimeException("阻塞队列满了");
}
}
}
coreTask、supportTask
封装为 CoreThread、SupportThread
public class MyThreadPool {
private int corePoolSize;
private int maxSize;
private int timeout;
private TimeUnit timeUnit;
private BlockingQueue<Runnable> blockingQueue;
public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit,BlockingQueue<Runnable> blockingQueue) {
this.corePoolSize = corePoolSize;
this.maxSize = maxSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.blockingQueue = blockingQueue;
}
List<Thread> coreList = new ArrayList<>();
List<Thread> supportList = new ArrayList<>();
public void execute(Runnable command){
if (coreList.size() < corePoolSize){
// 核心线程
Thread thread = new CoreThread();
coreList.add(thread);
thread.start();
}
if (blockingQueue.offer(command)) {
return;
}
if (coreList.size() + supportList.size() < maxSize){
// 辅助线程
Thread thread = new SupportThread();
supportList.add(thread);
thread.start();
}
if (!blockingQueue.offer(command)) {
throw new RuntimeException("阻塞队列满了");
}
}
class CoreThread extends Thread{
@Override
public void run() {
while (true){
try {
Runnable command = blockingQueue.take();
command.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
class SupportThread extends Thread{
@Override
public void run() {
while (true){
try {
Runnable command = blockingQueue.poll(timeout, timeUnit);
if (command == null){
break;
}
command.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("辅助线程结束了!");
}
}
}
问题:上述 command 第二次添加到阻塞队列失败了,除了抛异常,还有什么其他处理方式?
- 定义一个 拒绝策略,由创建线程池的开发者决定。
最终版线程池
@FunctionalInterface
public interface RejectHandle {
/**
* 拒绝策略
* @param rejectCommand 被拒绝的任务
* @param threadPool 拒绝任务的线程池
*/
void reject(Runnable rejectCommand, MyThreadPool threadPool);
}
public class MyThreadPool {
private final int corePoolSize;
private final int maxSize;
private final int timeout;
private final TimeUnit timeUnit;
private final BlockingQueue<Runnable> blockingQueue;
private final RejectHandle rejectHandle;
public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, RejectHandle rejectHandler) {
this.corePoolSize = corePoolSize;
this.maxSize = maxSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.blockingQueue = blockingQueue;
this.rejectHandle = rejectHandler;
}
List<Thread> coreList = new ArrayList<>();
List<Thread> supportList = new ArrayList<>();
public void execute(Runnable command){
if (coreList.size() < corePoolSize){
Thread thread = new CoreThread();
coreList.add(thread);
thread.start();
}
if (blockingQueue.offer(command)) {
return;
}
if (coreList.size() + supportList.size() < maxSize){
Thread thread = new SupportThread();
supportList.add(thread);
thread.start();
}
if (!blockingQueue.offer(command)) {
// 执行拒绝策略
rejectHandle.reject(command,this);
}
}
class CoreThread extends Thread{
@Override
public void run() {
while (true){
try {
Runnable command = blockingQueue.take();
command.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
class SupportThread extends Thread{
@Override
public void run() {
while (true){
try {
Runnable command = blockingQueue.poll(timeout, timeUnit);
if (command == null){
break;
}
command.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("辅助线程结束了!");
}
}
}
public class Test {
public static void main(String[] args) {
MyThreadPool threadPool =
new MyThreadPool(2,6,10, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10),
((rejectCommand, Pool) -> {
// 执行拒绝策略: 抛异常;丢弃阻塞队列中的第一个任务;丢弃当前任务 。。。
} ));
Runnable task = () -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
};
for (int i = 0;i < 5;i ++){
threadPool.execute(task);
}
System.out.println("主线程没有被阻塞!");
}
}