高性能消息中间件 - Kafka3.x(二)

news2025/7/20 17:31:45

文章目录

    • 高性能消息中间件 - Kafka3.x(二)
      • Kafka生产者⭐
        • 生产者发生原理⭐
        • RecordAccumulator源码简单分析⭐
        • Java Api生产者的重要参数⭐
        • 环境准备
          • 创建一个名为java-api-test的topic主题⭐
          • 命令行开启一个consumer消费者监听名为java-api-test的topic⭐
          • pom.xml
        • 案例1:异步发送消息⭐
          • MyProducerAsync类(生产者)⭐
        • 案例2:带回调函数的异步发送消息⭐
          • MyProducerAsyncCallback类(生产者)⭐
        • 案例3:同步发送消息⭐
          • MyProducerAsyncCallback类(生产者)⭐
        • 生产者默认分区策略⭐
        • 自定义分区器⭐
          • MyPartitionerImpl类
          • MyProducerPartitioner类
        • 生产调优1:如何提高生产者的吞吐量?⭐
        • 消息累加器(RecordAccumulator)
        • 消息发送线程(Sender线程)
        • 生产调优2:如何保证消息不丢失?(消息可靠性)⭐
          • 消息确认机制-acks⭐
        • ISR、OSR、AR之间的关系
        • 生产调优3:如何保证消息不重复?(消息去重)⭐
          • 消息通信的基本概念⭐
          • 幂等性实现消息去重原理⭐
          • 开启幂等性⭐
          • 消息事务⭐
        • 生产调优4:如何保证消息不会乱序(单分区下的消息顺序性)?⭐

高性能消息中间件 - Kafka3.x(二)

Kafka生产者⭐

生产者发生原理⭐

当我们通过producer(生产者)调用send方法发送消息,该消息就会依次经过拦截器(一般不使用)、序列化器(要对key、value进行序列化)、分区器(提供了默认的分区分配策略,也可以自定义分区器),通过分区器计算出该消息发往哪个分区,此时就会把消息先发送给RecordAccumulate缓存(而不是直接发生给broker的分区),到达RecordAccumulate缓存中,需要满足两个条件(满足两者之一即可发送)才会通过sender线程将数据发送给kafka broker(1:batch.size默认是16kb,如果批次消息到达16kb,则会发送;2:linger.ms默认是0ms,也就是说只要消息在缓存中待了0ms还没有发送则也会自动发生。),当上面两个消息发送条件满足其中一个之后,就会通过sender线程进行发送消息到kafka broker的分区,当收到某条发送成功ack之后就会把RecordAccumulate中的对应的消息给移除掉,如果没有收到发生成功ack的话,则会进行重试(重试次数默认是int的最大值。),不过默认kafka可以缓存5条未ack的消息。

在这里插入图片描述

RecordAccumulator源码简单分析⭐
public class RecordAccumulator {
    private final Logger log;
    private volatile boolean closed;
    private final AtomicInteger flushesInProgress;
    private final AtomicInteger appendsInProgress;
    private final int batchSize;
    private final CompressionType compression;
    private final int lingerMs;
    private final long retryBackoffMs;
    private final int deliveryTimeoutMs;
    private final BufferPool free;
    private final Time time;
    private final ApiVersions apiVersions;
    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
    private final IncompleteBatches incomplete;
    private final Set<TopicPartition> muted;
    private int drainIndex;
    private final TransactionManager transactionManager;
    private long nextBatchExpiryTimeMs = 9223372036854775807L;
    
    省略......
    
}
  • batchSize:批次大小(默认:16kb)
  • compression:压缩类型(默认:不压缩)
  • lingerMs:等待时间(默认是0ms)
  • ConcurrentMap<TopicPartition, Deque< ProducerBatch >> batches:缓存(默认是32M)
    • 由于TopicPartition(主题分区)是key,Deque< ProducerBatch >(双端队列)是值。
    • 可以看出每一个topic的分区(0、1、2…)都会创建一个对应的双端队列用于存储消息,以便后面通过sender线程发送给kafka broker。
  • transactionManager:事务管理器
Java Api生产者的重要参数⭐
参数描述
bootstrap.servers指定连接的Kafka Broker服务器的主机名称和端口号。(可以设置 1 个或者多个,中间用逗号隔开即可。
key.serializervalue.serializer指定发送消息的 key和value 的序列化类。要写全类名。
buffer.memoryRecordAccumulator缓冲区的总大小,默认大小为32m(生产环境可以调大,比如调到64m)
batch.size缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到 batch.size,sender线程等待linger.ms之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms之间。
acks0:生产者发送过来的数据,不需要应答ack。
1:生产者发送过来的数据,Leader收到数据后应答ack。
-1(all):生产者发送过来的数据,Leader 和 isr 队列
里面的所有节点都要应答ack。默认值是-1,-1 和
all是等价的。
max.in.flight.requests.per.connection允许最多没有应答ack的次数,默认为 5,开启幂等性后(幂等性默认是开启的),要保证该值是 1-5 的数字。
retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
enable.idempotence是否开启幂等性,默认 true,开启幂等性。
compression.type生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。
支持压缩类型:none、gzip、snappy、lz4 和 zstd。(生产环境可以使用snappy或者gzip
环境准备
创建一个名为java-api-test的topic主题⭐
kafka-topics.sh --bootstrap-server=192.168.184.201:9092 --topic=java-api-test --partitions=5 --replication-factor=2  --create
命令行开启一个consumer消费者监听名为java-api-test的topic⭐
kafka-console-consumer.sh --bootstrap-server=192.168.184.201:9092 --topic=java-api-test
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>kafka-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>


    <dependencies>
<!--        kafka3.2.1-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.2.1</version>
        </dependency>

        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>

        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.12</version>
        </dependency>
    </dependencies>
</project>
案例1:异步发送消息⭐
MyProducerAsync类(生产者)⭐
  • 1:编写代码如下:
package com.kafka01.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @author youzhengjie 2022-09-12 20:21:18
 */
public class MyProducerAsync {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // 基本配置(都可以在ProducerConfig找到对应的配置)
        // 1:连接集群机器。ProducerConfig.BOOTSTRAP_SERVERS_CONFIG相当于bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.201:9092");
        // 2:配置key和value的字符串序列化器
        // ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG相当于key.serializer
        // ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG相当于value.serializer
        // StringSerializer.class.getName()相当于org.apache.kafka.common.serialization.StringSerializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //3:创建KafkaProducer对象,泛型的<String,String>分别是key和value的类型。
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        //4:调用send方法发送消息。ProducerRecord类用于封装我们发送的消息
        //我们使用的是ProducerRecord(String topic, V value)构造方法。
        //topic:消息要发送到哪个topic名称
        //value:消息内容
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<String,String>("java-api-test","java api发送的消息:"+i));
        }

        //5:关闭KafkaProducer
        kafkaProducer.close();
    }

}
  • 2:运行MyProducerSync类的main方法发送kafka消息:
  • 3:查看我们刚刚启动的consumer:

在这里插入图片描述

案例2:带回调函数的异步发送消息⭐
MyProducerAsyncCallback类(生产者)⭐

回调函数会在producer生产者收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(Record Metadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

package com.kafka01.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class MyProducerAsyncCallback {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // 基本配置(都可以在ProducerConfig找到对应的配置)
        // 1:连接集群机器。ProducerConfig.BOOTSTRAP_SERVERS_CONFIG相当于bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.201:9092");
        // 2:配置key和value的字符串序列化器
        // ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG相当于key.serializer
        // ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG相当于value.serializer
        // StringSerializer.class.getName()相当于org.apache.kafka.common.serialization.StringSerializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //3:创建KafkaProducer对象,泛型的<String,String>分别是key和value的类型。
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        //4:调用send方法发送消息。ProducerRecord类用于封装我们发送的消息
        //我们使用的是ProducerRecord(String topic, V value)构造方法。
        //topic:消息要发送到哪个topic名称
        //value:消息内容
        for (int i = 0; i < 10; i++) {

            ProducerRecord<String, String> producerRecord =
                    new ProducerRecord<>("java-api-test", "MyProducerAsyncCallback发送的消息:" + i);

            //带回调函数的异步发送
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {

                    //如果Exception为null则发送成功,反之则发送失败,发送失败则会自动重试。
                    if(e==null) {
                        System.out.println("消息发送成功。分区为:"+recordMetadata.partition()+",topic为:"+recordMetadata.topic());
                    }
                    else {
                        System.out.println("消息发送失败。");
                    }

                }
            });
        }

        //5:关闭KafkaProducer
        kafkaProducer.close();
    }

}
案例3:同步发送消息⭐
  • 同步发送:意思是当我们一次性发送多条消息的时候,会按顺序一条一条来发送。
MyProducerAsyncCallback类(生产者)⭐
package com.kafka01.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyProducerSyncCallback {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // 基本配置(都可以在ProducerConfig找到对应的配置)
        // 1:连接集群机器。ProducerConfig.BOOTSTRAP_SERVERS_CONFIG相当于bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.201:9092");
        // 2:配置key和value的字符串序列化器
        // ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG相当于key.serializer
        // ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG相当于value.serializer
        // StringSerializer.class.getName()相当于org.apache.kafka.common.serialization.StringSerializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //3:创建KafkaProducer对象,泛型的<String,String>分别是key和value的类型。
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        //4:调用send方法发送消息。ProducerRecord类用于封装我们发送的消息
        //我们使用的是ProducerRecord(String topic, V value)构造方法。
        //topic:消息要发送到哪个topic名称
        //value:消息内容
        for (int i = 0; i < 10; i++) {

            ProducerRecord<String, String> producerRecord =
                    new ProducerRecord<>("java-api-test", "MyProducerSyncCallback发送的消息:" + i);

            //带回调函数的同步发送
            try {
                kafkaProducer.send(producerRecord, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {

                        //如果Exception为null则发送成功,反之则发送失败,发送失败则会自动重试。
                        if(e==null) {
                            System.out.println("消息发送成功。分区为:"+recordMetadata.partition()+",topic为:"+recordMetadata.topic());
                        }
                        else {
                            System.out.println("消息发送失败。");
                        }

                    }
                }).get(); //只需要在send方法后面加上get()就是同步了。
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        //5:关闭KafkaProducer
        kafkaProducer.close();
    }

}
生产者默认分区策略⭐
  • 生产者默认分区策略:

  • 1:指定partition分区的情况下,直接将指定的值作为partition值;

  • 2:没有指明partition:

    • 2.1:但是指定了key的情况下,将key的hashcode值与topic的partition数进行取模得到partition值;
    • 2.2:既没有partition值又没有key值的情况下,kafka默认使用粘性分区器(StrickyPartition),会随机选择一个分区,并一直使用这个分区,直到这个分区满了才会随机选择下一个分区。
自定义分区器⭐
  • 实现步骤:

    • 1:新建一个自定义分区器类去实现 Partitioner 接口。

    • 2:重写 partition()方法。

    • 3:把写好的自定义分区器类配置到生产者配置上。

MyPartitionerImpl类
package com.kafka01.config;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/*
自定义分区器类
 */
public class MyPartitionerImpl implements Partitioner {


    //只需要重写这个方法
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        //将value转换成String类型
        String val = value.toString();

        //如果val内容为part-3则分配到3号分区中,反之分配到2号分区
        if(val!=null&& "part-3".equals(val)){

            //分配到3号分区
            return 3;
        }
        //分配到2号分区
        return 2;
    }


    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
MyProducerPartitioner类
package com.kafka01.producer;

import com.kafka01.config.MyPartitionerImpl;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyProducerPartitioner {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        // 基本配置(都可以在ProducerConfig找到对应的配置)
        // 1:连接集群机器。ProducerConfig.BOOTSTRAP_SERVERS_CONFIG相当于bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.201:9092");
        // 2:配置key和value的字符串序列化器
        // ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG相当于key.serializer
        // ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG相当于value.serializer
        // StringSerializer.class.getName()相当于org.apache.kafka.common.serialization.StringSerializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());


        //3:配置自定义分区器类。ProducerConfig.PARTITIONER_CLASS_CONFIG相当于partitioner.class
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitionerImpl.class.getName());


        //4:创建KafkaProducer对象,泛型的<String,String>分别是key和value的类型。
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        //5:调用send方法发送消息。ProducerRecord类用于封装我们发送的消息
        //我们使用的是ProducerRecord(String topic, V value)构造方法。
        //topic:消息要发送到哪个topic名称
        //value:消息内容
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("java-api-test", "part-" + i);
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                    //如果Exception为null则发送成功,反之则发送失败,发送失败则会自动重试。
                    if(exception==null) {
                        System.out.println("消息发送成功。分区为:"+recordMetadata.partition()+",topic为:"+recordMetadata.topic());
                    }
                    else {
                        System.out.println("消息发送失败。");
                    }
                }
            }).get();
        }

        //6:关闭KafkaProducer
        kafkaProducer.close();
    }

}
生产调优1:如何提高生产者的吞吐量?⭐
  • 方案如下:

    • 1:batch.size:批次大小,默认是16k,可以适当调整,不是越大越好
    • 2:linger.ms:等待时间,默认是0ms,建议修改成5-100ms之间
    • 3:compression.type:压缩类型,默认是不压缩,建议修改成snappy或者gzip
    • 4:buffer.memory(RecordAccumulator):缓冲区大小,默认是32M,建议修改成64M
  • 调整上面四个参数后的代码:

package com.kafka02.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class MyProducer01 {

    public static void main(String[] args) {

        Properties properties = new Properties();

        // 1:kafka基本配置

        // 连接集群机器。ProducerConfig.BOOTSTRAP_SERVERS_CONFIG相当于bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.201:9092");
        // 配置key和value的字符串序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 2:配置生产者优化参数

        // batch.size:批次大小,默认 16K,还是保持16k
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024*16);
        // linger.ms:等待时间,默认 0,修改成10ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        // RecordAccumulator:缓冲区大小,默认 32M,修改成64M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,(1024*1024*64));
        // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");


        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("java-api-test", "消息---" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                    //如果Exception为null则发送成功,反之则发送失败,发送失败则会自动重试。
                    if(exception==null) {
                        System.out.println("消息发送成功。分区为:"+recordMetadata.partition()+",topic为:"+recordMetadata.topic());
                    }
                    else {
                        System.out.println("消息发送失败。");
                    }
                }
            });
        }

        kafkaProducer.close();

    }
}
消息累加器(RecordAccumulator)
  • 消息累加器(RecordAccumulator)默认大小是32m,这个默认值在生产环境中很容器就能达到,就会导致消息堆满消息累加器,从而阻塞。
  • 消息累加器的底层是一个Map集合(key为TopicPartition,value为Deque< ProducerBatch >),可以看出我们kafka每一个分区都会在消息累加器中有对应的一个双端队列。
  • 为了防止消息太大,我们通常需要指定一个压缩类型(比如snappy或者gzip),这样也可以减少网络传输的带宽占用。
消息发送线程(Sender线程)
  • Sender线程的作用就是把符合条件(满足batch.size或者linger.ms两者满足一个即可)的消息发送给kafka broker。
  • Sender线程是基于Java nio的selector,通过selector发送消息。
  • Sender线程默认可以容忍5个消息未被确认,当消息发送失败时会进行重试(重试次数默认为int的最大值),
生产调优2:如何保证消息不丢失?(消息可靠性)⭐
消息确认机制-acks⭐
  • 消息确认机制acks一共有三种:

    • 0
      • 表示生产者当消息发送出去就不管了,不管leader副本和follower副本是否接受到数据。(这种方式效率最高,但是消息可靠性最低,很容易丢失消息)
    • 1
      • 表示生产者发送消息后,只有leader副本接受到了数据(确认了消息),消息才算发送成功。(这种方式效率中等,可靠性也是中等,有丢失消息的风险)
      • 丢失消息过程:当生产者成功把消息发送给leader副本,但是此时follower宕机了,由于acks为1,所以这条消息被确认了不会重试消息了,但是过了一会儿,follower节点恢复了,但是leader副本的节点却宕机了,此时由于follower没有消息的备份,所以导致消息丢失。
    • -1(或者all)
      • 表示生产者发送消息后,需要等leader副本和follower副本都接收到消息并且确认消息后才算成功。(这种方式效率最低,但是可靠性最高,在副本>2的情况下基本没有丢失数据风险。)
  • 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

ISR、OSR、AR之间的关系
  • AR=ISR+OSR

  • ISR:表示在指定时间内和leader保存数据同步的副本集合;

  • ORS:表示不能在指定的时间内和leader保持数据同步副本集合,称为OSR(Out-Sync Relipca set)。

生产调优3:如何保证消息不重复?(消息去重)⭐
消息通信的基本概念⭐
  • 最少一次ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量>=2。可以保证数据不丢失,但是不能保证数据不重复。
  • 最多一次:ack设置为0,可以保证数据不重复,但是不能保证数据不丢失。
  • 精确一次最少一次+幂等性(或者最少一次+幂等性+事务)。
幂等性实现消息去重原理⭐
  • 重复消息判断标准:当两条及以上的消息的三个条件(pid, Partition, SeqNumber)都相同时,Broker只会持久化一条,其中pid(ProducerId)是kafka每次重启都会分配一个新的,Partition分区号,SeqNumber序列化号(单调递增)。

  • broker中会在内存维护一个pid+分区对应的序列号。如果收到的序列号正好比内存序列号数字大1,才存储消息如果小于内存序列号,意味着消息重复,那么会丢弃消息,并应答如果远大于内存序列号,意味着消息丢失,会抛出异常

  • 注意:幂等性只能保证的是在单分区单会话内不重复!!(所以要引入事务)

开启幂等性⭐
  • 只需要给生产者的enable.idempotence参数设置为true即可开启幂等性(默认就是true,也就是幂等性默认是开启的。)
消息事务⭐
  • 首先必须注意:开启事务的前提是必须要开启幂等性!!!!!

  • 为什么要引入事务?

  • 由于幂等性不能跨分区运作,为了保证同时发的多条消息,要么全成功,要么全失败。kafka引入了事务的概念。

  • 开启事务:需要producer设置transactional.id的值并同时开启幂等性。

  • 事务的方法如下:(KafkaProducer对象调用)

    // 1 初始化事务
    void initTransactions();
    // 2 开启事务
    void beginTransaction() throws ProducerFencedException;
    // 3 在事务内提交已经消费的偏移量(主要用于消费者)
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
     String consumerGroupId) throws 
    ProducerFencedException;
    // 4 提交事务
    void commitTransaction() throws ProducerFencedException;
    // 5 放弃事务(类似于回滚事务的操作)
    void abortTransaction() throws ProducerFencedException;
    
生产调优4:如何保证消息不会乱序(单分区下的消息顺序性)?⭐
  • kafka只能保证单分区下的消息顺序性。

  • 没有开启幂等性:

    • 需要把 max.in.flight.requests.per.connection 设置为1。(缓冲队列最多放置1个请求)
  • 开启幂等性:

    • 需要 max.in.flight.requests.per.connection 设置为小于5。
  • 这是因为broker端会缓存5条消息,能够保证最近5个消息是有序的。(当开启了幂等性,且消息小于5个则会进行重新排序

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1157324.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

5、QtCharts 曲线美化

文章目录 效果ui 设置dialog.hdialog.cpp 效果 ui 设置 dialog.h #ifndef DIALOG_H #define DIALOG_H#include <QDialog> #include <QtCharts> #include <QLineSeries> #include <QGraphicsScene> #include <QTimer> #include <QSplineSerie…

陕西某小型水库雨水情测报及大坝安全监测项目案例

项目背景 根据《陕西省小型病险水库除险加固项目管理办法》、《陕西省小型水库雨水情测报和大坝安全监测设施建设与运行管理办法》的要求&#xff0c;为保障水库安全运行&#xff0c;对全省小型病险水库除险加固&#xff0c;建设完善雨水情测报、监测预警、防汛道路、通讯设备、…

如何有效使用蜂邮EDM和vba批量发送邮件?

蜂邮EDM和vba批量发送邮件的方法&#xff1f;怎么使用蜂邮EDM和vba代码群发电子邮件&#xff1f; 批量发送邮件已经成为一种不可或缺的沟通方式。蜂邮EDM和VBA是两个功能强大的工具&#xff0c;可以帮助您在邮件营销和业务通信中实现高效的批量发送邮件操作。接下来将介绍如何…

关于Goby反制上线CS中的各种问题

前言 ​ Goby作为新一代网络安全技术&#xff0c;通过为目标建立完整的资产数据库&#xff0c;实现快速的安全应急&#xff0c;日常为广大师傅提供了便捷的渗透体验。最近有观察到有关于某些蜜罐出现了Goby反制的指纹&#xff0c;顿时就起了兴趣进行研究Goby的反制&#xff0c…

AIGC究竟是什么?为什么今年大家都在讨论?

目录 一、什么是AIGC 二、AIGC发展阶段 三、AIGC的技术应用 AIGC的应用场景 四、AIGC的伦理、风险与未来 五、说在最后 在23年初&#xff0c;大家的视野范围内突然出现了一种叫ChatGPT的产品&#xff0c;这是由OpenAI研发的一种基于深度学习和自然语言处理技术的文本生成…

SpringBoot_mybatis-plus使用json字段

mybatis-plus使用json字段 1.前言2.方案分析2.1 为什么是json2.2 数据库的选择 3. 实战3.1 使用text字段(h2数据库)3.1.1 建表语句3.1.2 数据操作与查询 3.2 使用json字段(mysql数据库)3.2.1 建表语句3.2.2 数据操作与查询 4. 附录4.1 MySQL JSON索引用法 5. 参考文档 1.前言 …

无需服务器内网穿透Windows下快速搭建个人WEB项目

&#x1f4d1;前言 本文主要是windows下内网穿透文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;CSDN主页放风讲故事 &#x1f304;每日一句&#xff1a;努力…

今日温馨早安问候语,祝大家平安健康早安吉祥

用清晨的阳光沐浴&#xff0c;给你舒展;用清新的空气洗漱&#xff0c;给你舒心;伴清莹的雨露散步&#xff0c;给你舒情;向美好的一天欢呼&#xff0c;给你舒怀&#xff0c;用快乐的词汇凝聚&#xff0c;给你祝福&#xff0c;祝你在绚丽的晨光中走好每一天。朋友&#xff0c;早安…

算法升级之路(六)

给定一个非负整数 numRows&#xff0c;生成「杨辉三角」的前 numRows 行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方的数的和。 示例 1: 输入: numRows 5 输出: [[1],[1,1],[1,2,1],[1,3,3,1],[1,4,6,4,1]] 示例 2: 输入: numRows 1 输出: [[1]] 解题思路&…

中国教育企业出海 新兴技术助力抢占先机

继游戏、电商、短视频等领域轮番出海之后&#xff0c;国内教育企业纷纷开启了出海之路。近日发布的《2023年教育应用出海市场洞察》报告显示&#xff0c;在中国教育企业出海市场中&#xff0c;语言学习是最主要的赛道&#xff0c;但赛道竞争更为激烈。 报告指出&#xff0c;全…

高性能消息中间件 - Kafka3.x(三)

文章目录 高性能消息中间件 - Kafka3.x&#xff08;三&#xff09;Kafka Broker ⭐Kafka Broker概念Zookeeper&#xff08;新版本可以不使用zk了&#xff09;⭐Zookeeper的作用 Kafka的选举1&#xff1a;Broker选举Leader⭐Broker核心参数⭐案例&#xff1a;服役新节点和退役旧…

多测师肖sir_高级金牌讲师_jmeter 反向代理录制脚本

jemeter自带的录制脚本功能&#xff0c;是利用代理服务器来进行录制的 1&#xff0c;新建一个线程组 2&#xff0c;新建一个代理服务器 右击工作台-添加-非测试元件-http代理服务器 3&#xff0c; 配置http代理服务器 端口&#xff1a; 默认为8888&#xff0c;可修改。但…

InfoHound:一款针对域名安全的强大OSINT工具

关于InfoHound InfoHound是一款针对域名安全的强大OSINT工具&#xff0c;在该工具的帮助下&#xff0c;广大研究人员只需要提供一个Web域名&#xff0c;InfoHound就可以返回大量跟目标域名相关的数据。 在网络侦查阶段&#xff0c;攻击者会搜索有关其目标的任何信息&#xff…

理解springboot那些过滤器与调用链、包装或封装、设计模式相关等命名规范,就可以读懂80%的springboot源代码,和其他Java框架代码

紧接上面《 理解springboot那些注册与回调、监控与统计等命名规范,就可以读懂70%的springboot源代码》、《 理解springboot那些约定俗成的框架类名、全局context等命名规范,就可以读懂一半springboot的源代码》2篇文章,此片将汇总springboot那些过滤器与调用链、包装或封装…

(三)上市企业实施IPD成功案例分享之——五菱

2022年对汽车产业而言是极为不平凡的一年。这一年&#xff0c;企业受到疫情反复、芯片短缺、原材料价格上涨等负面因素影响&#xff0c;汽车产业的变革持续加速。面对变革与挑战&#xff0c;五菱汽车仍逆势交出一份超出市场预期的成绩单。上半年&#xff0c;五菱发布2022年财报…

职场晋升力加分利器:巧用ChatGPT快速搞定数据分析

&#x1f482; 个人网站:【工具大全】【游戏大全】【神级源码资源网】&#x1f91f; 前端学习课程&#xff1a;&#x1f449;【28个案例趣学前端】【400个JS面试题】&#x1f485; 寻找学习交流、摸鱼划水的小伙伴&#xff0c;请点击【摸鱼学习交流群】 第一部分&#xff1a; C…

Java设置日期时间的毫秒数为0

背景 做一个发送短信的需求&#xff0c;采用RabbitMQ来实现定时发送。发送时需要验证发送短信任务的预计发送时间和生产者传过来的时间是否一致&#xff0c;一致才发送。 结果在调试的时候&#xff0c;却发现任务一直没法触发。一步步调试&#xff0c;发现是两个时间不相等。明…

【Proteus仿真】【51单片机】贪吃蛇游戏

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真51单片机控制器&#xff0c;使用8*8LED点阵、按键模块等。 主要功能&#xff1a; 系统运行后&#xff0c;可操作4个按键控制小蛇方向。 二、软件设计 /* 作者&#xff1a;嗨小易…

2023第二十五届深圳高交会将于11月15日在深启幕

10月31日下午&#xff0c;第二十五届中国国际高新技术成果交易会&#xff08;以下简称高交会&#xff09;新闻发布会在市政府新闻发布厅举行。以“激发创新活力 提升发展质量”为主题的第二十五届高交会将于11月15日-19日在深圳会展中心&#xff08;福田展区&#xff09;和深圳…

el-date-picker如何选择规定范围内的时间(十天以内的时间)

这个需求是可以选择之后来计算,选择当前日期之后自动计算当前日期前后的十天以内的日期 如下图 就是19号前面十天的日期 以及后面十天的日期(包含当天) 需要用到elementUI el-date-picker是Element UI库中的一个组件&#xff0c;用于日期选择 <el-date-picker v-model&q…