Kafka 如何保证不重复消费

news2025/6/5 22:35:06

在消息队列的使用场景中,避免消息重复消费是保障数据准确性和业务逻辑正确性的关键。对于 Kafka 而言,保证不重复消费并非单一机制就能实现,而是需要从生产者、消费者以及业务层等多个维度协同配合。接下来,我们将结合图文详细解析 Kafka 保证不重复消费的核心策略与实现方式。

一、消费者端:精确控制偏移量提交

在 Kafka 中,偏移量(Offset)是标识分区内消息位置的关键要素,消费者通过提交偏移量来标记已消费的消息位置。而合理管理偏移量提交,是避免重复消费的重要一环。

1.1 禁用自动提交,启用手动提交

自动提交偏移量(enable.auto.commit=true)是 Kafka 消费者的默认设置,但这种方式存在风险。因为自动提交可能在消息尚未完全处理完成时就执行,一旦消费者在此期间出现故障,重启后就会从已提交的偏移量位置开始消费,导致部分消息被重复处理。因此,为了更精确地控制消费进度,我们通常会禁用自动提交,改用手动提交。

props.put("enable.auto.commit", "false"); // 禁用自动提交

1.2 手动提交的正确时机

手动提交偏移量需要确保在消息完全处理成功后进行。以下是一段示例代码,展示了手动提交的逻辑:

try {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processMessage(record); // 处理消息
    }
    consumer.commitSync(); // 批量提交偏移量(仅当所有消息处理完成)
} catch (Exception e) {
    // 处理失败,不提交偏移量,重启后重新消费
}

在上述代码中,只有当processMessage(record)方法成功处理完所有拉取到的消息后,才会调用consumer.commitSync()提交偏移量。如果在处理过程中出现异常,偏移量不会被提交,消费者重启后将重新消费这些消息,从而保证消息至少被处理一次(At-Least-Once)。结合后续的去重逻辑,即可实现不重复消费(Exactly-Once)。

1.3 异步提交与回调处理

除了同步提交,Kafka 还支持异步提交偏移量,通过consumer.commitAsync()方法实现。异步提交不会阻塞线程,适用于对实时性要求较高的场景。不过,异步提交存在并发问题,例如旧偏移量可能覆盖新偏移量。因此,通常会搭配回调函数处理提交失败的情况:

consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        log.error("Commit failed: {}", exception.getMessage());
        // 可重试或记录日志
    }
});

消费者偏移量提交逻辑示意图如下:

二、生产者端:幂等性与事务机制

如果生产者重复发送消息,即便消费者端精确管理了偏移量,仍然可能导致重复消费。为此,Kafka 在生产者端提供了幂等性和事务机制来解决这一问题。

2.1 幂等性生产者

幂等性生产者(Idempotent Producer)是 Kafka 从 0.11.0.0 版本开始引入的特性。其核心原理是 Kafka 为每个生产者分配唯一的Producer ID(PID),并为每条消息生成递增的Sequence Number。当生产者因网络问题等原因重复发送同一消息时,Broker 会根据 PID 和 Sequence Number 过滤掉重复消息,确保相同消息仅被写入一次。

开启幂等性生产者非常简单,只需在生产者配置中设置:

props.put("enable.idempotence", "true"); // 默认为 true

不过,需要注意的是,幂等性生产者仅能保证单分区内的幂等性,无法跨分区或跨会话保证消息不重复。

2.2 事务性生产者

对于需要跨分区或跨会话保证消息不重复的场景,就需要使用事务性生产者(Transactional Producer)。事务性生产者通过Transactional ID将多个分区的消息写入操作封装为一个原子操作,确保这些操作要么全部成功,要么全部回滚。

事务性生产者的关键操作步骤如下:

  1. 初始化事务:producer.beginTransaction();
  2. 发送消息到多个分区:producer.send(...);
  3. 提交事务:producer.commitTransaction();
  4. 若中途失败,回滚事务:producer.abortTransaction();

通过事务性生产者,即使生产者重启,新实例也能通过相同的 Transactional ID 继承旧 PID,避免重复消息的产生。同时,配合消费者的偏移量管理,能够实现端到端的不重复消费语义。

生产者幂等性与事务机制示意图如下:

三、业务层:添加去重逻辑

尽管 Kafka 在生产者和消费者端提供了多种机制来避免重复消费,但在一些极端情况下,例如下游系统处理消息时出现异常重试,仍然可能导致重复数据。因此,在业务层添加去重逻辑是保证不重复消费的最后一道防线。

3.1 为消息添加唯一标识

一种常见的去重方式是为每条消息添加唯一标识,例如 UUID。消费者在处理消息时,首先检查本地是否已处理过该标识的消息。如果已处理,则直接跳过;否则,进行正常的消息处理流程,并在处理完成后将该标识记录下来。

3.2 利用数据库特性

在将消息写入数据库时,可以利用数据库的特性实现去重。例如,在 MySQL 中使用INSERT IGNORE语句,当插入重复数据时,数据库会自动忽略该操作;或者结合版本号(Version)或时间戳(Timestamp)实现乐观锁,确保同一数据不会被重复更新。

以下是一个简单的伪代码示例,展示了业务层去重逻辑:

void processMessage(ConsumerRecord record) {
    String messageId = record.value().getMessageId();
    if (isProcessed(messageId)) { // 检查本地缓存或数据库
        return; // 已处理,跳过
    }
    saveToDatabase(record.value()); // 写入业务系统
    markAsProcessed(messageId); // 标记为已处理
}

四、不同场景下的配置组合与实践建议

在实际应用中,需要根据具体的业务场景选择合适的配置组合来保证不重复消费:

场景

生产者配置

消费者配置

去重方式

单分区,不跨会话

开启幂等性(默认)

手动提交偏移量

可选(幂等性已保障)

多分区,需跨会话

开启事务性(transactional.id)

手动提交偏移量 + 事务性消费

可选(事务机制保障)

下游系统无去重能力

幂等性 / 事务性 + 消息唯一标识

手动提交偏移量

业务层去重(必选)

此外,在实际操作中还应注意以下几点:

  • 监控消费者的consumer_lag(消费滞后量)和生产者的transactional_id_expiry(事务 ID 过期时间)等关键指标,及时发现潜在问题。
  • 合理调整max.in.flight.requests.per.connection等参数,控制未确认请求数,避免重试时出现消息乱序。

Kafka 保证不重复消费是一个多机制协同工作的过程,需要从生产者、消费者和业务层等多个层面综合考虑和配置。通过正确运用这些机制和策略,能够在分布式消息处理场景中高效、可靠地避免重复消费,确保数据的准确性和业务的稳定性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2397943.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RNN结构扩展与改进:从简单循环网络到时间间隔网络的技术演进

本文系统介绍 RNN 结构的常见扩展与改进方案。涵盖 简单循环神经网络&#xff08;SRN&#xff09;、双向循环神经网络&#xff08;BRNN&#xff09;、深度循环神经网络&#xff08;Deep RNN&#xff09; 等多种变体&#xff0c;解析其核心架构、技术特点及应用场景&#xff0c;…

类 Excel 数据填报

类 Excel 填报模式&#xff0c;满足用户 Excel 使用习惯 数据填报&#xff0c;可作为独立的功能模块&#xff0c;用于管理业务流程、汇总采集数据&#xff0c;以及开发各类数据报送系统&#xff0c;因此&#xff0c;对于报表工具而言&#xff0c;其典型场景之一就是利用报表模…

Office文档图片批量导出工具

软件介绍 本文介绍一款专业的Office文档图片批量导出工具。 软件特点 这款软件能够批量导出Word、Excel和PPT中的图片&#xff0c;采用绿色单文件设计&#xff0c;体积小巧仅344KB。 基本操作流程 使用方法十分简单&#xff1a;直接将Word、Excel或PPT文件拖入软件&#xf…

【iOS】ARC 与 Autorelease

ARC 与 Autorelease 文章目录 ARC 与 Autorelease前言何为ARC内存管理考虑方式自己生成的对象,自己持有非自己生成的对象,自己也可以持有不再需要自己持有的对象时释放非自己持有的对象无法释放 ARC的具体实现编译期和运行期ARC做的事情ARC实现: __autoreleasing 与 Autoreleas…

铁电液晶破局 VR/AR:10000PPI 重构元宇宙显示体验

一、VR/AR 沉浸感困境&#xff1a;传统显示技术的天花板在哪&#xff1f; &#xff08;一&#xff09;纱窗效应与眩晕感&#xff1a;近眼显示的双重枷锁 当用户戴上 VR 头显&#xff0c;眼前像素网格形成的 “纱窗效应” 瞬间打破沉浸感。传统液晶 500-600PPI 的像素密度&…

竞争加剧,美团的战略升维:反内卷、科技与全球化

5月26日&#xff0c;美团发布2025年第一季度业绩报告&#xff0c;交出了一份兼具韧性与创新性的成绩单。 报告显示&#xff0c;公司一季度总营收866亿元&#xff0c;同比增长18%&#xff1b;核心本地商业收入643亿元&#xff0c;同比增长18%&#xff1b;季度研发投入58亿元&a…

(17)课36:窗口函数的例题:例三登录时间与连续三天登录,例四球员的进球时刻连续进球。

&#xff08;89&#xff09;例三登录时间 &#xff1a; 保留代码版本 &#xff1a; CREATE TABLE sql_8( user_id varchar(2), login_date date ); insert into sql_8(user_id,login_date) values(A,2024-09-02),(A,2024-09-03),(A,2024-09-04),(B,2023-11-25),(B,2023-12- 3…

高性能分布式消息队列系统(二)

上一篇博客将C进行实现消息队列的用到的核心技术以及环境配置进行了详细的说明&#xff0c;这一篇博客进行记录消息队列进行实现的核心模块的设计 五、项目的需求分析 5.1、项目框架的概念性理解 5.1.1、消息队列的设计和生产消费者模型的关系 在现代系统架构中&#xff0c;…

华为OD机试真题——天然蓄水库(2025A卷:200分)Java/python/JavaScript/C++/C语言/GO六种最佳实现

2025 A卷 200分 题型 本文涵盖详细的问题分析、解题思路、代码实现、代码详解、测试用例以及综合分析; 并提供Java、python、JavaScript、C++、C语言、GO六种语言的最佳实现方式! 2025华为OD真题目录+全流程解析/备考攻略/经验分享 华为OD机试真题《天然蓄水库》: 目录 题目…

【Harmony OS】数据存储

目录 数据存储概述 首选项数据存储 关系型数据库 数据存储概述 • 数据存储 是为了解决应用数据持久化问题&#xff0c;使得数据能够存储在外存中&#xff0c;达到保存或共享目的。 • 鸿蒙应用数据存储包括 本地数据存储 和 分布式数据存储 。 • 本地数据存储 为应用…

MybatisPlus--核心功能--service接口

Service接口 基本用法 MyBatisPlus同时也提供了service接口&#xff0c;继承后一些基础的增删改查的service代码&#xff0c;也不需要去书写。 接口名为Iservice&#xff0c;而Iservice也继承了IRepository&#xff0c;这里提供的方法跟BaseMapper相比只多不少&#xff0c;整…

uniapp调试,设置默认展示的toolbar内容

uniapp调试&#xff0c;设置默认展示的toolbar内容 设置pages.json中 pages数组中 json的顺序就可以只需要调整顺序&#xff0c;不会影响该bar在页面中的显示默认展示第一条page

笔记本电脑开机无线网卡自动禁用问题

1.问题环境 电脑品牌&#xff1a;华硕笔记本天选4 电脑型号&#xff1a;FX507VV 电脑系统&#xff1a;windows 11_x64_24h2 文档编写时间&#xff1a;2025年6月 2.问题现象 1. 笔记本电脑开机之后自动禁用无线网卡 使用USB转RJ45转接头同样无效&#xff0c;这个网卡也给禁…

推荐一款使用html开发桌面应用的工具——mixone

简介 mixone是开发桌面应用&#xff08;Win、Mac、Linux&#xff09;的一款工具、其基于electron实现。其拥有简单的工程结构。以为熟悉前端开发的程序员可以很轻松的开发出桌面应用&#xff0c;它比electron的其他框架更简单&#xff0c;因为那些框架基本上还需要了解electro…

【云原生开发】如何通过client-go来操作K8S集群

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

八.MySQL复合查询

一.基本查询回顾 分组统计 group by 函数作用示例语句说明count(*)统计记录条数select deptno, count(*) from emp group by deptno;每个部门有多少人&#xff1f;sum(sal)某字段求和select deptno, sum(sal) from emp group by deptno;每个部门总工资avg(sal)求平均值select…

FastMCP vs MCP:协议标准与实现框架的协同

你好&#xff0c;我是 shengjk1&#xff0c;多年大厂经验&#xff0c;努力构建 通俗易懂的、好玩的编程语言教程。 欢迎关注&#xff01;你会有如下收益&#xff1a; 了解大厂经验拥有和大厂相匹配的技术等 希望看什么&#xff0c;评论或者私信告诉我&#xff01; 文章目录 一…

AI视频“入驻”手机,多模态成智能终端的新战场

文&#xff5c;乐乐 今天&#xff0c;无线蓝牙耳机&#xff08;TWS&#xff09;已经成为人人都用得起的产品。 但退回到9年前&#xff0c;苹果AirPods是全球第一款真正意义上的无线蓝牙耳机。靠着自研并申请专利的Snoop监听技术&#xff0c;苹果解决了蓝牙耳机左右延时和能耗…

nginx+tomcat负载均衡群集

一 案例部署Tomcat 目录 一 案例部署Tomcat 1.案例概述 1.1案例前置知识点 &#xff08;1&#xff09;Tomcat简介 &#xff08;2&#xff09;应用场景 2.实施准备 &#xff08;1&#xff09;关闭Linux防火墙 &#xff08;2&#xff09;安装Java 2.1 安装配置TOMACT …

建造者模式:优雅构建复杂对象

引言 在软件开发中&#xff0c;有时我们需要创建一个由多个部分组成的复杂对象&#xff0c;这些部分可能有不同的变体或配置。如果直接在一个构造函数中设置所有参数&#xff0c;代码会变得难以阅读和维护。当对象构建过程复杂&#xff0c;且需要多个步骤时&#xff0c;我们可…