文章目录
- 高性能消息中间件 - Kafka3.x(四)
- Kafka监控-Kafka eagle(EFAK 2.1.0版本)⭐
- 修改每个kafka的配置信息
- 启动MySQL
- 在mysql中创建名为ke的数据库
- 开始安装并启动kafka-eagle
- Kafka的Kraft模式(新版Kafka特性,很重要)⭐
- Kraft模式概述
- 搭建kafka-Kraft 3.2.1集群⭐
- 搭建kafka01服务器的Kraft⭐
- 搭建kafka02服务器的Kraft⭐
- Kafka整合Spring Boot⭐
- 环境搭建
- pom.xml
- application.yml
- ProducerController类(生产者)
- MyConsumer类(消费者)
- 性能调优⭐
- 案例:根据场景选择硬件
- 根据场景选择服务器台数
- 根据场景选择磁盘
- 根据场景选择内存
- 根据场景选择CPU
- 根据场景选择CPU
- 提高系统吞吐量
- 提升生产者吞吐量
- 提高消费者吞吐量
- 设置合适的分区数
高性能消息中间件 - Kafka3.x(四)
Kafka监控-Kafka eagle(EFAK 2.1.0版本)⭐
修改每个kafka的配置信息
- 1:关闭每个kafka服务器:
kafka-server-stop.sh
- 2:修改文件:
cd /usr/local/kafka/bin/
vim kafka-server-start.sh
找到下面这个:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
将上面这个代码删除掉,并粘贴如下代码:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
- 3:重新启动kafka:
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
启动MySQL
- 1:编写配置文件:
mkdir -p /kf-mysql/conf
vim /kf-mysql/conf/my.cnf
内容如下:
[client]
# 指定编码格式为utf8,默认的MySQL会有中文乱码问题
default_character_set=utf8
[mysqld]
collation_server=utf8_general_ci
character_set_server=utf8
# 全局唯一id(不允许有相同的)
server_id=100
binlog-ignore-db=mysql
# 指定MySQL二进制日志
log-bin=order-mysql-bin
# binlog最大容量
binlog_cache_size=1M
# 二进制日志格式(这里指定的是混合日志)
binlog_format=mixed
# binlog的有效期(单位:天)
expire_logs_days=7
slave_skip_errors=1062
- 2:运行mysql:
docker run -p 3308:3306 \
-v /kf-mysql/log:/var/log/mysql \
-v /kf-mysql/data:/var/lib/mysql \
-v /kf-mysql/conf:/etc/mysql \
-e MYSQL_ROOT_PASSWORD=123456 \
--name mysql-kafka \
-d mysql:5.7
在mysql中创建名为ke的数据库
CREATE DATABASE ke;
开始安装并启动kafka-eagle
- 1:下载EFAK(本次版本为2.1.0),解压成tar.gz包并上传服务器:
下载地址
- 2:查看EFAK的包:
[root@kafka01 ~]# ll | grep kafka-eagle
-rw-r--r--. 1 root root 83512603 9月 16 23:55 kafka-eagle-bin-2.1.0.tar.gz
- 3:解压:
tar -zvxf kafka-eagle-bin-2.1.0.tar.gz
cd kafka-eagle-bin-2.1.0
tar -xzvf efak-web-2.1.0-bin.tar.gz -C /usr/local
cd /usr/local/
mv efak-web-2.1.0 kafka-eagle/
- 4:修改配置文件
cd /usr/local/kafka-eagle/conf
cp system-config.properties system-config.properties_bak
vi system-config.properties
我的配置文件如下:
需要修改的内容如下:
1:efak.zk.cluster.alias=cluster1 #Kafka使用的Zookeeper集群别名,多个集群用逗号分隔。
2:cluster1.zk.list=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181 #zookeeper.connect地址
3:cluster1.efak.offset.storage=kafka #存储消费信息的类型,一般在0.9版本之前,消费信息会默认存储在Zookeeper中,在0.10版本之后,消费者信息默认存储在 Kafka中,存储类型需要设置为kafka。
4:efak.driver=com.mysql.cj.jdbc.Driver #链接mysql8.0驱动,低版本数据库需修改5:efak.url= #mysql数据库的url(注意端口号)
6:efak.username= #mysql数据库用户名
7:efak.password= #mysql数据库密码
efak.zk.cluster.alias=cluster1
cluster1.zk.list=kafka01:2181,kafka02:2181,kafka03:2181/kafka
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123
cluster1.efak.broker.size=20
kafka.zk.limit.size=16
efak.webui.port=8048
efak.distributed.enable=false
efak.cluster.mode.status=master
efak.worknode.master.host=localhost
efak.worknode.port=8085
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456
cluster1.efak.offset.storage=kafka
cluster2.efak.offset.storage=zk
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
efak.metrics.charts=true
efak.metrics.retain=15
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10
efak.topic.token=keadmin
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
cluster2.efak.sasl.enable=false
cluster2.efak.sasl.protocol=SASL_PLAINTEXT
cluster2.efak.sasl.mechanism=PLAIN
cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
cluster2.efak.sasl.client.id=
cluster2.efak.blacklist.topics=
cluster2.efak.sasl.cgroup.enable=false
cluster2.efak.sasl.cgroup.topics=
cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=
efak.driver=com.mysql.jdbc.Driver
efak.url=jdbc:mysql://kafka01:3308/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456
- 5:添加环境变量
vim /etc/profile
在最后添加内容如下:
# kafka-eagle
export KE_HOME=/usr/local/kafka-eagle
export PATH=$PATH:$KE_HOME/bin
刷新配置:
source /etc/profile
- 6:启动kafka-eagle(前提是要启动zookeeper和kafka)
ke.sh start
- 7:访问kafka-eagle:(ip+8048)
Kafka的Kraft模式(新版Kafka特性,很重要)⭐
Kraft模式概述
-
早期版本的kafka是要强依赖于zookeeper的,没有zookeeper的话kafka便启动不了。但是新版的kafka打破了这一垄断,可以完全不需要zookeeper了,这就是Kraft模式。
-
Kraft模式的好处有:
- 1:kafka不用依赖于zookeeper这些外部中间件,不用担心哪一天zookeeper停更了。
- 2:kafka可以不用从zookeeper中获取数据了,使得kafka集群的性能大幅提高。
- 3:kafka不依赖于zookeeper,可以不用担心zookeeper的性能问题影响到kafka的性能。
搭建kafka-Kraft 3.2.1集群⭐
IP地址 | 主机名 | 需要安装的资源 | 操作系统 | node-id |
---|---|---|---|---|
192.168.184.201 | kafka01 | jdk、Docker、zookeeper、Kafka | centos7.9 | 21 |
192.168.184.202 | kafka02 | jdk、Docker、zookeeper、Kafka | centos7.9 | 22 |
搭建kafka01服务器的Kraft⭐
- 1:下载并上传kafka的tgz包到服务器:
[root@kafka01 ~]# ll | grep kafka_2.13-3.2.1
-rw-r--r--. 1 root root 103956099 8月 30 11:56 kafka_2.13-3.2.1.tgz
- 2:解压:
mkdir -p /usr/local/kafka-kraft
tar -zxvf kafka_2.13-3.2.1.tgz -C /usr/local/kafka-kraft
- 3:改名:
cd /usr/local/kafka-kraft
mv kafka_2.13-3.2.1/ kafka
- 4:修改配置文件(一定是在kraft目录下的文件)
cd /usr/local/kafka-kraft/kafka/config/kraft/
rm -f server.properties
vi server.properties
内容如下:(下面几点要进行修改)
1:node.id要修改成全局唯一。(kafka01和kafka02的nodeid分别规划为21、22)
2:controller.quorum.voters 的格式为: node.id1@host1:port1,node.id2@host2:port2。(比如21@kafka01:9093,22@kafka02:9093)
3:advertised.listeners:kafka对外暴露地址(比如PLAINTEXT://kafka01:9092)
4:log.dirs:kafka数据存储目录
#kafka 的角色(controller相当于主机、broker节点相当于从机,主机类似zk功能
process.roles=broker,controller
#节点id,要唯一(要修改)
node.id=21
# (要修改)
controller.quorum.voters=21@kafka01:9093,22@kafka02:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
#kafka对外暴露地址(要修改)
advertised.listeners=PLAINTEXT://kafka01:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
#kafka存储目录(要修改)
log.dirs=/usr/local/kafka-kraft/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
-
5:初始化集群数据目录:
- 5.1:生成存储目录唯一 ID:(例如生成的id为HP-6Pp5sSXuyhuK8nXiG6Q)
[root@kafka01 bin]# /usr/local/kafka-kraft/kafka/bin/kafka-storage.sh random-uuid HP-6Pp5sSXuyhuK8nXiG6Q
- 5.2:用该 ID格式化 kafka存储目录
- 把 -t 后面的字符串修改成你上面生成的id。
/usr/local/kafka-kraft/kafka/bin/kafka-storage.sh format -t HP-6Pp5sSXuyhuK8nXiG6Q -c /usr/local/kafka-kraft/kafka/config/kraft/server.properties
-
6:启动Kraft模式的kafka集群:(注意:-daemon后面的路径是kraft目录下的server.properties,而不是普通kafka目录下的server.properties)
kafka-server-start.sh -daemon /usr/local/kafka-kraft/kafka/config/kraft/server.properties
搭建kafka02服务器的Kraft⭐
- 1:下载并上传kafka的tgz包到服务器:
[root@kafka01 ~]# ll | grep kafka_2.13-3.2.1
-rw-r--r--. 1 root root 103956099 8月 30 11:56 kafka_2.13-3.2.1.tgz
- 2:解压:
mkdir -p /usr/local/kafka-kraft
tar -zxvf kafka_2.13-3.2.1.tgz -C /usr/local/kafka-kraft
- 3:改名:
cd /usr/local/kafka-kraft
mv kafka_2.13-3.2.1/ kafka
- 4:修改配置文件(一定是在kraft目录下的文件)
cd /usr/local/kafka-kraft/kafka/config/kraft/
rm -f server.properties
vi server.properties
内容如下:(下面几点要进行修改)
1:node.id要修改成全局唯一。(kafka01和kafka02的nodeid分别规划为21、22)
2:controller.quorum.voters 的格式为: node.id1@host1:port1,node.id2@host2:port2。(比如21@kafka01:9093,22@kafka02:9093)
3:advertised.listeners:kafka对外暴露地址(比如PLAINTEXT://kafka02:9092)
4:log.dirs:kafka数据存储目录
#kafka 的角色(controller相当于主机、broker节点相当于从机,主机类似zk功能
process.roles=broker,controller
#节点id,要唯一(要修改)
node.id=22
# (要修改)
controller.quorum.voters=21@kafka01:9093,22@kafka02:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
#kafka对外暴露地址(要修改)
advertised.listeners=PLAINTEXT://kafka02:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
#kafka存储目录(要修改)
log.dirs=/usr/local/kafka-kraft/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
-
5:初始化集群数据目录:
- 5.1:生成存储目录唯一 ID:(例如生成的id为oMtDFcKWT7yx0G827t3qhQ)
[root@kafka02 kraft]# /usr/local/kafka-kraft/kafka/bin/kafka-storage.sh random-uuid oMtDFcKWT7yx0G827t3qhQ
- 5.2:用该 ID格式化 kafka存储目录
- 把 -t 后面的字符串修改成你上面生成的id。
/usr/local/kafka-kraft/kafka/bin/kafka-storage.sh format -t oMtDFcKWT7yx0G827t3qhQ -c /usr/local/kafka-kraft/kafka/config/kraft/server.properties
-
6:启动Kraft模式的kafka集群:(注意:-daemon后面的路径是kraft目录下的server.properties,而不是普通kafka目录下的server.properties)
/usr/local/kafka-kraft/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka-kraft/kafka/config/kraft/server.properties
- 7:停止kafka集群:
/usr/local/kafka-kraft/kafka/bin/kafka-server-stop.sh
Kafka整合Spring Boot⭐
环境搭建
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafka-springboot-test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<!-- springboot版本-->
<spring-boot.version>2.6.7</spring-boot.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- spring整合kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>
<!-- springboot-web依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
application.yml
# 端口
server:
port: 8010
# 配置kafka
spring:
kafka:
# 配置kafka集群地址列表
bootstrap-servers:
- 192.168.184.201:9092
- 192.168.184.202:9092
listener:
#手动提交第1步:ack设置为手动(enable-auto-commit要设置为false)
# manual_immediate:每处理完业务手动调用Acknowledgment.acknowledge()后立即提交
ack-mode: manual_immediate
# kafka生产者配置
producer:
# 指定key和value的序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 指定压缩类型(这里指定为snappy)
compression-type: snappy
# 每批次大小。16384=16k(默认值也是这个)
batch-size: 16384
# 缓冲区大小,默认32M,建议改成64M(也就是67108864)。
buffer-memory: 67108864
# 重试次数(默认int的最大值)
retries: 3
properties:
# 配置linger.ms,建议5-100ms(默认是0ms)
linger.ms: 10
# 应答等级(设置为-1)
acks: -1
# kafka消费者配置
consumer:
# 配置kafka集群地址列表
bootstrap-servers:
- 192.168.184.201:9092
- 192.168.184.202:9092
# 指定key和value的反序列化器
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#指定消费者组id
group-id: boot-group
#手动提交第2步:开启手动提交offset(true的话就是消费完一条消息自动会提交)
# 为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 消费者最大能够拉取的消息数量(默认是500条),可以设置为1000
max-poll-records: 1000
ProducerController类(生产者)
package com.boot.controller;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.beans.factory.annotation.Autowired;
@RestController
@Slf4j
public class ProducerController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
// topic名称
private static final String TOPIC="java-api-test";
@GetMapping(path = "/send/{msg}")
public String send(@PathVariable("msg") String msg){
try {
/**
* TOPIC:要发送的主题
* msg:要发送的数据
*/
//开始发送消息,并添加回调函数
kafkaTemplate.send(TOPIC, msg).addCallback(new ListenableFutureCallback<SendResult<String, String>>(){
@Override
public void onFailure(Throwable ex) {
log.error("消息发生失败----onFailure:"+ex.getMessage());
throw new RuntimeException("消息发生失败抛出异常");
}
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("消息发生成功----onSuccess:"+result.toString());
}
});
return "消息发送成功--200";
}catch (Exception e){
return "消息发送失败--404";
}
}
}
MyConsumer类(消费者)
package com.boot.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MyConsumer {
// topic名称
private static final String TOPIC="java-api-test";
/**
*
* @param consumerRecord 消费的数据
* @param ack 手动确认消息
*/
//topics:需要监听的topic名称
@KafkaListener(topics = TOPIC)
public void consumer(ConsumerRecord<String,String> consumerRecord, Acknowledgment ack){
try {
//获取消费的数据内容
String data = consumerRecord.value();
log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}",
consumerRecord.topic(), consumerRecord.key(), consumerRecord.partition(), consumerRecord.offset(), data);
//最后,如果上面的代码没有报错的情况下,就可以确认消息了。(很重要)
ack.acknowledge();
}catch (Exception e){
throw new RuntimeException("kafka消费失败");
}
}
}
性能调优⭐
案例:根据场景选择硬件
场景:假如我们的系统每天都有200w人访问,每个人每天产生的日志大概是100条。
计算结果如下:
每天的总日志数是:200w*100条=2亿条
每秒产生的日志数是:2亿/24小时/60分钟/60秒=2314条
每条日志大小:假设为1k
生产者平时每秒数据量:2314*1k≈2.3m/s
高峰期每秒消息数(假设会比平时多20倍):2314*20=46280条;
生产者高峰期每秒数据量:46280*1k≈46m/s
根据场景选择服务器台数
- 公式:
- 服务器台数(有小数要去掉小数再+1)=2(生产者高峰期每秒数据量 * 副本/100)+1*
- 根据公式来计算上面场景需要多少台服务器:(假设副本数为3)
- 服务器台数:2(46 * 3 / 100)+1 = 2*1.38+1=4台*
根据场景选择磁盘
-
计算如下:
-
磁盘类型建议选择:机械硬盘(因为kafka底层是顺序写)
-
每天总数据量:2亿条*1k=200G
-
200G*副本数(假设副本数为3) * 保存时间7天/0.7=6T
-
总结:上述场景建议选择机械硬盘,并且3台服务器硬盘总大小为6T。
根据场景选择内存
-
kafka内存=堆内存+页缓存
-
堆内存:建议每个kafka节点都要分配10g-15g。
- 修改kafka-server-start.sh配置为如下:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G" fi
-
页缓存:每个kafka节点的页缓存大小 =(分区数 * 1g (一个segment大小) 25%)/ 节点数。例如 10 个分区,页缓存大小 =(10 * 1g * 25%)/ 3 ≈ 1g*
-
-
根据上面可以计算出每一个kafka服务器的内存大小建议为:10G+1G=11G
根据场景选择CPU
-
cpu总核心数:32
-
num.io.threads = 16 负责写磁盘的线程数,整个参数值要占总核数的 50%。
-
num.replica.fetchers = 5 副本拉取线程数,这个参数占总核数的 50%的 1/3。
-
num.network.threads = 11 数据传输线程数,这个参数占总核数的 50%的 2/3。
根据场景选择CPU
- 推荐的网络带宽 = 我们系统的高峰期数据量 ( 46MB/s )= 千兆网络(1000Mbps=125m/s)即可
- 百兆的网络(100Mbps =12.5m/s)、千兆的网络(1000Mbps=125m/s)、万兆的网络(10000Mbps=1250m/s)
提高系统吞吐量
提升生产者吞吐量
-
1:buffer.memory:发送消息的缓冲区大小,默认值是 32m,可以增加到 64m。
-
2:batch.size:默认是 16k。如果 batch 设置太小,会导致频繁网络请求,吞吐量下降;如果 batch 太大,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
-
3:linger.ms,这个值默认是 0,意思就是消息必须立即被发送。推荐设置5-100
毫秒。如果 linger.ms 设置的太小,会导致频繁网络请求,吞吐量下降;如果 linger.ms 太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。 -
4:compression.type:默认是 none,不压缩,但是也可以使用 lz4 压缩,效率还是不
错的,压缩之后可以减小数据量,提升吞吐量,但是会加大 producer 端的 CPU 开销。 -
5:增加分区
提高消费者吞吐量
- 1:调整 fetch.max.bytes 大小,默认是 50m。
- 2:调整 max.poll.records 大小,默认是 500 条。
- 3:增加下游消费者处理能力
设置合适的分区数
- 假设我们系统的吞吐量数据为:producer 吞吐量 = 46m/s;consumer 吞吐量 = 80m/s,我们最终期望吞吐量为150m/s;
- 分区数选择 = (期望吞吐量)/ (生产者吞吐量)=150 /46≈4个分区
- 分区数一般设置为:3-10 个