Kafka 分区分配及再平衡策略深度解析与消费者事务和数据积压的简单介绍

news2025/6/3 23:12:31

Kafka:分布式消息系统的核心原理与安装部署-CSDN博客

自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例-CSDN博客

Kafka 生产者全面解析:从基础原理到高级实践-CSDN博客

Kafka 生产者优化与数据处理经验-CSDN博客

Kafka 工作流程解析:从 Broker 工作原理、节点的服役、退役、副本的生成到数据存储与读写优化-CSDN博客

Kafka 消费者全面解析:原理、消费者 API 与Offset 位移-CSDN博客

Kafka 分区分配及再平衡策略深度解析与消费者事务和数据积压的简单介绍-CSDN博客

Kafka 数据倾斜:原因、影响与解决方案-CSDN博客

Kafka 核心要点解析_kafka mirrok-CSDN博客

Kafka 核心问题深度解析:全面理解分布式消息队列的关键要点_kafka队列日志-CSDN博客

目录

一、分区分配策略基础

二、Range 分区分配策略

(一)原理

(二)案例

(三)Range 分区分配再平衡案例

三、RoundRobin 分区分配策略

(一)原理

(二)案例

(三)RoundRobin 分区分配再平衡案例

四、Sticky 分区分配策略

(一)原理

(二)案例

(三)Sticky 分区分配再平衡案例

五、CooperativeSticky 分区分配策略

六、消费者事务

七、数据积压(消费者如何提高吞吐量)

八、总结


        在 Kafka 的消费任务处理中,分区的分配以及再平衡是至关重要的环节。合理的分区分配策略能够确保消费者高效地处理消息,而理解再平衡机制则有助于应对消费者组在运行过程中的动态变化。本文将深入探讨 Kafka 中不同的分区分配策略,包括 Range、RoundRobin、Sticky 和 CooperativeSticky,以及它们在各种场景下的再平衡表现,并结合实际案例进行详细分析,并对消费者事务和数据积压进行简单介绍。

一、分区分配策略基础

        在一个 Kafka 消费者组中,包含多个消费者,而一个主题则由多个分区组成。关键问题在于确定哪个消费者来消费哪个分区的数据。Kafka 提供了四种主流的分区分配策略,并且可以通过配置参数 partition.assignment.strategy 来修改分区的分配策略,默认策略是 Range + CooperativeSticky。同时,还有一些相关的重要参数:

参数名称

描述

heartbeat.interval.ms

Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。 该条目的值必须小于session.timeout.ms,也不应该高于 session.timeout.ms 的 1/3。

session.timeout.ms

Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超 过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms

消费者处理消息的最大时长,默认是 5 分钟。超过该值,该 消费者被移除,消费者组执行再平衡

partition.assignment.strategy

消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range +CooperativeSticky。Kafka 可以同时使用多个分区分配策略。

可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky

二、Range 分区分配策略

(一)原理

        Range 分区分配策略是基于主题的分区数量和消费者数量进行分配。它会按照顺序将连续的分区分配给每个消费者,尽可能平均地分配分区,但可能会导致不同消费者分配到的分区数量不一致。

(二)案例

首先,将主题 first 修改为 7 个分区:

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic first --partitions 7

注意,分区数可增加但不能减少,主题的副本数修改需要制定计划执行,不能直接修改。


         由三个消费者 CustomConsumerCustomConsumer1CustomConsumer2 组成消费者组,组名都为 “test”,同时启动这 3 个消费者。


        启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区(修改发送次数为 500 次)。

package com.bigdata.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerCallback {
    public static void main(String[] args) throws InterruptedException {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.235.128:9092");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<String, String>(properties);
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 500; i++) {
            // 添加回调
            kafkaProducer.send(new ProducerRecord<>("first",
                    "bigdata " + i), new Callback() {
                // 该方法在 Producer 收到 ack 时调用,为异步调用
                @Override
                public void onCompletion(RecordMetadata metadata,
                                         Exception exception) {
                    if (exception == null) {
                        // 没有异常,输出信息到控制台
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition());
                    } else {
                        // 出现异常打印
                        exception.printStackTrace();
                    }
                }
            });
            // 延迟一会会看到数据发往不同分区
            Thread.sleep(20);
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

说明:Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。

默认是Range,但是在经过一次升级之后,会自动变为CooperativeSticky。这个是官方给出的解释。

默认的分配器是[RangeAssignor, CooperativeStickyAssignor],默认情况下将使用RangeAssignor,但允许通过一次滚动反弹升级到CooperativeStickyAssignor,该滚动反弹会将RangeAssignor从列表中删除。


        观察消费情况,发现一个消费者消费了 5,6 分区,一个消费了 0,1,2 分区,一个消费了 3,4 分区。这是按照 Range 策略分配的结果。

此时并没有修改分区策略,原因是默认是Range.

(三)Range 分区分配再平衡案例

        停止掉 0 号消费者,快速重新发送消息(45s 以内),此时 1 号消费者消费到 3、4 号分区数据,2 号消费者消费到 5、6 号分区数据,0 号的数据无人消费。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

        再次重新发送消息(45s 以后),1 号消费者消费到 0、1、2、3 号分区数据,2 号消费者消费到 4、5、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

 

三、RoundRobin 分区分配策略

(一)原理

        RoundRobin 分区分配策略以轮询的方式将分区分配给消费者,确保每个消费者尽可能均衡地获取分区,不考虑主题的因素,只要是消费者组内的分区都会按照轮询顺序分配。

(二)案例

        在 CustomConsumerCustomConsumer1CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin(指定 org.apache.kafka.clients.consumer.RoundRobinAssignor),并修改消费者组为 test2

package com.bigdata.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerWithFenPei {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092");
        // 字段反序列化   key 和  value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");

        // 指定分区的分配方案
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 消费者订阅主题,主题有数据就会拉取数据
        // 指定消费的主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        // 一个消费者可以订阅多个主题
        kafkaConsumer.subscribe(topics);
        while(true){
            //1 秒中向kafka拉取一批数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String,String> record :records) {
                // 打印一条数据
                System.out.println(record);
                // 可以打印记录中的很多内容,比如 key  value  offset topic 等信息
                System.out.println(record.value());
            }

        }

    }
}

修改一下消费者组为test2

 

重启 3 个消费者,重复发送消息步骤并观察分区结果。

 

 

(三)RoundRobin 分区分配再平衡案例

        停止掉 0 号消费者,快速重新发送消息(45s 以内),1 号消费者消费到 2、5 号分区数据,2 号消费者消费到 4、1 号分区数据,0 号消费者以前对应的数据无人消费。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

        再次重新发送消息(45s 以后),1 号消费者消费到 0、2、4、6 号分区数据,2 号消费者消费到 1、3、5 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

四、Sticky 分区分配策略

(一)原理

        粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区 到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化

(二)案例

1)需求

设置主题为 first,7 个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察

消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

2)步骤

(1)修改分区分配策略为粘性。

注意:3 个消费者都应该注释掉,之后重启 3 个消费者,如果出现报错,全部停止等

会再重启,或者修改为全新的消费者组。

// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);

(3)使用同样的生产者发送 500 条消息。

可以看到会尽量保持分区的个数近似划分分区。

(三)Sticky 分区分配再平衡案例

        停止掉 0 号消费者,快速重新发送消息(45s 以内),1 号消费者消费到 2、5、3 号分区数据,2 号消费者消费到 4、6 号分区数据,0 号消费者的任务无人顶替。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需

要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

        再次重新发送消息(45s 以后),1 号消费者消费到 2、3、5 号分区数据,2 号消费者消费到 0、1、4、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配。

五、CooperativeSticky 分区分配策略

        CooperativeSticky 是新添加的策略。在消费过程中,会根据消费的偏移量情况进行重新再平衡,也就是粘性分区,并且在运行过程中还会根据消费的实际情况重新分配消费者,直到平衡为止。其好处是实现负载均衡,但多次平衡会浪费性能,它采用动态平衡,在消费过程中实施再平衡,而不是等到某个消费者退出再平衡。

六、消费者事务

        若要实现 Kafka 消费端的精准一次性消费,需要将消费过程和提交 offset 过程做原子绑定。此时可将 Kafka 的 offset 保存到支持事务的自定义介质(如 MySQL),这部分知识将在后续项目中深入涉及,事务具有 ACID 四大特征,例如转账场景(张三 --> 李四)就需要事务的保障来确保数据的准确性和完整性。

七、数据积压(消费者如何提高吞吐量)

        当面临数据积压问题时,消费者可以采取多种方式提高吞吐量,例如增加消费者数量、优化消费者代码处理逻辑、调整相关参数(如 max.poll.interval.ms 等)以适应更高的处理负载等。后续将深入探讨数据积压场景下的优化策略。

八、总结

        通过对 Kafka 分区分配以及再平衡策略的深入理解和实践,可以更好地构建和优化 Kafka 消费任务处理流程,确保系统的高效稳定运行。在实际应用中,需要根据具体的业务需求和场景特点选择合适的分区分配策略,并合理处理再平衡过程中的各种情况。

        消费者事务方面,为实现精准一次性消费,需将消费与提交 offset 原子绑定,可将 offset 存于支持事务的自定义介质如 MySQL 中。在数据积压场景下,消费者可通过增加数量、优化代码处理逻辑、调整参数等方式提高吞吐量,后续会深入探讨相关优化策略。这些知识对于深入理解和优化 Kafka 消费者的性能、可靠性和数据处理准确性具有极为重要的意义,有助于在实际应用中更好地构建和管理基于 Kafka 的系统架构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2247369.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机网络-VPN虚拟专用网络概述

前面我们学习了在企业内部的二层交换机网络、三层路由网络包括静态路由、OSPF、IS-IS、NAT等&#xff0c;现在开始学习下VPN&#xff08;Virtual Private Network&#xff0c;虚拟专用网络&#xff09;&#xff0c;其实VPN可能很多人听到第一反应就是梯子&#xff0c;但是其实这…

《第十部分》1.STM32之通信接口《精讲》之IIC通信---介绍

经过近一周的USART学习&#xff0c;我深刻体会到通信对单片机的重要性。它就像人类的手脚和大脑&#xff0c;只有掌握了通信技术&#xff0c;单片机才能与外界交互&#xff0c;展现出丰富多彩的功能&#xff0c;变得更加强大和实用。 单片机最基础的“语言”是二进制。可惜&am…

【蓝桥杯C/C++】深入解析I/O高效性能优化:std::ios::sync_with_stdio(false)

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: 蓝桥杯C/C 文章目录 &#x1f4af;前言&#x1f4af;C 语言与 C 语言的输入输出对比1.1 C 语言的输入输出1.2 C 语言的输入输出 &#x1f4af; std::ios::sync_with_stdio(false) 的作用与意义2.1 什么是 std::ios::s…

初识Linux—— 基本指令(下)

前言&#xff1a; 本篇继续来学习Linux的基础指令&#xff0c;继续加油&#xff01;&#xff01;&#xff01; 本篇文章对于图片即内容详解&#xff0c;已同步到本人gitee&#xff1a;Linux学习: Linux学习与知识讲解 Linux指令 1、查看文件内容的指令 cat ​ cat 查看文件…

VM虚拟机装MAC后无法联网,如何解决?

✨在vm虚拟机上&#xff0c;给虚拟机MacOS设置网络适配器。选择NAT模式用于共享主机的IP地址 ✨在MacOS设置中设置网络 以太网 使用DHCP ✨回到本地电脑上&#xff0c;打开 服务&#xff0c;找到VMware DHCP和VMware NAT&#xff0c;把这两个服务打开&#xff0c;专一般问题就…

MCGSMCGS昆仑通态触摸屏

MCGS昆仑通态触摸屏应用实例详解 1目录设置 本案例讲了两个窗口的互相调用 创建工程 首先创建一个新工程 打开软件 McgsPro组态软件 菜单栏&#xff1a;文件&#xff1a;新建工程 打开工程设置窗口 HMI配置中应该是对应的不同型号的触摸屏&#xff0c; 选择一个类型&#x…

aws ses生产环境申请

* aws ses生产环境申请经验&#xff1a; 要有域名邮箱作为反馈联系邮箱 且有收发记录 最好使用aws的WorkMail要说明清晰的使用用途、预估量、如何处理退信和投诉、防spam策略 等内容&#xff0c;这里可以先问问AI&#xff08;比如&#xff1a;如何处理退信和投诉&#xff1f;…

MongoDB相关问题

视频教程 【GeekHour】20分钟掌握MongoDB Complete MongoDB Tutorial by Net Ninja MongoDB开机后调用缓慢的原因及解决方法 问题分析&#xff1a; MongoDB开机后调用缓慢&#xff0c;通常是由于以下原因导致&#xff1a; 索引重建&#xff1a; MongoDB在启动时会重建索引…

pytest日志总结

pytest日志分为两类&#xff1a; 一、终端&#xff08;控制台&#xff09;打印的日志 1、指定-s&#xff0c;脚本中print打印出的信息会显示在终端&#xff1b; 2、pytest打印的summary信息&#xff0c;这部分是pytest 的默认输出&#xff08;例如测试结果PASSED, FAILED, S…

mysql系列1—mysql架构和协议介绍

背景&#xff1a; 本文开始整理mysql相关的文章&#xff0c;用于收集数据库相关内容&#xff1b;包括mysql架构和存储方式、索引结构和查询优化、数据库锁等内容。思考如何根据具体的业务给出最优的分表规划和表设计、字段选择和索引设计、优化的SQL语句&#xff0c;以及数据库…

Opencv+ROS实现摄像头读取处理画面信息

一、工具 ubuntu18.04 ROSopencv2 编译器&#xff1a;Visual Studio Code 二、原理 图像信息 ROS数据形式&#xff1a;sensor_msgs::Image OpenCV数据形式&#xff1a;cv:Mat 通过cv_bridge()函数进行ROS向opencv转换 cv_bridge是在ROS图像消息和OpenCV图像之间进行转…

docker容器化部署springboot项目

前言 docker安装 下载官网 选择自己的系统 然后安装文档内给的命令按顺序执行即可。设置仓库&#xff0c;安装docker. 一、更换镜像源 一般情况下,docker原本自带的镜像网站不一定连的上,就很容易导致下载镜像失败,因此需要换源. 创建/etc/docker/daemon.json并填入数据…

实时数据开发|简单理解Flink流计算中解决乱序的机制--水位线

今天继续学习Flink的关键机制–水位线&#xff0c;虽然看文字有种浮于表面、难以理解的感觉&#xff0c;但是我觉得等开发中使用到的时候就会融会贯通了。 定义 Fink 相比其他流计算技术的一个重要特性是支持基于事件时间(event time)的窗口操作。但是事件时间来自于源头系统…

Edify 3D: Scalable High-Quality 3D Asset Generation 论文解读

目录 一、概述 二、相关工作 1、三维资产生成 2、多视图下的三维重建 3、纹理和材质生成 三、Edify 3D 1、文本生成多视角图像的扩散模型 2、文本和多视角图像生成法线图像的ControlNet 3、重建与渲染模型 4、多视角高分辨率RGB图像生成 四、训练 1、训练过程 2、…

2025-2026财年美国CISA国际战略规划(下)

文章目录 前言四、加强综合网络防御&#xff08;一&#xff09;与合作伙伴共同实施网络防御&#xff0c;降低集体风险推动措施有效性衡量 &#xff08;二&#xff09;大规模推动标准和安全&#xff0c;以提高网络安全推动措施有效性衡量 &#xff08;三&#xff09;提高主要合作…

uniapp实现开发遇到过的问题(持续更新中....)

1. 在ios模拟器上会出现底部留白的情况 解决方案&#xff1a; 在manifest.json文件&#xff0c;找到开源码视图配置&#xff0c;添加如下&#xff1a; "app-plus" : {"safearea":{"bottom":{"offset" : "none" // 底部安…

文小言1:

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

Oracle 23ai 对应windows版本安装配置PLSQL导入pde文件navicat连接Oracle

因为有一个pde文件需要查看里面的数据&#xff0c;所以这次需要配置本地oracle数据库&#xff0c;并且导入数据&#xff0c;因为还有navicat&#xff0c;所以就想用navicat去连接查看。 1、找到官网。 Get Started with Oracle Database 23ai | Oracle 2、下载windows版本。…

【热门主题】000062 云原生后端:开启高效开发新时代

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 【热…

Python 版本的 2024详细代码

2048游戏的Python实现 概述&#xff1a; 2048是一款流行的单人益智游戏&#xff0c;玩家通过滑动数字瓷砖来合并相同的数字&#xff0c;目标是合成2048这个数字。本文将介绍如何使用Python和Pygame库实现2048游戏的基本功能&#xff0c;包括游戏逻辑、界面绘制和用户交互。 主…