【手写系列】手写线程池

news2025/6/4 8:33:35

PS:本文的线程池为演示 Demo,皆在理解线程池的工作原理,并没有解决线程安全问题

最简单一版的线程池

public class MyThreadPool {
    // 存放线程,复用已创建的线程
    List<Thread> threadList = new ArrayList<>();
    
    public void execute(Runnable command){
        Thread thread = new Thread(command);
        threadList.add(thread);
        thread.start();
    }
}

现在有个问题:创建出来的线程,是可复用的吗?

  • 答案是否定的。
  • 因为线程在执行完 runnable() 就结束,被销毁了。

思考两个问题:

  1. 线程什么时候创建
  2. 线程的 runnable() 方法是什么?是参数的 command 吗?
  • 线程的 runnable() 是一个死循环,不断从 commandList 取任务执行。
  • 参数的 command ,是 commandList(任务队列) 中的任务。

只有一个线程的线程池

简化一下问题:假设线程池现在只有一个线程,该如何设计呢?

  • List 中不在存放线程,存放提交的任务。
  • Thread 的 runnable() 不断去判断 taskList 中是否有任务。
public class MyThreadPool {
    // 存放线程池中的任务
    List<Runnable> commandList = new ArrayList<>();

    Thread thread = new Thread(() ->{
        while (true){
            if (!commandList.isEmpty()){
                Runnable task = commandList.remove(0);
                command.run();
            }
        }
    });

    public void execute(Runnable command){
        commandList.add(command);
    }
}

看似完美的单线程线程池,还有其他问题: while(true){} 循环中,CPU 一直在空转,浪费资源。

有没有一种容器,可以在容器中没有元素的时候阻塞,有元素的时候在获取?

  • 有的。 阻塞队列。
public class MyThreadPool {

    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(1024);

    Thread thread = new Thread(() ->{
        while (true){
            try {
                // 一直阻塞,直到取出元素
                Runnable command = blockingQueue.take();
                command.run();
            } catch (InterruptedException e) {
                // 线程在阻塞、休眠时 被打断,都会抛出 InterruptedException 异常
                throw new RuntimeException(e);
            }
        }
    },"唯一线程");

    {
        thread.start();
    }

    public void execute(Runnable command){
        boolean offered = blockingQueue.offer(command);
    }

}

线程池

单个线程的线程池,多数情况下是满足不了我们的需求的,需要多个线程共同来完成任务。

public class MyThreadPool {
    // 核心线程数
    private int corePoolSize = 10;

    // 核心线程集合
    List<Thread> coreList = new ArrayList<>();

    // 存放任务
    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(1024);

    private final Runnable task = () ->{
        while (true){
            try {
                Runnable command = blockingQueue.take();
                command.run();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    };

    public void execute(Runnable command){
        // 小于核心线程数
        if (coreList.size() < corePoolSize){
            // 创建核心线程
            Thread thread = new Thread(task);
            coreList.add(thread);
            thread.start();
        }
        
        // 任务添加失败,说明阻塞队列满了,核心线程处理不过来
        if (!blockingQueue.offer(command)) {
            Thread thread = new Thread(task);
            coreList.add(thread);
            thread.start();
            return;
        }
    }
}

问题:阻塞队列满了,核心线程处理不过来了 该如何做呢?

  • 可以添加一些辅助线程,帮助核心线程处理任务。
public class MyThreadPool {

    private int corePoolSize = 10;

    private int maxSize = 16;

    List<Thread> coreList = new ArrayList<>();

    List<Thread> supportList = new ArrayList<>();

    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(1024);

    private final Runnable task = () ->{
        while (true){
            try {
                Runnable command = blockingQueue.take();
                command.run();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    };

    public void execute(Runnable command){
        // 小于核心线程数
        if (coreList.size() < corePoolSize){
            // 创建核心线程
            Thread thread = new Thread(task);
            coreList.add(thread);
            thread.start();
        }
        
        // 任务添加成功
        if (blockingQueue.offer(command)) {
            return;
        }
        
        // 已创建的线程数 < 最大线程数
        if (coreList.size() + supportList.size() < maxSize){
            // 创建辅助线程
            Thread thread = new Thread(task);
            supportList.add(thread);
            thread.start();
        }
        
        // 任务添加失败
        if (!blockingQueue.offer(command)) {
            throw new RuntimeException("阻塞队列满了");
        }
    }
}

问题:一个线程如何在空闲的时候结束呢?

  • blockingQueue.take() ,阻塞一定时间我就认为空闲了,因为 在规定时间内取不到元素,说明阻塞队列不满。就应该结束辅助线程。
  • blockingQueue.poll(timeout,timeUnit) ,阻塞 指定的时间。
public class MyThreadPool {
    
    private int corePoolSize;
    private int maxSize;
    private int timeout;
    private TimeUnit timeUnit;
    private BlockingQueue<Runnable> blockingQueue;

    // 线程池参数交给使用者决定
    public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit,BlockingQueue<Runnable> blockingQueue) {
        this.corePoolSize = corePoolSize;
        this.maxSize = maxSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.blockingQueue = blockingQueue;
    }
    
    List<Thread> coreList = new ArrayList<>();
    List<Thread> supportList = new ArrayList<>();

    // 核心线程的任务
    private final Runnable coreTask = () ->{
        while (true){
            try {
                Runnable command = blockingQueue.take();
                command.run();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    };

    // 辅助线程的任务
    private final Runnable supportTask = () ->{
        while (true){
            try {
                Runnable command = blockingQueue.poll(timeout, timeUnit);
                if (command == null){
                    break;
                }
                command.run();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        System.out.println("辅助线程结束了!");
    };

    public void execute(Runnable command){
        if (coreList.size() < corePoolSize){
            Thread thread = new Thread(coreTask);
            coreList.add(thread);
            thread.start();
        }
        if (blockingQueue.offer(command)) {
            return;
        }
        if (coreList.size() + supportList.size() < maxSize){
            Thread thread = new Thread(supportTask);
            supportList.add(thread);
            thread.start();
        }
        if (!blockingQueue.offer(command)) {
            throw new RuntimeException("阻塞队列满了");
        }
    }
}

coreTask、supportTask 封装为 CoreThread、SupportThread

public class MyThreadPool {

    private int corePoolSize;
    private int maxSize;
    private int timeout;
    private TimeUnit timeUnit;
    private BlockingQueue<Runnable> blockingQueue;

    public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit,BlockingQueue<Runnable> blockingQueue) {
        this.corePoolSize = corePoolSize;
        this.maxSize = maxSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.blockingQueue = blockingQueue;
    }

    List<Thread> coreList = new ArrayList<>();
    List<Thread> supportList = new ArrayList<>();

    public void execute(Runnable command){
        if (coreList.size() < corePoolSize){
            // 核心线程
            Thread thread = new CoreThread();
            coreList.add(thread);
            thread.start();
        }
        if (blockingQueue.offer(command)) {
            return;
        }
        if (coreList.size() + supportList.size() < maxSize){
            // 辅助线程
            Thread thread = new SupportThread();
            supportList.add(thread);
            thread.start();
        }
        if (!blockingQueue.offer(command)) {
            throw new RuntimeException("阻塞队列满了");
        }
    }

    class CoreThread extends Thread{
        @Override
        public void run() {
            while (true){
                try {
                    Runnable command = blockingQueue.take();
                    command.run();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    class SupportThread extends Thread{
        @Override
        public void run() {
            while (true){
                try {
                    Runnable command = blockingQueue.poll(timeout, timeUnit);
                    if (command == null){
                        break;
                    }
                    command.run();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            System.out.println("辅助线程结束了!");
        }
    }
}

问题:上述 command 第二次添加到阻塞队列失败了,除了抛异常,还有什么其他处理方式?

  • 定义一个 拒绝策略,由创建线程池的开发者决定。

最终版线程池

@FunctionalInterface
public interface RejectHandle {

    /**
     * 拒绝策略
     * @param rejectCommand 被拒绝的任务
     * @param threadPool 拒绝任务的线程池
     */
    void reject(Runnable rejectCommand, MyThreadPool threadPool);

}

public class MyThreadPool {
    private final int corePoolSize;
    private final int maxSize;
    private final int timeout;
    private final TimeUnit timeUnit;
    private final BlockingQueue<Runnable> blockingQueue;
    private final RejectHandle rejectHandle;

    public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, RejectHandle rejectHandler) {
        this.corePoolSize = corePoolSize;
        this.maxSize = maxSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.blockingQueue = blockingQueue;
        this.rejectHandle = rejectHandler;
    }

    List<Thread> coreList = new ArrayList<>();
    List<Thread> supportList = new ArrayList<>();

    public void execute(Runnable command){
        if (coreList.size() < corePoolSize){
            Thread thread = new CoreThread();
            coreList.add(thread);
            thread.start();
        }
        if (blockingQueue.offer(command)) {
            return;
        }
        if (coreList.size() + supportList.size() < maxSize){
            Thread thread = new SupportThread();
            supportList.add(thread);
            thread.start();
        }
        if (!blockingQueue.offer(command)) {
            // 执行拒绝策略
            rejectHandle.reject(command,this);
        }
    }

    class CoreThread extends Thread{
        @Override
        public void run() {
            while (true){
                try {
                    Runnable command = blockingQueue.take();
                    command.run();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    class SupportThread extends Thread{
        @Override
        public void run() {
            while (true){
                try {
                    Runnable command = blockingQueue.poll(timeout, timeUnit);
                    if (command == null){
                        break;
                    }
                    command.run();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            System.out.println("辅助线程结束了!");
        }
    }
}

public class Test {
    public static void main(String[] args) {
        MyThreadPool threadPool = 
        new MyThreadPool(2,6,10, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10),
             ((rejectCommand, Pool) -> {
                // 执行拒绝策略: 抛异常;丢弃阻塞队列中的第一个任务;丢弃当前任务 。。。
            } ));

        Runnable task = () -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName());
        };
        
        for (int i = 0;i < 5;i ++){
            threadPool.execute(task);
        }
        System.out.println("主线程没有被阻塞!");
    }
}

思考

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2396504.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Live Helper Chat 安装部署

Live Helper Chat(LHC)是一款开源的实时客服聊天系统,适用于网站和应用,帮助企业与访问者即时沟通。它功能丰富、灵活、可自托管,常被用于在线客户支持、销售咨询以及技术支持场景。 🧰 系统要求 安装要求 您提供的链接指向 Live Helper Chat 的官方安装指南页面,详细…

ARXML解析与可视化工具

随着汽车电子行业的快速发展,AUTOSAR标准在车辆软件架构中发挥着越来越重要的作用。然而,传统的ARXML文件处理工具往往存在高昂的许可费用、封闭的数据格式和复杂的使用门槛等问题。本文介绍一种基于TXT格式输出的ARXML解析方案,为开发团队提供了一个高效的替代解决方案。 …

PnP(Perspective-n-Point)算法 | 用于求解已知n个3D点及其对应2D投影点的相机位姿

什么是PnP算法&#xff1f; PnP 全称是 Perspective-n-Point&#xff0c;中文叫“n点透视问题”。它的目标是&#xff1a; 已知一些空间中已知3D点的位置&#xff08;世界坐标&#xff09;和它们对应的2D图像像素坐标&#xff0c;求解摄像机的姿态&#xff08;位置和平移&…

在日常管理服务器中如何防止SQL注入与XSS攻击?

在日常管理服务器时&#xff0c;防止SQL注入&#xff08;Structured Query Language Injection&#xff09;和XSS&#xff08;Cross-Site Scripting&#xff09;攻击是至关重要的&#xff0c;这些攻击可能会导致数据泄露、系统崩溃和信息泄露。以下是一份技术文章&#xff0c;介…

Wkhtmltopdf使用

Wkhtmltopdf使用 1.windows本地使用2.golangwindows环境使用3.golangdocker容器中使用 1.windows本地使用 官网地址 https://wkhtmltopdf.org/&#xff0c;直接去里面下载自己想要的版本&#xff0c;这里以windows版本为例2.golangwindows环境使用 1.安装扩展go get -u githu…

ArcGIS Pro 创建渔网格网过大,只有几个格网的解决方案

之前用ArcGIS Pro创建渔网的时候&#xff0c;发现创建出来格网过大&#xff0c;只有几个格网。 后来查阅资料&#xff0c;发现是坐标不对&#xff0c;导致设置格网大小时单位为度&#xff0c;而不是米&#xff0c;因此需要进行坐标系转换&#xff0c;网上有很多资料讲了ArcGIS …

重学计算机网络之以太网

一&#xff1a;历史发展进程 DIX EtherNet V2 战胜IEEE802.3成为主流版本。总线型交换机拓扑机构代替集线器星型拓扑机构 1990年IEEE制定出星形以太网10BASE-T的标准**802.3i**。“10”代表10 Mbit/s 的数据率&#xff0c;BASE表示连接线上的信号是基带信号&#xff0c;T代表…

《深度解构现代云原生微服务架构的七大支柱》

☁️《深度解构现代云原生微服务架构的七大支柱》 一线架构师实战总结&#xff0c;系统性拆解现代微服务架构中最核心的 7 大支柱模块&#xff0c;涵盖通信协议、容器编排、服务网格、弹性伸缩、安全治理、可观测性、CI/CD 等。文内附架构图、实操路径与真实案例&#xff0c;适…

使用SCSS实现随机大小的方块在页面滚动

目录 一、scss中的插值语法 二、方块在界面上滚动的动画 一、scss中的插值语法 插值语法 #{}‌ 是一种动态注入变量或表达式到选择器、属性名、属性值等位置的机制 .类名:nth-child(n) 表示需同时满足为父元素的第n个元素且类名为给定条件 效果图&#xff1a; <div class…

AI 眼镜新纪元:贴片式TF卡与 SOC 芯片的黄金组合破局智能穿戴

目录 一、SD NAND&#xff1a;智能眼镜的“记忆中枢”突破空间限制的存储革命性能与可靠性的双重保障 二、SOC芯片&#xff1a;AI眼镜的“智慧大脑”从性能到能效的全面跃升多模态交互的底层支撑 三、SD NANDSOC&#xff1a;11&#xff1e;2的协同效应数据流水线的高效协同成本…

论文阅读(六)Open Set Video HOI detection from Action-centric Chain-of-Look Prompting

论文来源&#xff1a;ICCV&#xff08;2023&#xff09; 项目地址&#xff1a;https://github.com/southnx/ACoLP 1.研究背景与问题 开放集场景下的泛化性&#xff1a;传统 HOI 检测假设训练集包含所有测试类别&#xff0c;但现实中存在大量未见过的 HOI 类别&#xff08;如…

算法学习--持续更新

算法 2025年5月24日 完成&#xff1a;快速排序、快速排序基数优化、尾递归优化 快排 public class QuickSort {public void sort(int[] nums, int left, int right) {if(left>right){return;}int partiton quickSort(nums,left,right);sort(nums,left,partiton-1);sort(nu…

Postman 发送 SOAP 请求步骤 归档

0.来源 https://apifox.com/apiskills/sending-soap-requests-with-postman/?utm_sourceopr&utm_mediuma2bobzhang&utm_contentpostman 再加上自己一点实践经验 1. 创建一个新的POST请求 postman 创建一个post请求, 请求url 怎么来的可以看第三步 2. post请求设…

Python Day39 学习(复习日志Day4)

复习Day4日志内容 浙大疏锦行 补充: 关于“类”和“类的实例”的通俗易懂的例子 补充&#xff1a;如何判断是用“众数”还是“中位数”填补空缺值&#xff1f; 今日复习了日志Day4的内容&#xff0c;感觉还是得在纸上写一写印象更深刻&#xff0c;接下来几日都采取“纸质化复…

[Python] Python自动化:PyAutoGUI的基本操作

初次学习&#xff0c;如有错误还请指正 目录 PyAutoGUI介绍 PyAutoGUI安装 鼠标相关操作 鼠标移动 鼠标偏移 获取屏幕分辨率 获取鼠标位置 案例&#xff1a;实时获取鼠标位置 鼠标点击 左键单击 点击次数 多次有时间间隔的点击 右键/中键点击 移动时间 总结 鼠…

应急响应靶机-web2-知攻善防实验室

题目&#xff1a; 前景需要&#xff1a;小李在某单位驻场值守&#xff0c;深夜12点&#xff0c;甲方已经回家了&#xff0c;小李刚偷偷摸鱼后&#xff0c;发现安全设备有告警&#xff0c;于是立刻停掉了机器开始排查。 这是他的服务器系统&#xff0c;请你找出以下内容&#…

comfyui利用 SkyReels-V2直接生成长视频本地部署问题总结 1

在通过桌面版comfyUI 安装ComfyUI-WanVideoWrapper 进行SkyReels-V2 生成长视频的过程中&#xff0c;出现了&#xff0c;很多错误。 总结一下&#xff0c;让大家少走点弯路 下面是基于搜索结果的 ComfyUI 本地部署 SkyReels-V2 实现长视频生成的完整指南&#xff0c;涵盖环境配…

YOLOv8 实战指南:如何实现视频区域内的目标统计与计数

文章目录 YOLOv8改进 | 进阶实战篇&#xff1a;利用YOLOv8进行视频划定区域目标统计计数1. 引言2. YOLOv8基础回顾2.1 YOLOv8架构概述2.2 YOLOv8的安装与基本使用 3. 视频划定区域目标统计的实现3.1 核心思路3.2 完整实现代码 4. 代码深度解析4.1 关键组件分析4.2 性能优化技巧…

matlab实现VMD去噪、SVD去噪,源代码详解

为了更好的利用MATLAB自带的vmd、svd函数&#xff0c;本期作者将详细讲解一下MATLAB自带的这两个分解函数如何使用&#xff0c;以及如何画漂亮的模态分解图。 VMD函数用法详解 首先给出官方vmd函数的调用格式。 [imf,residual,info] vmd(x) 函数的输入&#xff1a; 这里的x是待…

SQLite软件架构与实现源代码浅析

概述 SQLite 是一个用 C 语言编写的库&#xff0c;它成功打造出了一款小型、快速、独立、具备高可靠性且功能完备的 SQL 数据库引擎。本文档将为您简要介绍其架构、关键组件及其协同运作模式。 SQLite 显著特点之一是无服务器架构。不同于常规数据库&#xff0c;它并非以单独进…