文章目录
- 前言
- ElasticJob-Lite 3.x 集成springBoot 实战 (一次性作业、定时作业)
- 1. ElasticJob简介
- 2. ElasticJob-Lite 是什么
- 3. 功能列表
- 4. 所需依赖包
- 5. 定时作业配置
- 5.1. 作业:
- 5.2. yml配置:
- 5.3. 测试
 
- 6. 一次性任务配置、并手动触发
- 6.1. 作业:
- 6.2. yml配置:
- 6.3. 测试
 
- 7. 其他
- 7.1. Zookeeper命令查看elasticjob注册的信息
- 7.2. 如果注册发生未知错误,如何清理elasticjob在zookeeper上注册的命令空间
 
 
 
 
前言
  如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
   而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!
ElasticJob-Lite 3.x 集成springBoot 实战 (一次性作业、定时作业)
1. ElasticJob简介
使用 ElasticJob 能够让开发工程师不再担心任务的线性吞吐量提升等非功能需求,使他们能够更加专注于面向业务编码设计; 同时,它也能够解放运维工程师,使他们不必再担心任务的可用性和相关管理需求,只通过轻松的增加服务节点即可达到自动化运维的目的。
2. ElasticJob-Lite 是什么
定位为轻量级无中心化解决方案,使用 jar 的形式提供分布式任务的协调服务。
 
3. 功能列表
弹性调度:
- 支持任务在分布式场景下的分片和高可用
- 能够水平扩展任务的吞吐量和执行效率
- 任务处理能力随资源配备弹性伸缩
资源分配:
- 在适合的时间将适合的资源分配给任务并使其生效
- 相同任务聚合至相同的执行器统一处理
- 动态调配追加资源至新分配的任务
作业治理:
- 失效转移
- 错过作业重新执行
- 自诊断修复
作业依赖(TODO):
- 基于有向无环图(DAG)的作业间依赖
- 基于有向无环图(DAG)的作业分片间依赖
作业开放生态:
- 可扩展的作业类型统一接口
- 丰富的作业类型库,如数据流、脚本、HTTP、文件、大数据等
- 易于对接业务作业,能够与 Spring 依赖注入无缝整合
可视化管控端:
- 作业管控端
- 作业执行历史数据追踪
- 注册中心管理
4. 所需依赖包
        <dependency>
            <groupId>org.apache.shardingsphere.elasticjob</groupId>
            <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
            <version>3.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.shardingsphere.elasticjob</groupId>
            <artifactId>elasticjob-error-handler-wechat</artifactId>
            <version>3.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.groovy</groupId>
            <artifactId>groovy-all</artifactId>
        <version>2.4.15</version>
            
其中:
        <dependency>
            <groupId>org.apache.shardingsphere.elasticjob</groupId>
            <artifactId>elasticjob-error-handler-wechat</artifactId>
            <version>3.0.1</version>
        </dependency>
这个是配置企业微信通知策略 ,即当作业抛异常时,发送企业微信消息通知,但不中断作业执行。
5. 定时作业配置
5.1. 作业:
package org.example.jobs;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.quartz.DisallowConcurrentExecution;
import org.springframework.stereotype.Component;
/**
 * @author yangzhenyu
 * @version 1.0
 * @description:
 * @date 2023/4/26 14:33
 */
@Slf4j
@Component
@DisallowConcurrentExecution
public class TestJobMany implements SimpleJob {
    @Override
    public void execute(ShardingContext context) {
        //作业名称
        String jobName = context.getJobName();
        //分片序列号
        int shardingItem = context.getShardingItem();
        //分片序列号对应的value值
        String shardingParameter = context.getShardingParameter();
        //作业分片总数
        int jobNameshardingTotalCount = context.getShardingTotalCount();
        log.info("{} begin, 作业分片总数: {}, 分片序列号: {},分片序列号对应的value值:{}",  jobName,jobNameshardingTotalCount, shardingItem,shardingParameter);
    }
}
5.2. yml配置:
elasticjob:
  regCenter:
    # 连接Zookeeper服务器的列表,包括IP地址和端口号,多个地址用逗号分隔
    serverLists: 127.0.0.1:2181
    # Zookeeper的命名空间
    namespace: yzy
    # 等待重试的间隔时间的初始值   单位:毫秒
    base-sleep-time-milliseconds: 10000
    # 等待重试的间隔时间的最大值
    max-sleep-time-milliseconds: 30000
    # 最大重试次数
    max-retries: 3
    # 会话超时时间 单位: 毫秒
    session-timeout-milliseconds: 600000
    # 连接超时时间 单位: 毫秒
    connection-timeout-milliseconds: 600000
    # elasticjob.jobs 是一个 Map,
    # key 为作业名称,value 为作业类型与配置。
    # Starter 会根据该配置自动创建 OneOffJobBootstrap 或 ScheduleJobBootstrap 的实例
    # 并注册到 Spring 容器中。
  jobs:
    TestJobMany:
      # elasticJobClass 与 elasticJobType 互斥,每项作业只能有一种类型
      elasticJobClass: org.example.jobs.TestJobMany
      # 作业分片总数
      shardingTotalCount: 20
      # 错误处理策略:记录日志策略、记录作业异常日志,但不中断作业执行
      # LOG : 记录日志策略、记录作业异常日志,但不中断作业执行
      # THROW : 抛出异常策略	抛出系统异常并中断作业执行
      # IGNORE : 忽略异常策略	忽略系统异常且不中断作业执行
      # WECHAT : 企业微信通知策略	发送企业微信消息通知,但不中断作业执行
      jobErrorHandlerType: LOG
      cron: 0/5 * * * * ?
      # 作业线程池处理策略
      jobExecutorServiceHandlerType: "SINGLE_THREAD"
      # 本地配置是否可覆盖注册中心配置 , 如果可覆盖,每次启动作业都以本地配置为准
      overwrite: true
      # 是否开启错过任务重新执行
      misfire: true
      #是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行
      failover: true
如果:jobErrorHandlerType选择WECHAT
 那么配置如下:
# Job 配置
elasticjob:
  regCenter:
    # 连接Zookeeper服务器的列表,包括IP地址和端口号,多个地址用逗号分隔
    serverLists: 127.0.0.1:2181
    # Zookeeper的命名空间
    namespace: yzy
    # 等待重试的间隔时间的初始值   单位:毫秒
    base-sleep-time-milliseconds: 10000
    # 等待重试的间隔时间的最大值
    max-sleep-time-milliseconds: 30000
    # 最大重试次数
    max-retries: 3
    # 会话超时时间 单位: 毫秒
    session-timeout-milliseconds: 600000
    # 连接超时时间 单位: 毫秒
    connection-timeout-milliseconds: 600000
  props:
    wechat:
      # 企业微信机器人的 webhook 地址
      webhook: xxxxxx
      # 与企业微信服务器建立连接的超时时间
      connectTimeout: 3000
      # 从企业微信服务器读取到可用资源的超时时间
      readTimeout: 5000
  # elasticjob.jobs 是一个 Map,
  # key 为作业名称,value 为作业类型与配置。
  # Starter 会根据该配置自动创建 OneOffJobBootstrap 或 ScheduleJobBootstrap 的实例
  # 并注册到 Spring 容器中。
  jobs:
    TestJobMany:
      # elasticJobClass 与 elasticJobType 互斥,每项作业只能有一种类型
      elasticJobClass: org.example.jobs.TestJobMany
      # 作业分片总数
      shardingTotalCount: 20
      # 错误处理策略:记录日志策略、记录作业异常日志,但不中断作业执行
      # LOG : 记录日志策略、记录作业异常日志,但不中断作业执行
      # THROW : 抛出异常策略	抛出系统异常并中断作业执行
      # IGNORE : 忽略异常策略	忽略系统异常且不中断作业执行
      # WECHAT : 企业微信通知策略	发送企业微信消息通知,但不中断作业执行
      jobErrorHandlerType: WECHAT 
      cron: 0/5 * * * * ?
      # 作业线程池处理策略
      jobExecutorServiceHandlerType: "SINGLE_THREAD"
      # 本地配置是否可覆盖注册中心配置 , 如果可覆盖,每次启动作业都以本地配置为准
      overwrite: true
      # 是否开启错过任务重新执行
      misfire: true
      #是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行
      failover: true
5.3. 测试

6. 一次性任务配置、并手动触发
6.1. 作业:
package org.example.jobs;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.example.entity.TestData;
import org.quartz.DisallowConcurrentExecution;
import org.springframework.stereotype.Component;
/**
 * @author yangzhenyu
 * @version 1.0
 * @description:
 * @date 2023/4/26 14:33
 */
@Slf4j
@Component
@DisallowConcurrentExecution
public class TestJobOne implements SimpleJob {
    @Override
    public void execute(ShardingContext context) {
        //作业名称
        String jobName = context.getJobName();
        //分片序列号
        int shardingItem = context.getShardingItem();
        //分片序列号对应的value值
        String shardingParameter = context.getShardingParameter();
        //作业分片总数
        int jobNameshardingTotalCount = context.getShardingTotalCount();
        log.info("{} begin, 作业分片总数: {}, 分片序列号: {},分片序列号对应的value值:{}",  jobName,jobNameshardingTotalCount, shardingItem,shardingParameter);
    }
}
6.2. yml配置:
# Job 配置
elasticjob:
  regCenter:
    # 连接Zookeeper服务器的列表,包括IP地址和端口号,多个地址用逗号分隔
    serverLists: 127.0.0.1:2181
    # Zookeeper的命名空间
    namespace: yzy
    # 等待重试的间隔时间的初始值   单位:毫秒
    base-sleep-time-milliseconds: 10000
    # 等待重试的间隔时间的最大值
    max-sleep-time-milliseconds: 30000
    # 最大重试次数
    max-retries: 3
    # 会话超时时间 单位: 毫秒
    session-timeout-milliseconds: 600000
    # 连接超时时间 单位: 毫秒
    connection-timeout-milliseconds: 600000
  props:
    wechat:
      # 企业微信机器人的 webhook 地址
      webhook: xxxxxx
      # 与企业微信服务器建立连接的超时时间
      connectTimeout: 3000
      # 从企业微信服务器读取到可用资源的超时时间
      readTimeout: 5000
  # elasticjob.jobs 是一个 Map,
  # key 为作业名称,value 为作业类型与配置。
  # Starter 会根据该配置自动创建 OneOffJobBootstrap 或 ScheduleJobBootstrap 的实例
  # 并注册到 Spring 容器中。
  jobs:
    # 如果配置了 cron 属性则为定时调度作业,Starter 会在应用启动时自动启动; 否则为一次性调度作业,
    # 需要通过 jobBootstrapBeanName 指定 OneOffJobBootstrap Bean 的名称,
    # 在触发点注入 OneOffJobBootstrap 的实例并手动调用 execute() 方法。
    myOneOffJob:
      jobBootstrapBeanName: myOneOffJob
      # elasticJobClass 与 elasticJobType 互斥,每项作业只能有一种类型
      elasticJobClass: org.example.jobs.TestJobOne
      # 作业分片总数
      shardingTotalCount: 20
      # 错误处理策略:记录日志策略、记录作业异常日志,但不中断作业执行
      # LOG : 记录日志策略、记录作业异常日志,但不中断作业执行
      # THROW : 抛出异常策略	抛出系统异常并中断作业执行
      # IGNORE : 忽略异常策略	忽略系统异常且不中断作业执行
      # WECHAT : 企业微信通知策略	发送企业微信消息通知,但不中断作业执行
      jobErrorHandlerType: LOG
      # 作业线程池处理策略
      jobExecutorServiceHandlerType: "SINGLE_THREAD"
      # 分片序列号和参数用等号分隔,多个键值对用逗号分隔
      # 分片序列号从0开始,不可大于或等于作业分片总数
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
      # 本地配置是否可覆盖注册中心配置 , 如果可覆盖,每次启动作业都以本地配置为准
      overwrite: true
      # 是否开启错过任务重新执行
      misfire: true
      #是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行
      failover: true
6.3. 测试
controller调用代码:
    @GetMapping("/execute")
    public String executeOneOffJob() {
        OneOffJobBootstrap myOneOffJob = SpringUtil.getBean("myOneOffJob");
        myOneOffJob.execute();
        return "success";
    }
执行:
 
 
7. 其他
7.1. Zookeeper命令查看elasticjob注册的信息
查看作业空间:
 yml全配置:
# Job 配置
elasticjob:
  regCenter:
    # 连接Zookeeper服务器的列表,包括IP地址和端口号,多个地址用逗号分隔
    serverLists: 127.0.0.1:2181
    # Zookeeper的命名空间
    namespace: yzy
    # 等待重试的间隔时间的初始值   单位:毫秒
    base-sleep-time-milliseconds: 10000
    # 等待重试的间隔时间的最大值
    max-sleep-time-milliseconds: 30000
    # 最大重试次数
    max-retries: 3
    # 会话超时时间 单位: 毫秒
    session-timeout-milliseconds: 600000
    # 连接超时时间 单位: 毫秒
    connection-timeout-milliseconds: 600000
  props:
    wechat:
      # 企业微信机器人的 webhook 地址
      webhook: xxxxxx
      # 与企业微信服务器建立连接的超时时间
      connectTimeout: 3000
      # 从企业微信服务器读取到可用资源的超时时间
      readTimeout: 5000
  # elasticjob.jobs 是一个 Map,
  # key 为作业名称,value 为作业类型与配置。
  # Starter 会根据该配置自动创建 OneOffJobBootstrap 或 ScheduleJobBootstrap 的实例
  # 并注册到 Spring 容器中。
  jobs:
    TestJobMany:
      # elasticJobClass 与 elasticJobType 互斥,每项作业只能有一种类型
      elasticJobClass: org.example.jobs.TestJobMany
      # 作业分片总数
      shardingTotalCount: 20
      # 错误处理策略:记录日志策略、记录作业异常日志,但不中断作业执行
      # LOG : 记录日志策略、记录作业异常日志,但不中断作业执行
      # THROW : 抛出异常策略	抛出系统异常并中断作业执行
      # IGNORE : 忽略异常策略	忽略系统异常且不中断作业执行
      # WECHAT : 企业微信通知策略	发送企业微信消息通知,但不中断作业执行
      jobErrorHandlerType: LOG
      cron: 0/5 * * * * ?
      # 作业线程池处理策略
      jobExecutorServiceHandlerType: "SINGLE_THREAD"
      # 本地配置是否可覆盖注册中心配置 , 如果可覆盖,每次启动作业都以本地配置为准
      overwrite: true
      # 是否开启错过任务重新执行
      misfire: true
      #是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行
      failover: true
    # 如果配置了 cron 属性则为定时调度作业,Starter 会在应用启动时自动启动; 否则为一次性调度作业,
    # 需要通过 jobBootstrapBeanName 指定 OneOffJobBootstrap Bean 的名称,
    # 在触发点注入 OneOffJobBootstrap 的实例并手动调用 execute() 方法。
    myOneOffJob:
      jobBootstrapBeanName: myOneOffJob
      # elasticJobClass 与 elasticJobType 互斥,每项作业只能有一种类型
      elasticJobClass: org.example.jobs.TestJobOne
      # 作业分片总数
      shardingTotalCount: 20
      # 错误处理策略:记录日志策略、记录作业异常日志,但不中断作业执行
      # LOG : 记录日志策略、记录作业异常日志,但不中断作业执行
      # THROW : 抛出异常策略	抛出系统异常并中断作业执行
      # IGNORE : 忽略异常策略	忽略系统异常且不中断作业执行
      # WECHAT : 企业微信通知策略	发送企业微信消息通知,但不中断作业执行
      jobErrorHandlerType: LOG
      # 作业线程池处理策略
      jobExecutorServiceHandlerType: "SINGLE_THREAD"
      # 分片序列号和参数用等号分隔,多个键值对用逗号分隔
      # 分片序列号从0开始,不可大于或等于作业分片总数
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
      # 本地配置是否可覆盖注册中心配置 , 如果可覆盖,每次启动作业都以本地配置为准
      overwrite: true
      # 是否开启错过任务重新执行
      misfire: true
      #是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行
      failover: true
Zookeeper 查询:
 ls /
 
 查看注册作业:
 
 其中 TestJobMany 是我们注册的定时任务、myOneOffJob 是我们注册的一次性任务
查看分片数:
 ls /yzy/TestJobMany/sharding
 
 ls /yzy/myOneOffJob/sharding
 
7.2. 如果注册发生未知错误,如何清理elasticjob在zookeeper上注册的命令空间
清除我们elasticjob在zookeeper上注册的命令空间
 deleteall /yzy
 其中yzy是你注册的命名空间
 
 查看:
 
 清除成功,然后重新启动服务,重新注册即可。











![[Android 模块化配置实践] Java + Gradle7配置模块化实践记录](https://img-blog.csdnimg.cn/2b51b68b3649414fb414341cae4d59d0.png)







