Spring Batch 高级篇-并行步骤

news2025/7/28 12:15:05

目录

引言

概念

案例

转视频版


引言

接着上篇:Spring Batch 高级篇-多线程步骤,了解Spring Batch多线程步骤后,接下来一起学习一下Spring Batch 高级功能-并行步骤

概念

并行步骤,指的是某2个或者多个步骤同时执行。比如下图

图中,流程从步骤1执行,然后执行步骤2, 步骤3,当步骤2/3执行结束之后,在执行步骤4.

设想一种场景,当读取2个或者多个互不关联的文件时,可以多个文件同时读取,这个就是并行步骤。

案例

需求:现有user-parallel.txt, user-parallel.json 2个文件将它们中数据读入内存  

1>编写user-parallel.txt, user-parallel.json

6#zhangsan#14
7#lisi#13
8#wangwu#12
9#zhaoliu#11
10#qianqi#10
[
  {"id":1, "name":"dafei", "age":18},
  {"id":2, "name":"xiaofei", "age":17},
  {"id":3, "name":"zhongfei", "age":16},
  {"id":4, "name":"laofei", "age":15},
  {"id":5, "name":"feifei", "age":14}
]

2>编写实体对象

@Getter
@Setter
@ToString
public class User {
    private Long id;
    private String name;
    private int age;
}

3>代码实现

package com.langfeiyes.batch._36_step_parallel;


import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.json.JacksonJsonObjectReader;
import org.springframework.batch.item.json.JsonItemReader;
import org.springframework.batch.item.json.builder.JsonItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import java.util.List;

@SpringBootApplication
@EnableBatchProcessing
public class ParallelStepJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;


    @Bean
    public JsonItemReader<User> jsonItemReader(){
        ObjectMapper objectMapper = new ObjectMapper();
        JacksonJsonObjectReader<User> jsonObjectReader = new JacksonJsonObjectReader<>(User.class);
        jsonObjectReader.setMapper(objectMapper);

        return new JsonItemReaderBuilder<User>()
                .name("userJsonItemReader")
                .jsonObjectReader(jsonObjectReader)
                .resource(new ClassPathResource("user-parallel.json"))
                .build();
    }

    @Bean
    public FlatFileItemReader<User> flatItemReader(){
        return new FlatFileItemReaderBuilder<User>()
                .name("userItemReader")
                .resource(new ClassPathResource("user-parallel.txt"))
                .delimited().delimiter("#")
                .names("id", "name", "age")
                .targetType(User.class)
                .build();
    }
    @Bean
    public ItemWriter<User> itemWriter(){
        return new ItemWriter<User>() {
            @Override
            public void write(List<? extends User> items) throws Exception {
                items.forEach(System.err::println);
            }
        };
    }

    @Bean
    public Step jsonStep(){
        return stepBuilderFactory.get("jsonStep")
                .<User, User>chunk(2)
                .reader(jsonItemReader())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public Step flatStep(){
        return stepBuilderFactory.get("step2")
                .<User, User>chunk(2)
                .reader(flatItemReader())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public Job parallelJob(){

        //线程1-读user-parallel.txt
        Flow parallelFlow1 = new FlowBuilder<Flow>("parallelFlow1")
                .start(flatStep())
                .build();

        //线程2-读user-parallel.json
        Flow parallelFlow2 = new FlowBuilder<Flow>("parallelFlow2")
                .start(jsonStep())
                .split(new SimpleAsyncTaskExecutor())
                .add(parallelFlow1)
                .build();


        return jobBuilderFactory.get("parallel-step-job")
                .start(parallelFlow2)
                .end()
                .build();
    }
    public static void main(String[] args) {
        SpringApplication.run(ParallelStepJob.class, args);
    }
}

结果

User(id=6, name=zhangsan, age=14)
User(id=7, name=lisi, age=13)
User(id=8, name=wangwu, age=12)
User(id=9, name=zhaoliu, age=11)
User(id=1, name=dafei, age=18)
User(id=2, name=xiaofei, age=17)
User(id=10, name=qianqi, age=10)
User(id=3, name=zhongfei, age=16)
User(id=4, name=laofei, age=15)
User(id=5, name=feifei, age=14)

解析:

1:jsonItemReader() flatItemReader() 定义2个读入操作,分别读json格式跟普通文本格式

2:parallelJob() 配置job,需要指定并行的flow步骤,先是parallelFlow1然后是parallelFlow2 , 2个步骤间使用.split(new SimpleAsyncTaskExecutor()) 隔开,表示线程池开启2个线程,分别处理parallelFlow1, parallelFlow2 2个步骤。

到这,本篇就结束了,欲知后事如何,请听下回分解~

转视频版

看文字不过瘾可以切换视频版:Spring Batch高效批处理框架实战

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/368006.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ask林曦|来回答,30个你关心的日常问题(二)

在林曦老师的线上书法直播课上&#xff0c;上课前后的聊天时间里&#xff0c;时常有同学向林曦老师提问&#xff0c;这些问题涵盖了日常生活的诸多方面&#xff0c;从身体的保养&#xff0c;到快乐的法门&#xff0c;皆是大家感兴趣的&#xff0c;也都共同关切的。   暄桐教室…

python+Vue学生作业系统 django课程在线学习网站系统

系统分为学生&#xff0c;教师&#xff0c;管理员三个角色&#xff1a; 学生功能&#xff1a; 1.学生注册登录系统 2.学生查看个人信息&#xff0c;修改个人信息 3.学生查看主页综合评价&#xff0c;查看今日值班信息 4.学生在线申请请假信息&#xff0c;查看请假的审核结果和请…

180、【动态规划】leetcode ——583. 两个字符串的删除操作:两种动态规划思路(C++版本)

题目描述 原题链接&#xff1a;583. 两个字符串的删除操作 解题思路 &#xff08;1&#xff09;基于求最长公共子序列思路 本题与 1143. 最长公共子序列 的区别在于&#xff0c;1143中求的是两个序列中的最长公共子序列&#xff0c;而本题是要找到最少删除多少个元素后可以得…

PHP程序员适合创业吗?

创业是一件自然而然的事&#xff0c;不需要人为选择。 只要你是一个努力能干主动的人&#xff0c;当你在一个行业深耕5年之后&#xff0c;就会发现人生发展的下一步就是创业。当然如果行业合适的话。 什么叫行业合适呢&#xff1f; 就是创业的成本并不那么高&#xff0c;不需…

怎么在LinkedIn领英安全添加到3万个好友?

根据领英最新公布的数据&#xff1a;领英全球用户数已经达到8.3亿&#xff0c;超5800万个公司主页&#xff0c;可以说是世界上最-大的business database。 这就不难理解为什么越来越多的外贸人&#xff0c;开始认真尝试和重视在领英开发客户&#xff0c;因为领英确实是外贸人&a…

鸿蒙3.0 APP混合开发闪退问题笔记

APP采用cordova混合开发&#xff0c; 鸿蒙2.0以及安卓操作系统正常使用&#xff0c;但是在鸿蒙3.0中出现APP闪退&#xff0c;对APP进行真机调试发现&#xff0c;鸿蒙3.0系统对crosswork插件存在兼容问题&#xff0c;这些问题会导致APP页面加载失败&#xff0c;进而导致App闪退测…

扬帆优配|“涨停敢死队”慌了?监管“盯紧”异常交易

日前&#xff0c;沪深买卖所发布《主板股票反常买卖实时监控细则》&#xff0c;对反常买卖行为的类型和标准作出规则。其间&#xff0c;针对“打板”“封板”等反常行为的监控遭到商场重视&#xff0c;有商场传闻称&#xff0c;新规或导致高频买卖毁灭&#xff0c;“量价型股票…

什么蓝牙耳机打游戏好?打游戏好用的无线蓝牙耳机

午休或是周末约上好友玩两局游戏&#xff0c;是忙里偷闲的快乐时刻&#xff0c;对于普通游戏玩家&#xff0c;其实耳机够用就行&#xff0c;下面就分享几款打游戏好用的蓝牙耳机。 一、南卡小音舱蓝牙耳机 蓝牙版本&#xff1a;5.3 推荐系数&#xff1a;五颗星 南卡小音舱li…

【代码随想录二刷】Day24-回溯-C++

代码随想录二刷Day24 今日任务 理论基础 77.组合 语言&#xff1a;C 理论基础 解决的问题 ① 组合问题&#xff1a;不考虑顺序 ② 切割问题 ③ 子集问题 ④ 排列问题&#xff1a;考虑顺序 ⑤ 棋盘问题&#xff1a;N皇后&#xff0c;解数独回溯法三部曲 ① 回溯函数模板返回…

ChatGPT来了,软件测试工程师距离失业还远吗?

小伙伴们前一段是不是都看到过ChatGPT的相关视频&#xff0c;那它到底是什么&#xff1f;对软件测试行业会有什么影响&#xff1f; 今天汇智妹就用一篇文章来给大家讲清楚。 一、ChatGPT是什么&#xff1f; 简单来说&#xff0c;ChatGPT是一款人工智能聊天机器人&#xff0c;…

【Spring中@Autowired和@Resource注解的区别?】

一.背景 Spring中Autowired和Resource注解的区别&#xff1f; Spring框架想必大家都知道吧&#xff0c;那么Spring中Autowired和Resource注解的区别你知道吗&#xff1f;如果不知道也不要紧&#xff0c;我们就一起来学习一起吧。 二.Autowired和Resource注解的区别&#xff1f…

【人工智能 AI 】您可以使用机器人流程自动化 (RPA) 实现自动化的 10 个业务流程:Robotic Process Automation (RPA)

摘:人类劳动正在被机器(例如在工业中)或计算机程序(适用于所有行业)所取代。 目录 10 processes you can robotise in your company您可以在公司中实现自动化的 10 个流程 Human employees or robotic workers?人类员工还是机器人工人? Robots take over headhunting…

【蓝桥杯每日一题】二分算法

&#x1f34e; 博客主页&#xff1a;&#x1f319;披星戴月的贾维斯 &#x1f34e; 欢迎关注&#xff1a;&#x1f44d;点赞&#x1f343;收藏&#x1f525;留言 &#x1f347;系列专栏&#xff1a;&#x1f319; 蓝桥杯 &#x1f319;我与杀戮之中绽放&#xff0c;亦如黎明的花…

乘上算力发展的东风,联想这次能否变革突起?

“逆水行舟&#xff0c;不进则退”笔者认为这句话也同样适用到现在的联想集团身上&#xff0c;近3年受到疫情的影响全球电子领域普遍不突出&#xff0c;智能手机出货量上涨乏力&#xff0c;个人电脑&#xff08;PC&#xff09;的销量也波动频繁&#xff0c;联想集团在这种不乐观…

外包整整干了一年,废了。。。

先说一下自己的个人情况&#xff0c;大专生&#xff0c;18年通过校招进入湖南某软件公司&#xff0c;干了接近3年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落&#xff01;而我已经在一个企业干了…

蓝牙标签操作指南

一、APP安装指南 1.APP权限问题 电子标签APP安装之后&#xff0c;会提示一些权限的申请&#xff0c;点击允许。否则某些会影响APP的正常运行。安装后&#xff0c;搜索不到蓝牙标签&#xff0c;可以关闭App&#xff0c;重新打开。 2.手机功能 运行APP时候&#xff0c;需要打开…

汽车零部件行业mes系统具体功能介绍

众所周知&#xff0c;汽车零部件的组装是汽车制造的关键环节&#xff0c;而汽车零部件江湖变革以精益为终极目标。即汽车零部件制造企业转型升级向精益生产和精益管理方向前进&#xff0c;而车间信息化管理是精益化生产的基础。 汽车零部件行业现状 随着全球汽车产业不断升级…

ICASSP2023录用率有可靠度还不错的消息了

点击文末公众号卡片&#xff0c;找对地方&#xff0c;轻松参会 由于录用邮件没说录用率&#xff0c;导致大家都不知道录用率是多少。 据一位群友的反馈&#xff0c;其小老板是meta review。该群友原话“接受率应该是42%”。 ICASSP2023投稿量6000&#xff0c;在投稿量大涨的…

会声会影2023官方新功能介绍

深入简单直观的视频编辑&#xff01;使用 Corel VideoStudio会声会影2023&#xff0c;将您最美好的时刻和生活体验变成令人惊叹的电影&#xff0c;这是一款有趣且直观的视频编辑器&#xff0c;包含高级工具和高级效果。从自定义标题和过渡&#xff0c;到 Mask Creator、Color G…

数据结构与算法系列之插入排序

&#x1f497; &#x1f497; 博客:小怡同学 &#x1f497; &#x1f497; 个人简介:编程小萌新 &#x1f497; &#x1f497; 如果博客对大家有用的话&#xff0c;请点赞关注再收藏 &#x1f31e; 什么是插入排序 有一个已经有序的数据序列&#xff0c;要求在这个已经排好的数…