高性能消息中间件 - Kafka3.x(四)

news2025/7/20 13:48:27

文章目录

    • 高性能消息中间件 - Kafka3.x(四)
      • Kafka监控-Kafka eagle(EFAK 2.1.0版本)⭐
        • 修改每个kafka的配置信息
        • 启动MySQL
        • 在mysql中创建名为ke的数据库
        • 开始安装并启动kafka-eagle
      • Kafka的Kraft模式(新版Kafka特性,很重要)⭐
        • Kraft模式概述
        • 搭建kafka-Kraft 3.2.1集群⭐
        • 搭建kafka01服务器的Kraft⭐
        • 搭建kafka02服务器的Kraft⭐
      • Kafka整合Spring Boot⭐
        • 环境搭建
          • pom.xml
          • application.yml
          • ProducerController类(生产者)
          • MyConsumer类(消费者)
      • 性能调优⭐
        • 案例:根据场景选择硬件
          • 根据场景选择服务器台数
          • 根据场景选择磁盘
          • 根据场景选择内存
          • 根据场景选择CPU
          • 根据场景选择CPU
        • 提高系统吞吐量
          • 提升生产者吞吐量
          • 提高消费者吞吐量
          • 设置合适的分区数

高性能消息中间件 - Kafka3.x(四)

Kafka监控-Kafka eagle(EFAK 2.1.0版本)⭐

修改每个kafka的配置信息
  • 1:关闭每个kafka服务器:
kafka-server-stop.sh 
  • 2:修改文件:
cd /usr/local/kafka/bin/
vim kafka-server-start.sh

找到下面这个:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

将上面这个代码删除掉,并粘贴如下代码:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

在这里插入图片描述

  • 3:重新启动kafka:
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
启动MySQL
  • 1:编写配置文件:
mkdir -p /kf-mysql/conf
vim /kf-mysql/conf/my.cnf

内容如下:

[client]
# 指定编码格式为utf8,默认的MySQL会有中文乱码问题
default_character_set=utf8
[mysqld]
collation_server=utf8_general_ci
character_set_server=utf8

# 全局唯一id(不允许有相同的)
server_id=100
binlog-ignore-db=mysql
# 指定MySQL二进制日志
log-bin=order-mysql-bin
# binlog最大容量
binlog_cache_size=1M
# 二进制日志格式(这里指定的是混合日志)
binlog_format=mixed
# binlog的有效期(单位:天)
expire_logs_days=7
slave_skip_errors=1062
  • 2:运行mysql:
docker run -p 3308:3306 \
-v /kf-mysql/log:/var/log/mysql \
-v /kf-mysql/data:/var/lib/mysql \
-v /kf-mysql/conf:/etc/mysql \
-e MYSQL_ROOT_PASSWORD=123456 \
--name mysql-kafka \
-d mysql:5.7
在mysql中创建名为ke的数据库
CREATE DATABASE	ke;

在这里插入图片描述

开始安装并启动kafka-eagle
  • 1:下载EFAK(本次版本为2.1.0),解压成tar.gz包并上传服务器:

下载地址

  • 2:查看EFAK的包:
[root@kafka01 ~]# ll | grep kafka-eagle
-rw-r--r--. 1 root root  83512603 916 23:55 kafka-eagle-bin-2.1.0.tar.gz
  • 3:解压:
tar -zvxf kafka-eagle-bin-2.1.0.tar.gz
cd kafka-eagle-bin-2.1.0
tar -xzvf efak-web-2.1.0-bin.tar.gz -C /usr/local
cd /usr/local/
mv efak-web-2.1.0 kafka-eagle/
  • 4:修改配置文件
cd /usr/local/kafka-eagle/conf
cp system-config.properties system-config.properties_bak
vi system-config.properties

我的配置文件如下:

需要修改的内容如下:

1:efak.zk.cluster.alias=cluster1 #Kafka使用的Zookeeper集群别名,多个集群用逗号分隔。
2:cluster1.zk.list=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181 #zookeeper.connect地址
3:cluster1.efak.offset.storage=kafka #存储消费信息的类型,一般在0.9版本之前,消费信息会默认存储在Zookeeper中,在0.10版本之后,消费者信息默认存储在 Kafka中,存储类型需要设置为kafka。
4:efak.driver=com.mysql.cj.jdbc.Driver #链接mysql8.0驱动,低版本数据库需修改

5:efak.url= #mysql数据库的url(注意端口号)

6:efak.username= #mysql数据库用户名
7:efak.password= #mysql数据库密码

efak.zk.cluster.alias=cluster1
cluster1.zk.list=kafka01:2181,kafka02:2181,kafka03:2181/kafka

cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123

cluster1.efak.broker.size=20

kafka.zk.limit.size=16

efak.webui.port=8048

efak.distributed.enable=false
efak.cluster.mode.status=master
efak.worknode.master.host=localhost
efak.worknode.port=8085

cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456

cluster1.efak.offset.storage=kafka
cluster2.efak.offset.storage=zk

cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi

efak.metrics.charts=true
efak.metrics.retain=15

efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10

efak.topic.token=keadmin

cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
cluster2.efak.sasl.enable=false
cluster2.efak.sasl.protocol=SASL_PLAINTEXT
cluster2.efak.sasl.mechanism=PLAIN
cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
cluster2.efak.sasl.client.id=
cluster2.efak.blacklist.topics=
cluster2.efak.sasl.cgroup.enable=false
cluster2.efak.sasl.cgroup.topics=

cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=

efak.driver=com.mysql.jdbc.Driver
efak.url=jdbc:mysql://kafka01:3308/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456
  • 5:添加环境变量
vim /etc/profile

在最后添加内容如下:

# kafka-eagle
export KE_HOME=/usr/local/kafka-eagle
export PATH=$PATH:$KE_HOME/bin

刷新配置:

source /etc/profile
  • 6:启动kafka-eagle(前提是要启动zookeeper和kafka)
ke.sh start

在这里插入图片描述

  • 7:访问kafka-eagle:(ip+8048)

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

Kafka的Kraft模式(新版Kafka特性,很重要)⭐

Kraft模式概述
  • 早期版本的kafka是要强依赖于zookeeper的,没有zookeeper的话kafka便启动不了。但是新版的kafka打破了这一垄断,可以完全不需要zookeeper了,这就是Kraft模式。

  • Kraft模式的好处有:

    • 1:kafka不用依赖于zookeeper这些外部中间件,不用担心哪一天zookeeper停更了。
    • 2:kafka可以不用从zookeeper中获取数据了,使得kafka集群的性能大幅提高。
    • 3:kafka不依赖于zookeeper,可以不用担心zookeeper的性能问题影响到kafka的性能。
搭建kafka-Kraft 3.2.1集群⭐
IP地址主机名需要安装的资源操作系统node-id
192.168.184.201kafka01jdk、Docker、zookeeper、Kafkacentos7.921
192.168.184.202kafka02jdk、Docker、zookeeper、Kafkacentos7.922
搭建kafka01服务器的Kraft⭐
  • 1:下载并上传kafka的tgz包到服务器:
[root@kafka01 ~]# ll | grep kafka_2.13-3.2.1
-rw-r--r--. 1 root root 103956099 830 11:56 kafka_2.13-3.2.1.tgz
  • 2:解压:
mkdir -p /usr/local/kafka-kraft
tar -zxvf kafka_2.13-3.2.1.tgz -C /usr/local/kafka-kraft
  • 3:改名:
cd /usr/local/kafka-kraft
mv kafka_2.13-3.2.1/ kafka
  • 4:修改配置文件(一定是在kraft目录下的文件)
cd /usr/local/kafka-kraft/kafka/config/kraft/
rm -f server.properties
vi server.properties

内容如下:(下面几点要进行修改)

1:node.id要修改成全局唯一。(kafka01和kafka02的nodeid分别规划为21、22)

2:controller.quorum.voters 的格式为: node.id1@host1:port1,node.id2@host2:port2。(比如21@kafka01:9093,22@kafka02:9093)

3:advertised.listeners:kafka对外暴露地址(比如PLAINTEXT://kafka01:9092)

4:log.dirs:kafka数据存储目录

#kafka 的角色(controller相当于主机、broker节点相当于从机,主机类似zk功能
process.roles=broker,controller

#节点id,要唯一(要修改)
node.id=21

# (要修改)
controller.quorum.voters=21@kafka01:9093,22@kafka02:9093

listeners=PLAINTEXT://:9092,CONTROLLER://:9093

inter.broker.listener.name=PLAINTEXT

#kafka对外暴露地址(要修改)
advertised.listeners=PLAINTEXT://kafka01:9092

controller.listener.names=CONTROLLER

listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

#kafka存储目录(要修改)
log.dirs=/usr/local/kafka-kraft/data

num.partitions=1

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000
  • 5:初始化集群数据目录:

    • 5.1:生成存储目录唯一 ID:(例如生成的id为HP-6Pp5sSXuyhuK8nXiG6Q)
    [root@kafka01 bin]# /usr/local/kafka-kraft/kafka/bin/kafka-storage.sh random-uuid
    
    HP-6Pp5sSXuyhuK8nXiG6Q
    
    • 5.2:用该 ID格式化 kafka存储目录
      • 把 -t 后面的字符串修改成你上面生成的id。
    /usr/local/kafka-kraft/kafka/bin/kafka-storage.sh format -t HP-6Pp5sSXuyhuK8nXiG6Q -c /usr/local/kafka-kraft/kafka/config/kraft/server.properties
    
  • 6:启动Kraft模式的kafka集群:(注意:-daemon后面的路径是kraft目录下的server.properties,而不是普通kafka目录下的server.properties)

kafka-server-start.sh -daemon /usr/local/kafka-kraft/kafka/config/kraft/server.properties
搭建kafka02服务器的Kraft⭐
  • 1:下载并上传kafka的tgz包到服务器:
[root@kafka01 ~]# ll | grep kafka_2.13-3.2.1
-rw-r--r--. 1 root root 103956099 830 11:56 kafka_2.13-3.2.1.tgz
  • 2:解压:
mkdir -p /usr/local/kafka-kraft
tar -zxvf kafka_2.13-3.2.1.tgz -C /usr/local/kafka-kraft
  • 3:改名:
cd /usr/local/kafka-kraft
mv kafka_2.13-3.2.1/ kafka
  • 4:修改配置文件(一定是在kraft目录下的文件)
cd /usr/local/kafka-kraft/kafka/config/kraft/
rm -f server.properties
vi server.properties

内容如下:(下面几点要进行修改)

1:node.id要修改成全局唯一。(kafka01和kafka02的nodeid分别规划为21、22)

2:controller.quorum.voters 的格式为: node.id1@host1:port1,node.id2@host2:port2。(比如21@kafka01:9093,22@kafka02:9093)

3:advertised.listeners:kafka对外暴露地址(比如PLAINTEXT://kafka02:9092)

4:log.dirs:kafka数据存储目录

#kafka 的角色(controller相当于主机、broker节点相当于从机,主机类似zk功能
process.roles=broker,controller

#节点id,要唯一(要修改)
node.id=22

# (要修改)
controller.quorum.voters=21@kafka01:9093,22@kafka02:9093

listeners=PLAINTEXT://:9092,CONTROLLER://:9093

inter.broker.listener.name=PLAINTEXT

#kafka对外暴露地址(要修改)
advertised.listeners=PLAINTEXT://kafka02:9092

controller.listener.names=CONTROLLER

listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

#kafka存储目录(要修改)
log.dirs=/usr/local/kafka-kraft/data

num.partitions=1

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000
  • 5:初始化集群数据目录:

    • 5.1:生成存储目录唯一 ID:(例如生成的id为oMtDFcKWT7yx0G827t3qhQ)
    [root@kafka02 kraft]# /usr/local/kafka-kraft/kafka/bin/kafka-storage.sh random-uuid
    
    oMtDFcKWT7yx0G827t3qhQ
    
    • 5.2:用该 ID格式化 kafka存储目录
      • 把 -t 后面的字符串修改成你上面生成的id。
    /usr/local/kafka-kraft/kafka/bin/kafka-storage.sh format -t oMtDFcKWT7yx0G827t3qhQ -c /usr/local/kafka-kraft/kafka/config/kraft/server.properties
    
  • 6:启动Kraft模式的kafka集群:(注意:-daemon后面的路径是kraft目录下的server.properties,而不是普通kafka目录下的server.properties)

/usr/local/kafka-kraft/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka-kraft/kafka/config/kraft/server.properties
  • 7:停止kafka集群:
/usr/local/kafka-kraft/kafka/bin/kafka-server-stop.sh

Kafka整合Spring Boot⭐

环境搭建
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>kafka-springboot-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <!--  springboot版本-->
        <spring-boot.version>2.6.7</spring-boot.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <!--        spring整合kafka依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.70</version>
        </dependency>

        <!--        springboot-web依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--        lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


    </dependencies>

</project>
application.yml
# 端口
server:
  port: 8010
# 配置kafka
spring:
  kafka:
    # 配置kafka集群地址列表
    bootstrap-servers:
      - 192.168.184.201:9092
      - 192.168.184.202:9092
    listener:
      #手动提交第1步:ack设置为手动(enable-auto-commit要设置为false)
      # manual_immediate:每处理完业务手动调用Acknowledgment.acknowledge()后立即提交
      ack-mode: manual_immediate
    # kafka生产者配置
    producer:
      # 指定key和value的序列化器
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 指定压缩类型(这里指定为snappy)
      compression-type: snappy
      # 每批次大小。16384=16k(默认值也是这个)
      batch-size: 16384
      # 缓冲区大小,默认32M,建议改成64M(也就是67108864)。
      buffer-memory: 67108864
      # 重试次数(默认int的最大值)
      retries: 3
      properties:
        # 配置linger.ms,建议5-100ms(默认是0ms)
        linger.ms: 10
      # 应答等级(设置为-1)
      acks: -1
    # kafka消费者配置
    consumer:
      # 配置kafka集群地址列表
      bootstrap-servers:
        - 192.168.184.201:9092
        - 192.168.184.202:9092
      # 指定key和value的反序列化器
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #指定消费者组id
      group-id: boot-group
      #手动提交第2步:开启手动提交offset(true的话就是消费完一条消息自动会提交)
      # 为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 消费者最大能够拉取的消息数量(默认是500条),可以设置为1000
      max-poll-records: 1000
ProducerController类(生产者)
package com.boot.controller;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import org.springframework.beans.factory.annotation.Autowired;

@RestController
@Slf4j
public class ProducerController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    // topic名称
    private static final String TOPIC="java-api-test";


    @GetMapping(path = "/send/{msg}")
    public String send(@PathVariable("msg") String msg){

        try {
            /**
             * TOPIC:要发送的主题
             * msg:要发送的数据
             */
            //开始发送消息,并添加回调函数
            kafkaTemplate.send(TOPIC, msg).addCallback(new ListenableFutureCallback<SendResult<String, String>>(){

                @Override
                public void onFailure(Throwable ex) {

                    log.error("消息发生失败----onFailure:"+ex.getMessage());
                    throw new RuntimeException("消息发生失败抛出异常");
                }

                @Override
                public void onSuccess(SendResult<String, String> result) {

                    log.info("消息发生成功----onSuccess:"+result.toString());
                }

            });
            return "消息发送成功--200";
        }catch (Exception e){
            return "消息发送失败--404";
        }
    }
}
MyConsumer类(消费者)
package com.boot.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyConsumer {

    // topic名称
    private static final String TOPIC="java-api-test";

    /**
     *
     * @param consumerRecord 消费的数据
     * @param ack 手动确认消息
     */
    //topics:需要监听的topic名称
    @KafkaListener(topics = TOPIC)
    public void consumer(ConsumerRecord<String,String> consumerRecord, Acknowledgment ack){

        try {
            //获取消费的数据内容
            String data = consumerRecord.value();
            log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}",
                    consumerRecord.topic(), consumerRecord.key(), consumerRecord.partition(), consumerRecord.offset(), data);
            //最后,如果上面的代码没有报错的情况下,就可以确认消息了。(很重要)
            ack.acknowledge();
        }catch (Exception e){
            throw new RuntimeException("kafka消费失败");
        }
    }
}

性能调优⭐

案例:根据场景选择硬件

场景:假如我们的系统每天都有200w人访问,每个人每天产生的日志大概是100条

计算结果如下:

每天的总日志数是:200w*100条=2亿条

每秒产生的日志数是:2亿/24小时/60分钟/60秒=2314条

每条日志大小:假设为1k

生产者平时每秒数据量:2314*1k≈2.3m/s

高峰期每秒消息数(假设会比平时多20倍):2314*20=46280条;

生产者高峰期每秒数据量:46280*1k≈46m/s

根据场景选择服务器台数
  • 公式:
    • 服务器台数(有小数要去掉小数再+1)=2(生产者高峰期每秒数据量 * 副本/100)+1*
  • 根据公式来计算上面场景需要多少台服务器:(假设副本数为3)
    • 服务器台数:2(46 * 3 / 100)+1 = 2*1.38+1=4台*
根据场景选择磁盘
  • 计算如下:

  • 磁盘类型建议选择:机械硬盘(因为kafka底层是顺序写)

  • 每天总数据量:2亿条*1k=200G

  • 200G*副本数(假设副本数为3) * 保存时间7天/0.7=6T

  • 总结:上述场景建议选择机械硬盘,并且3台服务器硬盘总大小为6T。

根据场景选择内存
  • kafka内存=堆内存+页缓存

    • 堆内存:建议每个kafka节点都要分配10g-15g。

      • 修改kafka-server-start.sh配置为如下:
      if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
      export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"
      fi
      
    • 页缓存:每个kafka节点的页缓存大小 =(分区数 * 1g (一个segment大小) 25%)/ 节点数。例如 10 个分区,页缓存大小 =(10 * 1g * 25%)/ 3 ≈ 1g*

  • 根据上面可以计算出每一个kafka服务器的内存大小建议为:10G+1G=11G

根据场景选择CPU
  • cpu总核心数:32

  • num.io.threads = 16 负责写磁盘的线程数,整个参数值要占总核数的 50%

  • num.replica.fetchers = 5 副本拉取线程数,这个参数占总核数的 50%的 1/3

  • num.network.threads = 11 数据传输线程数,这个参数占总核数的 50%的 2/3

根据场景选择CPU
  • 推荐的网络带宽 = 我们系统的高峰期数据量 ( 46MB/s )= 千兆网络(1000Mbps=125m/s)即可
  • 百兆的网络(100Mbps =12.5m/s)、千兆的网络(1000Mbps=125m/s)、万兆的网络(10000Mbps=1250m/s)
提高系统吞吐量
提升生产者吞吐量
  • 1:buffer.memory:发送消息的缓冲区大小,默认值是 32m,可以增加到 64m

  • 2:batch.size:默认是 16k。如果 batch 设置太小,会导致频繁网络请求,吞吐量下降;如果 batch 太大,会导致一条消息需要等待很久才能被发送出去,增加网络延时。

  • 3:linger.ms,这个值默认是 0,意思就是消息必须立即被发送。推荐设置5-100
    毫秒。如果 linger.ms 设置的太小,会导致频繁网络请求,吞吐量下降;如果 linger.ms 太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。

  • 4:compression.type:默认是 none,不压缩,但是也可以使用 lz4 压缩,效率还是不
    错的,压缩之后可以减小数据量,提升吞吐量,但是会加大 producer 端的 CPU 开销。

  • 5:增加分区

提高消费者吞吐量
  • 1:调整 fetch.max.bytes 大小,默认是 50m。
  • 2:调整 max.poll.records 大小,默认是 500 条。
  • 3:增加下游消费者处理能力
设置合适的分区数
  • 假设我们系统的吞吐量数据为:producer 吞吐量 = 46m/s;consumer 吞吐量 = 80m/s,我们最终期望吞吐量为150m/s
  • 分区数选择 = (期望吞吐量)/ (生产者吞吐量)=150 /46≈4个分区
  • 分区数一般设置为:3-10 个

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1157336.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Redis】redis的过期策略如何实现有关定时器的补充

文章目录 redis的过期策略如何实现关于定时器的补充基于优先级队列/堆实现的定时器基于时间轮实现的定时器 redis的过期策略如何实现 注意&#xff1a;不能直接遍历所有的key来判断当前key是否过期&#xff0c;这样子效率非常低&#xff0c;redis整体策略是&#xff1a;定期删…

系列十五、idea全局配置

一、全局Maven配置 IDEA启动页面>Customize>All settings>Build,Execution,Deployment>Build Tools>Maven 二、全局编码配置 IDEA启动页面>Customize>All settings>Editor>File Encodings 三、全局激活DevTools配置 IDEA启动页面>Customize>A…

企业通过ISO/IEC 27001的必要性阐述

文章目录 什么是ISO 27001?ISO 27001认证的必要性1&#xff0c;保护信息资产2&#xff0c;合规性要求3&#xff0c;提高客户信任4&#xff0c;降低安全风险5&#xff0c;提高内部效率6&#xff0c;改进供应链安全7&#xff0c;提高员工意识8&#xff0c;连续改进 推荐阅读 什么…

二叉树问题——前中后遍历数组构建二叉树

摘要 利用二叉树的前序&#xff0c;中序&#xff0c;后序&#xff0c;有序数组来构建相关二叉树的问题。 一、构建二叉树题目 105. 从前序与中序遍历序列构造二叉树 106. 从中序与后序遍历序列构造二叉树 889. 根据前序和后序遍历构造二叉树 617. 合并二叉树 226. 翻转二…

Hadoop相关知识点

文章目录 一、主要命令二、配置虚拟机2.1 设置静态ip2.2 修改主机名及映射2.3 修改映射2.4 单机模式2.5 伪分布式2.6 完全分布式 三、初识Hadoop四、三种模式的区别4.1、单机模式与伪分布式模式的区别4.2、特点4.3、配置文件的差异4.3.1、单机模式4.3.2、伪分布式模式4.3.3、完…

JVM虚拟机:堆结构的逻辑分区

堆内存的逻辑分区 堆内存的逻辑分区如下所示: 堆内存中分为新生代和老年代,二者空间大小1:3。在新生代里面分为两类区域(eden、survivor),三个区域(eden、survivor、survivor),三个区大小比例为8:1:1。 对象存放的位置 栈 当我们new一个对象的时候,首先会将对象…

Java之SpringCloud Alibaba【七】【Spring Cloud微服务网关Gateway组件】

一、网关简介 大家都都知道在微服务架构中&#xff0c;一个系统会被拆分为很多个微服务。那么作为客户端要如何去调用这么多的微服务呢?如果没有网关的存在&#xff0c;我们只能在客户端记录每个微服务的地址&#xff0c;然后分别去用。 这样的架构&#xff0c;会存在着诸多…

我在Vscode学OpenCV 处理图像

既然我们是面向Python的OpenCV&#xff08;OpenCV for Python&#xff09;那我们就必须要熟悉Numpy这个库&#xff0c;尤其是其中的数组的库&#xff0c;Python是没有数组的&#xff0c;唯有借助他库才有所实现想要的目的。 # 老三样库--事先导入 import numpy as np import c…

高性能消息中间件 - Kafka3.x(二)

文章目录 高性能消息中间件 - Kafka3.x&#xff08;二&#xff09;Kafka生产者⭐生产者发生原理⭐RecordAccumulator源码简单分析⭐Java Api生产者的重要参数⭐环境准备创建一个名为java-api-test的topic主题⭐命令行开启一个consumer消费者监听名为java-api-test的topic⭐pom.…

5、QtCharts 曲线美化

文章目录 效果ui 设置dialog.hdialog.cpp 效果 ui 设置 dialog.h #ifndef DIALOG_H #define DIALOG_H#include <QDialog> #include <QtCharts> #include <QLineSeries> #include <QGraphicsScene> #include <QTimer> #include <QSplineSerie…

陕西某小型水库雨水情测报及大坝安全监测项目案例

项目背景 根据《陕西省小型病险水库除险加固项目管理办法》、《陕西省小型水库雨水情测报和大坝安全监测设施建设与运行管理办法》的要求&#xff0c;为保障水库安全运行&#xff0c;对全省小型病险水库除险加固&#xff0c;建设完善雨水情测报、监测预警、防汛道路、通讯设备、…

如何有效使用蜂邮EDM和vba批量发送邮件?

蜂邮EDM和vba批量发送邮件的方法&#xff1f;怎么使用蜂邮EDM和vba代码群发电子邮件&#xff1f; 批量发送邮件已经成为一种不可或缺的沟通方式。蜂邮EDM和VBA是两个功能强大的工具&#xff0c;可以帮助您在邮件营销和业务通信中实现高效的批量发送邮件操作。接下来将介绍如何…

关于Goby反制上线CS中的各种问题

前言 ​ Goby作为新一代网络安全技术&#xff0c;通过为目标建立完整的资产数据库&#xff0c;实现快速的安全应急&#xff0c;日常为广大师傅提供了便捷的渗透体验。最近有观察到有关于某些蜜罐出现了Goby反制的指纹&#xff0c;顿时就起了兴趣进行研究Goby的反制&#xff0c…

AIGC究竟是什么?为什么今年大家都在讨论?

目录 一、什么是AIGC 二、AIGC发展阶段 三、AIGC的技术应用 AIGC的应用场景 四、AIGC的伦理、风险与未来 五、说在最后 在23年初&#xff0c;大家的视野范围内突然出现了一种叫ChatGPT的产品&#xff0c;这是由OpenAI研发的一种基于深度学习和自然语言处理技术的文本生成…

SpringBoot_mybatis-plus使用json字段

mybatis-plus使用json字段 1.前言2.方案分析2.1 为什么是json2.2 数据库的选择 3. 实战3.1 使用text字段(h2数据库)3.1.1 建表语句3.1.2 数据操作与查询 3.2 使用json字段(mysql数据库)3.2.1 建表语句3.2.2 数据操作与查询 4. 附录4.1 MySQL JSON索引用法 5. 参考文档 1.前言 …

无需服务器内网穿透Windows下快速搭建个人WEB项目

&#x1f4d1;前言 本文主要是windows下内网穿透文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;CSDN主页放风讲故事 &#x1f304;每日一句&#xff1a;努力…

今日温馨早安问候语,祝大家平安健康早安吉祥

用清晨的阳光沐浴&#xff0c;给你舒展;用清新的空气洗漱&#xff0c;给你舒心;伴清莹的雨露散步&#xff0c;给你舒情;向美好的一天欢呼&#xff0c;给你舒怀&#xff0c;用快乐的词汇凝聚&#xff0c;给你祝福&#xff0c;祝你在绚丽的晨光中走好每一天。朋友&#xff0c;早安…

算法升级之路(六)

给定一个非负整数 numRows&#xff0c;生成「杨辉三角」的前 numRows 行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方的数的和。 示例 1: 输入: numRows 5 输出: [[1],[1,1],[1,2,1],[1,3,3,1],[1,4,6,4,1]] 示例 2: 输入: numRows 1 输出: [[1]] 解题思路&…

中国教育企业出海 新兴技术助力抢占先机

继游戏、电商、短视频等领域轮番出海之后&#xff0c;国内教育企业纷纷开启了出海之路。近日发布的《2023年教育应用出海市场洞察》报告显示&#xff0c;在中国教育企业出海市场中&#xff0c;语言学习是最主要的赛道&#xff0c;但赛道竞争更为激烈。 报告指出&#xff0c;全…

高性能消息中间件 - Kafka3.x(三)

文章目录 高性能消息中间件 - Kafka3.x&#xff08;三&#xff09;Kafka Broker ⭐Kafka Broker概念Zookeeper&#xff08;新版本可以不使用zk了&#xff09;⭐Zookeeper的作用 Kafka的选举1&#xff1a;Broker选举Leader⭐Broker核心参数⭐案例&#xff1a;服役新节点和退役旧…