kafka
- 概述
 - kafka版本
 - 流程
 - 启动zk
 - 配置zk
 - 启动命令
 
- 启动kafka
 - 修改server.properties
 - 启动命令
 
- kafka脚本-命令行操作
 - 命令行
 - 创建主题脚本
 - 查看主题
 - 主题详情
 - 修改主题
 - 删除主题
 - 大量日志
 - 解决方案
 
- 控制台生产者消费者
 - 代码 生产者 消费者
 - kafka-tool
 - kafka数据文件
 
- 扩展
 - 横向扩展
 - 纵向扩展
 - 分区
 - 消费者组
 - 备份
 - 分区
 - Controller选举
 - 图解kafka架构
 
- Windows集群
 - cluster目录
 - zk
 - broker_1
 - broker_2
 - broker_3
 - kafka & zk启动脚本
 - cluster.bat cluster-clear-data.bat
 - znode节点
 - 临时节点 持久化节点
 - watch 节点监听
 - kafka在zookeeper中的组成
 
- zk选举kafka
 - 副本分配
 - 副本分配策略
 
- 发送数据流程
 - 生产数据
 - 消费数据
 - 拦截器
 - 添加拦截器
 - 实现自定义拦截器
 
- 分区器 分区计算策略
 - 自定义分区器
 - 分区计算策略
 
- 数据收集器
 - 数据发送者
 - 异步数据发送回调
 - 同步发送数据回调
 - 应答处理级别
 - ack == 0
 - ack == 1
 - ack == -1(all)
 
- kafka幂等性
 - 初始化事务
 
- 存储文件类型
 - 刷写数据条件
 
- 存储数据
 
概述
课程地址为 https://www.bilibili.com/video/BV1Gp421m7UN/
kafka版本
2.12-3.6.1
 
流程

启动zk
配置zk
需要启动zk,作为注册中心
 kafka内置了zk,直接命令启动即可
 在 kafka 的 config目录下,修改zookeeper.properties 配置文件
# 这里面配置的是有关数据存放的目录
dataDir=E:/kafka/kafka2.xxx/data/zk
 
启动命令
在 kafka/bin/windows 下,找到 zookeeper-server-start.bat,运行如下命令…/…/config/zookeeper.properties 文件
zookeeper-server-start.bat  ../../config/zookeeper.properties
 
这么一来,可以直接启动zk.出现如下图: 启动成功
 
启动kafka
修改server.properties
修改 log.dirs 的值
 
启动命令
kafka-server-start.bat config.properties
kafka脚本-命令行操作
提供了一定的脚本,通过脚本操作kafka
可以进行创建主题 发送消息等操作
 
命令行
创建主题脚本
相关脚本
 kafka-topics.bat
 kafka提供的主题脚本
 创建 test 主题
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --create
 
查看主题
kafka-topics.bat --bootstrap-server localhost:9092 --list
主题详情
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --describe
修改主题
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --alter --partitions
 
 将红圈部分改为2
 效果如下
 
删除主题
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --delete
 windows环境下会导致kafka停止运行的错误
 由于权限或进程锁定,会导致kafka被关闭
大量日志
由于JDK版本问题导致
解决方案
- 切换JDK17
 - 在kafka-run-class.bat 中设置 java_home

 
控制台生产者消费者
通过控制台控制生产
 启动生产者脚本
 消费者启动脚本
 生产者发送消息,消费者自动获取消息
 
代码 生产者 消费者
只引入 kafka 的依赖.进行代码
 生产者 创建 topic 以及 消息
 消费者 订阅 topic 以及消费消息
<dependencies>
	<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>3.6.1</version>
	</dependency>
</dependencies>
 
具体代码请百度.只引入 kafka-client即可.
kafka-tool

 这里百度去学习
kafka数据文件
.log结尾的就是数据文件
 因为 kafka 以前是做日志传输的.
扩展
横向扩展
增加机器
纵向扩展
增加内存 硬盘 宽带等…
分区
运行多个kafka实例. 对 kafkaBroker 的相同topic进行编号.
 这叫做分区
 生产者向不同分区传送数据
消费者组
消费者消费所有的分区topic
备份
可以对数据文件进行备份.
 分区备份
 同一 topic 的是数据文件备份到其他分区的broker上面
 
 但是多个副本,只有一个副本能够提供数据的读写.其他是从节点,负责备份
分区
Leader Follower
 Partition 分区
 Broker 服务节点集群
 Broker Controller: Broker 服务节点集群管理者
 Broker Controller Standby : 服务集群管理者备份
Controller选举
BrokerController down之后,Standby启动.
 BrokerStandbyController启动.
 BrokerStandbyController出现问题,可以继续选择其他主体 Broker ,选举出来一个BrokerController
图解kafka架构
KogManager: 数据管理器
 
Windows集群
cluster目录
创建cluster目录在硬盘根目录.解压 kafka,复制到该目录.如图
 
zk
修改kafka自带的zkdata存放目录.然后启动
 
broker_1

 
broker_2
和 broker_1 一样
broker_3
和 broker_2 一样
kafka & zk启动脚本
略...
cluster.bat cluster-clear-data.bat
略...
znode节点
临时节点 持久化节点
watch 节点监听
监听kafka的客户端.
kafka在zookeeper中的组成
略…
zk选举kafka
假设一共三个kafka节点
 brokerController1 down了.
 剩下的两个节点,监听zk,发现broker1掉了之后,会发起请求,谁的请求先到.谁就是 新的 brokerController
副本分配
对副本进行分配,放到不同的broker中
 
副本分配策略
…
发送数据流程

 总的就是这张图
 
生产数据
消费数据
拦截器
添加拦截器

实现自定义拦截器
public class ValueInterceptorTest implements ProducerInterceptor<String>{
	
}
 

分区器 分区计算策略
发送消息.
自定义分区器
实现接口重写方法
public class myKafkaParationer implements Partitioner{}
 

分区计算策略
略...
数据收集器
数据发送者
异步数据发送回调
producer.send(msg,callBackMethod);
 

同步发送数据回调

 这就同步操作了
应答处理级别
ack 的值
ack == 0
ack == 1
ack == all(-1)
 
ack == 0
ack == 1
ack == -1(all)
kafka幂等性
初始化事务

 prodicer.initTransaction(); // 开启事务
prodicer.commit(); // 提交事务
prodicer.abortTranscation(); // 终止事务
存储文件类型
刷写数据条件

一条数据就从内存刷到硬盘上



















