一次订单同步任务的多线程改造实践

news2026/4/16 6:24:16
背景最近我在维护一个订单同步任务每天需要从第三方系统同步订单数据到本地数据库。原来的代码是串行执行的按天循环一天一天地去调用 API 同步数据。java// 原来的代码串行 String date sinceDate; while (!date.equals(nowDate)) { String url buildUrl(date); batchSyncOrder(url); // 同步这一天 date nextDate; }一次同步需要跑 30 天假设每天平均耗时 2-3 秒总耗时就要 1-2 分钟。这显然有很大的优化空间。于是我决定对这两个定时任务进行多线程改造。一、为什么可以并行分析后发现每天的数据是天然隔离的每天调用独立的 API 接口每天使用独立的数据库事务SqlSession天与天之间没有任何数据依赖这就像工厂里的流水线每个工位处理不同的产品互不干扰。串行是等第 1 天做完再做第 2 天并行是 30 天同时开工。二、改造前的代码分析2.1 两个定时任务任务同步天数执行频率syncMonthOrder()最近 30 天每天凌晨 2 点syncAllOrder()最近 365 天每周日凌晨 3 点2.2 原有代码的一个隐藏 Bug改造过程中发现doGet()方法上有个Async注解javaAsync public String doGet(String url) { // 发起 HTTP 请求 return result; }由于Async生效调用方拿到的结果是null真正的 HTTP 请求在另一个线程中执行。这意味着原来的代码其实一直有问题每天的数据根本没有被正确同步我把Async去掉让doGet()变成同步方法先修复了这个 Bug。三、ThreadPoolExecutor 详解3.1 什么是 ThreadPoolExecutorThreadPoolExecutor是 Java 线程池的核心实现类位于java.util.concurrent包中。它管理着一组工作线程负责执行提交的任务。3.2 为什么要用线程池方式问题每次 new Thread()创建销毁开销大线程数量不可控容易 OOM使用线程池复用线程控制并发数管理任务队列3.3 核心构造参数javapublic ThreadPoolExecutor( int corePoolSize, // 核心线程数 int maximumPoolSize, // 最大线程数 long keepAliveTime, // 空闲线程存活时间 TimeUnit unit, // 时间单位 BlockingQueueRunnable workQueue, // 任务队列 ThreadFactory threadFactory, // 线程工厂 RejectedExecutionHandler handler // 拒绝策略 )参数一corePoolSize核心线程数核心线程会一直存活即使空闲也不会被回收除非设置allowCoreThreadTimeOut(true)。参数二maximumPoolSize最大线程数线程池允许创建的最大线程数量。当任务队列满了且当前线程数小于最大线程数时会创建新线程来处理任务。参数三keepAliveTime unit空闲存活时间非核心线程空闲超过这个时间就会被回收。参数四workQueue任务队列用于存放等待执行的任务。常见的有队列类型特点LinkedBlockingQueue链表结构的有界/无界队列ArrayBlockingQueue数组结构的有界队列SynchronousQueue不存储任务直接交给线程执行参数五threadFactory线程工厂用于创建线程可以给线程起有意义的名称javaThreadFactory threadFactory new ThreadFactory() { private final AtomicInteger threadNumber new AtomicInteger(1); Override public Thread newThread(Runnable r) { Thread t new Thread(r, order-sync-pool- threadNumber.getAndIncrement()); t.setUncaughtExceptionHandler((thread, throwable) - { log.error(线程 {} 发生未捕获异常, thread.getName(), throwable); }); return t; } };参数六handler拒绝策略当线程池和任务队列都满了时触发拒绝策略策略行为AbortPolicy默认抛出 RejectedExecutionExceptionCallerRunsPolicy让提交任务的线程自己执行DiscardPolicy静默丢弃任务DiscardOldestPolicy丢弃队列头部的任务重新提交3.4 线程池工作原理重点当一个任务被提交到线程池时执行流程如下text步骤1当前线程数 corePoolSize └─ 是 → 创建新线程执行任务流程结束 └─ 否 → 进入步骤2 步骤2任务队列未满 └─ 是 → 将任务放入队列等待 └─ 否 → 进入步骤3 步骤3当前线程数 maximumPoolSize └─ 是 → 创建新线程执行任务 └─ 否 → 触发拒绝策略图解说明text提交任务 ↓ ┌─────────────────────────────────────┐ │ 当前线程数 核心线程数 │ │ 例如当前5个线程核心线程数10 │ └─────────────────────────────────────┘ ↓ 否已经有10个核心线程在工作 ┌─────────────────────────────────────┐ │ 任务队列是否已满 │ │ 队列容量500当前有500个任务等待 │ └─────────────────────────────────────┘ ↓ 是队列满了 ┌─────────────────────────────────────┐ │ 当前线程数 最大线程数 │ │ 当前10个线程最大线程数50 │ └─────────────────────────────────────┘ ↓ 是 ┌─────────────────────────────────────┐ │ 创建新的非核心线程执行任务 │ └─────────────────────────────────────┘ ↓ 否已经有50个线程了 ┌─────────────────────────────────────┐ │ 触发拒绝策略 │ └─────────────────────────────────────┘3.5 线程回收机制核心线程默认一直存活空闲也不回收非核心线程空闲超过keepAliveTime后被回收可以通过allowCoreThreadTimeOut(true)让核心线程也能被回收3.6 如何合理设置线程数这是一个经典问题取决于任务类型任务类型特点建议线程数CPU密集型大量计算CPU 一直是 100%CPU核心数 1IO密集型网络调用、数据库操作大部分时间在等待CPU核心数 × (1 IO耗时/CPU耗时)我的任务是调用第三方 API 和操作数据库属于典型的IO 密集型所以线程数可以设置得大一些月同步30天10-50 个线程全量同步365天20-100 个线程3.7 我的线程池创建代码javaprivate ExecutorService createExecutorForMonthSync(int dateCount) { int cpuCores Runtime.getRuntime().availableProcessors(); int corePoolSize Math.min(dateCount, Math.max(10, cpuCores * 2)); int maximumPoolSize Math.min(dateCount, 50); long keepAliveTime 60L; TimeUnit unit TimeUnit.SECONDS; BlockingQueueRunnable workQueue new LinkedBlockingQueue(200); ThreadFactory threadFactory new ThreadFactory() { private final AtomicInteger threadNumber new AtomicInteger(1); Override public Thread newThread(Runnable r) { Thread t new Thread(r, month-sync-pool- threadNumber.getAndIncrement()); t.setDaemon(false); return t; } }; RejectedExecutionHandler handler new ThreadPoolExecutor.CallerRunsPolicy(); return new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler ); }四、CompletableFuture 详解4.1 什么是 CompletableFutureCompletableFuture是 Java 8 引入的异步编程工具可以看作Future的增强版。它解决了传统Future的几个痛点痛点FutureCompletableFuture手动完成结果❌✅ complete()回调机制❌✅ thenApply/thenAccept组合多个任务❌✅ allOf/anyOf异常处理抛出✅ exceptionally超时控制需自己实现✅ get(timeout)4.2 CompletableFuture 的核心数据结构javapublic class CompletableFutureT { // 存储结果或异常volatile保证可见性 volatile Object result; // 等待栈链表结构存储依赖的回调任务 volatile Completion stack; }关键设计result任务完成后的结果未完成时为nullstack一个栈结构存储所有等待该 Future 完成的回调任务当 Future 完成时会遍历这个栈依次执行每个回调。4.3 任务的创建方式java// 无返回值 CompletableFutureVoid future1 CompletableFuture.runAsync(() - { System.out.println(执行任务); }, executor); // 有返回值 CompletableFutureString future2 CompletableFuture.supplyAsync(() - { return result; }, executor);注意如果不指定executor会使用默认的ForkJoinPool.commonPool()。4.4 核心方法详解runAsyncjavapublic static CompletableFutureVoid runAsync(Runnable runnable, Executor executor)提交一个没有返回值的任务返回CompletableFutureVoid。supplyAsyncjavapublic static U CompletableFutureU supplyAsync(SupplierU supplier, Executor executor)提交一个有返回值的任务。allOfjavapublic static CompletableFutureVoid allOf(CompletableFuture?... cfs)这是我最常用的方法。它创建一个新的 CompletableFuture当传入的所有Future 都完成时这个新 Future 才会完成。javaCompletableFutureVoid f1 CompletableFuture.runAsync(() - task1()); CompletableFutureVoid f2 CompletableFuture.runAsync(() - task2()); CompletableFutureVoid f3 CompletableFuture.runAsync(() - task3()); // 等待 f1、f2、f3 全部完成 CompletableFuture.allOf(f1, f2, f3).get();底层原理allOf()内部维护了一个计数器每有一个 Future 完成计数器减 1当计数器为 0 时结果 Future 被标记为完成。get(timeout, unit)javapublic T get(long timeout, TimeUnit unit)阻塞等待结果最多等待指定时间。超时会抛出TimeoutException。exceptionallyjavapublic CompletableFutureT exceptionally(FunctionThrowable, ? extends T fn)处理异常返回一个默认值。4.5 方法链示例javaCompletableFuture.supplyAsync(() - hello) .thenApply(s - s.toUpperCase()) // HELLO .thenApply(s - s WORLD) // HELLO WORLD .thenAccept(System.out::println) // 打印 .exceptionally(e - { log.error(处理失败, e); return null; });4.6 我在改造中的使用javaListCompletableFutureVoid futures new ArrayList(); for (String date : dateList) { CompletableFutureVoid future CompletableFuture.runAsync(() - { try { syncOneDay(date); successCount.incrementAndGet(); } catch (Exception e) { failCount.incrementAndGet(); failedDates.add(date); } }, executor); futures.add(future); } // 等待所有任务完成最多30分钟 try { CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .get(30, TimeUnit.MINUTES); } catch (TimeoutException e) { log.error(同步任务整体超时); futures.forEach(f - f.cancel(true)); }五、失败兜底策略多线程环境下失败处理是非常重要的环节。我设计了以下几层兜底策略5.1 单天失败不影响其他天某一天同步失败比如网络超时、API 返回错误不会影响其他天的同步。失败的日期会被记录下来最后统一告警。javaAtomicInteger successCount new AtomicInteger(0); AtomicInteger failCount new AtomicInteger(0); ListString failedDates Collections.synchronizedList(new ArrayList()); for (String date : dateList) { CompletableFutureVoid future CompletableFuture.runAsync(() - { try { syncOneDay(date); successCount.incrementAndGet(); } catch (Exception e) { failCount.incrementAndGet(); failedDates.add(date); log.error(同步失败: {}, date, e); } }, executor); futures.add(future); } // 最后输出失败的日期 if (!failedDates.isEmpty()) { log.error(失败日期列表: {}, failedDates); }5.2 整体超时控制设置最大等待时间月同步 30 分钟全量同步 60 分钟超时后主动取消未完成的任务javatry { CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .get(30, TimeUnit.MINUTES); } catch (TimeoutException e) { log.error(同步任务整体超时部分任务可能未完成); futures.forEach(f - f.cancel(true)); // 取消未完成的任务 }5.3 线程池优雅关闭任务完成后需要优雅地关闭线程池确保已提交的任务能执行完成javaprivate void shutdownExecutor(ExecutorService executor) { executor.shutdown(); // 禁止提交新任务 try { // 等待30秒让现有任务完成 if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { log.warn(线程池未能在30秒内完成尝试强制关闭); executor.shutdownNow(); // 强制中断 if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { log.error(线程池强制关闭后仍未完全终止); } } } catch (InterruptedException e) { executor.shutdownNow(); Thread.currentThread().interrupt(); } }5.4 线程安全的统计多线程环境下普通的int和ArrayList不是线程安全的。需要使用原子类或同步容器java// 计数器 AtomicInteger successCount new AtomicInteger(0); successCount.incrementAndGet(); // 原子操作 // 列表 ListString failedDates Collections.synchronizedList(new ArrayList());5.5 线程未捕获异常处理通过ThreadFactory设置全局异常处理器javaThreadFactory threadFactory new ThreadFactory() { Override public Thread newThread(Runnable r) { Thread t new Thread(r, sync-pool- counter.getAndIncrement()); t.setUncaughtExceptionHandler((thread, throwable) - { log.error(线程 {} 发生未捕获异常, thread.getName(), throwable); }); return t; } };六、踩坑记录6.1 数据库连接池不够用多线程同时操作数据库需要确保连接池大小足够。现象部分任务卡住日志显示获取连接超时。解决调整 HikariCP 连接池配置最大连接数从 20 调到 50。yamlspring: datasource: hikari: maximum-pool-size: 506.2 API 限流多个线程同时请求第三方 API触发了限流。解决用Semaphore控制并发数javaprivate final Semaphore semaphore new Semaphore(10); private void syncOneDay(String date) { semaphore.acquire(); try { // 调用 API } finally { semaphore.release(); } }七、改造效果指标改造前改造后30 天同步耗时约 1-2 分钟26 秒代码改动量-约 150 行失败影响范围某天失败会导致后续全部停止只影响当天其他继续日志输出示例text 开始同步订单最近30天 需要同步的总天数: 30 创建月同步线程池: corePoolSize20, maximumPoolSize50 ... 各线程并行执行 ... 月同步完成统计 总天数: 30 成功天数: 30 失败天数: 0 总耗时: 26秒 Update task overdue success.八、总结这次改造让我对多线程有了更深的理解知识点实践应用核心线程数 vs 最大线程数核心常驻最大应对突发有界队列防止任务积压导致 OOM拒绝策略CallerRunsPolicy保证任务不丢失CompletableFuture.runAsync()异步执行无返回值的任务CompletableFuture.allOf()等待多个异步任务全部完成get(timeout)整体超时控制AtomicInteger线程安全的计数synchronizedList线程安全的列表shutdown()awaitTermination()优雅关闭线程池核心原则任务独立性任务之间没有依赖关系才适合并行失败隔离一个任务失败不能影响其他任务超时控制避免无限等待设置合理超时资源管理连接池、线程池都要合理配置线程安全共享变量必须用线程安全的类多线程改造并不复杂关键是理解业务场景选择合适的方案。希望这篇文章对你有帮助

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2522418.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…

网络编程(Modbus进阶)

思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…