线程池:Java 并发编程的核心武器
线程池Java 并发编程的核心武器线程池是管理和复用线程的高级工具它能显著提高程序性能避免频繁创建和销毁线程的开销。为什么需要线程池没有线程池的问题// 传统方式来一个任务创建一个线程 public class WithoutThreadPool { public static void main(String[] args) { for (int i 0; i 1000; i) { final int taskId i; new Thread(() - { System.out.println(执行任务 taskId); // 模拟任务执行 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } // 问题 // 1. 创建1000个线程开销巨大 // 2. 线程创建/销毁耗时 // 3. 可能耗尽系统资源 } }使用线程池的好处// 使用线程池 public class WithThreadPool { public static void main(String[] args) { // 创建线程池 ExecutorService executor Executors.newFixedThreadPool(10); for (int i 0; i 1000; i) { final int taskId i; executor.execute(() - { System.out.println(线程 Thread.currentThread().getName() 执行任务 taskId); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } executor.shutdown(); // 优势 // 1. 复用10个线程执行1000个任务 // 2. 控制并发数 // 3. 管理任务队列 } }线程池核心参数ThreadPoolExecutor 的7个核心参数public ThreadPoolExecutor( int corePoolSize, // 核心线程数 int maximumPoolSize, // 最大线程数 long keepAliveTime, // 空闲线程存活时间 TimeUnit unit, // 时间单位 BlockingQueueRunnable workQueue, // 工作队列 ThreadFactory threadFactory, // 线程工厂 RejectedExecutionHandler handler // 拒绝策略 ) { ... }参数详解// 1. 核心线程数 (corePoolSize) // 线程池中始终存活的线程数量即使空闲 // 默认不会回收除非设置allowCoreThreadTimeOuttrue // 2. 最大线程数 (maximumPoolSize) // 线程池允许的最大线程数量 // 当工作队列满时会创建新线程直到达到此值 // 3. 空闲线程存活时间 (keepAliveTime) // 非核心线程空闲多长时间后被回收 // 4. 工作队列 (workQueue) // 存放等待执行的任务 // 常用实现 // ArrayBlockingQueue 有界队列 // LinkedBlockingQueue 可选有界/无界队列 // SynchronousQueue 不存储元素的队列 // PriorityBlockingQueue 优先级队列 // 5. 线程工厂 (threadFactory) // 用于创建线程可设置线程名、优先级等 // 6. 拒绝策略 (handler) // 当线程池和工作队列都满时如何处理新任务 // AbortPolicy 抛出RejectedExecutionException默认 // CallerRunsPolicy 由调用者线程执行任务 // DiscardPolicy 直接丢弃任务 // DiscardOldestPolicy 丢弃队列最前面的任务然后重试线程池工作流程// 完整流程图 1. 提交任务 ↓ 2. 核心线程是否有空闲 ├── 是 → 分配任务给核心线程 └── 否 → 工作队列是否已满 ├── 未满 → 任务加入队列等待 └── 已满 → 线程数是否达到最大 ├── 未达到 → 创建新线程执行任务 └── 已达到 → 执行拒绝策略可视化示例初始状态线程池有3个核心线程 任务流程 任务1 → 核心线程1执行 任务2 → 核心线程2执行 任务3 → 核心线程3执行 任务4 → 加入工作队列 任务5 → 加入工作队列 任务6 → 队列满创建新线程4执行 任务7 → 队列满创建新线程5执行 任务8 → 线程数达到最大5执行拒绝策略创建线程池的几种方式1. 使用 Executors 工厂方法不推荐生产环境public class ExecutorsDemo { public static void main(String[] args) { // 1. 固定大小线程池 ExecutorService fixedPool Executors.newFixedThreadPool(10); // 核心最大10队列LinkedBlockingQueue无界 // 2. 单线程线程池 ExecutorService singleThread Executors.newSingleThreadExecutor(); // 核心最大1队列LinkedBlockingQueue无界 // 3. 可缓存线程池 ExecutorService cachedPool Executors.newCachedThreadPool(); // 核心0最大Integer.MAX_VALUE队列SynchronousQueue // 适合大量短生命周期的异步任务 // 4. 定时线程池 ScheduledExecutorService scheduledPool Executors.newScheduledThreadPool(5); // 支持定时、周期性任务 // 5. 工作窃取线程池Java 8 ExecutorService workStealingPool Executors.newWorkStealingPool(); // 基于ForkJoinPool适合计算密集型任务 } }2. 手动创建 ThreadPoolExecutor推荐public class CustomThreadPool { public static void main(String[] args) { // 自定义线程池 ThreadPoolExecutor executor new ThreadPoolExecutor( 5, // 核心线程数 10, // 最大线程数 60, // 空闲线程存活时间 TimeUnit.SECONDS, // 时间单位 new ArrayBlockingQueue(100), // 有界队列容量100 new ThreadFactory() { // 自定义线程工厂 private AtomicInteger counter new AtomicInteger(1); Override public Thread newThread(Runnable r) { Thread thread new Thread(r); thread.setName(MyThread- counter.getAndIncrement()); thread.setDaemon(false); thread.setPriority(Thread.NORM_PRIORITY); return thread; } }, new RejectedExecutionHandler() { // 自定义拒绝策略 Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 记录日志 System.err.println(任务被拒绝: r); // 可执行其他处理如持久化到数据库 } } ); // 允许核心线程超时回收 executor.allowCoreThreadTimeOut(true); // 预启动所有核心线程 executor.prestartAllCoreThreads(); } }实际应用场景场景1Web服务器请求处理// Tomcat的线程池配置 public class TomcatThreadPool { // server.xml 中的配置 // Executor nametomcatThreadPool // namePrefixcatalina-exec- // maxThreads200 // minSpareThreads10/ // 实际使用类似这样 public ThreadPoolExecutor createTomcatExecutor() { return new ThreadPoolExecutor( 10, // minSpareThreads 200, // maxThreads 60, // keepAliveTime TimeUnit.SECONDS, new LinkedBlockingQueue(Integer.MAX_VALUE), // 无界队列 new ThreadFactory() { private AtomicInteger count new AtomicInteger(1); Override public Thread newThread(Runnable r) { return new Thread(r, catalina-exec- count.getAndIncrement()); } }, new ThreadPoolExecutor.AbortPolicy() ); } }场景2批量数据处理public class BatchDataProcessor { private ThreadPoolExecutor executor; public BatchDataProcessor() { this.executor new ThreadPoolExecutor( 4, // 4核CPU核心线程CPU核心数 8, // 最大线程2*CPU核心数 60, TimeUnit.SECONDS, new LinkedBlockingQueue(1000), new CustomThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() // 避免任务丢失 ); } public void processBatch(ListString dataList) { CountDownLatch latch new CountDownLatch(dataList.size()); for (String data : dataList) { executor.execute(() - { try { processSingle(data); } finally { latch.countDown(); } }); } try { latch.await(); // 等待所有任务完成 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void processSingle(String data) { // 处理单个数据 System.out.println(Thread.currentThread().getName() 处理: data); } }场景3异步任务处理public class AsyncTaskService { // 独立线程池处理不同类型任务 private ThreadPoolExecutor emailExecutor; // 邮件发送 private ThreadPoolExecutor smsExecutor; // 短信发送 private ThreadPoolExecutor fileExecutor; // 文件处理 public AsyncTaskService() { // 邮件线程池 emailExecutor new ThreadPoolExecutor( 2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue(100), new NamedThreadFactory(email-pool) ); // 短信线程池 smsExecutor new ThreadPoolExecutor( 2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue(100), new NamedThreadFactory(sms-pool) ); // 文件处理线程池 fileExecutor new ThreadPoolExecutor( 4, 8, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(200), new NamedThreadFactory(file-pool) ); } public CompletableFutureVoid sendEmailAsync(String to, String content) { return CompletableFuture.runAsync(() - { // 发送邮件逻辑 System.out.println(发送邮件到: to); }, emailExecutor); } public CompletableFutureVoid sendSmsAsync(String phone, String content) { return CompletableFuture.runAsync(() - { // 发送短信逻辑 System.out.println(发送短信到: phone); }, smsExecutor); } }场景4定时/周期性任务public class ScheduledTaskDemo { public static void main(String[] args) { ScheduledExecutorService scheduler Executors.newScheduledThreadPool(3); // 1. 延迟执行 scheduler.schedule(() - { System.out.println(延迟5秒执行); }, 5, TimeUnit.SECONDS); // 2. 固定频率执行不等待上次任务完成 scheduler.scheduleAtFixedRate(() - { System.out.println(每3秒执行一次); }, 0, 3, TimeUnit.SECONDS); // 3. 固定延迟执行等待上次任务完成 scheduler.scheduleWithFixedDelay(() - { try { Thread.sleep(2000); // 模拟任务执行 System.out.println(任务完成2秒后执行下一个); } catch (InterruptedException e) { e.printStackTrace(); } }, 0, 5, TimeUnit.SECONDS); // 优雅关闭 Runtime.getRuntime().addShutdownHook(new Thread(() - { scheduler.shutdown(); try { if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) { scheduler.shutdownNow(); } } catch (InterruptedException e) { scheduler.shutdownNow(); } })); } }线程池监控与管理监控线程池状态public class ThreadPoolMonitor { private ThreadPoolExecutor executor; public ThreadPoolMonitor(ThreadPoolExecutor executor) { this.executor executor; // 启动监控线程 new Thread(this::monitor, ThreadPool-Monitor).start(); } private void monitor() { while (!executor.isShutdown()) { try { Thread.sleep(5000); // 每5秒监控一次 printStats(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } } private void printStats() { System.out.println( 线程池状态监控 ); System.out.println(核心线程数: executor.getCorePoolSize()); System.out.println(活动线程数: executor.getActiveCount()); System.out.println(最大线程数: executor.getMaximumPoolSize()); System.out.println(线程池大小: executor.getPoolSize()); System.out.println(已完成任务数: executor.getCompletedTaskCount()); System.out.println(总任务数: executor.getTaskCount()); System.out.println(队列大小: executor.getQueue().size()); System.out.println(队列剩余容量: executor.getQueue().remainingCapacity()); System.out.println(\n); } // 动态调整线程池参数 public void adjustThreadPool(int newCoreSize, int newMaxSize) { executor.setCorePoolSize(newCoreSize); executor.setMaximumPoolSize(newMaxSize); System.out.println(线程池参数已调整: core newCoreSize , max newMaxSize); } }优雅关闭线程池public class GracefulShutdown { public void shutdownThreadPool(ThreadPoolExecutor executor) { // 1. 停止接受新任务 executor.shutdown(); try { // 2. 等待现有任务完成 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // 3. 强制关闭 executor.shutdownNow(); // 4. 再次等待 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { System.err.println(线程池未能正常关闭); } } } catch (InterruptedException e) { // 重新尝试强制关闭 executor.shutdownNow(); Thread.currentThread().interrupt(); } } public void shutdownWithHook(ThreadPoolExecutor executor) { // 注册JVM关闭钩子 Runtime.getRuntime().addShutdownHook(new Thread(() - { System.out.println(JVM关闭正在关闭线程池...); shutdownThreadPool(executor); })); } }常见问题与解决方案问题1任务堆积导致内存溢出// ❌ 错误使用无界队列 ExecutorService executor Executors.newFixedThreadPool(10); // 默认使用LinkedBlockingQueue无界任务过多会OOM // ✅ 正确使用有界队列拒绝策略 ThreadPoolExecutor executor new ThreadPoolExecutor( 10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(1000), // 有界队列 new ThreadPoolExecutor.CallerRunsPolicy() // 队列满时由调用线程执行 );问题2线程池参数设置不合理// CPU密集型任务 // 核心线程数 CPU核心数 1 int coreSize Runtime.getRuntime().availableProcessors() 1; // IO密集型任务 // 核心线程数 CPU核心数 * 2 int coreSize Runtime.getRuntime().availableProcessors() * 2; // 混合型任务 // 核心线程数 CPU核心数 * (1 平均等待时间/平均计算时间)问题3线程泄漏public class ThreadLeakDemo { // ❌ 错误任务抛出异常导致线程终止 ExecutorService executor Executors.newFixedThreadPool(1); public void wrongUsage() { executor.execute(() - { // 任务抛出异常线程会终止 throw new RuntimeException(任务异常); // 线程池会创建新线程但可能来不及 }); } // ✅ 正确捕获所有异常 public void correctUsage() { executor.execute(() - { try { // 业务逻辑 throw new RuntimeException(任务异常); } catch (Exception e) { // 记录日志但不要抛出 log.error(任务执行失败, e); } }); } }最佳实践1. 合理设置参数public class ThreadPoolBestPractice { public ThreadPoolExecutor createIdealPool() { int coreSize getIdealCoreSize(); int maxSize coreSize * 2; return new ThreadPoolExecutor( coreSize, maxSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(1000), // 合理队列大小 new NamedThreadFactory(business-pool), new ThreadPoolExecutor.CallerRunsPolicy() // 重要的拒绝策略 ); } private int getIdealCoreSize() { int cpuCores Runtime.getRuntime().availableProcessors(); // 根据任务类型调整 return cpuCores 1; } }2. 使用 CompletableFuturepublic class CompletableFutureDemo { public void processWithCompletableFuture() { ThreadPoolExecutor executor new ThreadPoolExecutor( 4, 8, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(100) ); // 并行执行多个任务 CompletableFutureString future1 CompletableFuture .supplyAsync(() - fetchDataFromDB(), executor); CompletableFutureString future2 CompletableFuture .supplyAsync(() - fetchDataFromAPI(), executor); // 组合结果 CompletableFutureVoid allFutures CompletableFuture .allOf(future1, future2); // 处理结果 allFutures.thenRun(() - { try { String result1 future1.get(); String result2 future2.get(); processResults(result1, result2); } catch (Exception e) { e.printStackTrace(); } }); } }3. Spring 中的线程池Configuration EnableAsync public class AsyncConfig { Bean(taskExecutor) public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(200); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix(async-task-); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } } Service public class UserService { Async(taskExecutor) public CompletableFutureUser findUserAsync(Long userId) { // 异步查询用户 User user userRepository.findById(userId); return CompletableFuture.completedFuture(user); } }性能调优监控指标public class ThreadPoolMetrics { // 关键监控指标 // 1. 线程池使用率 activeCount / maximumPoolSize // 2. 队列使用率 queueSize / queueCapacity // 3. 任务拒绝率 rejectedCount / totalTaskCount // 4. 平均任务执行时间 // 5. 线程创建/销毁频率 }动态调整public class DynamicThreadPool { private ThreadPoolExecutor executor; private ScheduledExecutorService monitor; public DynamicThreadPool() { executor new ThreadPoolExecutor( 5, 20, 60, TimeUnit.SECONDS, new ResizableCapacityLinkedBlockingQueue(100) ); // 定时监控并调整 monitor Executors.newSingleThreadScheduledExecutor(); monitor.scheduleAtFixedRate(this::adjustPool, 1, 1, TimeUnit.MINUTES); } private void adjustPool() { int queueSize executor.getQueue().size(); int activeCount executor.getActiveCount(); if (queueSize 50 activeCount executor.getMaximumPoolSize()) { // 队列堆积且线程已满增加队列容量 ((ResizableCapacityLinkedBlockingQueue)executor.getQueue()) .setCapacity(queueSize * 2); } } }总结线程池核心价值降低资源消耗复用线程避免频繁创建销毁提高响应速度任务到达时线程已存在提高线程可管理性统一管理、监控、调优提供功能扩展定时执行、周期执行等使用原则不要用Executors快捷方法有OOM风险根据任务类型设置合理参数使用有界队列合适的拒绝策略监控线程池状态动态调整优雅关闭避免任务丢失记住线程池是并发编程的基石合理使用能极大提升系统性能。理解其原理才能用好这个强大的工具。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2454284.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!