消息队列--Kafka

news2025/7/24 3:53:31
  1. Kafka简介
  2. 集群部署
  3. 配置Kafka
  4. 测试Kafka

1.Kafka简介

数据缓冲队列。同时提高了可扩展性。具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

Kafka是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

特性:

  • 高吞吐量:kafka每秒可以处理几十万条消息。

  • 可扩展性:kafka集群支持热扩展- 持久性、

  • 可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

  • 高并发:支持数千个客户端同时读写

它主要包括以下组件:

话题(Topic):是特定类型的消息流。(每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。)
生产者(Producer):是能够发布消息到话题的任何对象(发布消息到 kafka 集群的终端或服务).
消费者(Consumer):可以订阅一个或多个话题,从而消费这些已发布的消息。
服务代理(Broker):已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。

partition(区):每个 topic 包含一个或多个 partition。
replication:partition 的副本,保障 partition 的高可用。
leader:replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
follower:replica 中的一个角色,从 leader 中复制数据。
zookeeper:kafka 通过 zookeeper 来存储集群的信息。

Zookeeper:

ZooKeeper是一个分布式协调服务,它的主要作用是为分布式系统提供一致性服务,提供的功能包括:配置维护、分布式同步等。Kafka的运行依赖ZooKeeper。  也是java微服务里面使用的一个注册中心服务

ZooKeeper主要用来协调Kafka的各个broker,不仅可以实现broker的负载均衡,而且当增加了broker或者某个broker故障了,ZooKeeper将会通知生产者和消费者,这样可以保证整个系统正常运转。

在Kafka中,一个topic会被分成多个区并被分到多个broker上,分区的信息以及broker的分布情况与消费者当前消费的状态信息都会保存在ZooKeeper中。

2.集群部署

        2.1环境

系统:Centos-Stream7

节点:

192.168.26.166   es01 ​

192.168.26.170   es02 ​

192.168.26.171   es03

软件版本:kafka_2.12-3.0.2.tgz

        2.2  安装配置jdk8

#yum install -y java-1.8.0-openjdk

        2.3  安装配置zookeeper

在配置中要注意每个配置项后面不要有空格否则会导致zookeeper启动不起来!!!!

Kafka运行依赖ZK,Kafka官网提供的tar包中,已经包含了ZK,这里不再额外下载ZK程序。

配置相互解析---三台机器(在es集群上安装的kafka):

# vim /etc/hosts

192.168.26.166   es01 ​

192.168.26.170   es02 ​

192.168.26.171   es03

安装Kafka:

# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.0.2/kafka_2.12-3.0.2.tgz

# tar xzvf kafka_2.12-2.8.0.tgz -C /usr/local/

# mv /usr/local/kafka_2.12-2.8.0/ /usr/local/kafka/

配置zookeeper:

在es01节点中:

# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties

# vim /usr/local/kafka/config/zookeeper.properties  #添加如下配置
dataDir=/opt/data/zookeeper/data  # 需要创建,所有节点一致
dataLogDir=/opt/data/zookeeper/logs # 需要创建,所有节点一致
clientPort=2181 
tickTime=2000 
initLimit=20 
syncLimit=10 

# 以下 IP 信息根据自己服务器的 IP 进行修改
server.1=192.168.19.20:2888:3888  //kafka集群IP:Port
server.2=192.168.19.21:2888:3888
server.3=192.168.19.22:2888:3888


#创建data、log目录

# mkdir -p /opt/data/zookeeper/{data,logs}

#创建myid文件

# echo 1 > /opt/data/zookeeper/data/myid     #myid号按顺序排

在es02节点中:

# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties

# vim /usr/local/kafka/config/zookeeper.properties
dataDir=/opt/data/zookeeper/data 
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181 
tickTime=2000 
initLimit=20 
syncLimit=10 
server.1=192.168.19.20:2888:3888
server.2=192.168.19.21:2888:3888
server.3=192.168.19.22:2888:3888


#创建data、log目录

# mkdir -p /opt/data/zookeeper/{data,logs}

#创建myid文件

# echo 2 > /opt/data/zookeeper/data/myid

在es03节点中:

# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties

# vim /usr/local/kafka/config/zookeeper.properties
dataDir=/opt/data/zookeeper/data 
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181 
tickTime=2000 
initLimit=20 
syncLimit=10 
server.1=192.168.19.20:2888:3888
server.2=192.168.19.21:2888:3888
server.3=192.168.19.22:2888:3888


#创建data、log目录

# mkdir -p /opt/data/zookeeper/{data,logs}

#创建myid文件

# echo 3 > /opt/data/zookeeper/data/myid

配置项含义:

dataDir     ZK数据存放目录。
dataLogDir  ZK日志存放目录。
clientPort  客户端连接ZK服务的端口。
tickTime    ZK服务器之间或客户端与服务器之间维持心跳的时间间隔。
initLimit   允许follower连接并同步到Leader的初始化连接时间,当初始化连接时间超过该值,则表示连接失败。
syncLimit   Leader与Follower之间发送消息时如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。
server.1=192.168.19.20:2888:3888    2888是follower与leader交换信息的端口,3888是当leader挂了时用来执行选举时服务器相互通信的端口。

3.配置Kafka

        3.1  配置

# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/server.properties

# vim /usr/local/kafka/config/server.properties  #在最后添加

broker.id=1  #改 
listeners=PLAINTEXT://192.168.19.20:9092   #改
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka/logs  
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.19.20:2181,192.168.19.21:2181,192.168.19.22:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0


[root@es01 ~]# mkdir -p /opt/data/kafka/logs

        3.2  其他节点配置

只需把配置好的安装包直接分发到其他节点,修改 Kafka的broker.id和 listeners就可以了。

        3.3  配置项含义

broker.id 
    每一个broker在集群中的唯一标识,要求是正数。在改变IP地址,不改变broker.id的时不会影响consumers
listeners=PLAINTEXT://192.168.19.22:9092       
    监听地址
num.network.threads  
    broker 处理消息的最大线程数,一般情况下不需要去修改
num.io.threads
    broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
socket.send.buffer.bytes          
    socket的发送缓冲区
socket.receive.buffer.bytes        
    socket的接收缓冲区
socket.request.max.bytes
    socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
log.dirs        日志文件目录
num.partitions
num.recovery.threads.per.data.dir   每个数据目录(数据目录即指的是上述log.dirs配置的目录路径)用于日志恢复启动和关闭时的线程数量。
offsets.topic.replication.factor

transaction state log replication factor  事务主题的复制因子(设置更高以确保可用性)。 内部主题创建将失败,直到群集大小满足此复制因素要求

log.cleanup.policy = delete
    日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.interval.mins=1
    指定日志每隔多久检查看是否可以被删除,默认1分钟    
log.retention.hours
    数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据。log.retention.bytes和log.retention.minutes或者log.retention.hours任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖

log.segment.bytes
    topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.retention.check.interval.ms 
    文件大小检查的周期时间,是否触发 log.cleanup.policy中设置的策略
zookeeper.connect   
    ZK主机地址,如果zookeeper是集群则以逗号隔开。
zookeeper.connection.timeout.ms     
    连接到Zookeeper的超时时间。

4.测试Kafka

        4.1  启动zookeeper集群

在三个节点依次执行:

# cd /usr/local/kafka

# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

查看端口:

# netstat -lntp | grep 2181

        4.2  启动Kafka

在三个节点依次执行:

# cd /usr/local/kafka

# nohup bin/kafka-server-start.sh config/server.properties &

        4.3  测验

验证  在192.168.26.166上创建topic:

# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic

参数解释:

–zookeeper指定zookeeper的地址和端口,
–partitions指定partition的数量,
–replication-factor指定数据副本的数量

在26.170上面查询192.168.26.166上的topic:

[root@es03 kafka]# bin/kafka-topics.sh --zookeeper 192.168.26.166:2181 --list
testtopic

        (二)模拟消息生产和消费

发送消息到192.168.26.166:

[root@es01 kafka]# bin/kafka-console-producer.sh --broker-list 192.168.19.20:9092 --topic testtopic

>hello
>你好呀
>

从192.168.26.171接受消息:

[root@es02 kafka]# bin/kafka-console-consumer.sh --bootstrap-server  192.168.19.21:9092 --topic testtopic --from-beginning

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/368400.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C/C++开发,无可避免的内存管理(篇一)-内存那些事

一、内存管理机制 任何编程语言在访问和操作内存时都会涉及大量的计算工作。但相对其他语言,c/c开发者必须自行采取措施确保所访问的内存是有效的,并且与实际物理存储相对应,以确保正在执行的任务不会访问不应该访问的内存位置。C/C语言及编译…

【Java】volatile

一、volatile volatile是Java虚拟机提供的轻量级的同步机制,它有3个特性: 1)保证可见性 2)不保证原子性 3)禁止指令重排 当写一个volatile变量时,JMM会把该…

openEuler部署Ceph集群(块存储)

openEuler部署Ceph集群1 目标2 环境2.1 服务器信息2.2 软件信息3 部署流程3.1 获取系统镜像3.2 创建虚拟机3.3 配置虚拟机3.3.1 配置互信3.3.2 关闭防火墙3.3.3 配置免密登录3.3.4 配置NTP3.3.4.1 安装NTP服务3.3.4.2 配置NTP服务端3.3.4.3 配置NTP客户端3.3.4.4 启动NTP服务3.…

pyqt5通过CANoe COM Server来操作CANoe仿真工程

文章目录前言一、COM接口技术二、UI界面设计三、功能实现四、工程运行测试前言 继续学习《CANoe开发从入门到精通》。 今天在《CANoe仿真工程开发》的基础上,开发实现pyqt5应用程序来操控CANoe工程。 一、COM接口技术 COM(Component Object Model&…

Linux基础命令-find搜索文件位置

文章目录 find 命令介绍 语法格式 命令基本参数 参考实例 1)在root/data目录下搜索*.txt的文件名 2)搜索一天以内最后修改时间的文件;并将文件删除 3)搜索777权限的文件 4)搜索一天之前变动的文件复制到test…

不懂什么是智慧工厂,看这篇文章就够了!

一、智慧工厂是什么? 一直以来,自动化在某种程度上始终是工厂的一部分,甚至高水平的自动化也非新生事物。然而,“自动化”一词通常表示单一且独立的任务或流程的执行。过去,机器自行“决策”的情况往往是以自动化为基…

【基础篇】9 # 排序:冒泡排序(Bubble Sort)、插入排序(Insertion Sort)、选择排序(Selection Sort)

说明 【数据结构与算法之美】专栏学习笔记 如何分析一个排序算法? 1、排序算法的执行效率 最好情况、最坏情况、平均情况时间复杂度时间复杂度的系数、常数 、低阶比较次数和交换(或移动)次数 2、排序算法的内存消耗 3、排序算法的稳定…

Fabric.js使用说明Part 2

目录一、Fabric.js使用说明Part 1Fabric.js简介 开始方法事件canvas常用属性对象属性图层层级操作复制和粘贴二、Fabric.js使用说明Part 2锁定拖拽和缩放画布分组动画图像滤镜渐变右键菜单删除三、Fabric.js使用说明Part 3自由绘画绘制背景图片绘制文本绘制线和路径一、锁定Fab…

传统豪华品牌引领?智能座舱进入「沉浸式娱乐体验」新周期

智能座舱正在进入硬件定型、软件(功能)升级以及多应用融合的新周期。 高工智能汽车研究院监测数据显示,2022年中国市场(不含进出口)乘用车搭载智能数字座舱(大屏语音车联网OTA)前装标配交付795…

【死磕数据库专栏启动】在CentOS7中安装 MySQL5.7版本实战

文章目录前言实验环境一. 安装MySQL1.1 配置yum源1.2 安装之前的环境检查1.3 下载MySQL的包1.4 开始使用yum安装1.5 启动并测试二. 设置新密码并重新启动2.1 设置新密码2.2 重新登录测试总结前言 学习MySQL是一件比较枯燥的事情,学习开始之前要先安装MySQL数据库&a…

【Linux修炼】14.磁盘结构/文件系统/软硬链接/动静态库

每一个不曾起舞的日子,都是对生命的辜负。 磁盘结构/文件系统/软硬链接/动静态库前言一.磁盘结构1.1 磁盘的物理结构1.2 磁盘的存储结构1.3 磁盘的逻辑结构二.理解文件系统2.1 对IO单位的优化2.2 磁盘分区与分组2.3 分组的管理方法2.4 文件操作三.软硬链接3.1理解硬…

测试4年裸辞失业,面试17k的测试岗被按在地上摩擦,结局让我崩溃大哭...

作为IT行业的大热岗位——软件测试,只要你付出了,就会有回报。说它作为IT热门岗位之一是完全不虚的。可能很多人回说软件测试是吃青春饭的,但放眼望去,哪个工作不是这样的呢?会有哪家公司愿意养一些闲人呢?…

「smardaten」上架钉钉应用中心!让进步再一次发生

使用钉钉的团队小伙伴们,smardaten给您送来福利啦~为了给更多团队提供更优质的应用开发体验,方便用户在线、快速使用无代码,数睿数据近期在【钉钉应用中心】发布smardaten在线版本。继与华为云、亚马逊云建立战略合作之后,smardat…

微信小程序实现分享到朋友圈的功能

分享朋友圈官方API:分享到朋友圈 1、分享到朋友圈接口设置事项 2、onShareTimeline()注意事项 3、分享朋友圈后,测试发现,没有数据请求。 用户在朋友圈打开分享的小程序页面,并不会真正打开小程序,而是进入一个“小程…

浏览器缓存策略

先走强缓存,再走协商缓存 强缓存 不发送请求,直接使用缓存的内容 状态码200 当前会话没有关闭的话就是走memory cache,否则就是disk cache 由响应头的 Pragma(逐渐废弃,优先级最高),catch-…

LeetCode 817. 链表组件

LeetCode 817. 链表组件 难度:middle\color{orange}{middle}middle 题目描述 给定链表头结点 headheadhead,该链表上的每个结点都有一个 唯一的整型值 。同时给定列表 numsnumsnums,该列表是上述链表中整型值的一个子集。 返回列表 numsnu…

自动驾驶仿真:ECU TEST 、VTD、VERISTAND连接配置

文章目录一、ECU TEST 连接配置简介二、TBC配置 test bench configuration三、TCF配置 test configuration提示:以下是本篇文章正文内容,下面案例可供参考 一、ECU TEST 连接配置简介 1、ECU TEST(简称ET),用于HIL仿…

MySQL tinyint(1) 、int(32) 与 varchar(255) 长度含义不同

MySQL tinyint(1) 、int(32) 与 varchar(255) 长度含义不同 发现 tinyint(1),int(32) 和 varchar(255) 这里面的数字的含义是不同的。 先说数字类型 tinyint 和 int 等 他们能存储的字节大小是与类型绑定的,即定义了 tinyint 或者 int 就确定了能存储…

【C++的OpenCV】第六课-OpenCV图像常用操作(三):OpenCV的图像的侵蚀和扩张

让我们继续一、图像的侵蚀和扩张1.1 侵蚀1.1.1 函数原型1.1.2 侵蚀的效果1.1.3 关于侵蚀的解释1.2 扩张1.2.1 函数原型1.2.2 扩张的效果二、实例一、图像的侵蚀和扩张 本章节中我们将会学习到: cv::erode() 函数详情cv::dilate() 函数详情 两个函数的基本使用方法…

java 接口 详解

目录 一、概述 1.介绍 : 2.定义 : 二、特点 1.接口成员变量的特点 : 2.接口成员方法的特点 : 3.接口构造方法的特点 : 4.接口创建对象的特点 : 5.接口继承关系的特点 : 三、应用 : 1.情景 : 2.多态 : ①多态的传递性 : ②关于接口的多态参数和多态…