一、前言
数据重复这个问题其实也是挺正常,全链路都有可能会导致数据重复。

通常,消息消费时候都会设置一定重试次数来避免网络波动造成的影响,同时带来副作用是可能出现消息重复。
整理下消息重复的几个场景:
- 生产端: 遇到异常,基本解决措施都是 重试。 
  - 场景一:leader分区不可用了,抛LeaderNotAvailableException异常,等待选出新leader分区。
- 场景二:Controller所在Broker挂了,抛NotControllerException异常,等待Controller重新选举。
- 场景三:网络异常、断网、网络分区、丢包等,抛 NetworkException异常,等待网络恢复。
 
- 场景一:
- 消费端: poll一批数据,处理完毕还没提交offset,机子宕机重启了,又会poll上批数据,再度消费就造成了消息重复。
怎么解决?
先来了解下消息的三种投递语义:
- 最多一次(at most once): 消息只发一次,消息可能会丢失,但绝不会被重复发送。例如:mqtt中QoS = 0。
- 至少一次(at least once): 消息至少发一次,消息不会丢失,但有可能被重复发送。例如:mqtt中QoS = 1
- 精确一次(exactly once): 消息精确发一次,消息不会丢失,也不会被重复发送。例如:mqtt中QoS = 2。
了解了这三种语义,再来看如何解决消息重复,即如何实现精准一次,可分为三种方法:
- Kafka幂等性- Producer: 保证生产端发送消息幂等。局限性,是只能保证单分区且单会话(重启后就算新会话)
- Kafka事务: 保证生产端发送消息幂等。解决幂等- Producer的局限性。
- **消费端幂等: ** 保证消费端接收消息幂等。蔸底方案。
1)Kafka 幂等性 Producer
**幂等性指:**无论执行多少次同样的运算,结果都是相同的。即一条命令,任意多次执行所产生的影响均与一次执行的影响相同。
幂等性使用示例:在生产端添加对应配置即可
Properties props = new Properties();
props.put("enable.idempotence", ture);  // 1. 设置幂等
props.put("acks", "all");               // 2. 当 enable.idempotence 为 true,这里默认为 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 注意
复制代码- 设置幂等,启动幂等。
- 配置 acks,注意:一定要设置acks=all,否则会抛异常。
- 配置 max.in.flight.requests.per.connection需要<= 5,否则会抛异常OutOfOrderSequenceException。- 0.11 >= Kafka < 1.1,- max.in.flight.request.per.connection = 1
- Kafka >= 1.1,- max.in.flight.request.per.connection <= 5
 
为了更好理解,需要了解下 Kafka 幂等机制:

- Producer每次启动后,会向- Broker申请一个全局唯一的- pid。(重启后- pid会变化,这也是弊端之一)
- Sequence Numbe:针对每个- <Topic, Partition>都对应一个从0开始单调递增的- Sequence,同时- Broker端会缓存这个- seq num
- 判断是否重复: 拿 <pid, seq num>去Broker里对应的队列ProducerStateEntry.Queue(默认队列长度为 5)查询是否存在- 如果 nextSeq == lastSeq + 1,即服务端seq + 1 == 生产传入seq,则接收。
- 如果 nextSeq == 0 && lastSeq == Int.MaxValue,即刚初始化,也接收。
- 反之,要么重复,要么丢消息,均拒绝。
 
- 如果 

这种设计针对解决了两个问题:
- 消息重复: 场景 Broker保存消息后还没发送ack就宕机了,这时候Producer就会重试,这就造成消息重复。
- 消息乱序: 避免场景,前一条消息发送失败而其后一条发送成功,前一条消息重试后成功,造成的消息乱序。
那什么时候该使用幂等:
- 如果已经使用 acks=all,使用幂等也可以。
- 如果已经使用 acks=0或者acks=1,说明你的系统追求高性能,对数据一致性要求不高。不要使用幂等。
2)Kafka 事务
使用
Kafka事务解决幂等的弊端:单会话且单分区幂等。
Tips: 这块篇幅较长,这先稍微提及下使用,之后另起一篇。
事务使用示例:分为生产端 和 消费端
Properties props = new Properties();
props.put("enable.idempotence", ture);  // 1. 设置幂等
props.put("acks", "all");               // 2. 当 enable.idempotence 为 true,这里默认为 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 最大等待数
props.put("transactional.id", "my-transactional-id");  // 4. 设定事务 id
Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 初始化事务
producer.initTransactions();
try{
    // 开始事务
    producer.beginTransaction();
    // 发送数据
    producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
 
    // 数据发送及 Offset 发送均成功的情况下,提交事务
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // 数据发送或者 Offset 发送出现异常时,终止事务
    producer.abortTransaction();
} finally {
    // 关闭 Producer 和 Consumer
    producer.close();
    consumer.close();
}
复制代码这里消费端 Consumer 需要设置下配置:isolation.level 参数
-  read_uncommitted: 这是默认值,表明Consumer能够读取到Kafka写入的任何消息,不论事务型Producer提交事务还是终止事务,其写入的消息都可以读取。如果你用了事务型Producer,那么对应的Consumer就不要使用这个值。
-  read_committed: 表明Consumer只会读取事务型Producer成功提交事务写入的消息。当然了,它也能看到非事务型Producer写入的所有消息。
3)消费端幂等
“如何解决消息重复?” 这个问题,其实换一种说法:就是如何解决消费端幂等性问题。
只要消费端具备了幂等性,那么重复消费消息的问题也就解决了。
典型的方案是使用:消息表,来去重:

- 上述栗子中,消费端拉取到一条消息后,开启事务,将消息Id新增到本地消息表中,同时更新订单信息。
- 如果消息重复,则新增操作 insert会异常,同时触发事务回滚。
  
二、案例:Kafka 幂等性 Producer 使用
 
环境搭建可参考:链接
准备工作如下:
-  Zookeeper:本地使用Docker启动$ docker run -d --name zookeeper -p 2181:2181 zookeeper a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4 复制代码
-  Kafka:版本2.7.1,源码编译启动(看上文源码搭建启动)
-  启动生产者: Kafka源码中exmaple中
-  启动消息者:可以用 Kafka提供的脚本# 举个栗子:topic 需要自己去修改 $ cd ./kafka-2.7.1-src/bin $ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic 复制代码
创建 topic : 1副本,2 分区
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2
# 查看
$ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe
复制代码生产者代码:

public class KafkaProducerApplication {
    private final Producer<String, String> producer;
    final String outTopic;
    public KafkaProducerApplication(final Producer<String, String> producer,
                                    final String topic) {
        this.producer = producer;
        outTopic = topic;
    }
    public void produce(final String message) {
        final String[] parts = message.split("-");
        final String key, value;
        if (parts.length > 1) {
            key = parts[0];
            value = parts[1];
        } else {
            key = null;
            value = parts[0];
        }
        final ProducerRecord<String, String> producerRecord 
            = new ProducerRecord<>(outTopic, key, value);
        producer.send(producerRecord,
                (recordMetadata, e) -> {
                    if(e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());
                    }
                }
        );
    }
    public void shutdown() {
        producer.close();
    }
    public static void main(String[] args) {
        final Properties props = new Properties();
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        final String topic = "myTopic";
        final Producer<String, String> producer = new KafkaProducer<>(props);
        final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);
        String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
        try {
            List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));
            linesToProduce.stream().filter(l -> !l.trim().isEmpty())
                    .forEach(producerApp::produce);
            System.out.println("Offsets and timestamps committed in batch from " + filePath);
        } catch (IOException e) {
            System.err.printf("Error reading file %s due to %s %n", filePath, e);
        } finally {
            producerApp.shutdown();
        }
    }
}
复制代码启动生产者后,控制台输出如下:

启动消费者:
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic
复制代码
修改配置 acks
启用幂等的情况下,调整 acks 配置,生产者启动后结果是怎样的:
- 修改配置 acks = 1
- 修改配置 acks = 0
会直接报错:
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. 
Otherwise we cannot guarantee idempotence.
复制代码
修改配置 max.in.flight.requests.per.connection
启用幂等的情况下,调整此配置,结果是怎样的:
- 将 max.in.flight.requests.per.connection > 5会怎样?

当然会报错:
Caused by: org.apache.kafka.common.config.ConfigException: Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.
复制代码








![[附源码]计算机毕业设计基于springboot的桌游信息管理系统](https://img-blog.csdnimg.cn/ca727bbeae324275a4c2f96a302502ff.png)










