一、背景
告警模块,作为大多数应用都存在的一个基础功能,今天我们就以开源项目openjob 为例,分析其设计及实现。
首先,我们梳理一下需求:
- 支持多种告警方式,包括钉钉、飞书、微信和webhook。
- 方便业务模块的接入,这里采用本地事件驱动的方式来解耦模块间的依赖。
本文将针对这两个问题,先画出多个告警的类及接口设计,再讲述本地事件驱动,最后是异步消费队列中的任务。
二、接口及类的设计


 源码内容就不在这里赘述了。
四个实现类的差异在于方法send()和channel(),大多数公共的实现在抽象类AbstractChannel中。
三、模块架构图

 我们可以看到,通过本地事件驱动机制,告警模块和其他业务模块做到了解耦。
事件监听者,订阅事件,转换为任务存入到LinkedBlockingQueue队列中。
同时,启动两个线程池(pullExecutor线程池负责拉取队列中的任务,consumerExecutor线程池负责执行任务,也即告警)

 还有两个基础类 io.openjob.common.task.BaseConsumer和io.openjob.common.task.TaskQueue。
 
 TaskQueue是对LinkedBlockingQueue的一个简单封装,入队在本地事件监听者,出队则在下一个类。
 抽象类BaseConsumer包括两个线程池:pullExecutor和consumerExecutor。
- 线程池pullExecutor的作用是:读取TaskQueue中的任务保存至第二个线程池consumerExecutor里
- 线程池consumerExecutor的作用是:异步执行任务,调用AlarmService.alarm()。
四、本地事件驱动机制
1、定义事件AlarmEvent
package io.openjob.server.alarm.event;
import io.openjob.server.alarm.dto.AlarmEventDTO;
import org.springframework.context.ApplicationEvent;
/**
 * @author stelin swoft@qq.com
 * @since 1.0.6
 */
public class AlarmEvent extends ApplicationEvent {
    public AlarmEvent(AlarmEventDTO alarmEventDTO) {
        super(alarmEventDTO);
    }
}
2、定义事件的发布者AlarmEventPublisher
package io.openjob.server.alarm.event;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
/**
 * @author stelin swoft@qq.com
 * @since 1.0.6
 */
@Component
public class AlarmEventPublisher implements ApplicationEventPublisherAware {
    private static ApplicationEventPublisher applicationEventPublisher;
    /**
     * Publish event
     *
     * @param applicationEvent applicationEvent
     */
    public static void publishEvent(ApplicationEvent applicationEvent) {
        applicationEventPublisher.publishEvent(applicationEvent);
    }
    @Override
    public void setApplicationEventPublisher(@NonNull ApplicationEventPublisher applicationEventPublisher) {
        AlarmEventPublisher.applicationEventPublisher = applicationEventPublisher;
    }
}
业务模块发布事件:
AlarmEventPublisher.publishEvent(new AlarmEvent(alarmEventDTO));
3、事件监听者AlarmEventListener(重点)
类初始化的时候,初始化任务队列TaskQueue,然后启动两个线程池。
作为事件监听者,使用注解EventListener,把事件内容保存至任务队列TaskQueue。
package io.openjob.server.alarm.event;
import io.openjob.common.task.TaskQueue;
import io.openjob.server.alarm.dto.AlarmEventDTO;
import io.openjob.server.alarm.task.EventTaskConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
/**
 * @author stelin swoft@qq.com
 * @since 1.0.6
 */
@Slf4j
@Component
public class AlarmEventListener {
    private final TaskQueue<AlarmEventDTO> queue;
    @Autowired
    public AlarmEventListener() {
        queue = new TaskQueue<>(0L, 1024);
        EventTaskConsumer consumer = new EventTaskConsumer(
                0L,
                1,
                4,
                "Openjob-heartbeat-executor",
                1024,
                "Openjob-heartbeat-consumer",
                queue
        );
        consumer.start();
    }
    /**
     * Alarm listener
     *
     * @param alarmEvent alarmEvent
     */
    @EventListener
    public void alarmListener(AlarmEvent alarmEvent) {
        try {
            AlarmEventDTO event = (AlarmEventDTO) alarmEvent.getSource();
            // 取得事件的内容,放入任务队列
            queue.submit(event);
        } catch (Throwable throwable) {
            log.error("Alarm event add failed!", throwable);
        }
    }
}
- start()方法,包括两个线程池:consumerExecutor和pullExecutor,
 consumerExecutor这里只有初始化,并没有放入任务,待类EventTaskConsumer的consume()实现。pullExecutor详见下文。
		consumerExecutor = new ThreadPoolExecutor(
                this.consumerCoreThreadNum,
                this.consumerMaxThreadNum,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10240),
                new ThreadFactory() {
                    private final AtomicInteger index = new AtomicInteger(1);
                    @Override
                    public Thread newThread(@Nonnull Runnable r) {
                        return new Thread(r, String.format("%s-%d-%d", consumerThreadName, id, index.getAndIncrement()));
                    }
                },
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        consumerExecutor.allowCoreThreadTimeOut(true);
4、生产线程池pullExecutor
- start()方法,每次从队列中拉取一定数量的任务
		this.pullExecutor = new ThreadPoolExecutor(
                1,
                1,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<>(1), r -> new Thread(r, "pull"));
        this.pullExecutor.submit(() -> {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    List<T> tasks = this.pollTasks();
                    if (tasks.size() < this.pollSize) {
                        if (tasks.isEmpty()) {
                            Thread.sleep(this.pollIdleTime);
                            continue;
                        }
                        Thread.sleep(this.pollSleepTime);
                    }
                }
            } catch (Throwable ex) {
                log.warn("Task consumer failed! message={}", ex.getMessage());
            }
        });
- pollTasks(),每次拉取一定量的任务,转放入消费线程池(消费逻辑不一)
    private synchronized List<T> pollTasks() {
        // 每次拉取一定量的任务
        List<T> tasks = queues.poll(this.pollSize);
        if (!tasks.isEmpty()) {
            this.activePollNum.incrementAndGet();
            // 放入消费线程池,异步执行任务
            this.consume(id, tasks);
        }
        return tasks;
    }
其中consume()是一个抽象方法,交由具体类去实现。见下文类EventTaskConsumer
public abstract void consume(Long id, List<T> tasks);
5、消费线程EventTaskConsumer
package io.openjob.server.alarm.task;
import io.openjob.server.alarm.dto.AlarmEventDTO;
import io.openjob.server.alarm.service.AlarmService;
import io.openjob.common.OpenjobSpringContext;
import io.openjob.common.task.BaseConsumer;
import io.openjob.common.task.TaskQueue;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
 * @author stelin swoft@qq.com
 * @since 1.0.6
 */
@Slf4j
public class EventTaskConsumer extends BaseConsumer<AlarmEventDTO> {
    public EventTaskConsumer(Long id,
                             Integer consumerCoreThreadNum,
                             Integer consumerMaxThreadNum,
                             String consumerThreadName,
                             Integer pollSize,
                             String pollThreadName, TaskQueue<AlarmEventDTO> queues) {
        super(id, consumerCoreThreadNum, consumerMaxThreadNum, consumerThreadName, pollSize, pollThreadName, queues, 5000L, 5000L);
    }
    @Override
    public void consume(Long id, List<AlarmEventDTO> tasks) {
         // 异步执行任务
        this.consumerExecutor.submit(new EventTaskRunnable(tasks));
    }
    private static class EventTaskRunnable implements Runnable {
        private final List<AlarmEventDTO> tasks;
        private EventTaskRunnable(List<AlarmEventDTO> tasks) {
            this.tasks = tasks;
        }
        @Override
        public void run() {
            try {
                // 执行告警,详细实现见后
                OpenjobSpringContext.getBean(AlarmService.class).alarm(this.tasks);
            } catch (Throwable throwable) {
                log.error("Alarm event consume failed!", throwable);
            }
        }
    }
}
五、执行告警任务
前文我们讲到告警的策略有多种,具体采用哪种策略,是由任务决定。
 所以,首先保存策略对应的实现,再取得任务的属性后,反查其实现类,最后执行调用。
- 保存策略对应的告警实现
@Service
public class AlarmService {
    private final AlertRuleDAO alertRuleDAO;
    private final DelayDAO delayDAO;
    private final JobDAO jobDAO;
    private final AppDAO appDAO;
    private final Map<String, AlarmChannel> channelMap = new HashMap<>();
    @Autowired
    public AlarmService(List<AlarmChannel> channels, AlertRuleDAO alertRuleDAO, DelayDAO delayDAO, JobDAO jobDAO, AppDAO appDAO) {
        this.alertRuleDAO = alertRuleDAO;
        this.delayDAO = delayDAO;
        this.jobDAO = jobDAO;
        this.appDAO = appDAO;
        channels.forEach(c -> channelMap.put(c.channel().getType(), c));
    }
注入接口AlarmChannel的所有实现类(这种写法的好处是不需要枚举),在类实例化的时候,遍历所有的实现类,保存至Map集合。
private AlarmChannel getChannel(String alertMethod) {
        return Optional.ofNullable(this.channelMap.get(alertMethod))
                .orElseThrow(() -> new RuntimeException("Alarm method not supported! method=" + alertMethod));
    }
根据任务的属性,反查接口的实现类。
AlarmChannel channel = this.getChannel(r.getMethod());
channel.send(alarmDTO);
最后调用实现类的send()方法,这样就实现了告警的灵活配置。
六、总结
当遇到一个接口多种实现的时候,利用jdk的多态性和抽象类,源码实现可以看到设计模式中的策略模式与工厂模式。
通过本文,使用事件驱动机制,降低模块之间的耦合。
顺便说一下,openjob是支持延迟任务的,不过它的实现比较复杂,并没有采用常见的开源方案。
本文就告警模块的源码给出了一个梳理与分析,希望可以帮助到你。



















