SpringBatch学习

news2026/5/15 19:27:32
/** * 示例一Tasklet 方式 */ Configuration EnableBatchProcessing public class TaskletBatchConfig { private static final Logger logger LoggerFactory.getLogger(TaskletBatchConfig.class); Autowired private JobBuilderFactory jobBuilderFactory; Autowired private StepBuilderFactory stepBuilderFactory; Bean public Job taskletJob() { return jobBuilderFactory.get(taskletJob) .start(taskletStep()) .build(); } Bean public Step taskletStep() { return stepBuilderFactory.get(taskletStep) .tasklet(helloWorldTasklet()) .build(); } Bean public Tasklet helloWorldTasklet() { return (contribution, chunkContext) - { int total 10; logger.info( Tasklet 开始执行 ); for (int i 1; i total; i) { logger.info(Hello World! index {}, i); // 可以在这里写任何业务逻辑 if (i % 3 0) { logger.info(第 {} 条数据处理完成特殊处理, i); } } logger.info( Tasklet 执行完成共处理 {} 条 , total); return RepeatStatus.FINISHED; }; } } /** * 示例二Chunk 方式推荐 */ Configuration EnableBatchProcessing public class ChunkBatchConfig { private static final Logger logger LoggerFactory.getLogger(ChunkBatchConfig.class); Autowired private JobBuilderFactory jobBuilderFactory; Autowired private StepBuilderFactory stepBuilderFactory; // Reader Bean public ItemReaderString nameReader() { ListString names Arrays.asList(张三, 李四, 王五, 赵六, 田七, 孙八, 周九, 吴十); return new ListItemReader(names); } // Processor Bean public ItemProcessorString, User userProcessor() { return name - { User user new User(); user.setName(name); user.setAge(20 (int) (Math.random() * 30)); user.setDesc(由 Spring Batch Chunk 处理); logger.info(【Processor】处理数据: {} → {}, name, user); return user; }; } // Writer Bean public ItemWriterUser userWriter() { return users - { logger.info(【Writer】本次批次写入 {} 条数据, users.size()); for (User user : users) { logger.info(【写入成功】 {}, user); // 实际项目中这里可以写入数据库、文件等 } }; } // Step Bean public Step chunkStep() { return stepBuilderFactory.get(chunkStep) .String, Userchunk(3) // 每3条数据为一个批次Chunk .reader(nameReader()) .processor(userProcessor()) .writer(userWriter()) .faultTolerant() // 开启容错 .skipLimit(5) // 最多跳过5条错误数据 .skip(Exception.class) // 跳过异常 .build(); } // Job Bean public Job chunkJob() { return jobBuilderFactory.get(chunkJob) .start(chunkStep()) .build(); } } Component public class CustomUserWriter implements ItemWriterUser { private static final Logger log LoggerFactory.getLogger(CustomUserWriter.class); Override public void write(List? extends User items) throws Exception { log.info(【Writer】开始批量写入 {} 条数据, items.size()); for (User user : items) { log.info(【写入数据库】 {}, user); // extUserMapper.batchInsert 或 update ... } } } Component public class CustomUserReader implements ItemReaderUser { private final ListUser users; private int index 0; public CustomUserReader() { this.users Arrays.asList( new User(张三, 25, 原始数据), new User(李四, 30, 原始数据), new User(王五, 28, 原始数据), new User(赵六, 35, 原始数据) ); } Override public User read() { if (index users.size()) { User user users.get(index); System.out.println(【Reader】读取到: {} user.getName()); return user; } return null; // 返回null表示读取结束 } } Component public class CustomUserProcessor implements ItemProcessorUser, User { private final Logger log LoggerFactory.getLogger(CustomUserProcessor.class); Override public User process(User item) throws Exception { // 业务处理 item.setAge(item.getAge() 1); item.setDesc(已加工 - LocalDateTime.now()); log.info(【Processor】加工完成: {}, item.getName()); return item; } } Configuration EnableBatchProcessing RequiredArgsConstructor public class CustomComponentBatchConfig { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final CustomUserReader customUserReader; private final CustomUserProcessor customUserProcessor; private final CustomUserWriter customUserWriter; Bean public Step customComponentStep() { return stepBuilderFactory.get(customComponentStep) .User, Userchunk(2) .reader(customUserReader) .processor(customUserProcessor) .writer(customUserWriter) .faultTolerant() .skipLimit(5) .skip(Exception.class) .build(); } Bean public Job customComponentJob() { return jobBuilderFactory.get(customComponentJob) .start(customComponentStep()) .build(); } } 使用监听器 Tasklet 主要使用 StepExecutionListener。 Component public class TaskletStepListener implements StepExecutionListener { private static final Logger log LoggerFactory.getLogger(TaskletStepListener.class); Override public void beforeStep(StepExecution stepExecution) { log.info(【Tasklet Step 开始】 Step名称: {}, stepExecution.getStepName()); log.info(Job参数: {}, stepExecution.getJobParameters()); } Override public ExitStatus afterStep(StepExecution stepExecution) { log.info(【Tasklet Step 结束】 读: {} 条, 写: {} 条, 跳过: {} 条, stepExecution.getReadCount(), stepExecution.getWriteCount(), stepExecution.getSkipCount()); log.info(最终状态: {}, stepExecution.getStatus()); return stepExecution.getExitStatus(); } } 在 Step 中使用 Bean public Step taskletStep(TaskletStepListener taskletStepListener) { return stepBuilderFactory.get(taskletStep) .tasklet(simpleTasklet()) .listener(taskletStepListener) // 添加监听器 .build(); } Chunk 方式可以使用的监听器更多StepExecutionListener ChunkListener Item*Listener ChunkListener示例 Component public class ChunkStepListener implements ChunkListener { private static final Logger log LoggerFactory.getLogger(ChunkStepListener.class); Override public void beforeChunk(ChunkContext context) { log.info(【Chunk 开始】 本批次即将处理数据...); } Override public void afterChunk(ChunkContext context) { log.info(【Chunk 完成】 本批次处理结束); } Override public void afterChunkError(ChunkContext context) { log.error(【Chunk 异常】 本批次处理失败); } } 使用 Bean public Step chunkInlineStep( ChunkStepListener chunkListener, StepExecutionListener stepListener) { // 可同时注入多个 return stepBuilderFactory.get(chunkInlineStep) .String, Userchunk(3) .reader(inlineReader()) .processor(inlineProcessor()) .writer(inlineWriter()) .listener(chunkListener) // Chunk 监听器 .listener(stepListener) // Step 监听器 // 也可以加 Item 级别监听器 // .listener(itemReadListener()) // .listener(itemProcessListener()) // .listener(itemWriteListener()) .faultTolerant() .build(); } 自定义监听器 Component public class UserStepListener implements StepExecutionListener { private static final Logger log LoggerFactory.getLogger(UserStepListener.class); Override public void beforeStep(StepExecution stepExecution) { log.info( User Step 开始执行 ); } Override public ExitStatus afterStep(StepExecution stepExecution) { log.info( User Step 执行结束 ); log.info(读: {}, 写: {}, 跳过: {}, 提交次数: {}, stepExecution.getReadCount(), stepExecution.getWriteCount(), stepExecution.getSkipCount(), stepExecution.getCommitCount()); return stepExecution.getExitStatus(); } } Component public class UserChunkListener implements ChunkListener { Override public void beforeChunk(ChunkContext context) { log.info(开始新 Chunk当前已提交: {} 次, context.getStepContext().getStepExecution().getCommitCount()); } Override public void afterChunk(ChunkContext context) { log.info(Chunk 执行完成); } } 配置使用 Bean public Step customComponentStep( UserStepListener stepListener, UserChunkListener chunkListener) { return stepBuilderFactory.get(customComponentStep) .User, Userchunk(10) .reader(customUserReader) .processor(customUserProcessor) .writer(customUserWriter) .listener(stepListener) // Step 监听 .listener(chunkListener) // Chunk 监听 .faultTolerant() .skipLimit(10) .skip(Exception.class) .build(); } 监听器类型 JobExecutionListener作业级监听器 监听器类型​ JobExecutionListener 触发时机​ 在整个 Job 开始执行之前、Job 执行完成之后无论成功还是失败触发 主要作用​ 用于监控和控制整个批处理任务的生命周期适合做一些全局性的初始化和收尾工作 实际业务用途非常重要​ 在 Job 开始前进行初始化操作例如清理历史临时表数据、对任务加分布式锁防止重复执行 在 Job 结束后汇总本次任务的执行结果例如统计成功处理的数据量、失败数量 发送执行完成通知如通过邮件、企业微信、钉钉等方式通知相关人员 将 Job 的执行日志、起止时间、最终状态等信息持久化到数据库便于审计和追溯 当 Job 执行失败时进行统一告警触发监控系统的异常提醒 ---------------------------------------------------------------- StepExecutionListener步骤级监听器 监听器类型​ StepExecutionListener 触发时机​ 在每一个 Step 开始之前、Step 执行完成之后触发 主要作用​ 用于监控单个 Step 的执行情况关注某个具体步骤的输入、处理和输出状态 实际业务用途非常重要​ 在 Step 开始前打印或记录当前 Step 的执行参数便于问题定位 在 Step 结束后统计该步骤中读取、处理、写入的数据量以及跳过的记录条数 记录每个 Step 的执行耗时、成功或失败状态用于性能分析和运行报表 当 Step 执行异常时记录详细的错误信息辅助后续排查和重试策略制定 ---------------------------------------------------------------- ChunkListener块级监听器 监听器类型​ ChunkListener 触发时机​ 在每个 Chunk 开始处理时、Chunk 处理完成后以及 Chunk 发生异常时触发 主要作用​ 用于监控批次级别的处理过程关注一批数据在处理过程中的状态和性能 实际业务用途非常重要​ 监控每一批次的处理进度例如打印“正在处理第 N 批数据” 统计每个 Chunk 的处理耗时识别是否存在性能瓶颈 在 Chunk 处理失败时记录错误信息和当前批次的关键数据方便后续重试或补偿 在批量处理过程中做阶段性打点用于监控大盘或日志分析 ---------------------------------------------------------------- ItemReadListener读监听器 监听器类型​ ItemReadListener 触发时机​ 在读取每一条数据之前、读取完成之后以及读取发生异常时触发 主要作用​ 用于监控和跟踪数据读取阶段的行为重点关注 Reader 的输出情况 实际业务用途非常重要​ 在调试或开发阶段打印读取到的原始数据验证数据源是否正确 记录读取失败的异常信息帮助定位数据格式或数据源问题 统计读取总数、空读次数辅助判断 Reader 是否正常工作 在某些特殊场景下对读取到的数据进行轻量级校验或标记 ---------------------------------------------------------------- ItemProcessListener处理监听器 监听器类型​ ItemProcessListener 触发时机​ 在处理每一条数据之前、处理完成之后以及处理过程出现异常时触发 主要作用​ 用于监控业务逻辑处理阶段关注 Processor 的执行情况和业务结果 实际业务用途非常重要​ 记录关键业务对象的处理日志例如订单、用户信息在业务规则处理后的变化 在处理前后埋点用于统计业务处理成功率、异常率等指标 捕获 Processor 中的业务异常记录异常原因便于后续补偿或人工干预 在复杂业务处理链路中跟踪单条数据的流转和处理结果 ---------------------------------------------------------------- ItemWriteListener写监听器 监听器类型​ ItemWriteListener 触发时机​ 在每一批数据Chunk写入之前、写入完成之后以及写入发生异常时触发 主要作用​ 用于监控数据写入阶段确保写出数据的准确性和可追溯性 实际业务用途非常重要​ 在写入前对即将落库或输出的数据进行最后校验或日志打印 在写入成功后记录成功写入的数据条数和关键字段用于对账和统计 在写入失败时记录错误详情并触发写入失败预警机制 结合事务回滚或重试策略在监听器中做补偿、告警或状态更新 ----------------------------------------------------------------- SkipListener 当 Spring Batch 在执行过程中跳过Skip某条数据时会触发这个监听器。 public interface SkipListenerT, S { // 当某条数据被跳过时触发最常用 void onSkipInRead(Throwable t); // Reader 读取时跳过 void onSkipInProcess(T item, Throwable t); // Processor 处理时跳过最常用 void onSkipInWrite(S item, Throwable t); // Writer 写入时跳过 } Component public class MySkipListener implements SkipListenerChannelVerifyBean, TlPayTransLog { private static final Logger log LoggerFactory.getLogger(MySkipListener.class); Override public void onSkipInRead(Throwable t) { log.warn(【Skip】读取数据时跳过一条记录, t); } // 最常用Processor 处理失败时跳过 Override public void onSkipInProcess(ChannelVerifyBean item, Throwable t) { log.error(【Skip】处理数据失败已跳过数据内容: {}, item, t); // 可以在这里记录到错误日志表方便后续人工处理 // errorLogService.saveSkipLog(item, t.getMessage()); } Override public void onSkipInWrite(TlPayTransLog item, Throwable t) { log.error(【Skip】写入数据失败已跳过数据: {}, item, t); } } Bean public Step myStep(MySkipListener skipListener) { return stepBuilderFactory.get(myStep) .ChannelVerifyBean, TlPayTransLogchunk(100) .reader(reader()) .processor(processor()) .writer(writer()) .faultTolerant() // 必须开启容错 .skipLimit(50) // 最多允许跳过50条 .skip(Exception.class) // 跳过所有异常可指定具体异常 // .skip(MyBusinessException.class) // 推荐指定具体异常 .listener(skipListener) // 绑定 SkipListener .build(); } ItemReadListener Component public class UserReadListener implements ItemReadListenerString { // 泛型为 Reader 读取的类型 private static final Logger log LoggerFactory.getLogger(UserReadListener.class); Override public void beforeRead() { // 每次读取一条数据之前触发 log.debug(【Read】准备读取下一条数据...); } Override public void afterRead(String item) { // 读取成功后触发 log.info(【Read】成功读取到数据: {}, item); } Override public void onReadError(Exception ex) { // 读取发生异常时触发 log.error(【Read Error】读取数据失败, ex); } } ItemProcessListener Component public class UserProcessListener implements ItemProcessListenerString, User { private static final Logger log LoggerFactory.getLogger(UserProcessListener.class); Override public void beforeProcess(String item) { // 处理之前 log.debug(【Process】开始处理数据: {}, item); } Override public void afterProcess(String item, User result) { // 处理成功后 if (result ! null) { log.info(【Process】处理完成: {} → {}, item, result); } else { log.info(【Process】数据被过滤: {}, item); } } Override public void onProcessError(String item, Exception e) { // 处理发生异常时 log.error(【Process Error】处理数据失败原始数据: {}, item, e); // 可以在这里记录失败数据到数据库 // skipLogService.save(item, e.getMessage()); } } ItemWriteListener Component public class UserWriteListener implements ItemWriteListenerUser { private static final Logger log LoggerFactory.getLogger(UserWriteListener.class); Override public void beforeWrite(List? extends User items) { // 写入之前 log.info(【Write】准备写入 {} 条数据, items.size()); items.forEach(user - log.debug(即将写入: {}, user)); } Override public void afterWrite(List? extends User items) { // 写入成功后 log.info(【Write】成功写入 {} 条数据, items.size()); items.forEach(user - log.info(写入成功: {}, user)); } Override public void onWriteError(Exception exception, List? extends User items) { // 写入发生异常时 log.error(【Write Error】写入 {} 条数据失败, items.size(), exception); items.forEach(user - log.error(失败数据: {}, user)); } } 使用监听器 Bean public Step userProcessStep( UserReadListener readListener, UserProcessListener processListener, UserWriteListener writeListener) { return stepBuilderFactory.get(userProcessStep) .String, Userchunk(10) .reader(userReader()) .processor(userProcessor()) .writer(userWriter()) // 绑定三个 Item 监听器 .listener(readListener) .listener(processListener) .listener(writeListener) .faultTolerant() .skipLimit(20) .skip(Exception.class) .build(); } 执行顺序 ItemReadListener.beforeRead() Reader.read() ItemReadListener.afterRead() ItemProcessListener.beforeProcess() Processor.process() ItemProcessListener.afterProcess() ItemWriteListener.beforeWrite() Writer.write() ItemWriteListener.afterWrite() 如果任何一步出错则会触发对应的 onXXXError() 方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2615841.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…

网络编程(Modbus进阶)

思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…