并发编程学习篇并发线程池底层原理详解与源码分析

news2025/7/27 5:02:07

在这里插入图片描述

一、线程池与线程对比

package bat.ke.qq.com.threadpool;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/***
 * 使用线程的方式去执行程序
 */
public class ThreadTest {

    public static void main(String[] args) throws InterruptedException {
        Long start = System.currentTimeMillis();
        final Random random = new Random();
        final List<Integer> list = new ArrayList<Integer>();
        for (int i = 0; i < 100000; i++) {
            Thread thread = new Thread() {
                @Override
                public void run() {
                    list.add(random.nextInt());
                }
            };
            thread.start();
            thread.join();  // 等待调用join方法的线程结束之后,程序再继续执行
        }
        System.out.println("时间:" + (System.currentTimeMillis() - start));
        System.out.println("大小:" + list.size());
}

执行结果:
时间:6422
大小:100000

线程池

package bat.ke.qq.com.threadpool;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/***
 * 线程池执行
 */
public class ThreadPoolTest {

    public static void main(String[] args) throws InterruptedException {
        Long start = System.currentTimeMillis();
        final Random random = new Random();
        final List<Integer> list = new ArrayList<Integer>();
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 100000; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    list.add(random.nextInt());
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("时间:"+(System.currentTimeMillis() - start));
        System.out.println("大小:"+list.size());
}

执行结果:
时间:33
大小:100000

二、为什么要用线程池?

线程池的优点:

  1. 降低资源消耗:通过 线程复用 来降低线程创建和销毁造成的消耗,创建线程涉及到了用户态到内核态的切换,非常影响性能;
  2. 提高响应速度:当任务到达时,任务可以不需要的等待,而直接被线程池中的空闲线程执行;
  3. 提高线程的可管理性:线程是稀缺资源,通过线程池可以进行统一的分配,调优和监控。

线程池的缺点:

  1. 增加了系统的复杂性:线程池的配置项很多,如果不恰当,可能会造成性能问题;
  2. 每一个任务的执行时间变长:线程池的任务是共享线程,不同任务之间互相影响,当线程池中线程数量很多时,每个任务执行时间可能会变长。

三、为什么不推荐使用Executors默认创建的线程池?

3.1 Executors.newFixedThreadPool(10)

在这里插入图片描述

LinkedBlockingQueue队列的容量默认为Integer.MAX_VALUE,大批量的任务提交到队列中 阻塞等待 时,可能导致 OOM

3.2 Executors.newCachedThreadPool()

在这里插入图片描述

非核心线程设置为Integer.MAX_VALUE,且 SynchronousQueue 同步队列,生产者消费者的模型,只要有一个任务被提交到队列中,就会 马上创建一个线程执行任务 ,这样可能创建非常多的非核心线程,所以可能导致 CPU100%

3.3 Executors.newSingleThreadExecutor()

在这里插入图片描述

与newFixedThreadPool()一样使用了 LinkedBlockingQueue 队列的容量默认为Integer.MAX_VALUE,大批量的任务提交到队列中阻塞等待时,可能导致 OOM

四、自定义线程池原理分析

image.png

4.1 自定义线程池使用方式:

public static void main(String[] args) {
    // 线程池的核心线程数如何设置
    // 任务可以分为两种:CPU密集,IO密集。
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1,
            2,
            1L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    // ...
                    return t;
                }
            },
            new ThreadPoolExecutor.AbortPolicy()
    );

    executor.execute(任务);
    executor.submit(有返回结果的任务);
}

4.2 线程池属性认知

// AtomicInteger,就是一个int,写操作用CAS实现,保证了原子性
// ctl维护这线程池的2个核心内容:
// 1:线程池状态(高3位,维护着线程池状态)
// 2:工作线程数量(核心线程+非核心线程,低29位,维护着工作线程个数)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS= 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 工作线程的最大个数
// 00100000 00000000 00000000 00000000(左移29位) - 1
// 000111111111111111111111111111111  
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;


private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// 拿到线程池状态  ~CAPACITY 000111111111111111111111111111111  取反 111xxx....
// 011... 
// 111...
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 拿到工作线程个数
// ...0000000111111
// ...1111111111111
private static int workerCountOf(int c)  { return c & CAPACITY; }

4.3 线程池状态

image.png
execute方法原理

在这里插入图片描述

4.4 execute方法源码解析

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1、判断当前的线程数是否小于corePoolSiz,
         * 如果是,使用入参任务通过addWord方法创建一个新的线程
         * 如果能完成新线程创建exexute方法结束,成功提交任务;
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        /**
        * 2、如果状态为运行并且能成功加入任务到工作队列后,再进行一次check
        * 如果状态在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),将这个任务从队列中移除,执行指定的拒绝策略(reject);
        * 否则判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;
        */
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        /**
        * 3、可能工作队列中的任务存满了,没添加进去,到这就要添加非核心线程去处理任务
        * 如果失败,则是线程池已经shutdown或者线程池已经达到饱和状态,执行reject;
        */
        else if (!addWorker(command, false))
            reject(command);
    }

从上面新增任务的execute方法也可以看出,拒绝策略不仅仅是在饱和状态下使用, 在线程池进入到关闭阶段同样需要使用到;

4.5 addWorker方法源码解析

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 判断线程池状态
        if (rs >= SHUTDOWN &&
              // 判断如果线程池的状态为SHUTDOWN,还要处理工作队列中的任务
              // 如果你添加工作线程的方式,是任务的非核心线程,并且工作队列还有任务
            ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            return false;

        // 判断工作线程个数
        for (;;) {
            // 阿巴阿巴……
            int wc = workerCountOf(c);
            // 判断1:工作线程是否已经 == 工作线程最大个数
            // 判断2-true判断:判断是核心线程么?如果是判断是否超过核心线程个数
            // 判断2-false判断:如果是非核心线程,查看是否超过设置的最大线程数
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 对工作线程进行 + 1操作
            if (compareAndIncrementWorkerCount(c))
                // +1成功,跳出外层循环,执行添加工作线程的业务
                // 以CAS方式,对ctl+1,多线程并发操作,只有会有一个成功
                break xxx;
            // 重新拿ctl,
            c = ctl.get();
            // 判断线程池状态是否有变化
            if (runStateOf(c) != rs)
                continue xxx;
        }
    }

    // 添加工作线程的业务  
    // 工作线程启动了吗?
    boolean workerStarted = false;
	// 工作线程添加了吗?
    boolean workerAdded = false;
    // Worker就是工作线程
    Worker w = null;
    try {
        // 创建工作线程,将任务传到Worker中
        w = new Worker(firstTask);
        final Thread t = w.thread;
        // 只有你写的线程工厂返回的是null,这里才会为null
        if (t != null) {
            // 获取锁资源
            final ReentrantLock mainLock = this.mainLock;
            // 加锁。  因为我要在启动这个工作线程时,避免线程池状态发生变化,加锁。
            mainLock.lock();
            try {
                // 重新获取ctl,拿到线程池状态
                int rs = runStateOf(ctl.get());
                // DCL i think you know~~~
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                   	// 判断Worker中的thread是否已经启动了,一般不会启动,除非你在线程工厂把他启动了
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    // 将工作线程存储到hashSet中
                    workers.add(w);
                    // 获取工作线程个数,判断是否需要修改最大工作线程数记录。
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 工作线程添加成功     0
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果添加成功
            if (workerAdded) {
                // 启动工作线程
                t.start();
                // 设置标识为true
                workerStarted = true;
            }
        }
    } finally {
        // 如果工作线程启动失败
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

// 如果添加工作线程失败,执行
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 说明worker可能存放到了workers的hashSet中。
        if (w != null)
            // 移除!
            workers.remove(w);
        // 减掉workerCount的数值 -1
        decrementWorkerCount();
        // 尝试干掉自己
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

4.6 runwoker方法

final void runWorker(Worker w) {
    // 拿到当前线程对象
    Thread wt = Thread.currentThread();
    // 拿到worker中存放的Runnable
    Runnable task = w.firstTask;
    // 将worker中的任务清空
    w.firstTask = null;
    // 揍是一个标识
    boolean completedAbruptly = true;
    try {
        // 如果Worker自身携带任务,直接执行
        // 如果Worker携带的是null,通过getTask去工作队列获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 判断线程池状态是否大于等于STOP,如果是要中断当前线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 // 中断当前线程(DCL)
                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
  
            try {
                // 前置钩子
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 后置钩子
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 当前工作执行完一个任务,就++
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

4.7 getTask工作线程排队拿任务

private Runnable getTask() {
    // 超时-false
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        // 阿巴
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池状态判断
        // 如果线程池状态为SHUTDOWN && 工作队列为空
        // 如果线程池状态为STOP
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 对工作线程个数--
            decrementWorkerCount();
            return null;
        }

        // 对数量的判断。
        int wc = workerCountOf(c);

        // 判断核心线程是否允许超时?
        // 工作线程个数是否大于核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 判断工作线程是否超过了最大线程数 && 工作队列为null
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            // 工作线程数有问题,必须-1,干掉当前工作线程
            // 工作线程是否超过了核心线程,如果超时,就干掉当前线程
            // 对工作线程个数--
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果是非核心,走poll,拉取工作队列任务,
            // 如果是核心线程,走take一直阻塞,拉取工作队列任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            	// 当工作队列没有任务时,这时就会被Condition通过await阻塞线程
            	// 当有任务添加到工作线程后,这是添加完任务后,就会用过Condition.signal唤醒阻塞的线程
                workQueue.take();
            if (r != null)
                return r;
            // 执行的poll方法,并且在指定时间没拿到任务,
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

4.8 processWorkerExit工作线程告辞~

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果是不正常操作,需要先对工作线程数-- (如果正常情况,getTask就--了)
    if (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 将当前工作线程完整的任务个数赋值给整个线程池中的任务数
        completedTaskCount += w.completedTasks;
        // 干掉当前工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 线程池是否可以中止,线程池状态是否发生变化。
    tryTerminate();

  
    int c = ctl.get();
    //如果当前线程池状态小于STOP
    if (runStateLessThan(c, STOP)) {
        // 判断线程池中的工作队列是否还有任务,并且工作线程是否还在。
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 添加非核心空任务的线程处理工作队列中的任务
        addWorker(null, false);
    }
}

五、4个拒绝策略

CallerRunsPolicy:由调用execute方法提交任务的线程来执行这个任务;

AbortPolicy:抛出异常RejectedExecutionException拒绝提交任务;

DiscardPolicy:直接抛弃任务,不做任何处理;

DiscardOldestPolicy:去除任务队列中的第一个任务(最旧的),重新提交;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/368247.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

为Webpack5项目引入Buffer Polyfill

前言 最近在公司的一个项目中使用到了Webpack5&#xff0c; 然而在使用某个npm包的时候&#xff0c;出现了Buffer is not defined 这个问题&#xff0c;原因很明显了&#xff0c;因为浏览器运行时没有Buffer这个API&#xff0c;所以需要为浏览器引入Buffer Polyfill. Webpack5…

如何制定达人营销策略

如今&#xff0c;达人营销不再是一个新兴趋势&#xff0c;而是公司整个数字营销战略的一部分。虽然十年前&#xff0c;达人还不存在&#xff0c;但随着公司对数字化营销依赖度地提升&#xff0c;各个领域的大V群体逐渐壮大&#xff0c;越来越多的公司已经采用了达人营销策略。如…

JavaScript 库

文章目录JavaScript 库JavaScript 框架&#xff08;库&#xff09;jQueryPrototypeMooTools其他框架CDN -内容分发网络引用 jQuery使用框架JavaScript 库 JavaScript 库 - jQuery、Prototype、MooTools。 JavaScript 框架&#xff08;库&#xff09; JavaScript 高级程序设计…

界面控件DevExtreme的Data Grid组件——让业务信息管理更轻松!

DevExtreme拥有高性能的HTML5 / JavaScript小部件集合&#xff0c;使您可以利用现代Web开发堆栈&#xff08;包括React&#xff0c;Angular&#xff0c;ASP.NET Core&#xff0c;jQuery&#xff0c;Knockout等&#xff09;构建交互式的Web应用程序&#xff0c;该套件附带功能齐…

【阿旭机器学习实战】【35】员工离职率预测---决策树与随机森林预测

【阿旭机器学习实战】系列文章主要介绍机器学习的各种算法模型及其实战案例&#xff0c;欢迎点赞&#xff0c;关注共同学习交流。 本文的主要任务是通过决策树与随机森林模型预测一个员工离职的可能性并帮助人事部门理解员工为何离职。 目录1.获取数据2.数据预处理3.分析数据3.…

Python之正则表达式细讲

文章目录前言一、行定位符二、元字符三、限定符四、字符类五、排除字符六、选择字符七、转义字符八、分组九、正则表达式语法总结前言 在处理字符串时&#xff0c;经常会有查找符合某些复杂规则的字符串的需求。比如用爬虫批量抓取某网站图片&#xff0c;要从抓下来的网页信息中…

【AI写作】 机器人流程自动化 介绍 - Robotic Process Automation (RPA) Introduction

写一篇文章介绍RPA技术,未来的发展。使用markdown格式,有3级索引,超过3000字。 某位大师说过的: 任何行业、任何部门都有大量的场景,涉及重复、有规则逻辑的工作,都可以用 RPA 开发一个软件机器人帮助完成。 文章目录 机器人过程自动化(RPA)简介RPA的定义RPA的好处Robo…

国内“谁”能实现chatgpt,短期穷出的类ChatGPT简评(算法侧角度为主),以及对MOSS、ChatYuan给出简评,一文带你深入了解宏观技术路线。

1.ChatGPT简介【核心技术、技术局限】 ChatGPT&#xff08;全名&#xff1a;Chat Generative Pre-trained Transformer&#xff09;&#xff0c;美国OpenAI 研发的聊天机器人程序 &#xff0c;于2022年11月30日发布 。ChatGPT是人工智能技术驱动的自然语言处理工具&#xff0c…

这周末,StarRocks 邀请开发者们一起来上海 GAIDC 开源集市,各种任务等你来挑战!

2 月 25 日-26 日 &#xff08;本周末&#xff09;2023 全球人工智能开发者先锋大会&#xff08;GAIDC&#xff09;将于上海临港中心举行&#xff01;StarRocks 社区受邀参与开源集市&#xff0c;展示开源魅力、分享社区成果。欢迎上海的同学们到“摊位”上与工作人员互动&…

经典设计模式MVC理解

MVC是模型(Model)、视图(View)、控制器(Controller)的简写&#xff0c;将业务逻辑、数据、显示分离的方法来组织代码。今天简单回顾一下。 mvc释义理解 M代表模型(Model)&#xff0c;表示业务规则封装。在MVC的三个部件中&#xff0c;模型拥有最多的处理任务。被模型返回的数据…

Kibana与Elasticsearch

下载与安装Kibanahttps://www.elastic.co/cn/downloads/kibanaKibana的版本与Elasticsearch的版本是一致的&#xff0c;使用方法也和Elasticsearch一致。由于我的英文不是特别好&#xff0c;我们找到config/kibana.yml末尾添加i18n.locale: "zh-CN" &#xff0c;汉化…

15 款面向内容创作者的 AI 工具

获得 AI 帮助&#xff0c;让您的营销流程更高效、超越竞争对手并扩大您的受众 &#x1f916;作为内容创作者&#xff0c;您知道引人入胜且有趣的内容是吸引观众注意力的关键。但是&#xff0c;如果有工具可以帮助您比以往更快、更轻松地创建更具吸引力的内容呢&#xff1f;这就…

自动驾驶仿真:ECU TEST自动化测试常用API调用

文章目录一、 API调用运行环境二、ET API帮助文档三、如何导入ET API四、 ET常用接口1、 创建用于添加测试步骤的Package2、 在Package的TestStep中添加precondition块3、 在Package的TestStep中添加Block块4、在Package的TestStep中添加PostconditionBlock块5、 在Package的Te…

JAVA 8 新特性 Lamdba表达式

Java8 新特性&#xff1a; 1、Lamdba表达式 2、函数式接口 3、方法引用和构造引用 4、Stream API 5、接口中的默认方法和静态方法 6、新时间日期API 7、Optional 8、其他特性 Java8 优势&#xff1a;速度快、代码更少&#xff08;增加了新的语法 Lambda 表达式&#xff09;、强…

Centos安装Python、PyCharm

安装Python 1、打开终端(Terminal) 2、输入以下命令更新系统&#xff1a; sudo yum update 3、安装Python&#xff1a; sudo yum install python3 4、安装完成后&#xff0c;可以使用以下命令检查Python版本&#xff1a; python3 --version 安装PyCharm 1、下载PyCharm的安…

SQL server设置用户只能访问特定数据库、访问特定表或视图

在实际业务场景我们可能需要开放单独用户给第三方使用&#xff0c;并且不想让第三方看到与业务不相关的表或视图&#xff0c;我们需要在数据库中设置一切权限来实现此功能&#xff1a; 1.设置用户只能查看数据库中特定的视图或表 1.创建用户名 选择默认数据库 服务器角色默认…

什么是项目管理?项目经理应该如何进行管理?

项目管理&#xff1a; 一是指一种管理活动&#xff0c;一种有意识地按照项目的特点和规律&#xff0c;对项目进行组织管理的活动&#xff1b; 二是指一种管理学科&#xff0c;以项目管理活动为研究对象的一门学科&#xff0c;它是探求项目活动科学组织管理的理论与方法。 就…

90%的程序员还不知道ChatGPT能这么用

本内容来自公众号“布博士”------&#xff08;擎创科技资深产品专家&#xff09;当下&#xff0c;越来越多的企业已经开始应用机器学习和自然语言处理等技术来辅助告警故障分析。在这个领域中&#xff0c;CHATGPT这样的人工智能模型可以扮演非常重要的角色&#xff0c;通过对历…

压榨配置写出颠覆认知的Java线程池

Java线程池参数调优前言线程池常见误解■ 必须用线程池 不能直接new线程■ 7参数的生效顺序结论Java线程池参数调优■ 网上流传的线程数计算公式■ 调整生产消费平衡疑问① 能否直接设置大量线程数疑问② 这个核心数最好吗■ 经典案例-业务请求第三方接口■ 反思过去配置的线程…

CSS 基础【快速掌握知识点】

目录 一、什么是CSS 二、CSS发展史 三、CSS基本语法结构 1、语法 2、例如 四、style标签 五、HTML中引入CSS样式 1、行内样式 2、内部样式表 3、外部样式表 六、CSS基本选择器 1、标签选择器 2、类选择器 3、ID选择器 4、总结 5、基本选择器的优先级 七、CSS的…