Apache kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,是消息中间件的一种,用于构建实时数据管道和流应用程序。
   
 
    
   
    Kafka官网:http://kafka.apache.org/
   
 
    
  
    安装环境:
   
 
    
   
    Kafka集群环境搭建,依赖于zookeeper环境,前面已经搭建好了。
   
 
   
    可以参考之前的 
    Zookeeper 集群安装_俗尘某某的博客-CSDN博客
   
 
    
  kafka集群
| 
         主机
        | 
         IP
        | 
         SoftWare
        | 
         Port
        | 
         OS
        | 
| 
         node1
        | 
         192.168.230.128
        | 
         kafka_2.13-3.3.1
        | 
         9092
        | 
         Centos 7
        | 
| 
         node2
        | 
         192.168.230.129
        | 
         kafka_2.13-3.3.1
        | 
         9092
        | 
         Centos 7
        | 
| 
         node3
        | 
         192.168.230.130
        | 
         kafka_2.13-3.3.1
        | 
         9092
        | 
         Centos 7
        | 
    一、集群部署
   
 
    
   
    1、下载kafka并解压(3台都执行)
   
 
    
   
    tar -xf kafka_2.13-3.3.1.tgz -C /usr/local/
   
 
    
   
    2、编辑配置文件
   
 
  
    vim /usr/local/kafka_2.13-3.3.1/config/server.properties
   
 
    
  # 每个broker在集群中的唯一标识,不能重复
broker.id=1
# 端口
port=9092
listeners=PLAINTEXT://192.168.230.128:9092 # Listener name, hostname and port the broker will advertise to clients. # If not set, it uses the value for "listeners". advertised.listeners=PLAINTEXT://192.168.230.128:9092
# broker处理消息的线程数
num.network.threads=3
# broker处理磁盘io的线程数
num.io.threads=8
# socket发送数据缓冲区
socket.send.buffer.bytes=102400
# socket接收数据缓冲区
socket.receive.buffer.bytes=102400
# socket接收请求最大值
socket.request.max.bytes=104857600
# kafka数据日志文件存放位置
log.dirs=/usr/local/kafka_2.13-3.3.1/kafka-logs
# topic默认的分区数
num.partitions=1
# 恢复线程数
num.recovery.threads.per.data.dir=1
# 默认副本数
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# 消息日志最大存储时间,这里是7天
log.retention.hours=168
# 每个日志分段文件大小,这里是1g
log.segment.bytes=1073741824
# 消息日志文件大小检查间隔时间
log.retention.check.interval.ms=300000
# zookeeper集群地址
zookeeper.connect=192.168.230.128:2181,192.168.230.129:2181,192.168.230.130:2181
#zookeeper.connect=zknode1:2181,zknode2:2181,zknode3:2181
# zookeeper连接超时时间
zookeeper.connection.timeout.ms=6000
# 推迟初始消费者再平衡时间
group.initial.rebalance.delay.ms=0
   特此说明:如果需要修改为主机名连接则需修改三台主机的hosts文件设置节点名称
  
 
   
    
   
    ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
   
 
   
    broker.id                     默认是0,node1配置为1,node2配置为2,node3配置为3
   
 
    
   
    host.name                   新增这个字段,node1配置为192.168.10.201,node2配置为192.168.10.202,node3配置为192.168.10.203
   
 
    
   
    log.dirs                        默认路径为/tmp/kafka-logs,修改为/usr/local/kafka_2.13-3.3.1/kafka-logs
   
 
    
   
    zookeeper.connect       默认为localhost:2181,修改为zookeeper集群的地址192.168.230.128:2181,192.168.230.129:2181,192.168.230.130:2181
   
 
   
    其他保持默认配置
   
 
   
    ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
   
 
  
    3、启动kafka
   
 
   
    (3个节点都启动)
   
 
   
    #到bin目录下执行
   
 
   
    cd /usr/local/kafka_2.13-3.3.1/bin/
   
 
   
    #后台启动加参数-daemon ./kafka-server-start.sh -daemon ../config/server.properties
   
 
   
     4、停止kafka 
    
 
   
     ./kafka-server-stop.sh     //不带任何参数即可
    
 
     
    
     # kafka集群部署成功后,会在zookeeper集群里面写入数据,通过zkCli.sh或者zkui可以看见kafka生成的数据:
    
 
   
    如果加了环境变量,可以执行下面命令:
   
 
    
   
    kafka-server-start.sh /usr/local/kafka_2.13-3.3.1/config/server.properties
   
 
  
   ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  
 
  
    5. 配置环境变量 三台节点都配置/etc/profile文件
   
 
   


    # 配置Kafka环境变量 
   
 
   
    export KAFKA_HOME=/usr/local/kafka_2.13-3.3.1 
   
 
   
    export PATH=$PATH:$KAFKA_HOME/bin
   
 
    
   
     #让新环境变量生效
    
 
    
     source /etc/profile
    
 
   
     6. 配置防火墙
    
 
    
     firewall-cmd --zone=public --add-port=2181/tcp --permanent         #zk端口
    
 
    
     firewall-cmd --zone=public --add-port=9092/tcp --permanent firewall-cmd --reload
    
 
   
    7. 创建主题
   
 
   
      ./kafka-topics.sh --create --bootstrap-server zknode1:9092 --replication-factor 1 --partitions 1 --topic iot
     
 
    
     8. 查看主题
    
 
    
      ./kafka-topics.sh --bootstrap-server 192.168.230.128:9092 --list
     
 
     
    
     9. 删除主题
    
 
    
     ./kafka-topics --delete --zookeeper 【zookeeper server:port】 --topic 【topic name】
    
 
     
   
    10. 订阅主题
   
 
   
    ./kafka-topics.sh --bootstrap-server zknode1:9092 --describe --topic iot
   
 
   
     #./kafka-topics.sh --bootstrap-server 192.168.230.128:9092 --describe --topic iot
    
 
     
    
     11. 生产消息
    
 
    
      ./kafka-console-producer.sh --bootstrap-server zknode1:9092 -topic iot
     
 
    
     #./kafka-console-producer.sh --bootstrap-server 192.168.230.128:9092 --topic iot
    
 
     
    
    12. 消费消息
   
 
   
    ./kafka-console-consumer.sh --bootstrap-server zknode2:9092 --topic iot
   
 
   
    #./kafka-console-consumer.sh --bootstrap-server 192.168.230.129:9092 --topic iot
   
 
    
   
 #后续持续完善中
                


![【Unity学习笔记】[Unity中文课堂教程] C#中级编程代码](https://img-blog.csdnimg.cn/41ea862f48384c81985a480d049422c4.png)














