SpringBatch学习
/** * 示例一Tasklet 方式 */ Configuration EnableBatchProcessing public class TaskletBatchConfig { private static final Logger logger LoggerFactory.getLogger(TaskletBatchConfig.class); Autowired private JobBuilderFactory jobBuilderFactory; Autowired private StepBuilderFactory stepBuilderFactory; Bean public Job taskletJob() { return jobBuilderFactory.get(taskletJob) .start(taskletStep()) .build(); } Bean public Step taskletStep() { return stepBuilderFactory.get(taskletStep) .tasklet(helloWorldTasklet()) .build(); } Bean public Tasklet helloWorldTasklet() { return (contribution, chunkContext) - { int total 10; logger.info( Tasklet 开始执行 ); for (int i 1; i total; i) { logger.info(Hello World! index {}, i); // 可以在这里写任何业务逻辑 if (i % 3 0) { logger.info(第 {} 条数据处理完成特殊处理, i); } } logger.info( Tasklet 执行完成共处理 {} 条 , total); return RepeatStatus.FINISHED; }; } } /** * 示例二Chunk 方式推荐 */ Configuration EnableBatchProcessing public class ChunkBatchConfig { private static final Logger logger LoggerFactory.getLogger(ChunkBatchConfig.class); Autowired private JobBuilderFactory jobBuilderFactory; Autowired private StepBuilderFactory stepBuilderFactory; // Reader Bean public ItemReaderString nameReader() { ListString names Arrays.asList(张三, 李四, 王五, 赵六, 田七, 孙八, 周九, 吴十); return new ListItemReader(names); } // Processor Bean public ItemProcessorString, User userProcessor() { return name - { User user new User(); user.setName(name); user.setAge(20 (int) (Math.random() * 30)); user.setDesc(由 Spring Batch Chunk 处理); logger.info(【Processor】处理数据: {} → {}, name, user); return user; }; } // Writer Bean public ItemWriterUser userWriter() { return users - { logger.info(【Writer】本次批次写入 {} 条数据, users.size()); for (User user : users) { logger.info(【写入成功】 {}, user); // 实际项目中这里可以写入数据库、文件等 } }; } // Step Bean public Step chunkStep() { return stepBuilderFactory.get(chunkStep) .String, Userchunk(3) // 每3条数据为一个批次Chunk .reader(nameReader()) .processor(userProcessor()) .writer(userWriter()) .faultTolerant() // 开启容错 .skipLimit(5) // 最多跳过5条错误数据 .skip(Exception.class) // 跳过异常 .build(); } // Job Bean public Job chunkJob() { return jobBuilderFactory.get(chunkJob) .start(chunkStep()) .build(); } } Component public class CustomUserWriter implements ItemWriterUser { private static final Logger log LoggerFactory.getLogger(CustomUserWriter.class); Override public void write(List? extends User items) throws Exception { log.info(【Writer】开始批量写入 {} 条数据, items.size()); for (User user : items) { log.info(【写入数据库】 {}, user); // extUserMapper.batchInsert 或 update ... } } } Component public class CustomUserReader implements ItemReaderUser { private final ListUser users; private int index 0; public CustomUserReader() { this.users Arrays.asList( new User(张三, 25, 原始数据), new User(李四, 30, 原始数据), new User(王五, 28, 原始数据), new User(赵六, 35, 原始数据) ); } Override public User read() { if (index users.size()) { User user users.get(index); System.out.println(【Reader】读取到: {} user.getName()); return user; } return null; // 返回null表示读取结束 } } Component public class CustomUserProcessor implements ItemProcessorUser, User { private final Logger log LoggerFactory.getLogger(CustomUserProcessor.class); Override public User process(User item) throws Exception { // 业务处理 item.setAge(item.getAge() 1); item.setDesc(已加工 - LocalDateTime.now()); log.info(【Processor】加工完成: {}, item.getName()); return item; } } Configuration EnableBatchProcessing RequiredArgsConstructor public class CustomComponentBatchConfig { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final CustomUserReader customUserReader; private final CustomUserProcessor customUserProcessor; private final CustomUserWriter customUserWriter; Bean public Step customComponentStep() { return stepBuilderFactory.get(customComponentStep) .User, Userchunk(2) .reader(customUserReader) .processor(customUserProcessor) .writer(customUserWriter) .faultTolerant() .skipLimit(5) .skip(Exception.class) .build(); } Bean public Job customComponentJob() { return jobBuilderFactory.get(customComponentJob) .start(customComponentStep()) .build(); } } 使用监听器 Tasklet 主要使用 StepExecutionListener。 Component public class TaskletStepListener implements StepExecutionListener { private static final Logger log LoggerFactory.getLogger(TaskletStepListener.class); Override public void beforeStep(StepExecution stepExecution) { log.info(【Tasklet Step 开始】 Step名称: {}, stepExecution.getStepName()); log.info(Job参数: {}, stepExecution.getJobParameters()); } Override public ExitStatus afterStep(StepExecution stepExecution) { log.info(【Tasklet Step 结束】 读: {} 条, 写: {} 条, 跳过: {} 条, stepExecution.getReadCount(), stepExecution.getWriteCount(), stepExecution.getSkipCount()); log.info(最终状态: {}, stepExecution.getStatus()); return stepExecution.getExitStatus(); } } 在 Step 中使用 Bean public Step taskletStep(TaskletStepListener taskletStepListener) { return stepBuilderFactory.get(taskletStep) .tasklet(simpleTasklet()) .listener(taskletStepListener) // 添加监听器 .build(); } Chunk 方式可以使用的监听器更多StepExecutionListener ChunkListener Item*Listener ChunkListener示例 Component public class ChunkStepListener implements ChunkListener { private static final Logger log LoggerFactory.getLogger(ChunkStepListener.class); Override public void beforeChunk(ChunkContext context) { log.info(【Chunk 开始】 本批次即将处理数据...); } Override public void afterChunk(ChunkContext context) { log.info(【Chunk 完成】 本批次处理结束); } Override public void afterChunkError(ChunkContext context) { log.error(【Chunk 异常】 本批次处理失败); } } 使用 Bean public Step chunkInlineStep( ChunkStepListener chunkListener, StepExecutionListener stepListener) { // 可同时注入多个 return stepBuilderFactory.get(chunkInlineStep) .String, Userchunk(3) .reader(inlineReader()) .processor(inlineProcessor()) .writer(inlineWriter()) .listener(chunkListener) // Chunk 监听器 .listener(stepListener) // Step 监听器 // 也可以加 Item 级别监听器 // .listener(itemReadListener()) // .listener(itemProcessListener()) // .listener(itemWriteListener()) .faultTolerant() .build(); } 自定义监听器 Component public class UserStepListener implements StepExecutionListener { private static final Logger log LoggerFactory.getLogger(UserStepListener.class); Override public void beforeStep(StepExecution stepExecution) { log.info( User Step 开始执行 ); } Override public ExitStatus afterStep(StepExecution stepExecution) { log.info( User Step 执行结束 ); log.info(读: {}, 写: {}, 跳过: {}, 提交次数: {}, stepExecution.getReadCount(), stepExecution.getWriteCount(), stepExecution.getSkipCount(), stepExecution.getCommitCount()); return stepExecution.getExitStatus(); } } Component public class UserChunkListener implements ChunkListener { Override public void beforeChunk(ChunkContext context) { log.info(开始新 Chunk当前已提交: {} 次, context.getStepContext().getStepExecution().getCommitCount()); } Override public void afterChunk(ChunkContext context) { log.info(Chunk 执行完成); } } 配置使用 Bean public Step customComponentStep( UserStepListener stepListener, UserChunkListener chunkListener) { return stepBuilderFactory.get(customComponentStep) .User, Userchunk(10) .reader(customUserReader) .processor(customUserProcessor) .writer(customUserWriter) .listener(stepListener) // Step 监听 .listener(chunkListener) // Chunk 监听 .faultTolerant() .skipLimit(10) .skip(Exception.class) .build(); } 监听器类型 JobExecutionListener作业级监听器 监听器类型 JobExecutionListener 触发时机 在整个 Job 开始执行之前、Job 执行完成之后无论成功还是失败触发 主要作用 用于监控和控制整个批处理任务的生命周期适合做一些全局性的初始化和收尾工作 实际业务用途非常重要 在 Job 开始前进行初始化操作例如清理历史临时表数据、对任务加分布式锁防止重复执行 在 Job 结束后汇总本次任务的执行结果例如统计成功处理的数据量、失败数量 发送执行完成通知如通过邮件、企业微信、钉钉等方式通知相关人员 将 Job 的执行日志、起止时间、最终状态等信息持久化到数据库便于审计和追溯 当 Job 执行失败时进行统一告警触发监控系统的异常提醒 ---------------------------------------------------------------- StepExecutionListener步骤级监听器 监听器类型 StepExecutionListener 触发时机 在每一个 Step 开始之前、Step 执行完成之后触发 主要作用 用于监控单个 Step 的执行情况关注某个具体步骤的输入、处理和输出状态 实际业务用途非常重要 在 Step 开始前打印或记录当前 Step 的执行参数便于问题定位 在 Step 结束后统计该步骤中读取、处理、写入的数据量以及跳过的记录条数 记录每个 Step 的执行耗时、成功或失败状态用于性能分析和运行报表 当 Step 执行异常时记录详细的错误信息辅助后续排查和重试策略制定 ---------------------------------------------------------------- ChunkListener块级监听器 监听器类型 ChunkListener 触发时机 在每个 Chunk 开始处理时、Chunk 处理完成后以及 Chunk 发生异常时触发 主要作用 用于监控批次级别的处理过程关注一批数据在处理过程中的状态和性能 实际业务用途非常重要 监控每一批次的处理进度例如打印“正在处理第 N 批数据” 统计每个 Chunk 的处理耗时识别是否存在性能瓶颈 在 Chunk 处理失败时记录错误信息和当前批次的关键数据方便后续重试或补偿 在批量处理过程中做阶段性打点用于监控大盘或日志分析 ---------------------------------------------------------------- ItemReadListener读监听器 监听器类型 ItemReadListener 触发时机 在读取每一条数据之前、读取完成之后以及读取发生异常时触发 主要作用 用于监控和跟踪数据读取阶段的行为重点关注 Reader 的输出情况 实际业务用途非常重要 在调试或开发阶段打印读取到的原始数据验证数据源是否正确 记录读取失败的异常信息帮助定位数据格式或数据源问题 统计读取总数、空读次数辅助判断 Reader 是否正常工作 在某些特殊场景下对读取到的数据进行轻量级校验或标记 ---------------------------------------------------------------- ItemProcessListener处理监听器 监听器类型 ItemProcessListener 触发时机 在处理每一条数据之前、处理完成之后以及处理过程出现异常时触发 主要作用 用于监控业务逻辑处理阶段关注 Processor 的执行情况和业务结果 实际业务用途非常重要 记录关键业务对象的处理日志例如订单、用户信息在业务规则处理后的变化 在处理前后埋点用于统计业务处理成功率、异常率等指标 捕获 Processor 中的业务异常记录异常原因便于后续补偿或人工干预 在复杂业务处理链路中跟踪单条数据的流转和处理结果 ---------------------------------------------------------------- ItemWriteListener写监听器 监听器类型 ItemWriteListener 触发时机 在每一批数据Chunk写入之前、写入完成之后以及写入发生异常时触发 主要作用 用于监控数据写入阶段确保写出数据的准确性和可追溯性 实际业务用途非常重要 在写入前对即将落库或输出的数据进行最后校验或日志打印 在写入成功后记录成功写入的数据条数和关键字段用于对账和统计 在写入失败时记录错误详情并触发写入失败预警机制 结合事务回滚或重试策略在监听器中做补偿、告警或状态更新 ----------------------------------------------------------------- SkipListener 当 Spring Batch 在执行过程中跳过Skip某条数据时会触发这个监听器。 public interface SkipListenerT, S { // 当某条数据被跳过时触发最常用 void onSkipInRead(Throwable t); // Reader 读取时跳过 void onSkipInProcess(T item, Throwable t); // Processor 处理时跳过最常用 void onSkipInWrite(S item, Throwable t); // Writer 写入时跳过 } Component public class MySkipListener implements SkipListenerChannelVerifyBean, TlPayTransLog { private static final Logger log LoggerFactory.getLogger(MySkipListener.class); Override public void onSkipInRead(Throwable t) { log.warn(【Skip】读取数据时跳过一条记录, t); } // 最常用Processor 处理失败时跳过 Override public void onSkipInProcess(ChannelVerifyBean item, Throwable t) { log.error(【Skip】处理数据失败已跳过数据内容: {}, item, t); // 可以在这里记录到错误日志表方便后续人工处理 // errorLogService.saveSkipLog(item, t.getMessage()); } Override public void onSkipInWrite(TlPayTransLog item, Throwable t) { log.error(【Skip】写入数据失败已跳过数据: {}, item, t); } } Bean public Step myStep(MySkipListener skipListener) { return stepBuilderFactory.get(myStep) .ChannelVerifyBean, TlPayTransLogchunk(100) .reader(reader()) .processor(processor()) .writer(writer()) .faultTolerant() // 必须开启容错 .skipLimit(50) // 最多允许跳过50条 .skip(Exception.class) // 跳过所有异常可指定具体异常 // .skip(MyBusinessException.class) // 推荐指定具体异常 .listener(skipListener) // 绑定 SkipListener .build(); } ItemReadListener Component public class UserReadListener implements ItemReadListenerString { // 泛型为 Reader 读取的类型 private static final Logger log LoggerFactory.getLogger(UserReadListener.class); Override public void beforeRead() { // 每次读取一条数据之前触发 log.debug(【Read】准备读取下一条数据...); } Override public void afterRead(String item) { // 读取成功后触发 log.info(【Read】成功读取到数据: {}, item); } Override public void onReadError(Exception ex) { // 读取发生异常时触发 log.error(【Read Error】读取数据失败, ex); } } ItemProcessListener Component public class UserProcessListener implements ItemProcessListenerString, User { private static final Logger log LoggerFactory.getLogger(UserProcessListener.class); Override public void beforeProcess(String item) { // 处理之前 log.debug(【Process】开始处理数据: {}, item); } Override public void afterProcess(String item, User result) { // 处理成功后 if (result ! null) { log.info(【Process】处理完成: {} → {}, item, result); } else { log.info(【Process】数据被过滤: {}, item); } } Override public void onProcessError(String item, Exception e) { // 处理发生异常时 log.error(【Process Error】处理数据失败原始数据: {}, item, e); // 可以在这里记录失败数据到数据库 // skipLogService.save(item, e.getMessage()); } } ItemWriteListener Component public class UserWriteListener implements ItemWriteListenerUser { private static final Logger log LoggerFactory.getLogger(UserWriteListener.class); Override public void beforeWrite(List? extends User items) { // 写入之前 log.info(【Write】准备写入 {} 条数据, items.size()); items.forEach(user - log.debug(即将写入: {}, user)); } Override public void afterWrite(List? extends User items) { // 写入成功后 log.info(【Write】成功写入 {} 条数据, items.size()); items.forEach(user - log.info(写入成功: {}, user)); } Override public void onWriteError(Exception exception, List? extends User items) { // 写入发生异常时 log.error(【Write Error】写入 {} 条数据失败, items.size(), exception); items.forEach(user - log.error(失败数据: {}, user)); } } 使用监听器 Bean public Step userProcessStep( UserReadListener readListener, UserProcessListener processListener, UserWriteListener writeListener) { return stepBuilderFactory.get(userProcessStep) .String, Userchunk(10) .reader(userReader()) .processor(userProcessor()) .writer(userWriter()) // 绑定三个 Item 监听器 .listener(readListener) .listener(processListener) .listener(writeListener) .faultTolerant() .skipLimit(20) .skip(Exception.class) .build(); } 执行顺序 ItemReadListener.beforeRead() Reader.read() ItemReadListener.afterRead() ItemProcessListener.beforeProcess() Processor.process() ItemProcessListener.afterProcess() ItemWriteListener.beforeWrite() Writer.write() ItemWriteListener.afterWrite() 如果任何一步出错则会触发对应的 onXXXError() 方法。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2615841.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!