线程池源码解析 2.工作原理与内部结构

news2025/7/11 18:44:56

线程池源码解析—工作原理与内部结构

工作原理

概述

image-20221113110625730

  • 线程池是线程的池子,本质上是通过单个线程执行多个并发任务,使得尽量少的创建线程,减少开销。
  • 在线程池内部,是没有区分核心线程和非核心线程的,是通过 Set 集合的大小来进行区分的。

线程池状态

  • 线程池有以下几种状态,在不同的状态之间,有着不同的处理逻辑,在代码中,有着大量的判断逻辑:

image-20221017155420528

拒绝策略

image-20221017155839747

image-20221017155924290

  • 以上是 JDK 提供的一些拒绝策略,这四个用的比较多的是第一种 AbortPolicy,也是默认拒绝策略。

  • 而我们在业务开发过程中,往往会自定义线程池拒绝策略进行处理。

  • 线程池拒绝策略接口

    public interface RejectedExecutionHandler {
        
    	void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    

    image-20221018204515142

ThreadPoolExecutor 内部结构

核心属性之 ctl

    /*
     * 线程池核心属性之一 ctl
     * 高3位表示当前线程池运行状态,低29位表示当前线程池中所拥有的线程数量
     * 是一个原子类 AtomicInteger
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

	/*
	 * 表示在ctl中,低COUNT_BITS位 是用于存放当前线程数量的位
	 * Integer.SIZE = 32 => 32 - 3 = 29 表示低29位用来存放当前线程数量的位
	 */
    private static final int COUNT_BITS = Integer.SIZE - 3;

	/*
	 * 表示低29位能表示的最大的线程数 就是 1 << 29 - 1 (大概是5亿多)
	 * CAPACITY = 000 11111111111111111111111111111
	 */
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
	
	/*
	 * 下面的表示线程池的5种状态
	 * 状态从上到下依次递增
	 */
    // 111 00000000000000000000000000000(二进制) 转换成10进制是一个负数
    private static final int RUNNING    = -1 << COUNT_BITS;

	// 000 00000000000000000000000000000 
    private static final int SHUTDOWN   =  0 << COUNT_BITS;

	// 001 00000000000000000000000000000 
    private static final int STOP       =  1 << COUNT_BITS;

	// 010 00000000000000000000000000000 
    private static final int TIDYING    =  2 << COUNT_BITS;

	// 011 00000000000000000000000000000 
    private static final int TERMINATED =  3 << COUNT_BITS;
	
	/*
	 * 获取当前线程池的运行状态
	 * CAPACITY = 000 11111111111111111111111111111 取反后 => ~CAPACITY = 111 00000000000000000000000000000
	 * 因为要进行一个&运算,而~CAPACITY的值是固定的,根据这个值并且我们知道ctl的高三位
	 * 表示线程池的运行状态,所以进行&运算后就能获取到ctl的高三位的状态,即线程池的状态
	 * c = ctl = 111 00000000000000000000000000111(表示当前线程池RUNNING状态 并且有7个线程)
	 *                            &
	 *           111 00000000000000000000000000000
	 *                            =
	 *           111 00000000000000000000000000000(最终只会保留高三位 即线程池的状态)
	 */
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
	/*
	 * 获取当前线程池的线程数量
	 * CAPACITY = 000 11111111111111111111111111111
	 * 这个值跟ctl进行&运算,取出ctl的低29位的值,即表示获取线程池中的线程数量
	 */
	private static int workerCountOf(int c)  { return c & CAPACITY; }

	/* 
	 * 用在重置当前线程池ctl值时会用到 
	 * rs 表示线程池状态, wc表示当前线程池中worker(线程)数量
	 * |表示的就是不进位加法 表示的就是通过rs 和 wc重新构建一个ctl
	 * 111 000000000000000000
     * 000 000000000000000111
     * 111 000000000000000111
	 */
    private static int ctlOf(int rs, int wc) { return rs | wc; }
	
	/*
	 * 表示当前线程池ctl所表示的状态是否小于某个状态s
	 * RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
	 */
	private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
	
	/*
	 * 表示当前线程池ctl所表示的状态是否大于等于某个状态s
	 */
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
	
	/*
	 * 判断线程池是否处于RUNNING状态
	 * 小于SHUTDOWN的状态一定是RUNNING状态 
	 */
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

常用简单方法

  	/*  
  	 * 使用CAS的方式让 ctl值+1,成功返回true 失败返回false
  	 * 即尝试添加一个线程(Worker实际上就是工作者线程)
  	 */
	private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /*
     * 使用CAS的方式让 ctl值-1,成功返回true 失败返回false
  	 * 即尝试干掉一个线程(Worker实际上就是工作者线程)
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    /*
	 * 将ctl的值-1, 这个方法一定成功,使用的是 自旋 + CAS 的方式保证
     */
    private void decrementWorkerCount() {
        do {} while (!compareAndDecrementWorkerCount(ctl.get()));
    }

核心成员属性

    /*
     * 线程池全局锁
     * 增加worker(线程) 减少worker 时需要持有mainLock,修改线程池运行状态时也需要
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    // 线程池中真正存到 worker(Thread)的地方 工作者集合
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /*
     * 条件队列
     * 当外部线程调用awaitTermination()方法时,外部线程会阻塞等待当前线程池状态为Termination为止
     * 底层类似AQS的原理,等待就是将当前线程封装成一个Node,然后进入Condition的等待队列中(线程被park)。当线程池状态变为termination时,
     * 会通过调用termination.signalAll()将这些线程全部唤醒,进入到阻塞队列中(AQS),继续去争抢锁(每次只有头节点可以获得锁)
     * 抢占到的线程,会继续执行awaitTermination() 后面程序。这些线程最后,都会正常执行。
     * 简单理解:termination.await() 会将线程阻塞在这
     *          termination.signalAll() 会将阻塞在这的线程依次唤醒
     */
    private final Condition termination = mainLock.newCondition();

    // 记录线程池生命周期内,线程数的最大值
    private int largestPoolSize;

    // 记录线程池所完成的任务总数,当一个worker退出时,会将worker完成的任务累加到这个属性中
    private long completedTaskCount;
	
	/* 
 	 *  线程池7大核心参数之一:任务队列:BlockingQueue(阻塞队列)是一个接口
  	 *  当线程池中的正在工作的线程达到核心线程数时,这时再提交的任务会直接放到workQueue中
  	 *  常用的实现类有基于数组的阻塞队列  ArrayBlockingQueue
  	 *  		     基于链表的阻塞队列  LinkedBlockingQueue 
 	 */
    private final BlockingQueue<Runnable> workQueue;

    /*
     * 线程池的7大参数之一,线程的创建工厂,创建线程时会使用,是一个接口
     * 当我们使用 Executors.newFix...  newCache... 创建线程池时,使用的是 DefaultThreadFactory
     * 一般不推荐使用默认的实现类DefaultThreadFactory,推荐自己实现ThreadFactory
     */
    private volatile ThreadFactory threadFactory;

    /*
     * 线程池7大核心参数之一,拒绝策略,是一个接口,有四种实现,默认是直接丢弃并抛出异常
	 * DiscardOldestPolicy   ---> 丢弃队列中最老(最先入队)的任务
	 * AbortPolicy           ---> 直接丢弃新来的任务 抛出异常 (默认的)
	 * CallerRunsPolicy      ---> 直接调用run方法,相当于同步方法
	 * DiscardPolicy         ---> 直接丢弃新来的任务 不抛出异常
     */
    private volatile RejectedExecutionHandler handler;

	/*
	 * 线程池7大核心参数之一:空闲线程存活时间
	 * 当allowCoreThreadTimeOut为false时,只有当非核心线程空闲时间达到指定时间时才会被回收
	 * 当allowCoreThreadTimeOut为true时,线程池内所有的线程到达指定的时间均会被回收
	 * 此参数常常和 TimeUnit一起使用,指定超时时间的单位(也是线程池的7大核心参数之一) 
	 */
    private volatile long keepAliveTime;

	// 控制线程池内核心线程空闲时间达到指定时间时能否被回收 true 可以 false不可以
    private volatile boolean allowCoreThreadTimeOut;
	
	/*
	 * 线程池7大核心参数之一:核心线程数
	 */
    private volatile int corePoolSize;

    /*
     * 线程池7大核心参数之一:最大线程数
     */
    private volatile int maximumPoolSize;

    // 缺省拒绝策略,采用的是AbortPolicy 抛出异常的方式
    private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();

核心内部类 Worker

	/*
     * Worker采用了AQS的 独占 模式
     * 独占模式:两个重要属性 state 和 ExclusiveOwnerThread
     * state:0时表示表示未被占用,> 0时表示被占用,< 0时表示初始状态,这种情况下不能被抢锁,只有等于0时才能尝试去抢锁
     * ExclusiveOwnerThread表示独占(抢到)锁的线程
     */		
	private final class Worker
        extends AbstractQueuedSynchronizer // 是AQS的子类
        implements Runnable // 实现了Runnable接口
    {

        private static final long serialVersionUID = 6138294804551838833L;

        // worker内部封装的工作线程
        final Thread thread;
        
    	// 假设firstTask不为空,那么当worker启动后(内部的线程启动)会优先执行firstTask,当执行完firstTask后,会到队列中去获取下一个任务	
        Runnable firstTask;

        // 记录当前worker所完成的任务数量
        volatile long completedTasks;

        /*
         * 构造器 传来的Runnable任务可以为null,firstTask为null的线程启动后会去队列中获取任务
         */
        Worker(Runnable firstTask) {
            // 设置AQS独占模式为初始化中状态,这个时候不能被抢占锁
            setState(-1); 
            // 为内部的firstTask赋值
            this.firstTask = firstTask;         
            /*
             * 使用线程工厂创建了一个线程,并且将当前worker指定为Runnable,也就是说当thread启动的时候会议worker.run为入口
             */
            this.thread = getThreadFactory().newThread(this);
        }

        /*
         * 当worker启动时,会执行run()方法 当前的这个Worker就是一个任务(Runnable)
         * 底层调用runWorker()直接将this传入了
         */
        public void run() {
            // 直接将当前对象传入进行执行
            // ThreadPoolExecutor->runWorker() 这个是核心方法,等后面分析worker启动后逻辑时会以这里切入
            runWorker(this);
        }

		/*
		 * 判断当前worker的独占锁是否被占用
		 * state为0 表示为被占用
		 * state为1 表示被占用
		 */
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
		
    	// 尝试去占用worker的独占锁 
        // 返回值 表示是否抢占成功
        protected boolean tryAcquire(int unused) {
            // CAS的方式,将state设置为1,尝试抢占锁
            if (compareAndSetState(0, 1)) {
                // CAS成功,成功抢到锁,则将exclusiveOwnerThread设置为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
		
    	/*  
    	 * 尝试释放锁 
         * 外部不会直接调用这个方法,这个方法是 AQS内调用的
         * 外部调用unlock时,unlock -> AQS.release -> tryRelease (模板方法模式)
         */
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
		
    	// 加锁,加锁失败时会阻塞当前线程(类似ReentarntLock),直到获取到锁
        public void lock()        { acquire(1); }
    
    	/*
         * 尝试去加锁,如果锁是未被持有状态,那么加锁成功后会返回true
    	 * 否则 不会阻塞当前线程 直接返回false
    	 */ 
        public boolean tryLock()  { return tryAcquire(1); }
    
        /*
         * 释放锁
    	 * 一般情况下,咱们调用unlock 要保证 当前线程是持有锁的
         * 特殊情况,当worker的 state == -1 时,调用unlock 表示初始化state 设置state == 0
         * 启动worker之前会先调用unlock()这个方法 会强制刷新ExclusiveOwnerThread == null state==0 之后看源码就明白了。
         */
        public void unlock()      { release(1); }
    
    	// 返回当前worker的lock是否被占用
        public boolean isLocked() { return isHeldExclusively(); }
		
        // 回头再说
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

构造方法

  • 其余的 3 个构造方法都是套娃下面的这个构造方法,所以我们直接看这个最核心的即可。
	/*
	 * 7个参数的构造方法 传入7大核心参数,为内部属性赋值
	 */  	
	public ThreadPoolExecutor(int corePoolSize, // 核心线程数
                              int maximumPoolSize, // 最大线程数
                              long keepAliveTime, // 空闲线程存活时间
                              TimeUnit unit, // 时间单位 seconds nano..
                              BlockingQueue<Runnable> workQueue, // 阻塞(任务)队列
                              ThreadFactory threadFactory, // 线程工厂
                              RejectedExecutionHandler handler) { // 拒绝策略
      	// 判断参数是否合法
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
       
        // 工作队列 和 线程工厂 和 拒绝策略 都不能为空
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
      	// 为属性赋值
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

参考

  • 视频参考
    • b站_小刘讲源码付费课
  • 文章参考
    • shstart7_线程池源码解析2.工作原理与内部结构
    • 肆华_线程池阅读理解

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/8771.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

拒绝内卷,阿里架构师整理的这份Java核心手册,堪称最强

2022年注定是不寻常的一年&#xff0c;在今年因为疫情以及各大大厂纷纷传来裁员的消息&#xff0c;引得整个互联网圈动荡不堪。腾讯裁员30%、京东、百度、字节等大厂都在纷纷裁员&#xff0c;引的这些中厂和小厂也跟风裁员。 这个时候外部的各种变化愈发证明一个重要的一点&am…

2022.11.7-11.13 AI行业周刊(第123期):技术人员的职业发展在哪里?

篇章一&#xff1a;技术人员的职业发展 上周和大学时的舍友聊天&#xff0c;交流当前大家的生活状态。 我们已经本科毕业将近10年了&#xff0c;他目前也有两个孩子&#xff0c;在湖北的一个地级市中&#xff0c;从事的是通信行业。 不过随着工作的时间越久&#xff0c;他发…

软件测试面试真题 | 黑盒测试和白盒测试的基本概念是什么?

在软件测试的面试中&#xff0c;什么是黑盒测试&#xff0c;什么是白盒测试是特别容易被问到的一个问题。 面试官问出这个问题&#xff0c;其实考察的是大家对于软件测试基础理论的掌握程度。下面来梳理一下这个问题的回答思路。 黑盒测试 黑盒测试会把被测的软件看作是一个…

只会加班的项目经理,迟早被淘汰

早上好&#xff0c;我是老原。 最近看到一个文章的标题「废掉一个人最好的方式&#xff0c;就是让他忙到没时间学习」&#xff0c;具体内容是什么我还没有细读&#xff0c;只看完标题&#xff0c;有一丝心酸和自豪&#xff1a; 有那么一群人&#xff0c;在玻璃渣里找糖吃&…

皮带跑偏检测系统

皮带跑偏检测系统对皮带运行状态进行全天候实时监测&#xff0c;一旦皮带跑偏检测系统监测到现场皮带跑偏、撕裂、堆煤、异物等异常情况时&#xff0c;系统马上开展警报&#xff0c;通知后台&#xff0c;并提醒相关人员及时处置。皮带跑偏检测系统并把警报截屏和视频储存到数据…

附参考文献丨艾美捷Cholesterol胆固醇说明书

Cholesterol胆固醇以固体形式提供。可以通过将胆固醇溶解在所选择的溶剂中来制备储备溶液&#xff0c;该溶剂应使用惰性气体吹扫。胆固醇以约30mg/ml的浓度溶于有机溶剂氯-仿中。 艾美捷Cholesterol胆固醇参数&#xff1a; CAS号&#xff1a;57-88-5 正式名称&#xff1a;&am…

自动驾驶入门:预测

目录 概念 预测方式 障碍物预测 递归神经网络在预测中的应用 轨迹生成 概念 无人车是在许多物体间穿梭行驶&#xff0c;其中许多物体本身就是一直在移动的&#xff0c;比如像其他汽车、自行车、行人。无人车需要预测这些物体的行为&#xff0c;这样才能确保做出最佳决策。…

工作中对InheritableThreadLocal使用的思考

最近在工作中结合线程池使用 InheritableThreadLocal 出现了获取线程变量“错误”的问题&#xff0c;看了相关的文档和源码后在此记录。 1. 先说结论 InheritableThreadLocal 只有在父线程创建子线程时&#xff0c;在子线程中才能获取到父线程中的线程变量&#xff1b;当配合…

coding持续集成

先看看官网的一些操作提示 1、创建SSH密钥对 2、创建制品仓库 看完官网的介绍&#xff0c;持续集成需要提前准备好SSH凭证和制品仓库&#xff0c;下面将让我们动手开始吧 一、创建SSH密钥对 登录服务器控制台&#xff0c;创建 SSH 密钥对。获取私钥对后将其录入至 CODING 中…

Netty源码阅读(2)之——服务端源码梗概

上文我们把客户端源码梗概大致了解了一下&#xff0c;这样再了解服务端源码就轻松一点&#xff0c;我们将从服务端和客户端的区别着手去解析。 目录 区别 ④ ③ ① ⑤ 区别 ④ 客户端&#xff1a;.option(ChannelOption.TCP_NODELAY, true) 在TCP/IP协议中&#xff0c;无论…

贪心算法小结

A-金银岛 某天KID利用飞行器飞到了一个金银岛上&#xff0c;上面有许多珍贵的金属&#xff0c;KID虽然更喜欢各种宝石的艺术品&#xff0c;可是也不拒绝这样珍贵的金属。但是他只带着一个口袋&#xff0c;口袋至多只能装重量为w的物品。岛上金属有s个种类, 每种金属重量不同&am…

ffmpeg视频编解码 demo初探(一)(包含下载指定windows版本ffmpeg)分离视频文件中的视频流每一帧YUV图片

参考文章1&#xff1a;YUV数据流编码成H264 参考文章2&#xff1a;【FFmpeg编码实战】&#xff08;1&#xff09;将YUV420P图片集编码成H264视频文件 文章目录第一个项目&#xff1a;分离视频文件中的视频流每一张图片弯路步入正轨下载官方编译的ffmpeg4.3&#xff08;win64-g…

SpringFramework:SpringBean的生命周期

SpringFramework&#xff1a;SpringBean的生命周期 文章目录SpringFramework&#xff1a;SpringBean的生命周期一、SpringBean的生命周期1. 实例化 Bean2. 填充属性&#xff08;DI&#xff09;3. 初始化4. 销毁二、BeanDefinition1. 基本概念2. 大致结构3. Spring 构建它的优势…

深度学习必备Python基础知识充电2

一、python中的类 1.1 python中是有内置的数据类型的 intstr 1.2 创建新的数据类型 自定义类来实现这样的功能 二、年轻人的第一个python类 2.1 来尝试一下 # 年轻人的第一个自定义python类class Man:def __init__(self, name):self.name nameprint(initialized Succes…

【优雅的参数验证@Validated】@Validated参数校验的使用及注解详解——你还在用if做条件验证?

Validated参数校验的使用及注解详解你还在用if做条件验证吗&#xff1f;一、优雅的参数验证Validated1.Valid和Validated的用法(区别)2.引入并使用Validated参数验证二、javax.validation.constraints下参数条件注解详解三、自定义条件注解你还在用if做条件验证吗&#xff1f; …

【云原生之K8s】 Pod控制器

文章目录一、Pod控制器及其功用二、控制器的类型1.Deployment2.StatefulSet2.1 StatefulSet的组成2.2 常规service和无头服务区别2.3 示例小结3.DaemonSet4.Job5.CronJob一、Pod控制器及其功用 Pod控制器&#xff0c;又称之为工作负载&#xff08;workload&#xff09;&#x…

【毕业设计】机器视觉火车票识别系统 - python 深度学习

文章目录0 前言1 课题意义1.1 课题难点&#xff1a;2 实现方法2.1 图像预处理2.2 字符分割2.3 字符识别2.3.1 部分实现代码3 实现效果4 最后0 前言 &#x1f525; Hi&#xff0c;大家好&#xff0c;这里是丹成学长的毕设系列文章&#xff01; &#x1f525; 对毕设有任何疑问…

疑似大厂泄露!阿里内部Redis教程笔记,细节点满/效率翻倍

Redis是一个key-value存储系统&#xff0c;是当下互联网公司广泛采用的NoSQL数据库之一&#xff0c;也是Java程序员应知应会的必备技术。 这套笔记教程采用Redis 6.2.1版本&#xff0c;内容由浅入深&#xff0c;循序渐进&#xff0c;从Redis的基本概念开启讲解&#xff0c;内容…

React核心技术浅析

1. JSX与虚拟DOM 我们从React官方文档开头最基本的一段Hello World代码入手: ReactDOM.render(<h1>Hello, world!</h1>,document.getElementById(root) );这段代码的意思是通过 ReactDOM.render() 方法将 h1 包裹的JSX元素渲染到id为“root”的HTML元素上. 除了在…

NVIDIA Grace Hopper架构深度解析

NVIDIA Grace Hopper架构深度解析 NVIDIA Grace Hopper Superchip 架构是第一个真正的异构加速平台&#xff0c;适用于高性能计算 (HPC) 和 AI 工作负载。 它利用 GPU 和 CPU 的优势加速应用程序&#xff0c;同时提供迄今为止最简单、最高效的分布式异构编程模型。 科学家和工程…