本文从kafka中的实际应用场景分析,讲述kafka的一些基本概念。再讲述zookeeper集群环境的构建;kafka的搭建以及脚本文件编写;最后是一个快速入门的demo.内容会比较多,希望大家能有所收获!
1.Kafka(MQ)实战应用场景剖析
Kafka(mq)应用场景
 1、Kafka之异步化、服务解耦、削峰填谷
 2、Kafka海量日志收集
 3、Kafka之数据同步应用
 4、Kafka之实时计算分析
Kafka异步化实战:
 异步化解耦,消息推送,持久化存储保证数据可靠性,避免需要重传。
 
 Kafka服务解耦,削峰填谷:
 下单请求-redis实现库存校验(list存储)
 
 实现服务解耦指的是请求和接收方可以是两个完全独立的系统;
 通常消息队列是作为不同服务之间信息传递的桥梁。
KAFKA海量日志收集:(logstash日志解析)
 
KAFKA之数据同步实战:
 
 KAFKA实时计算分析:
 
2.Kafka基础概念
1集群架构简介
 
 
Topic主题与Partition分区(一对多的关系)
 一个分区只能属于单个主题,同一个主题下可以有不同的分区。
 分区里面包含不同的消息。分区可以理解为是一个可追加的日志文件。
 Kafka可是通过offset保证消息在分区内的顺序性的。
 路由规则有关分区区号,分区选择那个分区;
 分区的目的是分散磁盘io;创建主题的时候也可以去制定分区的个数的。
 通过增加分区,实现水平扩展
2副本概念(replica)
 
 绿色p1是个主分片或主副本,
 Broker2上的紫色p1是副本。副本分片。优点类似elasticsearch
 通过replica,实现集群故障转移,保证了集群高可用。
 用空间换取数据的高可靠性,稳定性。
3ISR详解
 
 ISR集合的理解:数据录取及时进入ISR集合(比如p1s1的拉取时间为50ms,p1s2的pull时间为200ms,都小于规定超时时间1s。两者皆进入ISR.)跟数据一直性相关,副本从主副本的拉取时间
 超时的话会进入OSR集合。
 Leader副本(p1)主要是维护和跟踪ISR集合中的滞后状态。
 ISR集合跟OSR集合是动态的。比如后面有消息来后,p1s2的时间超过规定时间1s,
 则将p1s2放入OSR集合。
 当p1宕机,一般是从ISR选取副本进行替代成为leader副本,即新的P1。
ISR只是模型,映射到实际的存储的一些基本概念。
 HW:high watermark ,高水位线,消费者只能最多拉取到高水位线的消息
 LEO:log end offset,日志文件的最后一条记录offset(偏移量)
 ISR集合与HW和LEO直接存在着密不可分的关系
 
 高水位线下一次需要拉取的数据是6
 
 理解:当写入消息3和4时,此时leader已经成功写入,hw没变,
 Leo变成4.HW没变是因为还没有完成数据同步,3和4目前consumer
 目前还没有办法处理。
 
 消息3同步的比较快,此时hw变成3.LEO变为4.
 因为follower2对于数据同步的比较慢,所以暂定hw为3
3.构建zookeeper集群环境

 
4.开机启动与连接工具介绍
实现开机自启动步骤:
 cd /etc/rc.d/init.d/
 Touch zookeeper
 Chmod 777 zookeeper
 vi zookeeper
开启启动zookeeper脚本:
#!/bin/bash
#chkconfig:2345 20 90
#description:zookeeper
#processname:zookeeper
export JAVA_HOME=/usr/java/jdk1.8.0_301
export PATH=$JAVA_HOME/bin:$PATH
case $1 in
          start) su root /home/software5/zookeeper-3.6.2/bin/zkServer.sh start;;
          stop) su root /home/software5/zookeeper-3.6.2/bin/zkServer.sh stop;;
          status) su root /home/software5/zookeeper-3.6.2/bin/zkServer.sh status;;
          restart) su root /home/software5/zookeeper-3.6.2/bin/zkServer.sh restart;;
          *) echo "require start|stop|status|restart" ;;
esac
 
使用ZooInspector图形界面连接zookeeper。
 使用到的命令:java -jar.\zookeeper-dev-ZooInspector.jar
5.Kafka环境搭建
Kafka版本:kafka_2.12
 管控台:kafkaManager2.0.0.2
 协调服务:zookeeper-3.4.6
 Kafka环境验证(操作台控制)
Kafka环境搭建:
 准备zookeeper环境(zookeeper-3.6.2)
 下载kafka安装包:kafka
 上传到虚拟机:/home/software6
 解压到/usr/local目录下:
 tar -zxvf kafka_2.12-2.1.0.tgz.gz -C /usr/local
 重命名压缩的文件:mv kafka_2.12-2.1.0 kafka_2.12
 修改kafka配置文件:
cd kafka_2.12/config
vi server.properties
  ## 修改内容:
  ## The id of the broker. This must be set to a unique integer for each broker
  broker.id=0
  port=9092
  host.name=192.168.11.221
  dvertised.host.name=192.168.11.221
  log.dirs=/usr/local/kafka_2.12/kafka-logs
  num.partitions=5
  zookeeper.connect=192.168.11.221:2181,192.168.11.222:2181,192.168.11.223:2181
 
创建kafka存储消息(log日志数据)的目录
mkdir /usr/local/kafka_2.12/kafka-logs
 
kafka配置成功,执行启动命令,启动kafka。
/usr/local/kafka_2.12/bin/kafka-server-start.sh  /usr/local/kafka_2.12/config/server.properties &
/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &
 
Kafka-manager管控台搭建与脚本测试验证
 安装kafka Manager可视化管控台:安装到192.168.56.107上
 解压zip文件:
 unzip kafka-manager-2.0.0.2.zip -d /usr/local
 修改配置文件: vi /usr/local/kafka-manager-2.0.0.2/conf/appication.conf
 修改的内容:
 kafka-manager.zkhosts=“192.168.56.107:2181,192.168.56.110:2181,
 192.168.56.111:2181”
192.168.56.107节点启动kafka manager 控制台
 /usr/local/kafka-manager-2.0.0.2/bin/kafka-manager &
浏览器访问控制台:默认端口号是9000
 http://192.168.56.107:9000/
集群验证:
 通过控制台创建了一个topic为"test" 2个分区 1个副本
消费发送与接收验证
 cd /usr/local/kafka_2.12/bin
 ## 启动发送消息的脚本
 ## --broker-list 192.168.11.221 指的是kafka broker的地址列表
 ## --topic test 指的是把消息发送到test主题
 ./kafka-console-producer.sh --broker-list 192.168.56.107:9092 --topic topic-quickstart
 ## 启动接收消息的脚本
 ./kafka-console-consumer.sh --bootstrap-server 192.168.56.107:9092 --topic topic-serial
6.Kafka快速入门
Kafka急速入门:producer:
 配置生产者参数属性,创建生产者对象;构建消息producerrecord
 发送消息send;关闭生产者
 Kafka快速入门:Consumer
 配置消费者参数写构造消费者对象;订阅主题;
 拉取消息并进行消费处理;提交消费偏移量,关闭消费者
 kafka是通过序列化好的key然后去进行分区的。
Kafka急速入门:producer:
import com.alibaba.fastjson.JSON;
import com.bfxy.kafka.api.Const;
import com.bfxy.kafka.api.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class QuickStartProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //1.配置生产者启动的关键属性参数
        //1.1 BOOTSTRAP_SERVERS_CONFIG,连接kafka集群服务列表,如果有多个,使用","进行分隔
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.107:9092");
        //1.2 CLIENT_ID_CONFIG: 这个属性的目的是标记kafka cilent的ID
        properties.put(ProducerConfig.CLIENT_ID_CONFIG,"quickstart-producer");
        //1.3对kafka的key和value做序列化(kafka只能识别二进制数据)
        //org.apache.kafka.common.serialization.Serialization
        //key;是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //value:实际发送消息的内容
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //2.创建kafka生产者对象,传递properties属性参数集合
        KafkaProducer<String ,String> producer = new KafkaProducer<>(properties);
        //3.构造消息内容
        User user = new User("003","张飞");
        //需要将user对象转化为string,arg1:topic  arg2:实际消息体的内容
        ProducerRecord<String,String> record =
                new ProducerRecord<String,String>(Const.TOPIC_QUICKSTART,
                        JSON.toJSONString(user));
        //4.发送消息
        producer.send(record);
        //5.关闭生产者,生产环境中一般不关闭
        producer.close();
    }
}
 
Kafka快速入门:Consumer
import com.bfxy.kafka.api.Const;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
/**
 * @author nly
 */
public class QuickStartConsumer {
    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        //1.配置属性参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.107:9092");
        //key;是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //value:实际发送消息的内容
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //非常重要的属性配置,与我们消费者订阅组有关系
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "topic-quickstart");
        //常规属性:会话连接超时时间
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
        //常规属性:消费者提交offset:自动提交&手工提交,默认是自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,5000);
        //2.创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        //3.订阅你感兴趣的主题:Const.TOPIC_QUICKSTART
        consumer.subscribe(Collections.singletonList(Const.TOPIC_QUICKSTART));
        System.err.println("quickstart consumer started ...");
        //4.采用拉取的方式消费数据
        while (true){
                 //等待多久进行一次数据的拉取
                 //拉取TOPIC_QUICKSTART主题里面所有的消息
                 //topic和partition是一对多的关系,一个topic可以有多个partition
                 ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(3000));
                 //因为消息是在partition中存储的,所以需要遍历partition集合
                 for (TopicPartition topicPartition : records.partitions()){
                     //通过topicPartition获取制定的消息集合,就是获取到当前topicPartition下面所有的消息
                     // 在records对象中的数据集合
                     List<ConsumerRecord<String,String>> partitionRecords = records.records(topicPartition);
                     //获取到每一个TopicPartition
                     String topic =topicPartition.topic();
                     //获取当前topicPartition下的消息条数
                     int size = partitionRecords.size();
                     System.out.println(String.format("--- 获取topic: %s,  分区位置 : %s, 消息总数: %s",
                             topic,
                             topicPartition.partition(),
                             size));
                     for(int i = 0; i <  size; i++){
                         ConsumerRecord<String,String> consumerRecord = partitionRecords.get(i);
                         // 实际数据内容
                         String value = consumerRecord.value();
                         // 当前获取的消息的偏移量
                         long offset = consumerRecord.offset();
                         //ISR : High Watermark,如果要提交的话,比如提交当前消息的offset
                         //表示下一次从什么位置(offset)拉取消息
                         long commitOffset = offset + 1;
                         System.err.println(String.format("获取实际消息value: %s,  消息offset: %s, 提交offset: %s",
                                 value,offset,commitOffset));
                     }
                 }
             }
    }
}
                
![vcpkg install libtorch[cuda] -allow-unsupported-compiler](https://i-blog.csdnimg.cn/direct/36b15e279696405e9efe0e8ac15df564.png)

















