文章目录
- 线程池实现原理
- addWorker(Runnable firstTask, boolean core)
- 1. 状态检查:校验线程池是否允许添加线程
- 2. 工作线程数调整:CAS保证并发安全
- 3. 初始化变量
- 4. 创建 `Worker` 对象并获取线程
- 5. 加锁保证线程安全
- 6. 启动工作线程
- 7. 异常处理
- 核心作用
线程池实现原理
接下来,我们进入 addWork 方法, 在创建线程前会获取全局锁:
addWorker(Runnable firstTask, boolean core)
/
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
/ 若状态为STOP/TIDYING/TERMINATED(rs >= SHUTDOWN),直接拒绝新增线程。
例外情况:当状态为SHUTDOWN且满足以下条件时允许添加 非核心线程 处理残留任务:
1. firstTask == null(线程不携带新任务)
2. workQueue非空(队列中仍有待处理任务)*/
return false;
for (;;) {
int wc = workerCountOf(c); // 工作线程数(低29位)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 容量校验
return false;
if (compareAndIncrementWorkerCount(c))
// CAS递增workerCount
break retry; // 成功则退出整个retry循环
c = ctl.get(); // 重新读取当前ctl
if (runStateOf(c) != rs)
continue retry; // 状态变化则重试外层循环
// 若仅workerCount变化,继续内层循环重试CAS
}
}
boolean workerStarted = false; //标记工作线程是否成功启动
boolean workerAdded = false;//标记线程是否被成功添加到线程池
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);//通过线程工厂创建Worker对象(后面讲)
final Thread t = w.thread;//获取其绑定的线程
if (t != null) {
mainLock.lock(); // 获取全局锁
try {
//重新检查线程池状态
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN || //线程池处于RUNNING状态时,允许添加新线程
(rs == SHUTDOWN && firstTask == null)) {
//SHUTDOWN状态下仅允许添加无初始任务的线程(处理队列中的剩余任务)
if (t.isAlive()) // 防止重复启动
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//若成功添加Worker,则启动其线程(真正开始消费任务)
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//失败回滚:若未成功启动(如线程池已关闭),调用addWorkerFailed回滚资源
addWorkerFailed(w);
}
return workerStarted;
}
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w); //移除Worker
decrementWorkerCount();//减少计数
tryTerminate(); //尝试终止线程池
} finally {
mainLock.unlock();
}
}
1. 状态检查:校验线程池是否允许添加线程
int c = ctl.get();
int rs = runStateOf(c); // 运行状态(高3位)
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
-
关键条件:
-
若状态为
STOP/TIDYING/TERMINATED
(rs >= SHUTDOWN
),**直接拒绝新增线程**。 -
例外情况:当状态为
SHUTDOWN
且满足以下条件时允许添加**非核心线程**处理残留任务:-
firstTask == null
(线程不携带新任务) -
workQueue
非空(队列中仍有待处理任务),确保线程关闭时仍能处理队列残留任务
-
-
2. 工作线程数调整:CAS保证并发安全
for (;;) {
int wc = workerCountOf(c); // 工作线程数(低29位)
// 容量校验
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS递增workerCount
if (compareAndIncrementWorkerCount(c))
break retry; // 成功则退出整个retry循环
// CAS失败后处理
c = ctl.get(); // 重新读取当前ctl
if (runStateOf(c) != rs)
continue retry; // 状态变化则重试外层循环
// 若仅workerCount变化,继续内层循环重试CAS
}
-
双重校验逻辑:
-
容量控制:通过参数
core
决定上限为核心/最大线程数,避免资源溢出。 -
CAS操作:通过
compareAndIncrementWorkerCount
原子性增加线程数,防止多线程竞争导致计数错误。 -
失败处理:若检测到状态变化(如线程池关闭),重新进行外层状态检查。若仅线程数变化,则仅重试内层循环。
-
双重循环:外层处理状态变化,内层处理线程数变更,分离关注点,提升并发效率。
-
3. 初始化变量
初始化 workerStarted
和 workerAdded
为 false
,表示工作线程未启动且未成功添加到工作线程集合。Worker
对象 w
的初始值为 null
。
4. 创建 Worker
对象并获取线程
w = new Worker(firstTask);
final Thread t = w.thread;
通过线程工厂创建 Worker
对象,并获取其绑定的线程 t
。若线程工厂创建失败(如返回 null
),后续逻辑直接跳转至第7步异常处理。
5. 加锁保证线程安全
获取 mainLock
(全局锁),确保对线程池状态的检查和修改、workers
集合的操作是原子的:
6. 启动工作线程
if (workerAdded) {
t.start(); // 启动线程
workerStarted = true;
}
如果 Worker
成功添加到集合,则启动其绑定的线程。
7. 异常处理
finally {
if (!workerStarted)
addWorkerFailed(w); // 回滚失败的 Worker 操作
}
- 若线程未启动(如锁获取后线程池已关闭),调用
addWorkerFailed
移除Worker
并更新线程池状态。
核心作用
这段代码是线程池 ThreadPoolExecutor
中添加工作线程的核心逻辑,它会创建Worker,Worker内置了一个线程(由线程工厂创建),这个线程会在Worker创建后启动,专门用于执行Worker的run()方法。另外还实现了:
-
线程安全的
Worker
管理:通过ReentrantLock
保证对共享变量(如workers
集合)的原子操作。 -
状态双重检查:防止在加锁期间线程池状态发生变化。
- 异常回滚机制:确保部分失败的操作能被正确处理,保证线程池的稳定性。