一、假设需求:
- 某系统在MySQL某表中操作了一条数据
 - 在其他系统中,实时获取最新被操作数据的数据库名、数据表名、操作类型、数据内容
 
应用场景:
按最近项目的一个需求来说:
1.当某子系统向报警表中新增了一条报警数据;
2.项目中各个子系统需要获取刚刚新增的报警数据;
3.如果使用传统入库查库方式:
- 大批量插入时获取最新的报警数据需要新增查询逻辑
 - 频繁获取最新新增数据效率较低
 
二、实现思路
- 使用ApplicationListener监听数据库
 - 将监听到的数据同步并发布到Redis消息队列中
 - 其他系统订阅Redis消息队列频道获取新增的最新数据
 
三、代码实现
- 引入redis客户端依赖(SpringBoot并未集成)
 
		<dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>5.0.0</version>
        </dependency>
 
- 创建数据同步事件
 
public class MessageEvent extends ApplicationEvent {
    private CdcMessage message;
    /**
     * 初始化对象
     * 
     * @param source
     */
    public MessageEvent(Object source, CdcMessage message) {
        super(source);
        this.message = message;
    }
    @Override
    public Object getSource() {
        return super.getSource();
    }
    public CdcMessage getMessage() {
        return this.message;
    }
    public void setMessage(CdcMessage message) {
        this.message = message;
    }
}
 
- 创建数据信息类CdcMessage
 
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CdcMessage implements Serializable {
    /**
     * 数据
     */
    private JSONObject data;
    /**
     * 数据库类型
     */
    private String dbType;
    /**
     * 处理类型(UPDATE DELETE CREATE)
     */
    private String handleType;
    /**
     * 数据库名
     */
    private String database;
    /**
     * 表名
     */
    private String table;
    
    /**
     * JSON 转对象
     *
     * @param clazz 转换类型
     * @param <T>   泛型
     * @return 集合结果
     */
    public <T> List<T> toBean(Class<T> clazz) {
        List<T> rst = new LinkedList<>();
        rst.add(JSON.toJavaObject(data, clazz));
        return rst;
    }
}
 
- 创建数据同步方法(实现ApplicationListener数据监听接口,实现onApplicationEvent方法)
 
@Slf4j
@Component
public class Process implements ApplicationListener<MessageEvent> {
    
    @Override
    public void onApplicationEvent(MessageEvent event) {
        CdcMessage message = event.getMessage();
        // 当TableName表进行新增操作时,执行数据同步操作
        if ("TableName".equalsIgnoreCase(message.getTable()) && "CREATE".equals(message.getHandleType())) {
            // 创建Jedis对象,连接到Redis服务器
            Jedis jedis = new Jedis("ip", 6379);
            // 设置认证密码
            jedis.auth("psssword");
            JSONObject messageData = message.getData();
            // 发布消息给消费者
            jedis.publish("频道名称", JSON.toJSONString(messageData ));
            // 关闭Jedis连接
            jedis.close();
        }
    }
}
 
四、测试
- 编写测试代码(消息订阅)
 
@Test
    public void test() {
        // 创建Jedis对象,连接到Redis服务器
        Jedis jedis = new Jedis("ip", 6379);
        // 设置认证密码
        jedis.auth("password");
        // 创建消息订阅器对象
        JedisPubSub jedisPubSub = new JedisPubSub() {
            @Override
            public void onMessage(String channel, String message) {
                // 在接收到消息时执行的逻辑,可以根据实际需求进行编写
                System.out.println(message);
            }
        };
        // 订阅指定频道
        jedis.subscribe(jedisPubSub, "频道名称");
        // 关闭Jedis连接
        jedis.close();
    }
 
- 新增数据
 

- 获取消息订阅数据
 

五、总结
该功能主要实现方式为传统数据监听+MQ消息发布/订阅。由于该项目系统MQ只集成了Redis,所以未使用四大MQ从而使用Redis。

















![CVE-2023-2766:泛微E-Office信息泄露漏洞复现 [附POC]](https://img-blog.csdnimg.cn/de6bfaa83f3149519e7076e39be8ee3c.png)

