文章目录
- Kafka概述
- 定义
- 应用场景
- 缓冲/削峰
- 解耦
- 异步通信
 
- 应用模式
- 点对点模式
- 发布/订阅模式
 
- 基础架构
 
- Kafka集群部署
- 集群规划
- 下载解压
- 修改配置文件
- 分发安装包
- hadoop103、hadoop104修改配置文件
- 配置环境变量
- 启动集群
- 先启动Zookeeper集群
- 然后启动Kafka
 
- 关闭集群
 
- 集群启停脚本
- 脚本编写
- 添加执行权限
- 启动集群脚本命令
- 停止集群脚本命令
 
- Docker启动Kafka集群
- docker-compose.yml编写
- 启动compose
- 命令行验证
- Python验证
 
 
Kafka概述
定义
kafka是一种分布式的,基于发布/订阅的消息队列 (MessageQueue)。它可以处理消费者在网站中的所有动作流数据。
Kafka是一个开源的分布式事件流平台(Event StreamingPlatform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。(既想处理消息队列,又想处理数据)
 
 
应用场景
缓冲/削峰
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
 
 
解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
 
 
异步通信
允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。
 
 
应用模式
点对点模式
消费者主动拉去数据,消息收到后清除消息
 
 
发布/订阅模式
• 可以有多个topic主题(浏览、点赞、收藏、评论等)
• 消费者消费数据之后,不删除数据
• 每个消费者相互独立,都可以消费到数据
 
 
基础架构
 
 
 
 
(1)Producer:消息生产者,就是向Kafka broker发消息的客户端。
 (2)Consumer:消息消费者,向Kafka broker取消息的客户端。
 (3)Consumer Group(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
 (4)Broker:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
 (5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。
 (6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
 (7)Replica:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower。
 (8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
 (9)Follower:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader。
Kafka集群部署
集群规划
| hadoop102 | hadoop103 | hadoop104 | 
|---|---|---|
| zk | zk | zk | 
| kafka | kafka | kafka | 
下载解压
官网地址;https://kafka.apache.org/downloads
# 下载
cd /opt/module
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz
# 解压
tar -zxvf kafka_2.12-3.4.0.tgz -C /opt/module/
# 改名
mv kafka_2.12-3.4.0/ kafka
 
 
修改配置文件
路径:config/server.properties
- broker.id=0
- log.dirs=/opt/module/kafka/datas
- zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
#broker的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
分发安装包
这一步的作用是让另外几台服务器也都有 kafka/ 这套文件
- 命令:xsync kafka/
- 下面是xsync.sh脚本的内容
#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
    echo Not Enough Arguement!
    exit;
fi
#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
    echo ====================  $host  ====================
    #3. 遍历所有目录,挨个发送
    for file in $@
    do
        #4. 判断文件是否存在
        if [ -e $file ]
            then
                #5. 获取父目录
                pdir=$(cd -P $(dirname $file); pwd)
                #6. 获取当前文件的名称
                fname=$(basename $file)
                ssh $host "mkdir -p $pdir"
                rsync -av $pdir/$fname $host:$pdir
            else
                echo $file does not exists!
        fi
    done
done
hadoop103、hadoop104修改配置文件
# broker.id不得重复,整个集群中唯一
# hadoop103对应的
broker.id=1
# hadoop104对应的
broker.id=2
配置环境变量
路径:vim /etc/profile.d/my_env.sh
注意:集群内的机子都需要配一遍
# KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
刷新一下环境变量
source /etc/profile
启动集群
先启动Zookeeper集群
kafka/zk.sh start
vim kafka/zk.sh
#!/bin/bash
case $1 in
"start"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo  ------------- zookeeper $i 启动 ------------
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
	done
}
;;
"stop"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo  ------------- zookeeper $i 停止 ------------
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
	done
}
;;
"status"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo  ------------- zookeeper $i 状态 ------------
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
	done
}
;;
esac
然后启动Kafka
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties
关闭集群
bin/kafka-server-stop.sh
集群启停脚本
脚本编写
vim kf.sh
#! /bin/bash
case $1 in
"start"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------启动 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
    done
};;
"stop"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------停止 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
    done
};;
esac
添加执行权限
chmod +x kf.sh
启动集群脚本命令
kf.sh start
停止集群脚本命令
kf.sh stop
注意: 停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。
Docker启动Kafka集群
docker-compose.yml编写
version: '3'
services:
  li-zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "7181:2181"
    networks:
      - li-kafka-net
  li-kafka-1:
    image: wurstmeister/kafka
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://139.196.169.148:7091
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: li-zookeeper:2181
    ports:
      - "7091:9092"
    networks:
      - li-kafka-net
  li-kafka-2:
    image: wurstmeister/kafka
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://139.196.169.148:7092
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: li-zookeeper:2181
    ports:
      - "7092:9092"
    networks:
      - li-kafka-net
  li-kafka-3:
    image: wurstmeister/kafka
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://139.196.169.148:7093
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: li-zookeeper:2181
    ports:
      - "7093:9092"
    networks:
      - li-kafka-net
  li-kafka-map:
    image: dushixiang/kafka-map:latest
    environment:
      KAFKA_MAP_KAFKA_SERVERS: li-kafka-1:9092,li-kafka-2:9092,li-kafka-3:9092
      KAFKA_MAP_USERNAME: admin
      KAFKA_MAP_PASSWORD: admin
    ports:
      - "8080:8080"
    networks:
      - li-kafka-net
networks:
  li-kafka-net:
    driver: bridge
启动compose
docker-compose up -d
命令行验证
docker exec -it kafka-server bash
cd /opt/bitnami/kafka
列出所有topics
kafka-topics.sh --bootstrap-server 139.196.169.148:7091 --list
创建一个topic
kafka-topics.sh --bootstrap-server 139.196.169.148:7092 --create --partitions 1 --replication-factor 3 --topic first
生产者
kafka-console-producer.sh --bootstrap-server 139.196.169.148:7091  --topic first 
消费者
kafka-console-consumer.sh --bootstrap-server 139.196.169.148:7092 --from-beginning --topic first
Python验证
-  安装包 pip install kafka-python
-  生产者 from kafka import KafkaProducer from kafka.errors import KafkaError # Kafka 集群地址 bootstrap_servers = ['139.196.169.148:7091', '139.196.169.148:7092', '139.196.169.148:7093'] # Kafka 主题名称 topic = 'first' # 创建 Kafka 生产者 producer = KafkaProducer(bootstrap_servers=bootstrap_servers) # 发送消息到 Kafka 主题 def send_message(message): try: producer.send(topic, message.encode('utf-8')) producer.flush() print('Message sent successfully:', message) except KafkaError as e: print('Failed to send message:', e) # 测试发送消息 send_message('Hello, Kafka!')
-  消费者 from kafka import KafkaConsumer # Kafka 集群地址 bootstrap_servers = ['139.196.169.148:7091', '139.196.169.148:7092', '139.196.169.148:7093'] # Kafka 主题名称 topic = 'first' # 创建 Kafka 消费者 consumer = KafkaConsumer( topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest', # 从最早的消息开始消费 enable_auto_commit=True, # 自动提交消费位移 group_id='my-group') # 消费者组名称 # 从 Kafka 主题消费消息 def consume_message(): for message in consumer: print('Message received:', message.value.decode('utf-8')) # 测试消费消息 consume_message()



















