Kafka系列四生产者

news2025/5/18 23:37:36

文章首发于个人博客,欢迎访问关注:https://www.lin2j.tech

一条消息从生产到发送成功,需要经过多个步骤,多个组件。首先要经过拦截器、序列化器、分区器对消息进行预处理,然后将消息按批次放入缓冲区,之后由 Sender 线程将消息发送到对应的节点以及分区。以下是大概流程
在这里插入图片描述

生产者的配置

生产者对应的类是 KafkaProducer<K,V>,它是个泛型,其中 K 和 V 分别对应 key.serializervalue.serializer 参数的类型。

实例化一个生产者需要有以下三个必填参数

  1. bootstrap.servers:指定生产者客户端要链接的 Kafka 集群地址,多个地址通过英文逗号分隔,比如 host1:ip1,host2:ip2 。可以不用配置全部的地址清单,因为生产者会从给定的 broker 里获取其他 broker 的信息。不过还是建议配置多几个,保证其中一个 broker 宕机后,还能连上 Kafka 集群。

  2. key.serializervalue.serializer:因为 broker 端接收到的消息必须以字节数组的形式存在,因此在将消息发送到 broker 之前,需要对消息的 key 和 value 进行序列化操作转换成字节数组。参数的值一定要是序列化器的类的全限定名称。以 StringSerializer 为例,一般在代码中用这种方式去获取全限定名称,以保证不会出错。

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSeriablizer.class.getName());
    

当然还有其他的参数可以根据实际需要去合理配置,比如 batch.sizeackslinger.msmax.request.size 等等。

发送

消息的构建

Kafka 中的消息对应的类是 ProducerRecord<K,V> 它是个泛型,其中 K 和 V 分别对应 key.serializervalue.serializer 参数的类型。一条消息要发送的时候,需要先构建消息,即创建 ProducerRecord 对象,此时 topic 和 value 属性是必填项。

public ProducerRecord(String topic, V value);

当需要直接指定分区或者 key 的时候,还可以使用以下的构造方法。

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)

消息的发送方式

发送消息时,可以调用 KafkaProducer 的 send 方法。这个方法有两个重载方法。

public Future<RecordMetadata> send(ProducerRecord<K, V> record);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

可以看到调用之后,会返回一个 Future 对象,甚至还可以传入一个 Callback 用来作为消息发送之后的回调操作。

根据对以上两个方法的不同调用,可以实现三种发送方式:发后即忘(fire-and-forget)、同步(sync)及异步(async)

  • fire-and-forget:指发送过后,不管成功与否,都不会做任何后续操作。这意味着如果消息发送失败了,那么也就丢失了,可靠性变差,但是性能最高

  • sync:指消息发送之后,线程阻塞等待发送结果。需要使用到返回的 Future 对象。比如:

    try{
        Future<RecordMetadata> future = producer.send(data);
        // 调用 get 方法会阻塞当前线程直到方法有返回
        RecordMetadata result = future.get();
        // 对 result 进行操作
    } catch (ExecutionException | InterruptedException e) {
        // 异常处理
    }
    

    同步发送的可靠性高,只有两个结果:要么成功,要么异常。发生异常的时候,还可以根据情况去做处理。

  • async:异步发送需要用到 Callback 对象,在 Kafka 有响应的时候进行回调,要么成功,要么抛出异常。比如:

    producer.send(data, new Callback() {
       @Overrid
       public void onCompletion(RecordMetadata metadata, Exception exception){
           if(exception == null) {
               // 发送成功,可以打印 metadata 的信息等
           } else {
               // 发送失败,处理异常
           }
       } 
    });
    

    对于 onCompletion 的两个参数 metadata 和 exception ,他们是互斥的:消息发送成功,metadata 不为 null,exception 为 null;消息发送异常,metadata 为 null,exception 不为 null。

    • 如果使用 Futrue 对象来做回调,是否可以?

      理论上是可以的,但是当发送的消息很多时,就需要处理很多的 Future 对象。而且什么时候调用 get 方法,也是一个问题,会使问题处理起来变得麻烦,代码变得混乱。

异常的类型

在发送消息的过程,可能发生异常,这里的异常主要分为两种:可重试异常和不可重试异常。

常见的可重试异常有:NetworkException、LeaderNotAvailableException、NotEnoughRelicasException、UnknownTopicOrPartitionException 等。

发生可重试异常时,如果配置了 retries 参数(默认为0),那么就会在规定的次数里进行重试。

props.put(ProducerConf.RETRIES_CONFIG, 10);

不可重试异常比如 RecordTooLargeException ,说明发送的消息太大,这种是通过重试解决不了的,Kafka 会直接抛出异常。

序列化

前面提到,broker 接收到的消息必须以字节数组的形式存在,因此发送前需要对消息进行序列化。

Kafka 中的序列化器都实现了 org.apache.kafka.common.serialization.Serializer 接口,它有 3 个方法。

// 为序列化器做一些配置工作
void configure(Map<String, ?> configs, boolean isKey);

// 将消息转化为字节数组
byte[] serialize(String topic, T data);

// 关闭序列化器,这个方法时继承自 java.io.Closable 接口
// 一般情况下,该方法是个空方法,如果要实现,需要保证该方法的幂等性
void close();

与序列化器对应的是反序列化器,反序列化器用于在消费者将从 Kafka 获得的字节数组转换为响应的对象。

对于 String 类型的 key 和 value ,可以用 Kafka 自带的 StringSerializer 序列化器。除此之外,还有针对 ByteArray、ByteBuffer、Bytes、Double、Integer、Long 这几种类型的序列化器。

可以看一下 StringSerializer 类的实现,代码比较简单,可以对 Serializer 接口有更进一步的认识。

如果自带的序列化器无法满足应用需求,则可以使用入 Avro、JSON、ProtoBuf 等等通用的序列化工具来实现。

分区器

消息经过序列化之后,就要确定要发往的分区了。如果消息的 ProducerRecord 有指定 partition 字段,那么就不需要分区器的作用,因为 partition 代表所要发往的分区号。

如果没有指定 partition 字段,就需要用分区器计算 partition 字段。Kafka 中提供了一个默认的分区器 DefaultPartitioner,它实现了 Partitioner 接口,它定义了以下两个方法

// 根据消息计算分区
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

// 关闭分区器
public void close();

除此之外,它还继承了 Configurable 接口,因此还有一个方法用来配置分区器的信息和初始化数据。

void configure(Map<String, ?> configs);

DefaultPartitioner

DefaultPartitioner 在计算 partition 字段上时,会判断传入的 key 是否为 null。 DefaultPartitioner 的实现很简短,可以直接看代码。

public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<String, ?> configs) {}
    
    /**
     * @param topic 		topic的名称
     * @param key 			消息的key,可以为null
     * @param keyBytes 		消息的key对应的字节数组,可以为null
     * @param value 		消息的value,可以为null
     * @param valueBytes 	消息的value对应的字节数组,可以为null
     * @param cluster 		集群的信息
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 从集群中获取分区数目,用于对哈希值进行取模
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            // 当 key 为null时,会在所有可用的分区中,选择一个分区
            int nextValue = nextValue(topic);
            // 获取所有可用分区
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                // 在可用分区中选择
                return availablePartitions.get(part).partition();
            } else {
                // 如果可用分区数为 0,那就在所有的分区中进行选择
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // 如果 key 不为 null,使用 MurmurHash2 算法进行哈希。
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    /**
     * 可以理解为获取一个对应 topic 的随机数
     */
    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

    public void close() {}
}

自定义分区器

如果要按照自身的需要去设计分区逻辑,可以自定义一个分区器,只需要实现 Partitioner 接口,然后在启动的时候,指定对应的分区器即可。

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "自定义分区器的全限定名");

生产者拦截器

生产者拦截器可以实现在消息发送前与应答后做一些定制化的需求,比如过滤某些消息。

ProducerInterceptor 接口

生成者拦截器需要实现 ProducerInterceptor 接口,它继承了 Configurable 接口,并且定义了一下三个方法

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

public void onAcknowledgement(RecordMetadata metadata, Exception exception);

public void close();

onSend 方法在序列化和分区分配之前调用。如果在这一步修改了 key 或者 topic 的信息,会影响到下一步分区操作,因为分区时用的 key 和 topic 是来自拦截器的,而不是最开始的 key 和 topic。因此一般不要修改 ProducerRecord 的 key 、value、topic 等信息,否则可能会产生与预期不同的结果或者异常,同样会影响 broker 端的日志压缩。

onAcknowledgement 方法在消息已被 broker 端确认之后,或者发送到 broker 之前失败时调用,并且是先于 send 方法中指定的 Callback 调用的。这个方法一般运行在 I / O I/O I/O 线程中,因此越简单越好,否则会影响消息的发送速度。

close 方法主要是关闭拦截器,做资源清理工作。

拦截器的调用顺序

拦截器的配置方式如下

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "拦截器1的全限定名,拦截器2的全限定名");

如果有多个拦截器,可以通过英文逗号将这些拦截器的全限定名连接起来,并且拦截器之间的执行顺序是按照配置的时候的顺序调用的,比如这里拦截器1会早于拦截器2被执行。

尽量不要让后面的拦截器去依赖前面拦截器的执行结果

消息累加器

消息累加器(RecordAccumulator)也叫消息收集器。

消息客户端是由两个线程协调运行的

  • 主线程完成消息从生产到拦截器、序列化器、分区器的过程;
  • Sender 线程负责从消息累加器中获取消息并将其发送到 Kafka 中。

消息从主线程中生产后,会先缓存到消息累加器中,等待 Sender 线程批量发送。

消息累加器内部为每个分区都维护了一个双端队列 Deque 用来存储 ProducerBatch,ProducerBatch 中包含一个或者多个 ProducerRecord。使用 ProducerBatch 可以批量发送消息,减少网络请求次数,提高吞吐量。

消息累加器的大小通过参数 buffer-memory 决定,默认是 32M。

CopyOnWriteMap

上文提到消息累加器内部为每个分区维护了一个双端队列,这个对应关系是通过 CopyOnWriteMap 这个数据结构实现的。

CopyOnWriteMap 是 Kafka 实现的一个线程安全的 Map 类型的数据结构。

在 RecordAccumulator 中,声明一个该类型的成员变量 batches,这个对象的 key 是消息主题的分区,value 是一个双端队列。

之所以需要自定义一个数据结构,是因为

  • 为了维持分区到队列的关系,这个结构最好是 key-value 类型的,所以锁定 Map 类型的结构。

  • Kafka 的生产者会在大量生产消息时,会有大量的数据涌入消息累加器,所以这个数据结构需要是线程安全的。

  • 大量涌入数据的同时,对应的是对 batches 对象的大量读,而分区的数量一般不会有什么变化,因此面对的是一个读多写少的情况。Kafka 通过模仿 CopyOnWriteList 实现了 CopyOnWriteMap 数据结构,采用读写分离来解决读多写少的问题又保证了线程安全。

ProducerBatch

上文提到双端队列中存储的对象类型是 ProducerBatch ,而这个对象的大小是由 batch.size 参数决定的,默认是 16k ,整个缓存的大小是 32M。

在频繁的发送消息的过程中,势必会不断创建 ProducerBatch 对象,如果每次都通过 GC 来回收内存,这样会发生频繁的 GC,影响性能。

因此 Kafka 在缓存中设计了一个内存池,16k 的内存如果用完了,就返回给内存池,需要的时候再向内存池申请。这是一种提高性能的不错实践。

Sender线程

把消息放进缓冲区之后,与此同时会有一个独立线程Sender去把一个个Batch发送给对应的主机。

整体架构图

在这里插入图片描述

重要的参数

acks 消息验证

消息发送到服务器之后,有三种方式认为是否发送成功

  • 1:只要 leader 副本写入成功,就会收到来自服务器的成功响应;
  • 0:发送消息后不需要等待服务器响应就算发送成功;
  • -1:需要 ISR 中所有的副本写入成功才能收到来自服务器的成功响应;
  • all:同 -1 。

retries 重试次数

有时候发生网络错误可能发送不成功,但是下一秒就好了,因此可以设置重试机制。

batch.size 批次大小

ProducerBatch 的大小,默认是 16k,设置大一点可以提高吞吐量。

linger.size 发送时间限制

消息发送的条件有两个,一个是数据量达到 batch.size 指定的大小,一个是达到了 linger.size 指定的时间。达到时间限制后,就算数据量很小也会被发送。

buffer.memory 缓冲区大小

当发送速率比不上生产速率时,需要一个缓冲区来减小发送的压力。默认是 32M。

max.request.size 最大消息大小

用来控制发送的消息的最大大小,默认是 1M。当超过限制大小时就会抛出 RecordTooLargeException 异常。

request.timeout.ms 请求超时

消息发送之后的超时等待时间,默认时 30 秒。超过等待时间会抛出 TimeoutException 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/953325.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uniapp移动端h5设计稿还原

思路 动态设置html的font-size大小 实现步骤 先创建一个public.css文件&#xff0c;设置初始的font-size大小 /* 注意这样写 只能使用css文件, scss 是不支持的, setProperty 只适用于原生css上 */ html {--gobal-font-size: 0.45px; } .gobal-font-size {font-size: var(--g…

CRM通过哪四个特点赢得不同类型的客户

1.设置正确的目标 首先&#xff0c;在CRM系统中设置正确的目标是非常重要的。不同类型的客户有不同的需求和预期&#xff0c;需要使用不同的方法去处理。如果企业想吸引新客户&#xff0c;那么企业需要更加侧重于建立品牌形象和提供相关的信息。如果企业想留住老客户&#xff…

matplotlib绘图常见设置总结

绘图 官方API 头文件、画布初始化 首先要导入头文件&#xff0c;初始化画布 from matplotlib import pyplot as plt from matplotlib.pyplot import MultipleLocator # 从pyplot导入MultipleLocator类&#xff0c;这个类用于设置刻度间隔 import numpy as np # 常用的数据…

SQL求解用户连续登录天数

数据分析面试过程中&#xff0c;一般都逃不掉对SQL的考察&#xff0c;可能是笔试的形式&#xff0c;也可能是面试过程中面试官当场提问&#xff0c;当场在纸上写出&#xff0c;或者简单说一下逻辑。 今天&#xff0c;就来分享一道面试中常常被问到的一类SQL问题&#xff1a;连…

Vue3响应式原理 私

响应式的本质&#xff1a;当数据变化后会自动执行某个函数映射到组件&#xff0c;自动触发组件的重新渲染。 响应式的实现方式就是劫持数据&#xff0c;Vue3的reactive就是通过Proxy劫持数据&#xff0c;由于劫持的是整个对象&#xff0c;所以可以检测到任何对象的修改&#xf…

【传输层】网络基础 -- UDP协议 | TCP协议

再谈端口号端口号范围划分netstatpidof UDPUDP的特点面向数据报UDP的缓冲区 基于UDP的应用层协议 TCP认识TCP协议的报头理解封装解包理解可靠性TCP工作模式16位窗口大小6位标志位URGACKPSHRSTSYNFIN 再谈端口号 端口号(Port)标识了一个主机上进行通信的不同的应用程序 在TCP/I…

积分游戏小程序模板源码

积分游戏小程序模板源码是一款可以帮助用户快速开发小程序的工具&#xff0c;此模板源码包含五个静态页面&#xff0c;分别是首页、任务列表、大转盘、猜拳等五个页面&#xff0c;非常适合进行积分游戏等相关开发。 此模板源码的前端部分非常简单易用&#xff0c;用户可以根据…

KiCad 封装原件类型与封装焊盘不匹配 预期SMD 实际通孔

KiCad 7.0.6 PCB ERC 检查时弹出不匹配错误&#xff0c;提示&#xff1a; 封装原件类型与封装焊盘不匹配 预期SMD 实际通孔&#xff1a; 但实际的封装已经是 SMD 了呀。为啥&#xff0c;因为自己绘制的封装中属性不对。将自绘封装中的 原件类型由 通孔 改为 贴片即可&#xff1…

Python钢筋混凝土结构计算.pdf-T001-混凝土强度设计值

以下是使用Python求解上述问题的完整代码&#xff1a; # 输入参数 f_ck 35 # 混凝土的特征抗压强度&#xff08;单位&#xff1a;MPa&#xff09; f_cd 25 # 混凝土的强度设计值&#xff08;单位&#xff1a;MPa&#xff09; # 求解安全系数 gamma_c f_ck / f_cd # …

提高工作效率,轻松实现IP地址批量ping

在实际操作中&#xff0c;我们经常需要对一系列已分配的IP进行ping检测&#xff0c;以确认其是否正在运行。然而&#xff0c;我们的表格仅有一个标签页&#xff0c;且仅包含一个ip地址列。 iP192.168.196.106192.168.196.107192.168.196.108192.168.196.109 实现思路 我们的…

面对银行分支机构,UPS监控该如何应对?

UPS系统确保在电力中断或故障时&#xff0c;银行的关键系统和设备能够继续正常运行&#xff0c;从而防止因电力波动而可能导致的数据丢失和业务中断。 为了实现有效的UPS监控&#xff0c;银行需要应用监控系统。银行可以实时监测UPS系统的状态&#xff0c;及时发现潜在问题并采…

智能感测型静电中和设备由哪些部分构成

智能感测型静电中和设备是一种利用先进的传感技术和自动控制系统&#xff0c;以及适应性算法来实现静电电荷的中和和消除的设备。它主要用于消除静电带来的问题&#xff0c;比如电子元件的损坏、电磁干扰、火灾等。 智能感测型静电中和设备通常包括以下几个主要部分&#xff1…

年轻人的新社交密码:高质量小众社交app皮雀,到底怎么玩?

新一代年轻人被各种生活、工作和强社交关系充斥&#xff0c;面临着巨大的社交压力&#xff0c;因此他们在社交的选择方向上&#xff0c;逐渐远离线下社交&#xff0c;去选择线上社交&#xff0c;不同于有心理负担的线下社交&#xff0c;线上社交具有更多的选择性。基于能为年轻…

地下管线三维自动建模软件MagicPipe3D V3.0发布

2023年9月1日经纬管网建模系统MagicPipe3D V3.0正式发布&#xff0c;该版本经过众多用户应用和反馈&#xff0c;在三维地下管网建模效果、效率、适配性方面均有显著提升&#xff01;MagicPipe3D本地离线参数化构建地下管网模型&#xff08;包括管道、接头、附属设施等&#xff…

2023固态U盘、移动硬盘对比

最近测试了几款固态U盘/移动硬盘&#xff0c;希望能大家的选购有点帮助。 1、移速逸动-2T&#xff08;500MB/s&#xff09;&#xff1a;799元某音 2、爱国者u397-1T&#xff08;1000MB/s&#xff09;&#xff1a;578元京东 3、梵想FF520-512G&#xff08;500MB/s&#xff09…

直播程式源码平台细讲HTTP协议:超文本传输

HTTP协议的简介 HTTP协议是一种数据通信协议&#xff0c;是浏览器与服务器之间的协议&#xff0c;HTTP协议的中文全称为超文本传输协议&#xff0c;HTTP协议在直播程式源码平台中&#xff0c;承载着数据传输的重要任务&#xff0c;用户可以通过HTTP协议获取直播程式源码平台中提…

Docker部署RustDesk Server 设置开机自启

三、Docker安装 Docker官方和国内daocloud都提供了一键安装的脚本&#xff0c;使得Docker的安装更加便捷。 官方的一键安装方式&#xff1a; curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun 国内 daocloud一键安装命令&#xff1a; curl -sSL https://…

【LeetCode】1654:到家的最少跳跃次数的解题思路 关于力扣无法return的BUG的讨论

文章目录 一、题目二、题解与代码三、神奇的BUG3.1 无法执行的 return 和 break 语句3.2 通过另一个 break 解决 一、题目 有一只跳蚤的家在数轴上的位置 x 处。请你帮助它从位置 0 出发&#xff0c;到达它的家。 跳蚤跳跃的规则如下&#xff1a; 它可以 往前 跳恰好 a 个位…

Android OTA 相关工具(七) 使用 lpunpack 解包 super.img

文章目录 1. lpunpack 的编译2. lpunpack 的帮助信息3. lpunpack 的用法3.1 解包所有镜像3.2 解包指定名称分区镜像3.3 解包指定槽位分区镜像 4. 其它 从 Android 10(Q) 开始&#xff0c;引入了动态分区&#xff0c;伴随的就是一组动态分区内容数据增删改查相关的操作&#xff…