【消息队列】细说Kafka消费者位移机制

news2025/6/9 23:23:29

什么是位移

在这里插入图片描述
位移说白了就是消费者消费对应的Topic的分区的消费位置,之前存储到ZK中,后来转移到Kafka默认的Topic中。结构是采用key+value形势存储的,key是groupId+topic+分区号,value是offset的值。
而上述的存储就在_consumer_offsets,即位移主题。

为什么Kafka从ZK转移到Kafka内部呢?
我们知道针对位移这种操作是一个频繁的写操作,而ZK本身不支持高频的写操作。并且由于Kafka本身提供高持久性和高频的写操作,所以将位移管理移交给Kafka就是水到渠成的事情。

Kafak的位移主题和普通的主题有什么区别嘛?
其实位移主题和普通的主题没有区别,只不过是专门存储位移相关的数据。但是一般不建议程序员对位移主题进行写入和消费。Kafka Consumer API会自动完成这件事情。

位移主题的格式是什么样子呢?
如何让我们来设计的话,那么如何标记消费者所在的位移,而GroupId就是唯一值,不仅仅对于一个消费者适用,对于多个消费者也是适用的。而消费者消费的是分区级别的数据,所以就是<GroupId、Topic、分区>,这就是Key的形式,但是具体的Value呢,除了保存基本的位移数据外,还保存了Consumer Group的信息以及删除Group的信息。

位移主题如何创建的?

  • Kafka集群的第一个消费者程序启动的时候,自动创建位移主题。默认是50个分区,offsets.topic.num.partitions 进行设置。副本数默认是3,offsets.topic.replication.factor。

位移的提交方式

上面我们说了位移是什么,以及具体格式,创建时机,但是什么时候提交位移,Kafka有两种方式,一种是自动提交,另一种是手动提交,可以通过参数来进行设置。enable.auto.commit,默认是true。自动提交。
从用户的角度来说分为手动提交和自动提交,从Consumer端的角度来说,位移提交分为同步提交和异步提交

自动提交

在这里插入图片描述

自动提交位移是默认机制,但是存在一个问题,就是会无限制的向位移主题写入消息。
假设生产者写入一条消息,位移为1,消费者消费了1,生产者之后没有写入,那么消费者会一直给位移主题不停写入位移=1的消息,其实本身值保留一条就可以,时间久了对于磁盘会撑满。

Compact策略
既然提交会有磁盘爆满的风险,那么kafka是如何处理的呢,其实Kafka会有整理的过程,适用Compact策略,说白了就是如果K1的提交时刻早于K2,那么K1其实就没有必要保存,删除K1就可以。以下是Kafka官网的Compact的过程。因为K1的V1版本早于K1的V4版本,所以值保存一份。同理别的数据也是一样的。
在这里插入图片描述
对于专门处理Compact策略,有一个后台线程即 Log Cleaner

		Properties properties = new Properties();
		
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"2000");

        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        kafkaConsumer.subscribe(Arrays.asList("test1"));

        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> c : consumerRecords) {
                System.out.printf("offset = %d, key = %s, values = %s%n",c.offset(),c.key(),c.value());
            }
        }

以上就是自动提交位移,Kafka Consumer API会每5秒自动提交一次位移。

手动提交

手动提交 enable.auto.commit 设置为false, 只是告诉Kafka ConsumerAPI不要自动提交位移,但是需要调用相应的API手动提交位移。

同步提交

同步提交的方式是通过consumer.commitSync(),这个方法会一直等待位移提交结果,除非出现一次,被catch住。

但是我们知道,只有当从poll中获取消息处理完毕业务逻辑之后,才进行位移提交。如果过早的提交位移,那么可能出现消费数据丢失。
自动提交 存在重复消费 而针对自动位移提交来说,虽然可以保证从顺序上消息按照批次进行消费,不会出现消息的丢失,但是可能存在消息重复消费的问题,这也就是一般消费者需要有幂等机制进行限制。
比如默认是5S提交一次位移,但是当提交位移之后的3S发生了rebalance,那么下次消费者消费的位移会从3S之前的位移开始消费,也就是说可能3S前的数据重新消费一次。

		properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
		
        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            //处理消息
            try {
                kafkaConsumer.commitSync();
            } catch (Exception e) {
                //处理异常
                e.printStackTrace();
            } finally {
                kafkaConsumer.close();
            }
        }

同步提交其实虽然可以很灵活的控制提交时间和频率,但是因为需要同步等待,必须等待Broker返回结果流程才能执行下去,而这种是非常影响系统的TPS,因为非资源限制而导致的阻塞是系统的瓶颈。如果拉长提交时间,那么可能consumer重启之后,重新消费的消息更多。所以也就引出了异步提交的方式。

异步提交

同步的问题是影响系统的TPS,而异步不会,会通过回调函数来进行结果的通知,成功的情况是是没有问题,但是失败的情况下,如果重试的话,因为是异步的方式,所以可能A消息因为各种原因导致消费失败,但是此时已经执行了B、C消息,如果重试的话,那么A小时可能已经过期或者不是最新值。所以异步提交的方式重试没有意义。

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            //处理消息
            try {
                kafkaConsumer.commitAsync((offsets, exception) -> {
                    if (Objects.nonNull(exception)) {
                        //处理异常
                    }
                });
            }  finally {
                kafkaConsumer.close();
            }
        }

一般来说我们更加推荐在实际的生产环境中使用同步+异步的方式提交,这样即避免了因同步带来的无效等待,也可以保证位移可以准确提交。

try {
            while (true) {
                        ConsumerRecords<String, String> records = 
                                    consumer.poll(Duration.ofSeconds(1));
                        process(records); // 处理消息
                        commitAysnc(); // 使用异步提交规避阻塞
            }
} catch (Exception e) {
            handle(e); // 处理异常
} finally {
            try {
                        consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
	} finally {
	     consumer.close();
    }
}

精细化的提交

需要注意的是Kafka默认每次提交位移都是按照消费位移的最大值提交,比如消费者从Broker拉取了100条消息,那么位移如何原来是0的话,当kafka消费完这批次的数据,位移提交会提交100。这样就存在一个问题那就是我们可能不想每次都消费完100在提交,如果在这个过程中出现不可控异常,那么需要全部重新消费一次,我们可以通过配置比如消费50个消费就提交。
其实以上的思想就是将大事务切分成小事务,这样即使出现异常情况,也只需要从已经提交的位移处开始消费,具体的方式就是通过 commitSync(Map<TopicPartition, OffsetAndMetadata>) 和 commitAsync(Map<TopicPartition, OffsetAndMetadata>) 具体大家可以自行Google。这里就不细说了。

生产经验

漏消息和重复消费

重复消费:已经消费了,但是offset没有提交
漏消息:提交了offset但是没有消费
在这里插入图片描述

小结

本篇文章主要描述了位移以及相关位移提交的方式。
首先针对位移这是消费者消费位置通过在Broker进行持久化数据的过程,位移有相应的位移主题,以及位移提交方式,自动提交和手动提交。自动提交中可能存在消息重复提交的情况,通过compact策略来解决,而手动提交的方式有同步和异步方式,显然两者都存在一定的优缺点,所以需要两种方式结合,也可以根据不同的业务来进行精细化的提交。其实可以从代码层面看,不同的方式有不同的优缺点,而提高的更高纬度的架构,也是一种trade-off,这可能就是架构与代码设计的艺术吧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/412405.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微信小程序02

小程序tabBar 普通页面跳转到 带有tabBar页面的时候不能使用 wx.navigateTo() 小程序中跳转到选项卡页面使用 wx.switchTab()跳转 到底部 onReachBottom() 函数 &#xff0c;&#xff0c; 在下拉刷新显示取消loading &#xff1a; wx.showNavigationBarLoading() wx.hideNavi…

深入讲解Linux内核中常用的数据结构和算法

Linux内核代码中广泛使用了数据结构和算法&#xff0c;其中最常用的两个是链表和红黑树。 链表 Linux内核代码大量使用了链表这种数据结构。链表是在解决数组不能动态扩展这个缺陷而产生的一种数据结构。链表所包含的元素可以动态创建并插入和删除。链表的每个元素都是离散存…

【网络原理】网络通信与协议

✨个人主页&#xff1a;bit me&#x1f447; ✨当前专栏&#xff1a;Java EE初阶&#x1f447; 目 录一. 网络发展史二. 网络通信基础1. IP地址2. 端口号3. 认识协议&#xff08;核心概念&#xff09;4. 五元组5. 协议分层6. 封装和分用一. 网络发展史 独立模式&#xff1a;计…

C++入门demo(从最简单的案例学习C++)

通过案例学习Cdemo01 在屏幕上输出内容demo02 规格不同的箱子&#xff08;变量&#xff09;demo03 物品存放&#xff08;变量赋值&#xff09;demo04 交换物品&#xff08;变量之间交换数值&#xff09;demo05 消失的重量&#xff08;隐式类型变换&#xff09;demo06 游泳池的容…

Melis4.0[D1s]:7.lvgl添加物理按键

文章目录1.lvgl注册keypad驱动1.1 在melis的ADC按键中发送消息1.1.1 创建消息队列&#xff0c;并初始化1.1.2 扫描按键时&#xff0c;发送按下和松开消息1.2 编写读取按键的回调函数1.3 lvgl按键驱动注册2.在gui中测试物理按键效果2.1 测试效果参考资料&#xff1a; 1.韦东山老…

第七章 基于 RNN 的生成文本

目录7.1 使用语言模型生成文本7.1.1 使用 RNN 生成文本的步骤7.1.2 文本生成的实现7.1.3 更好的文本生成7.2 seq2seq 模型7.2.1 seq2seq 的原理7.2.2 时序数据转换的简单尝试7.2.3 可变长度的时序数据7.2.4 加法数据集7.3 seq2seq 的实现7.3.1 Encoder类7.3.2 Decoder类7.3.3 S…

静态时序分析Static Timing Analysis3——特殊路径(多周期、半周期、伪路径)的时序检查

文章目录前言一、多周期路径1、建立时间检查2、保持时间检查二、半周期路径1、建立时间检查2、保持时间检查三、伪路径前言 2023.4.12 一、多周期路径 对于建立时间&#xff0c;要设置为N&#xff08;向后移&#xff09;&#xff1b;对于保持时间&#xff0c;要设置为N-1&…

9.8.0.32:ProEssentials数据可视化2D和3D图表:Crack

下面是我们的Winforms、Wpf、C MFC、VCL、ActiveX图表组件示例项目中的屏幕捕获。 有关下图&#xff0c;请参见我们的示例项目和演示中的030。 ProEssentials Winforms 图表, WPF 图表, C/MFC/VCL 图表. Gigasoft拥有20多年帮助企业开发大型客户端和嵌入式图表项目的经验。图…

JavaScript基础-02

常量&#xff08;字面量&#xff09;&#xff1a;数字和字符串 常量也称之为“字面量”&#xff0c;是固定值&#xff0c;不可改变。看见什么&#xff0c;它就是什么。 常量有下面这几种&#xff1a; 数字常量&#xff08;数值常量&#xff09;字符串常量布尔常量自定义常量…

传输线的物理基础(九):N 截面集总电路模型

理想的传输线电路元件是一种分布式元件&#xff0c;可以非常准确地预测实际互连的测量性能。下图显示了 1 英寸长传输线在频域中的实测阻抗和仿真阻抗对比。我们看到甚至高达 5 GHz 的测量带宽也能达成出色的协议。 1英寸长、50欧姆传输线的测量&#xff08;圆圈&#xff09;和…

Java实现hdfs的8个api操作

Java实现hdfs的8个api操作一、预处理准备1. 配置本地hadoop3.1.3目录文件2. 配置环境变量二、Maven项目依赖三、Java源代码四、api操作的实现1. 实现前的准备2. 创建hdfs上的路径3. 删除hdfs上的路径4. 创建hdfs文件并写入数据5. 删除hdfs上的文件6. hdfs上的文件移动路径并改名…

算法笔记:Frechet距离度量

曲线之间相似性的度量&#xff0c;它考虑了沿曲线的点的位置和顺序 1 概念 1.1 直观理解 主人走路径A&#xff0c;狗走路径B&#xff0c;他们有不同的配速方案主人和狗各自走完这两条路径过程中所需要的最短狗绳长度 &#xff08;在某一种配速下需要的狗绳长度&#xff09;&a…

MySQL-高可用MHA(二)

目录 &#x1f341;通过keepalived方式 &#x1f342;安装keepalived &#x1f343;防火墙策略 &#x1f343;keep配置文件 &#x1f342;MHA应用keepalived &#x1f343;停止MHA &#x1f343;启动MHA &#x1f343;检查状态 &#x1f343;测试 &#x1f341;通过脚本实现VIP…

数据结构——线段树

线段树的结构 线段树是一棵二叉树&#xff0c;其结点是一条“线段”——[a,b]&#xff0c;它的左儿子和右儿子分别是这条线段的左半段和右半段&#xff0c;即[a, (ab)/2 ]和[(ab)/2 ,b]。线段树的叶子结点是长度为1的单位线段[a,a1]。下图就是一棵根为[1,10]的线段树&#xff1…

真题详解(UML图)-软件设计(四十七)

真题详解(Flynn分类)-软件设计&#xff08;四十六)https://blog.csdn.net/ke1ying/article/details/130072198 某搜索引擎在使用过程中&#xff0c;若要增加接受语音输入的功能&#xff0c;使用户可以通过语音来进行搜索&#xff0c;此时对应系统进行____维护&#xff1f; 正确…

基于逻辑回归构建肿瘤预测模型

使用逻辑回归构建肿瘤预测模型 描述 乳腺癌数据集包括569个样本&#xff0c;每个样本有30个特征值&#xff08;病灶特征数据&#xff09;&#xff0c;每个样本都属于恶性&#xff08;0&#xff09;或良性&#xff08;1&#xff09;两个类别之一&#xff0c;要求使用逻辑回归&…

Python学习笔记--函数

&#xff08;一&#xff09; 函数介绍 1. 函数&#xff1a;是组织好的&#xff0c;可重复使用的&#xff0c;用来实现特定功能的代码段。 eg. len()&#xff1a;实现统计长度这一特定功能的代码段。 2. 函数好处&#xff1a; * 将功能封装在函数内&#xff0c;可随时随地重复…

eSearch使用教程大全

下载&#xff1a; https://www.xsoftnet.com/share/a0002tNuuOswc.html产品&#xff1a; eSearch 即拥有 截屏OCR搜索翻译贴图以图搜图录屏功能。 截屏 框选裁切 框选大小位置可调整(支持方向键或 WASD) 框选大小栏可输入四则运算式调整 取色器 放大镜 画笔&#xff08;自由画…

Kafka系统整理 一

一、Kafka 概述 1.1 定义 Kafka传统定义&#xff1a;Kafka是一个分布式的基于发布/订阅模式的消息队列 (Message Queue), 主要应用于大数据实时处理领域。 kafka最新定义&#xff1a;kafka是一个开源的分布式事件流平台&#xff08;Event Streaming Platform&#xff09;, 被…

PostgreSQL下载、安装、Problem running post-install step的解决、连接PostgreSQL

我是参考《SQL基础教程》来安装的&#xff0c;关于书的介绍、配套视频、相关代码可以参照下面的链接&#xff1a; SQL基础教程&#xff08;第2版&#xff09; (ituring.com.cn) 一、下载 我直接打开书中的下载链接时&#xff0c;显示的是这个界面&#xff1a; You are not …