Kafka生产者之分区

news2025/7/20 17:25:28

一、分区好处

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果;

(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据;

在这里插入图片描述


二、分区策略

在IDEA中全局查找(ctrl +n)ProducerRecord类,在类中可以看到如下构造方法:
在这里插入图片描述


(1)指明partition的情况下,直接将指明的值作为partition值;例如partition=0,所有数据写入分区0

(2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那
么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。

(3)既没有partition值又没有key值的情况下Kafka采用Sticky Partition(黏性分区器)会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。
例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。



三、自定义分区器

如果研发人员可以根据企业需求,自己重新实现分区器;

1)需求

例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区;

2)实现步骤

2.1)定义类实现 Partitioner 接口

2.2)重写 partition()方法

/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {
/**
 * 返回信息对应的分区
 * @param topic 主题
 * @param key 消息的 key
 * @param keyBytes 消息的 key 序列化后的字节数组
 * @param value 消息的 value
 * @param valueBytes 消息的 value 序列化后的字节数组
 * @param cluster 集群元数据可以查看分区信息
 * @return
 */
 @Override
 public int partition(String topic, Object key, byte[] 
keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
 // 获取消息
 String msgValue = value.toString();
 // 创建 partition
 int partition;
 // 判断消息是否包含 atguigu
 if (msgValue.contains("atguigu")){
 partition = 0;
 }else {
 partition = 1;
 }
 // 返回分区号
 return partition;
 }
 // 关闭资源
 @Override
 public void close() {
 }
 // 配置方法
 @Override
 public void configure(Map<String, ?> configs) {
 }
}

2.3)使用分区器的方法,在生产者的配置中添加分区器参数

// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atgui
gu.kafka.producer.MyPartitioner");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/37735.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【畅购商城】购物车模块之查看购物车

目录 分析 接口 后端实现 前端实现&#xff1a;显示页面 前端实现&#xff1a;显示购物车信息 分析 用户如果没有登录&#xff0c;购物车存放在浏览器端的localStorage处&#xff0c;且以数组的方式进行存储。用户如果登录了&#xff0c;购物车存放在redis中&#xff0c…

项目实战——对战回放和排行榜

目录 一、天梯积分更新 二、实现对局列表页面 三、前端测试 四、实现查看录像功能 五、实现分页功能 六、后端实现查询排行耪 七、前端展示 八、限制Bot数量 一、天梯积分更新 可以自己定义一下规则 存之前算一下两名玩家的天梯积分 实现更新&#xff0c;实现后重启看一…

挂耳式蓝牙耳机性价比推荐,盘点五款性能高的耳机分享

众所周知&#xff0c;骨传导耳机之所以能够受到人们的喜欢&#xff0c;是因为其佩戴不需入耳&#xff0c;尤其是针对于运动爱好者来说&#xff0c;在户外运动的时候不但可以听见音乐&#xff0c;还可以听见外界的声音&#xff0c;进一步的将危险系数拉低&#xff0c;其次也是因…

音乐信息提取-1-音频表示

音频信号是声音的一种表示&#xff0c;它表示由振动引起的气压随时间的波动&#xff08;数字信号处理-1-关于声音与波&#xff09;。 1 波形与时域 音频信号在时域上的表示就是波形随时间的变化&#xff0c;可以将波形的幅度值理解为声压。 声音是连续的&#xff0c;但数字记…

Spring创建、Bean对象的存储和读取

文章目录1、创建Spring项目1.1 创建Maven项目1.2 添加 Spring 框架支持1.3 添加启动类并添加main&#xff08;非必要步骤&#xff09;2、存储Bean对象2.1 添加Spring配置文件&#xff08;第一次添加&#xff09;2.2 创建Bean对象2.3 将Bean对象注册到Spring容器中3、读取并使用…

秋招挂麻了,就差去送外卖了,10w字Java八股啃完,春招必拿下

最近看到一名前腾讯员工发的帖子&#xff0c;总结的近期面试结果&#xff0c;真的就是那三个字&#xff1a;挂麻了…… 一个毕业后就在腾讯的高级程序员&#xff0c;由于种种原因&#xff0c;离职出来了。趁着金九银十求职季&#xff0c;互联网大厂小厂面试了一圈&#xff0c;感…

AcWing 搜素与图论

搜索 DFS 全排列 代码 #include<iostream> using namespace std;int vis[10], a[10];void dfs(int step, int n) {if (step n 1){for (int i 1; i < n; i)printf("%d ", a[i]);printf("\n");return;}for (int i 1; i < n; i){if (!vis[i…

计算机网络笔记5 传输层

文章目录前言一、运输层概述二、运输层的端口与复用、分用的概念三、UDP协议 和 TCP协议 对比用户数据报协议 UDP&#x1f4a6;&#xff08;User Datagram Protocol&#xff09;传输控制协议 TCP&#x1f4a6;&#xff08;Transmission Control Protocol&#xff09;四、TCP协议…

作用域和作用域链

文章目录1.作用域&#xff08;Scope&#xff09;1.1 什么是作用域1.2 全局作用域1.3 函数作用域1.3 块级作用域2. 作用域链2.1 自由变量2.2 作用域链2.3 *自由变量的取值2.4 作用域与执行上下文的区别3.总结1.作用域&#xff08;Scope&#xff09; 1.1 什么是作用域 当前的执…

easy-rules规则引擎最佳落地实践

写作目的 这是一个头部互联网公司中的一个问题。因为有很多业务产品线&#xff0c;作为一个新人或者团队外的人员是很难区分不同的产品线之间的区别的&#xff0c;因此需要给某个产品线一个描述。但是随着业务的发展&#xff0c;产品线下可能又根据某个字段进一步划分&#xf…

招生CRM系统|基于Springboot实现培训机构招生CRM管理系统

作者主页&#xff1a;编程指南针 作者简介&#xff1a;Java领域优质创作者、CSDN博客专家 、掘金特邀作者、多年架构师设计经验、腾讯课堂常驻讲师 主要内容&#xff1a;Java项目、毕业设计、简历模板、学习资料、面试题库、技术互助 收藏点赞不迷路 关注作者有好处 文末获取源…

Mongodb操作基础 分片

Mongodb分片 MongoDB分片是MongoDB支持的另一种集群形式&#xff0c;它可以满足MongoDB数据量呈爆发式增长的需求。当MongoDB存储海量的数据时&#xff0c;一台机器可能无法满足数据存储的需求&#xff0c;也可能无法提供可接受的读写吞吐量&#xff0c;这时&#xff0c;我们就…

基于内容的个性化推荐算法

一、什么是推荐算法 随着移动互联网的高速发展与智能手机的普及&#xff0c;海量的有用信息虽然为人们提供了更多的价值&#xff0c;然而信息的泛滥也意味着为了寻找合适的信息必须付出更多的时间成本。事实上&#xff0c;有时候仅仅是浏览和简单的查询来寻找有用的信息变得相…

「强烈收藏」Python第三方库资源大全,1000+工具包

前言 awesome-python 是 vinta 发起维护的 Python 资源列表&#xff0c;内容包括&#xff1a;Web 框架、网络爬虫、网络内容提取、模板引擎、数据库、数据可视化、图片处理、文本处理、自然语言处理、机器学习、日志、代码分析等。 &#xff08;文末送读者福利&#xff09; …

超全!程序员必备的20个学习网站,看这一篇就够了!

之前一直想出个程序员学习清单&#xff0c;终于腾出时间弄出来了&#xff0c;也趁此机会整理了收藏夹。 此篇对于新手程序员比较有用&#xff0c;技术老鸟们也可以查缺补漏。 话不多说&#xff0c;纯纯干货呈上&#xff0c;赶紧点个赞收藏&#xff0c;以后会用得上&#xff01;…

CMake中include_directories的使用

CMake中include_directories命令用于在构建(build)中添加包含目录,其格式如下: include_directories([AFTER|BEFORE] [SYSTEM] dir1 [dir2 ...]) 将给定的目录添加到编译器(compiler)用于搜索包含文件的目录。相对路径被解释为相对于当前源目录。 包含目录被添加到当前C…

【网络】tcpdump、Wireshark 案例超详细介绍

文章目录网络分层应用层找到服务器的 IP查接口、对象的耗时删除指定网站的Cookie表示层、会话层tcpdump、wireshard传输层telnet: 路径可达性测试nc: 路径可达性测试netstat&#xff1a;查看当前连接状态iftop&#xff1a;查看当前连接的传输速率netstat -s: 查看丢包和乱序的统…

万字 HashMap 详解,基础(优雅)永不过时

本文已收录到 AndroidFamily&#xff0c;技术和职场问题&#xff0c;请关注公众号 [彭旭锐] 提问。 前言 大家好&#xff0c;我是小彭。 在上一篇文章里&#xff0c;我们聊到了散列表的整体设计思想&#xff0c;在后续几篇文章里&#xff0c;我们将以 Java 语言为例&#xff…

【王道计算机组成原理Note】5.5 指令流水线

5 指令流水线 5.1 指令流水的定义 一条指令的执行过程可以分成多个阶段(或过程)。根据计算机的不同&#xff0c;具体的分法也不同。 取指&#xff1a;根据Pc内容访问主存储器&#xff0c;取出一条指令送到IR中。分析&#xff1a;对指令操作码进行译码&#xff0c;按照给定的寻…

Visio 安装暴雷记录

Visio 安装记录起因&#xff1a; office2016家庭学生版中&#xff0c;安装visio2021后&#xff0c;插入word的vsdx图形右键显示unkown类型&#xff0c;无法识别&#xff0c;给学习工作带来很多麻烦&#xff01;   搜查一圈没找到对应可用的方法&#xff0c;想着可能是visio20…