kafka之操作示例

news2025/7/18 19:53:30

一、常用shell命令

#1、创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replications 1 --topic test

#2、查看创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

#3、生产者发布消息命令
(执行完此命令后在控制台输入要发送的消息,回车即可)
bin/kafka-console-producer.sh --broker-list 192.168.91.231:9092,192.168.91.231:9093,192.168.91.231:9094 --topic test

#4、消费者接受消息命令
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

#5、kafka启动
首先启动zookeeper zkServer.sh start(相当于一个server,kafka会连接这个server)
bin/kafka-server-start.sh config/server.properties # 启动kafka

#6、查看kafka节点数目
在zookeeper中查看,登录客户端bin/zkCli.sh 执行ls /brokers/ids 查看节点数目及节点ID,[0,1,2]

#7、kafka中的概念
生产者 Producer、代理Broker、消费者Consumer、主题Topic、分区 Partition、消费者组 Consumer Group

#8、查看主颗信息
bin/kafka-topics.sh --zookeeper 192.168.91.231:2181 [加其他选项]

eg:
bin/kafka-topics.sh --zookeeper 192.168.91.231:2181 --describe
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test


#9、为主题创建分区
一共创建八个分区,编号分别为0~7
bin/kafka-topics.sh --zookeeper 192.168.91.231:2181 --alter -partitions 8 -topic test

#10、查看kafka进程
ps -eflgrep server.properties
ps -eflgrep server-1.properties
ps -eflgrep server-2.properties

#11、kafka宕机重启后,消息不会丢失

#12、kafka其中一个broker宕机后,对消费者和生产者影响很小(命令行下测试)
消费者会尝试连接,连接不到,返回java.net.ConnectException:Connection refused异常 生产者可能会在发送消息的时候报异常,但会很快连接到其他broker,继续正常使用

#13.查看kafka消息队列的积压情况
bin/kafka-consumer-groups.sh --zookeeper 192.168.91.231:2181 --describe --group console-consumer-37289

#14.kafka 中查看所有的group列表信息
bin/kafka-consumer-groups.sh --zookeeper 192.168.91.231:2181 --list

二、python操作kafka

本地安装与启动(基于Docker)

#1、下载zookeeper镜像与kafka镜像:
docker pull registry.cn-shanghai.aliyuncs.com/egon-k8s-test/kafka-zookeeper:3.4.6
docker pull registry.cn-shanghai.aliyuncs.com/egon-k8s-test/wurstmeister-kafka:2.13-2.8.1

#2、本地启动zookeeper
docker run -d --name zookeeper -p 2181:2181 -t registry.cn-shanghai.aliyuncs.com/egon-k8s-test/kafka-zookeeper:3.4.6

#3、本地启动kafka(注意下述代码,将kafka启动在9092端口)
docker run -d --name kafka --publish 9092:9092 --link zookeeper \
--enV KAFKA ZO0KEEPER CONNECT=zookeeper:2181 \
--enV KAFKA ADVERTISED HOST NAME=192.168.71.113 \
--enV KAFKA ADVERTISED PORT=9092 \
registry.cn-shanghai.aliyuncs.com/egon-k8s-test/wurstmeister-kafka:2.13-2.8.1

上面写的localhost没有影响,查看端口如下
# netstat -tuanlp | grep 9092
tcp 0 0 0.0.0.0:9092 0.0.0.0:*LISTEN 102483/docker-proxy
tcp6 00:::9092 :::* LISTEN 102487/docker-proxy

#4、进入kafka bash
docker exec it kafka bash
cd /opt/kafka/bin

#5、创建Topic,分区为2,Topic name为'kafka_demo'
kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic kafka_demo

kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic egon

数据存在哪里
[root@web02 ~]# docker exec -it kafka bash
bash-5.1#
bash-5.1#
bash-5.1#
bash-5.1# ls /kafka/
kafka-logs-f33383f9c414
bash-5.1#
bash-5.1#
bash-5.1#
bash-5.1# 1s /kafka/kafka-logs-f33383f9c414/
kafka_demo-0 kafka_demo-1
egon-0 egon-1
.........
bash-5.1#
bash-5.1#
bash-5.1#
bash-5.1# ls /kafka/kafka-logs-f33383f9c414/egon-0
00000000000000000000.index0000000000000000000.timeindex
00000000800000080000.1og leader-epoch-checkpoint

#6、查看当前所有topic
kafka-topics.sh --zookeeper zookeeper:2181 --list

#7、命令行操作
$docker exec -ti kafka sh
/ # cd /opt/kafka/bin
/ # kafka-console-producer.sh --bootstrap-server 192.168.71.113:9092 --topic test_topic
然后一行行输入,回车即发送一条消息
>111
>222
>333

另外一个终端
$ docker exec -ti kafka sh
/ # cd /opt/kafka/bin
/ # kafka-console-consumer.sh --bootstrap-server 192.168.71.113:9092 --topic test_topic --from-beginning可以收到消息
111
222
333

#8、安装kafka-python
pip install kafka-python

代码示例:

# pip3 install kafka-python  # 版本是2.0.2
from kafka import KafkaProducer, KafkaConsumer
import json
import threading
import time
 
# Kafka broker address
bootstrap_servers = '192.168.71.113:9092'
 
# Topic name
topic = 'test_topic'
 
 
# Producer function
def kafka_producer():
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'))
 
    try:
        for i in range(10):
            message = {'message': f'Hello Kafka! Message {i}'}
            producer.send(topic, value=message)
            print(f"Sent: {message}")
            time.sleep(1)
        else:
            print("发送完成")
    except Exception as ex:
        print(f"Exception occurred: {ex}")
    finally:
        producer.close()
 
 
# Consumer function
def kafka_consumer():
    consumer = KafkaConsumer(topic,
                             bootstrap_servers=bootstrap_servers,
                             auto_offset_reset='earliest',
                             consumer_timeout_ms=5000)  # 设置超时时间为1秒
 
    try:
        for message in consumer:
            print(f"Received: {message.value}")
        else:
            print("消费完毕,等5000毫秒超时即可结束,执行finally内的代码")
    except Exception as ex:
        print(f"Exception occurred: {ex}")
    finally:
        print("消费者结束")
        consumer.close()
 
 
# Create threads for producer and consumer
producer_thread = threading.Thread(target=kafka_producer)
consumer_thread = threading.Thread(target=kafka_consumer)
 
# Start both threads
producer_thread.start()
consumer_thread.start()
 
# Wait for threads to complete
producer_thread.join()
consumer_thread.join()
 
print("Kafka producer and consumer threads have finished.")
 

执行结果:

                  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2387621.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

车载网关策略 --- 车载网关通信故障处理机制深度解析

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 钝感力的“钝”,不是木讷、迟钝,而是直面困境的韧劲和耐力,是面对外界噪音的通透淡然。 生活中有两种人,一种人格外在意别人的眼光;另一种人无论…

三天掌握PyTorch精髓:从感知机到ResNet的快速进阶方法论

本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 一、分析式AI基础与深度学习核心概念 1.1 深度学习三要素 数学基础: f(x;W,b)σ(Wxb)(单层感知机) 1.2 PyTorch核心组件 张量操作示例…

分布式缓存:三万字详解Redis

文章目录 缓存全景图PreRedis 整体认知框架一、Redis 简介二、核心特性三、性能模型四、持久化详解五、复制与高可用六、集群与分片方案 Redis 核心数据类型概述1. String2. List3. Set4. Sorted Set(有序集合)5. Hash6. Bitmap7. Geo8. HyperLogLog Red…

BiLSTM与Transformer:位置编码的隐式vs显式之争

BiLSTM 与使用位置编码的LLM(如Transformer)的核心区别 一、架构原理对比 维度BiLSTM带位置编码的LLM(如Transformer)基础单元LSTM单元(记忆细胞、门控机制)自注意力机制(Self-Attention)信息传递双向链式传播(前向+后向LSTM)并行多头注意力,全局上下文关联位置信息…

html5视频播放器和微信小程序如何实现视频的自动播放功能

在HTML5中实现视频自动播放需设置autoplay和muted属性&#xff08;浏览器策略要求静音才能自动播放&#xff09;&#xff0c;并可添加loop循环播放、playsinline同层播放等优化属性。微信小程序通过<video>组件的autoplay属性实现自动播放&#xff0c;同时支持全屏按钮、…

关于vue结合elementUI输入框回车刷新问题

问题 vue2项目结合elementUI&#xff0c;使用el-form表单时&#xff0c;第一次打开浏览器url辞职&#xff0c;并且是第一次打开带有这个表单的页面时&#xff0c;输入框输入内容&#xff0c;回车后会意外触发页面自动刷新。 原因 当前 el-form 表单只有一个输入框&#xff0…

vue项目表格甘特图开发

🧩 甘特图可以管理项目进度,生产进度等信息,管理者可以更直观的查看内容。 1. 基础环境搭建 引入 dhtmlx-gantt 插件引入插件样式 dhtmlxgantt.css引入必要的扩展模块(如 markers、tooltip)创建 Vue 组件并挂载 DOM 容器初始化 gantt 图表配置2. 数据准备与处理 定义任务…

Spark 中,创建 DataFrame 的方式(Scala语言)

在 Spark 中&#xff0c;创建 DataFrame 的方式多种多样&#xff0c;可根据数据来源、结构特性及性能需求灵活选择。 一、创建 DataFrame 的 12 种核心方式 1. 从 RDD 转换&#xff08;需定义 Schema&#xff09; import org.apache.spark.sql.{Row, SparkSession} import o…

Python----目标检测(MS COCO数据集)

一、MS COCO数据集 COCO 是一个大规模的对象检测、分割和图像描述数据集。COCO有几个 特点&#xff1a; Object segmentation&#xff1a;目标级的分割&#xff08;实例分割&#xff09; Recognition in context&#xff1a;上下文中的识别&#xff08;图像情景识别&#xff0…

塔能科技:有哪些国内工业节能标杆案例?

在国内工业领域&#xff0c;节能降耗不仅是响应国家绿色发展号召、践行社会责任的必要之举&#xff0c;更是企业降低运营成本、提升核心竞争力的关键策略。塔能科技在这一浪潮中脱颖而出&#xff0c;凭借前沿技术与创新方案&#xff0c;成功打造了多个极具代表性的工业标杆案例…

OpenCV CUDA模块图像过滤------创建一个行方向的一维积分(Sum)滤波器函数createRowSumFilter()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 cv::cuda::createRowSumFilter 是 OpenCV CUDA 模块中的一个函数&#xff0c;用于创建一个行方向的一维积分&#xff08;Sum&#xff09;滤波器。…

Frequent values/gcd区间

Frequent values 思路&#xff1a; 这题它的数据是递增的&#xff0c;ST表&#xff0c;它的最多的个数只会在在两个区间本身就是最多的或中间地方产生&#xff0c;所以我用map数组储存每个值的左右临界点&#xff0c;在ST表时比较多一个比较中间值的个数就Ok了。 #define _…

08SpringBoot高级--自动化配置

目录 Spring Boot Starter 依赖管理解释 一、核心概念 二、工作原理 依赖传递&#xff1a; 自动配置&#xff1a; 版本管理&#xff1a; 三、核心流程 四、常用 Starter 示例 五、自定义 Starter 步骤 创建配置类&#xff1a; 配置属性&#xff1a; 注册自动配置&a…

Deep Evidential Regression

摘要 翻译&#xff1a; 确定性神经网络&#xff08;NNs&#xff09;正日益部署在安全关键领域&#xff0c;其中校准良好、鲁棒且高效的不确定性度量至关重要。本文提出一种新颖方法&#xff0c;用于训练非贝叶斯神经网络以同时估计连续目标值及其关联证据&#xff0c;从而学习…

「Python教案」循环语句的使用

课程目标 1&#xff0e;知识目标 能使用for循环和while循环设计程序。能使用循环控制语句&#xff0c;break、continue、else设计程序。能使用循环实际问题。 2&#xff0e;能力目标 能根据需求合适的选择循环结构。能对嵌套循环代码进行调试和优化。能利用循环语句设计&am…

linux快速入门-VMware安装linux,配置静态ip,使用服务器连接工具连接,快照和克隆以及修改相关配置信息

安装VMWare 省略&#xff0c;自己检索 安装操作系统-linux 注意&#xff1a;需要修改的我会给出标题&#xff0c;不要修改的直接点击下一步就可以 选择自定义配置 选择稍后安装操作系统 选择合适的内存 选择NAT模式 仅主机模式 虚拟机只能和主机通信&#xff0c;不能上网…

轻量化开源方案——浅析PdfPatcher实际应用

PDF处理在实际工作中十分重要&#xff0c;今天浅析PdfPatcher在PDF处理中的实际应用。 核心功能实测 批量处理能力 支持修改文档属性/页码编号/页面链接 一键清除复制/打印限制&#xff08;实测WPS加密文档可解锁&#xff09; 自动清理隐藏冗余数据&#xff08;经测试可平均…

Ansible常用Ad-Hoc 命令

1.配置sshpass yum install sshpass -y ssh-keygen -t dsa -f ~/.ssh/id_dsa -P "" # ssh-keygen密钥生成工具 -t密钥类型为dsa -f指定生成的密钥文件的路径。 -P&#xff1a;指定私钥的密码。 for i in seq 128 130; do sshpass -p123456 ssh-copy-id -i ~/.s…

[论文阅读]Pandora: Jailbreak GPTs by Retrieval Augmented Generation Poisoning

Pandora: Jailbreak GPTs by Retrieval Augmented Generation Poisoning [2402.08416] Pandora: Jailbreak GPTs by Retrieval Augmented Generation Poisoning 间接越狱攻击 GPT的RAG增强过程分四个阶段&#xff1a;❶GPT首先组织不同的用户上传的文档类型&#xff08;PDF、…

云效流水线Flow使用记录

概述 最近在频繁使用阿里云云效的几款产品&#xff0c;如流水线。之前写过一篇&#xff0c;参考云效流水线缓存问题。 这篇文章来记录更多问题。 环境变量 不管是云效流水线Flow还是应用交付AppStack&#xff08;基于流水线&#xff0c;后文不再赘述&#xff09;&#xff0…