Spring Batch 大数据量处理实战:从入门到精通
Spring Batch 大数据量处理实战从入门到精通别叫我大神叫我 Alex 就好。处理百万级数据不用愁Spring Batch 让批处理变得优雅而高效。一、Spring Batch 基础架构1.1 核心配置Configuration EnableBatchProcessing public class BatchConfig { Autowired private JobRepository jobRepository; Autowired private PlatformTransactionManager transactionManager; Bean public JobLauncher jobLauncher() throws Exception { TaskExecutorJobLauncher jobLauncher new TaskExecutorJobLauncher(); jobLauncher.setJobRepository(jobRepository); jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); jobLauncher.afterPropertiesSet(); return jobLauncher; } Bean public JobExplorer jobExplorer(DataSource dataSource) throws Exception { JobExplorerFactoryBean factoryBean new JobExplorerFactoryBean(); factoryBean.setDataSource(dataSource); factoryBean.afterPropertiesSet(); return factoryBean.getObject(); } Bean public JobRegistry jobRegistry() { return new MapJobRegistry(); } Bean public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() { JobRegistryBeanPostProcessor postProcessor new JobRegistryBeanPostProcessor(); postProcessor.setJobRegistry(jobRegistry()); return postProcessor; } }1.2 数据库表结构-- Spring Batch 元数据表 -- BATCH_JOB_INSTANCE: 作业实例 -- BATCH_JOB_EXECUTION: 作业执行 -- BATCH_JOB_EXECUTION_PARAMS: 作业参数 -- BATCH_STEP_EXECUTION: 步骤执行 -- BATCH_JOB_EXECUTION_CONTEXT: 作业上下文 -- BATCH_STEP_EXECUTION_CONTEXT: 步骤上下文 -- 创建自定义监控表 CREATE TABLE batch_job_monitoring ( id BIGINT PRIMARY KEY AUTO_INCREMENT, job_name VARCHAR(100) NOT NULL, job_instance_id BIGINT, job_execution_id BIGINT, start_time TIMESTAMP, end_time TIMESTAMP, status VARCHAR(20), read_count BIGINT DEFAULT 0, write_count BIGINT DEFAULT 0, skip_count BIGINT DEFAULT 0, error_message TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );二、简单任务示例2.1 CSV 导入数据库Configuration public class CsvToDatabaseJobConfig { Autowired private JobRepository jobRepository; Autowired private PlatformTransactionManager transactionManager; Bean public Job csvToDatabaseJob() { return new JobBuilder(csvToDatabaseJob, jobRepository) .start(csvToDatabaseStep()) .listener(jobExecutionListener()) .build(); } Bean public Step csvToDatabaseStep() { return new StepBuilder(csvToDatabaseStep, jobRepository) .ProductInput, Productchunk(1000, transactionManager) .reader(csvItemReader()) .processor(productItemProcessor()) .writer(databaseItemWriter()) .faultTolerant() .skipLimit(10) .skip(ValidationException.class) .retryLimit(3) .retry(TransientDataAccessException.class) .listener(stepExecutionListener()) .build(); } Bean public FlatFileItemReaderProductInput csvItemReader() { return new FlatFileItemReaderBuilderProductInput() .name(csvItemReader) .resource(new FileSystemResource(input/products.csv)) .delimited() .names(id, name, description, price, category) .fieldSetMapper(new BeanWrapperFieldSetMapper() {{ setTargetType(ProductInput.class); }}) .linesToSkip(1) // 跳过表头 .build(); } Bean public ItemProcessorProductInput, Product productItemProcessor() { return input - { Product product new Product(); product.setId(input.getId()); product.setName(input.getName().trim()); product.setDescription(input.getDescription()); product.setPrice(new BigDecimal(input.getPrice())); product.setCategory(input.getCategory()); product.setCreatedAt(LocalDateTime.now()); return product; }; } Bean public JdbcBatchItemWriterProduct databaseItemWriter() { return new JdbcBatchItemWriterBuilderProduct() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider()) .sql(INSERT INTO products (id, name, description, price, category, created_at) VALUES (:id, :name, :description, :price, :category, :createdAt) ON DUPLICATE KEY UPDATE name VALUES(name), description VALUES(description), price VALUES(price), category VALUES(category)) .dataSource(dataSource) .build(); } }2.2 数据库导出到文件Configuration public class DatabaseToFileJobConfig { Bean public Job exportOrdersJob() { return new JobBuilder(exportOrdersJob, jobRepository) .start(exportOrdersStep()) .build(); } Bean public Step exportOrdersStep() { return new StepBuilder(exportOrdersStep, jobRepository) .Order, OrderOutputchunk(500, transactionManager) .reader(orderItemReader()) .processor(orderItemProcessor()) .writer(orderItemWriter()) .build(); } Bean public JdbcPagingItemReaderOrder orderItemReader() { return new JdbcPagingItemReaderBuilderOrder() .name(orderItemReader) .dataSource(dataSource) .queryProvider(new PagingQueryProvider() { Override public void init(DataSource dataSource) {} Override public String getSortKey() { return id; } Override public String getSelectClause() { return SELECT id, user_id, total_amount, status, created_at; } Override public String getFromClause() { return FROM orders; } Override public String getWhereClause() { return WHERE created_at :startDate AND created_at :endDate; } }) .parameterValues(Map.of(startDate, startDate, endDate, endDate)) .pageSize(1000) .rowMapper(new OrderRowMapper()) .build(); } Bean public FlatFileItemWriterOrderOutput orderItemWriter() { return new FlatFileItemWriterBuilderOrderOutput() .name(orderItemWriter) .resource(new FileSystemResource(output/orders.csv)) .delimited() .delimiter(,) .names(orderId, userId, amount, status, createdDate) .headerCallback(writer - writer.write(OrderID,UserID,Amount,Status,CreatedDate)) .footerCallback(writer - writer.write(Total records exported)) .build(); } }三、复杂任务处理3.1 多步骤任务Configuration public class ComplexBatchJobConfig { Bean public Job orderProcessingJob() { return new JobBuilder(orderProcessingJob, jobRepository) .start(validateOrderStep()) .next(processPaymentStep()) .next(updateInventoryStep()) .next(sendNotificationStep()) .on(FAILED).to(errorHandlingStep()) .from(sendNotificationStep()).on(*).to(cleanupStep()) .end() .build(); } Bean public Step validateOrderStep() { return new StepBuilder(validateOrderStep, jobRepository) .Order, ValidatedOrderchunk(100, transactionManager) .reader(pendingOrderReader()) .processor(orderValidator()) .writer(validatedOrderWriter()) .build(); } Bean public Step processPaymentStep() { return new StepBuilder(processPaymentStep, jobRepository) .tasklet((contribution, chunkContext) - { // 处理支付逻辑 JobParameters params chunkContext.getStepContext().getJobParameters(); String batchId params.getString(batchId); paymentService.processBatchPayments(batchId); return RepeatStatus.FINISHED; }, transactionManager) .build(); } Bean public Step updateInventoryStep() { return new StepBuilder(updateInventoryStep, jobRepository) .OrderItem, InventoryUpdatechunk(200, transactionManager) .reader(orderItemReader()) .processor(inventoryProcessor()) .writer(inventoryWriter()) .build(); } Bean public Flow splitFlow() { return new FlowBuilderSimpleFlow(splitFlow) .split(taskExecutor()) .add(flow1(), flow2(), flow3()) .build(); } Bean public Flow flow1() { return new FlowBuilderSimple
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2466361.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!