线程池(二)

news2024/5/18 4:25:38

线程池MyThreadPoolExecutor的执行流程和工作线程的执行逻辑如下:

线程池执行流程:

  1. 初始化:在创建MyThreadPoolExecutor实例时,会设置核心线程数、最大线程数、空闲存活时间以及工作队列。

  2. 提交任务:通过execute(Runnable command)方法提交一个任务到线程池。

  3. 任务判断:线程池会检查当前状态和任务的有效性。

  4. 核心线程判断:如果工作线程数量小于核心线程数,尝试创建一个新线程来执行任务。

  5. 工作队列判断:如果工作线程数量已达到核心线程数,尝试将任务放入工作队列。

  6. 最大线程判断:如果工作队列已满或无法添加任务,且工作线程数量小于最大线程数,创建一个新线程来执行任务。

  7. 拒绝策略:如果任务既不能放入工作队列,又不能创建新线程,则线程池会采取拒绝策略,通常会抛出一个异常。

工作线程执行逻辑:

  1. 工作线程创建:通过addWorker(Runnable command, boolean core)方法创建一个新的工作线程。

  2. 启动线程:新创建的工作线程启动,开始执行Worker类的run方法。

  3. 获取任务:工作线程通过runWorkers(Worker worker)方法尝试获取任务。如果是核心线程,它会无限期地等待任务;如果是非核心线程,则会有一个超时时间。

  4. 执行任务:一旦获取任务,工作线程执行任务的run方法。

  5. 任务完成:任务执行完成后,工作线程记录已完成的任务数。

  6. 继续工作:工作线程尝试获取下一个任务,继续执行,直到没有任务可执行。

  7. 线程移除:当工作线程没有任务可执行时,它会尝试从工作线程集合中移除自己。

  8. 线程终止:如果线程池处于停止状态,工作线程将终止执行。

package com.threadpool;

import com.sun.media.sound.SF2InstrumentRegion;
import javafx.concurrent.Worker;

import java.util.HashSet;
import java.util.Scanner;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyThreadPoolExecutor {
    private int corePoolCount;
    private int maxPoolCount;
    private int keepLiveTime;

    BlockingDeque<Runnable> workQueue;

    public MyThreadPoolExecutor(int corePoolCount, int maxPoolCount, int keepLiveTime, BlockingDeque<Runnable> workQueue) {
        this.corePoolCount = corePoolCount;
        this.maxPoolCount = maxPoolCount;
        this.keepLiveTime = keepLiveTime;
        this.workQueue = workQueue;
    }

    //当前线程的状态
    private AtomicInteger status = new AtomicInteger();
    private AtomicInteger workCount = new AtomicInteger();//工作线程的数量 都是线程安全的
    private HashSet<Worker> workers = new HashSet<>();//工作线程
    private final Lock lock = new ReentrantLock();//hashset不安全辅助锁
    private final static Integer RUNNING = 0;
    private final static Integer STOP = 1;
    //当前完成的总任务数
    private int completedTaskCount = 0;


    public void execute(Runnable command) {
        if (command == null) throw new NullPointerException();
        if (status.get() == STOP) throw new RuntimeException();
        if (status.get() == RUNNING) {
            if (workCount.get() < corePoolCount) {
                if (addWorker(command, true)) {//增加核心线程数
                    return;
                }
            }
            if (workQueue.offer(command)) {//只是放到任务队列
                return;
            }
            if (workCount.get() < maxPoolCount) {
                if (addWorker(command, false)) {//创建新的线程
                    return;
                }
            }
            throw new RuntimeException("拒绝策略");
        }
    }

    private boolean addWorker(Runnable command, boolean core) {//创建新的线程加入工作者队列
        if (status.get() == STOP) return false;
        retry:
        while (true) {
            if (status.get() == STOP) return false;
            while (true) {
                if (workCount.get() >= (core ? corePoolCount : maxPoolCount)) {
                    return false;//可能其他线程也创建了
                }
                //创建工作线程
                if (!casAddWorkerCount()) {//自增失败
                    continue retry;
                }
                break retry;
            }
        }
        Worker worker = null;
        try {
            lock.lock();
            worker = new Worker(command);
            final Thread thread = worker.thread;
            if (thread != null) {
                if (thread.isAlive()) throw new IllegalArgumentException();
            }
            thread.start();
            workers.add(worker);//向hashset中添加
        } finally {
            lock.unlock();
        }
        return true;
    }

    public void shutdown(){
        lock.lock();
        try {
            setState(STOP);
            interruptIdleWorkers();
        }finally {
            lock.unlock();
        }
    }
    private void setState(Integer stop){
        if (status.get()==stop) return;
        while (true){
            if (status.get() == stop){
                break;
            }
            if (status.compareAndSet(status.get(),stop)){
                break;
            }
        }
    }
    private void interruptIdleWorkers(){
        lock.lock();
        try {
            for (Worker worker : workers) {
                if (!worker.thread.isInterrupted()){
                    worker.thread.interrupt();
                }
                this.completedTaskCount += worker.completedTasks;
            }
        }finally {
            lock.unlock();
        }
    }

    private void runWorkers(Worker worker) {
        if (worker == null) throw new NullPointerException();
        try {
            Runnable task = worker.firstTask;
            Thread wt = worker.thread;
            worker.firstTask = null;
            while (task != null || (task = getTask()) != null) {//不断的取任务
                if (wt.isInterrupted()) {
                    System.out.println("this thread is interrupted");
                }
                if (status.get() == STOP) {
                    System.out.println("this threadPool has already stopped");
                }
                task.run();//调用方法
                task = null;
                worker.completedTasks++;
            }
        }catch (Exception e){
            e.printStackTrace();
        } finally {
            //没任务了
            while (true) {
                if (casDelWorkerCount()) {
                    completedTaskCount += worker.completedTasks;
                    break;
                } else {
                    continue;
                }
            }
            lock.lock();
            try {
                workQueue.remove(worker);
            } finally {
                lock.unlock();
            }
        }
    }
    private Runnable getTask() {
        boolean timeout = false;
        Runnable task = null;
        try {
            while (true) {
                if (timeout){
                    return null;
                }
                if (status.get() == STOP) throw new NullPointerException();
                //常驻工作队列
                if (workCount.get() <= corePoolCount){
                    task = workQueue.take();//如果workQueue是空 那么会阻塞在这里 直到不为空 “保活”线程
                }else{
                    task = workQueue.poll(keepLiveTime, TimeUnit.SECONDS);//如果在一定时间内拿不到直接返回null
                }
                if (task != null){
                    return task;
                }
                timeout = true;
            }
        }catch (InterruptedException exception){
            exception.printStackTrace();
            return null;
        }
    }
    private boolean casAddWorkerCount() {
        workCount.compareAndSet(workCount.get(), workCount.get() + 1);
        return true;
    }

    private boolean casDelWorkerCount() {
        workCount.compareAndSet(workCount.get(), workCount.get() - 1);
        return true;
    }

    private final class Worker implements Runnable {//工作者线程
        final Thread thread;
        Runnable firstTask;
        volatile int completedTasks;

        private Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            this.thread = new Thread(this);
            this.completedTasks = 0;
        }

        @Override
        public void run() {
            runWorkers(this);//工作
        }
    }
}

关键逻辑点:

  • 线程安全:使用AtomicIntegerReentrantLock来保证线程池中对工作线程数量和状态的修改是线程安全的。
  • 任务队列:使用BlockingDeque作为任务队列,可以阻塞或超时地获取任务。
  • 工作线程的管理:通过HashSetLock来管理工作线程,保证在添加和移除工作线程时的线程安全。
  • 状态管理:使用AtomicInteger status来跟踪线程池的状态,决定是否接受新任务。

注意:

  • 代码中casAddWorkerCount()casDelWorkerCount()方法的实现似乎有误,因为它们没有使用compareAndSet()方法,这可能导致工作线程数量的不一致。
  • shutdown()方法中,应该在中断空闲线程后,也要尝试中断正在执行任务的工作线程,以确保线程池能够顺利关闭。
  • 代码中的setState(Integer stop)方法中,状态设置的逻辑可能存在竞态条件,因为它没有考虑状态在设置过程中可能被其他线程改变的情况。

在实际使用中,需要对这些潜在的问题进行修正和优化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1642075.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Web端重叠路径可视化

近几年来&#xff0c;由于信息技术的发展&#xff0c;大数据成为了这个时代的代名词之一&#xff0c;“数据可视化”风靡一时。得益于HTML5提供的新标签“canvas”&#xff0c;Web端也能分“数据可视化”一杯羹。 随着越来越多的可视化方案和需求&#xff0c;需要解决问题也越来…

Java中接口的默认方法

为什么要使用默认方法 当我们把一个程序的接口写完后 用其他的类去实现&#xff0c;此时如果程序需要再添加一个抽象方法的时候我们只有两种选择 将抽象方法写在原本的接口中 但是这样写会导致其他所有改接口的实现类都需要实现这个抽象方法比较麻烦 写另一个接口 让需要的实…

Sqli-labs第一关到第四关

目录 一&#xff0c;了解PHP源代码 二&#xff0c;破解第一关 2.1在了解完源码之后&#xff0c;我们重点看一下 2.2破解这道题表中有几列 2.3查看表中哪一列有回显 2.4查询库&#xff0c;表&#xff0c;列信息 三&#xff0c;总结 前提&#xff1a; 之所以把1234关…

2024年5月5日 十二生肖 今日运势

小运播报&#xff1a;2024年5月5日&#xff0c;星期日&#xff0c;农历三月廿七 &#xff08;甲辰年己巳月己巳日&#xff09;&#xff0c;法定节假日。 红榜生肖&#xff1a;牛、猴、鸡 需要注意&#xff1a;鼠、虎、猪 喜神方位&#xff1a;东北方 财神方位&#xff1a;正…

eSIM IoT vs M2M vs Consumer

有任何关于GSMA\IOT\eSIM\RSP\业务应用场景相关的问题&#xff0c;欢迎W: xiangcunge59 一起讨论, 共同进步 (加的时候请注明: 来自CSDN-iot). 截至2023年5月&#xff0c;全球移动通信系统协会&#xff08;GSMA&#xff09;发布了三个关键的eSIM技术规范&#xff0c;这些规…

2000-2020年县域创业活跃度数据

2000-2020年县域创业活跃度数据 1、时间&#xff1a;2000-2020年 2、指标&#xff1a;地区名称、年份、行政区划代码、经度、纬度、所属城市、所属省份、年末总人口万人、户籍人口数万人、当年企业注册数目、县域创业活跃度1、县域创业活跃度2、县域创业活跃3 3、来源&#…

【前端项目——分页器】手写分页器实现(JS / React)

组件介绍 用了两种方式实现&#xff0c;注释详细~ 可能代码写的不够简洁&#xff0c;见谅&#x1f641; 1. 包含内容显示的分页器 网上看了很多实现&#xff0c;很多只有分页器部分&#xff0c;没和内容显示联动。 因此我增加了模拟content的显示&#xff0c;这里模拟了32条数…

JavaEE初阶Day 15:文件IO(1)

目录 Day 15&#xff1a;文件IO&#xff08;1&#xff09;IO文件1. 路径2. 文件的分类3. 使用Java针对文件系统进行操作3.1 属性3.2 构造方法3.3 方法 Day 15&#xff1a;文件IO&#xff08;1&#xff09; IO I&#xff1a;Input输入 O&#xff1a;Output输出 输入输出规则…

使用机器学习确定文本的编程语言

导入必要的库 norman Python 语句&#xff1a;import <span style"color:#000000"><span style"background-color:#fbedbb"><span style"color:#0000ff">import</span> pandas <span style"color:#0000ff&quo…

onedrive下載zip檔案有20G限制,如何解決

一般來說&#xff0c;OneDrive網頁版對文件下載大小的限制如下圖所示&#xff0c;更多資訊&#xff0c;請您參考這篇文章&#xff1a;OneDrive 和 SharePoint 中的限制 - Microsoft Support 因此我們推薦您使用OneDrive同步用戶端來同步到本地電腦&#xff0c;您也可以選擇只同…

【实验】使用docker-compose编排lnmp(dockerfile) 完成Wordpress 部署

环境准备 docker&#xff1a;192.168.67.30 虚拟机&#xff1a;4核4G 关闭防火墙 systemctl stop firewalld systemctl disable firewalld setenforce 0 安装docker 直接点击【复制】粘贴到xshell中即可&#xff0c; 执行过程中若出现睡眠(sleep)通过 kill -9 pid号 &#x…

库存管理系统开源啦

软件介绍 ModernWMS是一个针对小型物流仓储供应链流程的开源库存管理系统。该系统的开发初衷是为了满足中小型企业在有限IT预算下对仓储管理的需求。通过总结多年ERP系统研发经验&#xff0c;项目团队开发了这套适用于中小型企业的系统&#xff0c;以帮助那些有特定需求的用户。…

vector的使用

1.构造函数 void test_vector1() {vector<int> v; //无参的构造函数vector<int> v2(10, 0);//n个value构造&#xff0c;初始化为10个0vector<int> v3(v2.begin(), v2.end());//迭代器区间初始化,可以用其他容器的区间初始化vector<int> v4(v3); //拷贝…

【 书生·浦语大模型实战营】作业(六):Lagent AgentLego 智能体应用搭建

【 书生浦语大模型实战营】作业&#xff08;六&#xff09;&#xff1a;Lagent & AgentLego 智能体应用搭建 &#x1f389;AI学习星球推荐&#xff1a; GoAI的学习社区 知识星球是一个致力于提供《机器学习 | 深度学习 | CV | NLP | 大模型 | 多模态 | AIGC 》各个最新AI方…

jupyter notebook 设置密码报错ModuleNotFoundError: No module named ‘notebook.auth‘

jupyter notebook 设置密码报错ModuleNotFoundError: No module named ‘notebook.auth‘ 原因是notebook新版本没有notebook.auth 直接输入以下命令即可设置密码 jupyter notebook password

链表的带环问题 链表的深度拷贝

1.1. 链表是否带环 代码很简单&#xff0c;最主要就是如何证明 首先判断链表是否带环&#xff0c;可以定义两个指针&#xff0c;一个快指针一个慢指针。快指针走两步&#xff0c;慢指针走一步一定会相遇吗&#xff1f;有没有可能会超过&#xff1f;假设进环的时候fast和slow的…

87、动态规划-最长地址子序列

思路&#xff1a; 使用递归来理解题目&#xff0c;然后在看如何优化&#xff0c;假设我当前使用元素那么最长是多少&#xff0c;如果不使用当前元素最长是多少&#xff0c;然后取最大值。 代码如下&#xff1a; //算出最长递增子序列的长度public static int lengthOfLIS02(…

【机器学习】集成方法---Boosting之AdaBoost

一、Boosting的介绍 1.1 集成学习的概念 1.1.1集成学习的定义 集成学习是一种通过组合多个学习器来完成学习任务的机器学习方法。它通过将多个单一模型&#xff08;也称为“基学习器”或“弱学习器”&#xff09;的输出结果进行集成&#xff0c;以获得比单一模型更好的泛化性…

批量美化图片,轻松实现多张图片描边,让图片瞬间焕发新生!

图片已成为我们日常生活中不可或缺的一部分。无论是社交媒体上的个人分享&#xff0c;还是商业宣传中的产品展示&#xff0c;高质量、精美的图片都扮演着至关重要的角色。然而&#xff0c;对于许多人来说&#xff0c;图片处理仍然是一个令人头疼的问题。现在&#xff0c;我们为…

激动,五四青年节,拿下YashanDB认证YCP

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 作者&#xff1a;IT邦德 中国DBA联盟(ACDU)成员&#xff0c;10余年DBA工作经验&#xff0c; Oracle、PostgreSQL ACE CSDN博客专家及B站知名UP主&#xff0c;全网粉丝10万 擅长主流Oracle、My…