Kafka生产者分区

news2024/5/18 4:13:31

生产者分区

分区的原因

(1)便于合理使用存储资源,每个Patition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据
在这里插入图片描述

生产者分区策略

1.默认分区器DefaultPartitioner

public class DefaultPartitioner implements Partitioner {
… …
}

2.使用:
1) 我们需要将producer发送的数据封装成一个ProducerRecord对象。
2) 上述的分区策略,我们在ProducerRecord对象中进行配置。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
3)策略实现:

代码解释
ProducerRecord(topic,partition_num,……)指明partition的情况下直接发往指定的分区,key的分配方式将无效
ProducerRecord(topic,key,value)没有指明partition值但有key的情况下:将key的hash值与topic的partition个数进行取余得到分区号;
ProducerRecord(topic,value)既没有 partition 值又没有 key 值的情况下:kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,kafka再随机一个分区(绝对不会是上一个)进行使用.
  1. 案例实操:
    1)案例1:将数据发送到指定partition的情况下,如:将所有消息发送到分区1中。
package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class CustomProducerCallbackPartitions {
    public static void main(String[] args) {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息
      properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        // key,value序列化(必须):
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 3. 创建生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        // 4. 造数据
        for (int i = 0; i < 5; i++) {
            // 指定数据发送到1号分区,key为空(IDEA中ctrl + p查看参数)
            kafkaProducer.send(new ProducerRecord<>("first", 1,"","atguigu " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e == null){
                        System.out.println("主题:" + metadata.topic() + "->"  + "分区:" + metadata.partition()
                        );
                    }else {
                        e.printStackTrace();
                    }
                }
            });
        }

        kafkaProducer.close();
    }
}

2)测试:
① 在hadoop102上开启kafka消费者

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

② 在IDEA中执行代码,观察hadoop102上的消费者消费情况

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu0
atguigu1
kafka2
……

③ 观察IDEA中控制台输出
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
3)案例2:没有指明partition但是有key的情况下的消费者分区分配

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class CustomProducerCallbackKey {
    public static void main(String[] args) {
        // 1. 创建配置对象
        Properties properties = new Properties();
        // 2. 配置属性
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 3. 创建生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        // 4. 造数据
        for (int i = 1; i < 11; i++) {
            // 创建producerRecord对象
            final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
                        "first", 
                        i + "",// 依次指定key值为i
                        "atguigu " + i);
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e == null){
                        System.out.println("消息:"+producerRecord.value()+", 主题:" + metadata.topic() + "->" + "分区:" + metadata.partition()
                        );
                    }else {
                        e.printStackTrace();
                    }
                }
            });
        }

        kafkaProducer.close();
    }
}

4)测试
观察IDEA中控制台输出
消息:atguigu 1, 主题:first->分区:0
消息:atguigu 5, 主题:first->分区:0
消息:atguigu 7, 主题:first->分区:0
消息:atguigu 8, 主题:first->分区:0
消息:atguigu 2, 主题:first->分区:2
消息:atguigu 3, 主题:first->分区:2
消息:atguigu 9, 主题:first->分区:2
消息:atguigu 4, 主题:first->分区:1
消息:atguigu 6, 主题:first->分区:1
消息:atguigu 10, 主题:first->分区:1

自定义分区器

  1. 生产环境中,我们往往需要更加自由的分区需求,我们可以自定义分区器。
  2. 需求:在上面的根据key分区案例中,我们发现与我们知道的hash分区结果不同。那么我们就实现一个。
    3.实现步骤:
    ① 定义类,实现Partitioner接口
    ② 重写partition()方法
  3. 代码实现:
package com.atguigu.kafka.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;

/**
 * @author leon
 * @create 2020-12-11 10:43
 * 1. 实现接口Partitioner
 * 2. 实现3个方法:partition,close,configure
 * 3. 编写partition方法,返回分区号
 */
public class MyPartitioner implements Partitioner {
    /**
    *  分区方法
    **/
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
       // 1. 获取key
        String keyStr = key.toString();
        // 2. 创建分区号,返回的结果
        int partNum;
        // 3. 计算key的hash值
        int keyStrHash = keyStr.hashCode();
        // 4. 获取topic的分区个数
        int partitionNumber = cluster.partitionCountForTopic(topic);
        // 5. 计算分区号
        partNum = Math.abs(keyStrHash) % partitionNumber;
        // 4. 返回分区号
        return partNum;
    }

    // 关闭资源
    @Override
    public void close() {
    }

    // 配置方法
    @Override
    public void configure(Map<String, ?> configs) {
    }
}
  1. 测试
    在生产者代码中,通过配置对象,添加自定义分区器
// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG," com.atguigu.kafka.partitioner.MyPartitioner ");

在hadoop102上启动kafka消费者

[atguigu@hadoop102 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

在IDEA中观察回调信息
消息:atguigu 2, 主题:first->分区:2
消息:atguigu 5, 主题:first->分区:2
消息:atguigu 8, 主题:first->分区:2
消息:atguigu 1, 主题:first->分区:1
消息:atguigu 4, 主题:first->分区:1
消息:atguigu 7, 主题:first->分区:1
消息:atguigu 10, 主题:first->分区:1
消息:atguigu 3, 主题:first->分区:0
消息:atguigu 6, 主题:first->分区:0
消息:atguigu 9, 主题:first->分区:0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/165131.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何设计一个消息队列?

本文已经收录到Github仓库&#xff0c;该仓库包含计算机基础、Java基础、多线程、JVM、数据库、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分布式、微服务、设计模式、架构、校招社招分享等核心知识点&#xff0c;欢迎star~ Github地址&#xff1a;https://github.com/…

用户登录、注册的简单案例: html+css+MyBatis+Servlet

用户登录一. 用户登录1. 流程与思路基本流程&#xff1a;详细过程&#xff1a;2. 准备环境建库建表Pojo实体类User&#xff1a;Maven坐标&#xff1a;mybatis核心配置文件&#xff1a;代理接口&#xff1a;3. 编写目录&#xff1a;HTML&#xff1a;Serlvet&#xff1a;4. 效果二…

【Leetcode】21. 合并两个有序链表

【Leetcode】21. 合并两个有序链表题目思路代码题目 思路 归并排序比较两个单链表每一个节点&#xff0c;将较小元素的节点值封装成一个新的节点添加到一个新的链表中如果两个单链表长度不一致&#xff0c;也就是有一个链表指针指向null ,那么将另一个链表中的所有节点全部添加…

【手写 Vue2.x 源码】第二十四篇 - 异步更新流程

一&#xff0c;前言 上篇&#xff0c;介绍了 Vue依赖收集的视图更新部分&#xff0c;主要涉及以下几点&#xff1a; 视图初始化时&#xff1a; render方法中会进行取值操作&#xff0c;进入 Object.defineProperty 的 get 方法get 方法中为数据添加 dep&#xff0c;并记录当…

20230115英语学习

Gold From Old Sim Cards Could Help Make Future Drugs SIM卡中回收的黄金&#xff0c;可用于制造药品 Chemists are paving a road to recycle discarded SIM cards, not for electronics, but for medicine. SIM cards, which allow your phone to connect to your netwo…

ATTCK 05

环境搭建 自行下载安装包 解压VMware中win7 win8 同样方法所要用到的攻击机为kali 调节kali的网络适配器为vmnet8 调节win7的网络适配器 增加vmnet5用来连接内网win8 vmnet5名称ip角色kali192.168.115.129攻击机win7192.168.115.150192.168.138.136win8192.168.138.138DC拓…

【 java 反射上篇】java反射机制不难吧?来看看这篇

&#x1f4cb; 个人简介 &#x1f496; 作者简介&#xff1a;大家好&#xff0c;我是阿牛&#xff0c;全栈领域优质创作者。&#x1f61c;&#x1f4dd; 个人主页&#xff1a;馆主阿牛&#x1f525;&#x1f389; 支持我&#xff1a;点赞&#x1f44d;收藏⭐️留言&#x1f4d…

BMS系统—产生原因如何工作

1 为什么需要BMS系统 1.1 介绍 1&#xff09;BMS&#xff0c;battery management system,电池管理系统 2&#xff09;BMS是一套嵌入式系统&#xff0c;由硬件和软件共同组成 3&#xff09;BMS功能&#xff1a;管理多节锂电池组成的电池包&#xff0c;实现充放电管理、安全保护…

jsp动物园网上购票系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 JSP 动物园网上购票系统 是一套完善的web设计系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统采用web模式开发&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开发&#x…

Python:每日一题之FBI树(深度优先遍历)

题目描述 我们可以把由 “0” 和 “1” 组成的字符串分为三类&#xff1a;全 “0” 串称为 B 串&#xff0c;全 “1” 串称为 I 串&#xff0c;既含 “0” 又含 “1” 的串则称为 F 串。 FBI树是一种二叉树&#xff0c;它的结点类型也包括 F 结点&#xff0c;B 结点和 I 结点…

Anolis 8.6 部署 Kafka 3.3.1 安装和测试

龙蜥 8.6 安装 Kafka 3.3.1 并通过 SpringBoot 3.0.1 版本调试一.Kafka 安装1.下载编译后的文件2.拷贝到 Anolis 并解压3.启动服务3.常用命令1.Topic 增查删2.生产消费测试二.SpringBoot 连接 Kafka1.项目结构、依赖、配置文件和启动类2.生产者和生产监听3.消费者和消费监听4.自…

【Java寒假打卡】Java基础-并发工具类

【Java寒假打卡】Java基础-并发工具类HashMap在多线程下的问题ConcurrentHashMapCountDownLatchHashMap在多线程下的问题 package com.hfut.edu.test14;import com.sun.jdi.request.StepRequest;import java.util.HashMap;public class test2 {public static void main(String…

信号的时域和频域特性的区别到底是什么?

不严谨的说&#xff0c;时域和频域分析就是在不同的空间看待问题的&#xff0c;不同空间所对应的原子(基函数)是不同的。你想一下时域空间的基函数是什么&#xff1f;频域空间的基函数是什么&#xff1f;一般的时-频联合域空间的基函数是什么&#xff1f;小波域空间的基函数是什…

线索二叉树(c++)

1.引言&#xff1a; 二叉树的三种遍历方法能将二叉树中的结点按某种方式生成一个线性序列&#xff0c;能将一个非线性结构进行线性化操作。但随之也产生两个问题&#xff1a; 遍历效率低 在采用左右链表示方法作为二叉树的存储结构时&#xff0c;当二叉树更新后&#xff0c;并…

【博客586】ipvs的hook点位置以及hook点钩子函数剖析

ipvs的hook点位置以及hook点钩子函数剖析 ipvs实现负载均衡的基础 ipvs其实是基于netfilter框架来挂载hook点&#xff0c;从而对流量进行dnat等操作 ipvs的hook点剖析 IPVS的实现利用了Netfilter的三个Hook点&#xff0c;分别是&#xff1a;NF_INET_LOCAL_IN、NF_INET_LOCAL_O…

【nvidia CUDA 高级编程】NVSHMEM 直方图——复制式方法

博主未授权任何人或组织机构转载博主任何原创文章&#xff0c;感谢各位对原创的支持&#xff01; 博主链接 本人就职于国际知名终端厂商&#xff0c;负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作&#xff0c;目前牵头6G算力网络技术标准研究。 博客…

pandas时间序列,案列

一&#xff1a;pandas时间序列 1.1为什么要学习pandas中的时间序列 不管在什么行业&#xff0c;时间序列都是一种非常重要的数据形式&#xff0c;很多统计数据以及数据的规律也都和时间序列有着非常重要的联系&#xff0c;而且在pandas中处理时间序列是非常简单的 1.2生成一段时…

【Python技巧】:cmd查看Python版本号居然与自己电脑装的版本不一致,特此提出解决方案

项目场景&#xff1a; 大家好&#xff01;欢迎大家看我的博客&#xff0c;最近学习Python的GUI&#xff08;PyQt5&#xff09;的时候发现了自己电脑的一个python问题&#xff0c;我本想装一下PyQt5&#xff0c;顺手查了一下自己电脑的Python版本&#xff0c;没想到居然是Python…

哈希表(二)—— 开散列 / 拉链法 / 哈希桶的模拟实现

哈希表的基本思路是通过某种方式将某个值映射到对应的位置&#xff0c;这里的采取的方式是除留余数法&#xff0c;即将原本的值取模以后再存入到数组的对应下标&#xff0c;即便存入的值是一个字符串&#xff0c;也可以根据字符串哈希算法将字符串转换成对应的ASCII码值&#x…

Week 6 hw3-1 全连接网络反向传播推导

Week 6 hw3-1 全连接网络反向传播推导 折腾了半天&#xff0c;记录一下。 作业中网络由若干全连接层ReLU组成&#xff0c;输出层的函数为softmax&#xff0c;损失函数为交叉熵。 一、记号 设网络有nnn层。如图&#xff0c;当i<ni<ni<n时&#xff0c;我们有如下几条…