
文章目录
更多相关内容可查看
Kafka介绍
Apache Kafka是一个开源的分布式事件流平台,由LinkedIn公司开发并于2011年贡献给Apache软件基金会。Kafka设计用于处理大规模实时数据,它能够处理每秒数百万条消息,因此被广泛应用于大数据和实时分析领域。
Kafka的主要特点包括:
-  
高吞吐量:Kafka能够处理每秒数百万条消息,满足大规模数据处理的需求。
 -  
分布式:Kafka通过分布式系统设计,提供数据冗余和容错能力。
 -  
实时性:Kafka能够实时处理数据,适合需要快速响应的场景。
 -  
持久性:Kafka将数据存储在磁盘上,即使系统崩溃,数据也不会丢失。
 -  
可扩展性:Kafka可以通过添加更多的服务器来扩展处理能力。
 
Kafka使用场景
自媒体用户发布文章成功之后需要进行文章的审核 , 审核通过之后才会发布到APP端供用户查看 , 审核功能因为耗时较久 , 长时间阻塞会影响用户体验 , 而且长时间阻塞会严重影响系统的吞吐量
 所以为了实现功能之间的解耦 , 提升用户体验 , 我们可以抽取一个独立的审核服务 , 文章发布成功之后自媒体服务通过MQ通知审核服务进行文章审核 , 如下图所示 :
 
为什么要选择使用Kafka作为消息中间件
- 因为我们后期会使用MQ进行行为数据采集 , 对于消息的吞吐量要求更高
 - 因为后期会进行文章的实时推荐 , 会使用到一些实时流计算技术 , Kafka提供这么一个技术 Kafka Stream , 开发成本和运维成本会更低一些
 
kafka概述和安装
kafka概述
消息中间件对比
| 特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka | 
|---|---|---|---|---|
| 开发语言 | java | erlang | java | scala(JVM) | 
| 单机吞吐量 | 万级 | 万级 | 10万级 | 100万级 | 
| 时效性 | ms | us | ms | ms级以内 | 
| 可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 非常高(分布式) | 
| 功能特性 | 成熟的产品、较全的文档、各种协议支持好 | 并发能力强、性能好、延迟低 | MQ功能比较完善,扩展性佳 | 只支持主要的MQ功能,主要应用于大数据领域 | 
消息中间件对比-选择建议
| 消息中间件 | 建议 | 
|---|---|
| Kafka | 追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务 | 
| RocketMQ | 可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验 | 
| RabbitMQ | 性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ | 
kafka介绍
 Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。
 kafka官网:http://kafka.apache.org/

kafka介绍-名词解释
 
- producer:发布消息的对象称之为主题生产者(Kafka topic producer)
 - topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
 - consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
 - broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
 
kafka安装配置
可查看文章【Linux】Docker安装kafka教程(保姆篇)
kafka快速入门
- 生产者发送消息,多个消费者只能有一个消费者接收到消息
 - 生产者发送消息,多个消费者都可以接收到消息
 
创建项目

 
导入依赖
<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>
 
发送消息
启动引导类
@SpringBootApplication
public class KafkaProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerApplication.class, args);
    }
}
 
在kafka-producer项目中编写生产者代码发送消息 , 创建application.yml配置文件, 配置Kafka连接信息
spring:
  application:
    name: kafka-producer
  kafka:
    bootstrap-servers: 118.25.197.221:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
 
配置消息主题
@Configuration
public class KafkaConfig {
    @Bean
    public NewTopic newTopic(){
        return TopicBuilder.name("topic.my-topic1").build();
    }
}
 
发送消息到Kafka
package com.heima.kafka;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;
import java.util.concurrent.ExecutionException;
@SpringBootTest
public class KafkaProducerTest {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    @Test
    public void testSend() throws ExecutionException, InterruptedException {
        kafkaTemplate.send("kafka.topic.my-topic1", "kafka", "hello kafka !");
    }
}
 
接收消息
启动引导类
@SpringBootApplication
public class KafkaConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }
}
 
在kafka-consumer项目中编写消费者代码接收消息 , 创建application.yml配置文件, 配置Kafka连接信息
spring:
  application:
    name: kafka-consumer
  kafka:
    bootstrap-servers: 118.25.197.221:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
 
创建监听器类, 监听kafka消息
@Component
public class KafkaConsumerListener {
    @KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
    public void listenTopic1group1(ConsumerRecord<String, String> record) {
        String key = record.key();
        String value = record.value();
        System.out.println("group1中的消费者接收到消息:" + key + " : " + value+"));
    }
}
 
kafka生产者详解
发送类型
同步发送: 使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功,因为如果发送操作失败,kafkaTemplate.send().get()会抛出异常,而不会返回SendResult。如果SendResult被成功返回,那么就意味着消息已经被成功发送到Kafka。
消息偏移量:是Kafka用于标识消息在主题中的位置的一个数字。每个新的消息都会被赋予一个比前一个消息大的偏移量。
@Test
public void testSend() throws ExecutionException, InterruptedException {
    同步发送
    SendResult result = (SendResult) kafkaTemplate.send("kafka.topic.my-topic1", "kafka", "hello kafka").get();
    //打印发送结果中的消息偏移量。
    System.out.println(result.getRecordMetadata().offset());
}
 
异步发送 :调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数
@Test
public void testSend() throws ExecutionException, InterruptedException {
    //异步发送
    ListenableFuture future = kafkaTemplate.send("kafka.topic.my-topic1", "kafka", "hello kafka");
    future.addCallback(result -> {
        //消息发送成功执行
        SendResult sendResult = (SendResult) result;
        System.out.println(sendResult.getRecordMetadata().offset());
    }, throwable -> {
        //消息发送失败执行
        System.out.println("发送消息出现异常:" + throwable);
    });
    Thread.sleep(1000);
}
 
参数详解
签收机制 : acks
 
 代码的配置方式:
spring:
  application:
    name: kafka-producer
  kafka:
    bootstrap-servers: 118.25.197.221:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 10  # 重试次数
      compression-type: gzip  # 消息压缩算法
      batch-size: 16KB  #批量提交的数据大小
      acks: all  # 消息确认机制  0: 不签收 , 1 : leader签收 , all : leader和follower都签收
 
参数的选择说明
| 确认机制 | 说明 | 
|---|---|
| acks=0 | 生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快 | 
| acks=1(默认值) | 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应 | 
| acks=all | 只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 | 
追求极致的吞吐量和性能使用 acks=0
追求是数据安全, 消息发送不丢失 , acks=all
既要吞吐量也要可靠性 : acks=1 (折中方案)
重试机制 : retries
 
 生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms
 代码中配置方式:
spring:
  application:
    name: kafka-producer
  kafka:
    bootstrap-servers: 118.25.197.221:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 10  # 重试次数
 
为了提高消息投递的成功率, 可以将重试次数设为一个很大的值 , 例如 : 999999999999999
kafka消费者详解
消息有序性
所谓消息有序性就是保证Kafka消息消费的顺序和发送的顺序保持一致 ,
 应用场景 : 比如客户开车出事故了需要保险公司来处理,至少要有以下几个步骤: 报案、查勘定损、立案、收单理算支付、结案等环节,这些环节是严格有序的。保险公司每完成一个环节,需要给中保信(监管保险公司的)推送数据,如果推送顺序有问题,会返回错误,比如上一个环节还没有完成。同样电商行业也是如此,下单、支付、发货都是有序的。
Kafka消息有序性
我们知道Kafka中的每个分区中的数据是有序的,但有序性仅限于当前的分区中。比如我们现在往一个topic中发送消息 , 这个topic有两个分区 , 默认采用轮询策略, 那么这个topic分区0中插入数据 1,3,5,然后在分区1中插入数据2,4,6 , 这时如果消费者想要读取这个topic的数据,他就可能随机从分区0和分区1中读取数据,比如读出结果为1,3,2,5,4,6。这时可以看到读到的数据顺序已经不是插入的顺序了。
方法一 : 一个 Topic 只对应一个 Partion
 消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。每次添加消息到 Partition(分区) 的时候都会采用尾加法,如下图所示。Kafka 只能为我们保证 Partition(分区) 中的消息有序,而不能保证 Topic(主题) 中的 Partition(分区) 的有序。
 
 所以,我们就有一种很简单的保证消息消费顺序的方法:一个 Topic 只对应一个 Partion , 这种方式影响Kafka效率
方法二 : 按键路由
 Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 partion 的话,所有消息都会被发送到指定的 partion。并且,同一个 key 的消息可以保证只发送到同一个 partition ! 这样我们就可以为需要保证顺序的消息设置同一个Key , 这样就能保证这组消息都发送到同一个分区中 , 从而保证消息顺序性
@Test
public void sendTopic3() {
    kafkaTemplate.send("test.topic03", "order_1001", "kafka!");
}
 
提交和偏移量
Kafka会记录每条消息的offset(偏移量) , 消费者可以使用offset来追踪消息在分区的位置 , 所以在Kafka中消息消费采用的是pull模型, 由消费者主动去Kafka Brocker中拉取消息
 之前说过Kafka的消费者再均衡机制 : 如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡 , 例如:
 
 如果消费者2挂掉以后,会发生再均衡,消费者2负责的分区会被其他消费者进行消费
 
 再均衡后不可避免会出现一些问题(消息丢失&消息重复消费)
问题一:消息重复消费
 
 如果提交偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
 问题二:消息丢失
 
 如果提交的偏移量大于客户端的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
 如果想要解决这些问题,还要知道目前kafka提交偏移量的方式 , 提交偏移量的方式有两种,分别是自动提交偏移量和手动提交
- 自动提交偏移量 :当
enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去 - 手动提交偏移量 : 当
enable.auto.commit被设置为false , 需要程序员手动提交偏移量 
手动提交偏移量 : 同步提交
 把enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。
 只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。
@KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
public void listenTopic1group1(KafkaConsumer consumer, ConsumerRecord<String, String> record) {
    String key = record.key();
    String value = record.value();
    System.out.println("group1中的消费者接收到消息:" + key + " : " + value+",偏移量:"+record.offset());
    //同步提交偏移量
    consumer.commitSync();  
}
 
手动提交偏移量 : 异步提交
 手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。
@KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
public void listenTopic1group1(KafkaConsumer consumer, ConsumerRecord<String, String> record) {
    String key = record.key();
    String value = record.value();
    System.out.println("group1中的消费者接收到消息:" + key + " : " + value+",偏移量:"+record.offset());
    //异步提交偏移量
    consumer.commitAsync();
}
 
手动提交偏移量 : 同步和异步组合提交
 异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。
@KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
public void listenTopic1group1(KafkaConsumer consumer, ConsumerRecord<String, String> record) {
    String key = record.key();
    String value = record.value();
    System.out.println("group1中的消费者接收到消息:" + key + " : " + value+",偏移量:"+record.offset());
    //同步异步, 结合提交
    try {
        consumer.commitAsync();
    } catch (Exception e) {
        e.printStackTrace();
        consumer.commitSync();
    }
}
 


![[分布式网络通讯框架]----RPC通信原理以及protobuf的基本使用](https://img-blog.csdnimg.cn/direct/b8c3b99b3e674a86abda9d5dcdb4fca0.png)










![[C++][设计模式][抽象工厂]详细讲解](https://img-blog.csdnimg.cn/direct/2a30ef7b5c264a949fa95ff925719992.png)





