[特殊字符] 使用增量同步+MQ机制将用户数据同步到Elasticsearch

news2025/5/25 13:40:04

在开发用户搜索功能时,我们通常会将用户信息存储到 Elasticsearch(简称 ES) 中,以提高搜索效率。本篇文章将详细介绍我们是如何实现 MySQL 到 Elasticsearch 的增量同步,以及如何通过 MQ 消息队列实现用户信息实时更新 的机制。

一、整体思路

为了保证用户数据在 MySQL 与 ES 之间保持一致,我们采用了以下 双通道同步策略

  1. 定时任务 + 游标机制:实现 MySQL 到 ES 的增量同步

  2. 通过 MQ(消息队列) 实现实时同步用户更新/删除操作到 ES


二、定时任务增量同步逻辑详解

我们定义了一个定时任务 syncUserDataToESJob,主要用于从 user 表中 增量拉取变动数据,并同步到 ES。

✨ 增量拉取机制

为了避免全量同步的高开销,我们使用了 “更新时间 + 主键 ID”双重游标,实现分页增量同步:

List<User> usersBatch = userClient.selectIncrementalUsers(lastSyncTime, lastMaxId, PAGE_SIZE);

其中:

  • lastSyncTime 表示上次同步的最大更新时间

  • lastMaxId 用于处理相同更新时间下的并发写入

🧠 同步逻辑核心代码如下:

@XxlJob("syncUserDataToESJob")
@GlobalTransactional
public void syncUserData() {
    Date lastSyncTime = syncPointService.getLastSyncTime();
    Long lastMaxId = syncPointService.getLastMaxId();

    if (lastSyncTime == null) {
        lastSyncTime = new Date(0); // 默认从最早开始
        lastMaxId = 0L;
    }

    Date maxUpdateTime = lastSyncTime;
    Long maxId = lastMaxId;

    boolean hasNewData = false;

    while (true) {
        List<User> usersBatch = userClient.selectIncrementalUsers(lastSyncTime, lastMaxId, PAGE_SIZE);
        if (usersBatch.isEmpty()) break;
        hasNewData = true;

        List<EsUserDoc> esDocs = usersBatch.stream()
            .map(this::convertToEsDoc)
            .collect(Collectors.toList());
        esClient.bulkIndex(esDocs);

        for (User u : usersBatch) {
            Date updateTime = u.getUpdateTime();
            if (updateTime.after(maxUpdateTime)) {
                maxUpdateTime = updateTime;
                maxId = u.getId();
            } else if (updateTime.equals(maxUpdateTime) && u.getId() > maxId) {
                maxId = u.getId();
            }
        }

        lastSyncTime = maxUpdateTime;
        lastMaxId = maxId;
    }

    // 同步删除数据
    List<Long> deletedUserIds = userClient.selectDeletedUserIds(syncPointService.getLastSyncTime(), syncPointService.getLastMaxId());
    if (!deletedUserIds.isEmpty()) {
        esClient.bulkDeleteByIds(deletedUserIds);
    }

    if (hasNewData) {
        log.info("更新同步点:maxUpdateTime = {}, maxId = {}", maxUpdateTime, maxId);
        syncPointService.updateLastSyncPoint(maxUpdateTime, maxId);
    } else {
        log.info("本次没有增量数据,不更新同步点");
    }
}

📝 特别说明:

  • syncPointService 用于记录上次同步的时间点和 ID,保证每次定时任务可重复安全执行。

  • 如果服务中断重启,也不会造成数据丢失或重复。


三、用户修改通过 MQ 实时同步到 ES

虽然定时任务可以周期性同步,但如果用户更新昵称、头像、标签等信息,等待下一次定时任务才能生效,可能会造成 数据延迟

为此,我们引入了 消息队列机制,实现实时更新:

✅ 使用 MQ 的同步方案

  1. 用户信息发生变化时,在业务服务中发送一条消息:

UserUpdateMessage message = new UserUpdateMessage(userId);
rabbitTemplate.convertAndSend("user.topic.exchange", "user.update", message);
  1. 在 ES 同步服务中监听消息并处理:

@RabbitListener(queues = "user.update.queue")
public void onUserUpdate(UserUpdateMessage msg) {
    User user = userClient.getUserById(msg.getUserId());
    if (user != null) {
        EsUserDoc doc = convertToEsDoc(user);
        esClient.index(doc);
    }
}

💡 好处:

  • 实时:用户更新后立即同步到 ES

  • 解耦:业务逻辑与搜索逻辑分离

  • 高性能:避免频繁更新 ES


四、总结与展望

通过“定时任务 + 增量游标” 和 “消息队列实时更新” 的结合方案,我们实现了对用户数据高效且可靠的同步到 Elasticsearch。

同步方式特点使用场景
定时任务批量、容错性强周期性同步新增/修改/删除
MQ 实时快速、解耦用户主动更新资料时快速生效

未来我们还可以扩展以下能力:

  • 引入 Canal + Binlog 监听实现更彻底的实时同步

  • 支持多租户分库分表的场景下数据同步

  • 引入失败重试机制保障消息不丢


希望本文对你在做数据同步或 ES 架构设计时有所启发,欢迎点赞、收藏、评论交流!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2385375.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

打卡第27天:函数的定义与参数

知识点回顾&#xff1a; 1.函数的定义 2.变量作用域&#xff1a;局部变量和全局变量 3.函数的参数类型&#xff1a;位置参数、默认参数、不定参数 4.传递参数的手段&#xff1a;关键词参数 5.传递参数的顺序&#xff1a;同时出现三种参数类型时 作业&#xff1a; 题目1&a…

python训练营day34

知识点回归&#xff1a; CPU性能的查看&#xff1a;看架构代际、核心数、线程数GPU性能的查看&#xff1a;看显存、看级别、看架构代际GPU训练的方法&#xff1a;数据和模型移动到GPU device上类的call方法&#xff1a;为什么定义前向传播时可以直接写作self.fc1(x) 作业 复习今…

人工智能在医疗影像诊断上的最新成果:更精准地识别疾病

摘要&#xff1a;本论文深入探讨人工智能在医疗影像诊断领域的最新突破&#xff0c;聚焦于其在精准识别疾病方面的显著成果。通过分析深度学习、多模态影像融合、三维重建与可视化以及智能辅助诊断系统等关键技术的应用&#xff0c;阐述人工智能如何提高医疗影像诊断的准确性和…

塔能节能平板灯:点亮苏州某零售工厂节能之路

在苏州某零售工厂的运营成本中&#xff0c;照明能耗占据着一定比例。为降低成本、提升能源利用效率&#xff0c;该工厂与塔能科技携手&#xff0c;引入塔能节能平板灯&#xff0c;开启了精准节能之旅&#xff0c;并取得了令人瞩目的成效。 一、工厂照明能耗困境 苏州该零售工厂…

3DMAX插件UV工具UV Tools命令参数详解

常规: 打开UV工具设置对话框。 右键点击: 隐藏/显示主界面。 添加 为选定对象添加展开修改器。 将从下拉菜单中选择映射通道。 Ctrl+点击: 克隆任何当前的修饰符。 右键点击: 找到第一个未展开的修改器。 地图频道 设置展开映射通道。 Ctrl+Click:添加选定的映射通道的展开…

Docker 与微服务架构:从单体应用到容器化微服务的迁移实践

随着软件系统规模和复杂性的日益增长,传统的单体应用(Monolithic Application)在开发效率、部署灵活性和可伸缩性方面逐渐暴露出局限性。微服务架构(Microservice Architecture)作为一种将大型应用拆分为一系列小型、独立、松耦合服务的模式,正成为现代企业构建弹性、敏捷…

《岁月深处的童真》

在那片广袤而质朴的黄土地上&#xff0c;时光仿佛放慢了脚步&#xff0c;悠悠地流淌着。画面的中央&#xff0c;是一个扎着双髻的小女孩&#xff0c;她静静地伫立着&#xff0c;宛如一朵绽放在岁月缝隙中的小花。 小女孩身着一件略显陈旧的中式上衣&#xff0c;布料的纹理间似乎…

文件夹图像批处理教程

前言 因为经常对图像要做数据清洗&#xff0c;又很费时间去重新写一个&#xff0c;我一直在想能不能写一个通用的脚本或者制作一个可视化的界面对文件夹图像做批量的修改图像大小、重命名、划分数据训练和验证集等等。这里我先介绍一下我因为写过的一些脚本&#xff0c;然后我…

RL电路的响应

学完RC电路的响应&#xff0c;又过了一段时间了&#xff0c;想必很多人都忘了RC电路响应的一些内容。我们这次学习RL电路的响应&#xff0c;以此同时&#xff0c;其实也是带大家一起回忆一些之前所学的RC电路的响应的一些知识点。所以&#xff0c;这次的学习&#xff0c;其实也…

30-消息队列

一、消息队列概述 队列又称消息队列&#xff0c;是一种常用于任务间通信的数据结构&#xff0c;队列可以在任务与任务间、 中断和任务间传递信息&#xff0c;实现了任务接收来自其他任务或中断的不固定长度的消息&#xff0c;任务能够从队列里面读取消息&#xff0c;当队列中的…

Thinkphp6使用token+Validate验证防止表单重复提交

htm页面加 <input type"hidden" name"__token__" value"{:token()}" /> Validate 官方文档 ThinkPHP官方手册

AppAgentx 开源AI手机操控使用分享

项目地址: https://appagentx.github.io/?utm_sourceai-bot.cn GitHub仓库: https://github.com/Westlake-AGI-Lab/AppAgentX/tree/main arXiv技术论文:https://arxiv.org/pdf/2503.02268 AppAgentx是什么: AppAgentX 是西湖大学推出的一种自我进化式 GUI 代理框架。它通过…

Axure设计之带分页的穿梭框原型

穿梭框&#xff08;Transfer&#xff09;是一种常见且实用的交互组件&#xff0c;广泛应用于需要批量选择或分配数据的场景。 一、应用场景 其典型应用场景包括&#xff1a; 权限管理系统&#xff1a;批量分配用户角色或系统权限数据筛选工具&#xff1a;在大数据集中选择特…

电机控制储备知识学习(五) 三项直流无刷电机(BLDC)学习(四)

目录 电机控制储备知识学习&#xff08;五&#xff09;一、三项直流无刷电机(BLDC)学习&#xff08;四&#xff09;1&#xff09;软件方法控制电机转速2&#xff09;PWM概念和PWM的产生3&#xff09;转子位置检测和霍尔传感器的工作原理分析4&#xff09;霍尔传感器安装角度和电…

Java—— 网络爬虫

案例要求 https://hanyu.baidu.com/shici/detail?pid0b2f26d4c0ddb3ee693fdb1137ee1b0d&fromkg0 http://www.haoming8.cn/baobao/10881.html http://www.haoming8.cn/baobao/7641.html上面三个网址分别表示百家姓&#xff0c;男生名字&#xff0c;女生名字&#xff0c;如…

Baklib内容中台的主要构成是什么?

Baklib内容中台核心架构 Baklib作为一站式知识管理平台的核心载体&#xff0c;其架构设计围绕智能搜索引擎优化技术与多终端适配响应系统展开。通过模块化内容组件的灵活配置&#xff0c;企业可快速搭建知识库、FAQ页面及帮助中心等标准化场景&#xff0c;同时借助可视化数据看…

深度解析 Java 中介者模式:重构复杂交互场景的优雅方案

一、中介者模式的核心思想与设计哲学 在软件开发的历史长河中&#xff0c;对象间的交互管理一直是架构设计的核心难题。当多个对象形成复杂的网状交互时&#xff0c;系统会陷入 "牵一发而动全身" 的困境。中介者模式&#xff08;Mediator Pattern&#xff09;作为行…

untiy实现汽车漫游

实现效果 汽车漫游 1.创建汽车模型 导入汽车模型(FBX格式或其他3D格式),确保模型包含车轮、车身等部件。 为汽车添加碰撞体(如 Box Collider 或 Mesh Collider),避免穿透场景物体。 添加 Rigidbody 组件,启用重力并调整质量(Mass)以模拟物理效果。 2.编写汽车控制脚本…

PID项目---硬件设计

该项目是立创训练营项目&#xff0c;这些是我个人学习的记录&#xff0c;记得比较潦草 1.硬件-电路原理电赛-TI-基于MSPM0的简易PID项目_哔哩哔哩_bilibili 这个地方接地是静电的考量 这个保护二极管是为了在电源接反的时候保护电脑等设备 大电容的作用&#xff1a;当电机工作…

Pluto实验报告——基于FM的音频信号传输并解调恢复

目录 一、实验目的 ................................ ................................ ................................ .................. 3 二、实验内容 ................................ ................................ ................................ ......…