Kafka的安装与使用(windows下python使用等)

news2025/5/18 15:59:07

一、下载

可以去官网下载:https://kafka.apache.org/downloads

版本可选择,建议下载比较新的,新版本里面自带zookeeper

 二、安装

创建一个目录,此处是D:\kafka,将文件放进去解压

如果文件后缀是gz,解压后没有文件夹,此时需要先将文件后缀修改为tgz,然后再解压

进入kafka目录,创建一个data和log目录,用作zookeeper和kafka的数据和日志目录

三、修改配置

1.修改zookeeper的配置,使用编译器打开 config文件夹下面的zookeeper.properties文件,然后修改

dataDir,这个值改为自己刚才创建的data目录地址,使用\\连接

clientPort=2181

修改后,确认保存

 2.修改kafka配置文件,打开config文件夹下的server.properties文件,修改内容

port,9092是默认的端口号

host.name,修改为自己的IP,一般是本机或者局域网IP

listeners,修改为自己的IP和端口

log.dirs,填写为自己刚才创建的log文件夹,\\连接

zookeeper.connect=127.0.0.1:2181,这个根据自己情况写IP和端口,上面配置的

四、启动服务

1.启动zookeeper服务

先进入kafka解压后的根目录,然后在cmd里面执行如下命令:

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

2.启动kafka

先进入kafka解压后的根目录,然后在cmd里面执行如下命令:

bin\windows\kafka-server-start.bat config\server.properties

温馨提示:如果kafka没有正常关闭,可能下一次启动就会报错,可以删除data,log和logs目录里面的内容之后,再从启动zookeeper开始往下走

3.创建topic

创建一个test_topic

bin\windows\kafka-topics.bat --create --topic test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

4.查看topic列表

.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

5.启动producer生产者

如果使用代码去创建生产者和消费者此步骤不需要

假设你要向名为 test_topic 的主题发送消息,并且 Kafka 代理地址是 localhost:9092,可以运行以下命令:

.\bin\windows\kafka-console-producer.bat --topic test_topic --bootstrap-server localhost:9092

 

启动后进入输入模式,等待生产者输入,输入测试的123,234

6.启动customer消费者

如果使用代码去创建生产者和消费者此步骤不需要

.\bin\windows\kafka-console-consumer.bat --topic test_topic --bootstrap-server localhost:9092 --from-beginning

启动后,可以看到生产者发送的内容:

五、Python代码实现生产者和消费者

pip install kafka-python

模拟生产者代码,通过输入进行内容控制:

import uuid

from kafka import KafkaProducer
import json
import time

# 配置 Kafka 生产者
producer = KafkaProducer(
    bootstrap_servers=['127.0.0.1:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 要发送的主题名称
topic = 'test_topic'

try:
    while True:
        content = input("内容:")
        message = {
            'id': uuid.uuid4().hex,
            'message': f'{content}'
        }
        # 发送消息
        future = producer.send(topic, value=message)
        # 等待消息发送结果
        result = future.get(timeout=10)
        print(f"Sent message {content} to partition {result.partition} at offset {result.offset}")

except Exception as e:
    print(f"Error sending message: {e}")
finally:
    # 刷新缓冲区并关闭生产者
    producer.flush()
    producer.close()

 消费者模拟:

from kafka import KafkaConsumer
import json

# 配置 Kafka 消费者
consumer = KafkaConsumer(
    'test_topic',
    bootstrap_servers=['127.0.0.1:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest'
)

try:
    # 持续消费消息
    for message in consumer:
        print(f"Received message: {message.value} from partition {message.partition} at offset {message.offset}")
except KeyboardInterrupt:
    print("Consumer interrupted by user.")
finally:
    # 关闭消费者
    consumer.close()

在生产者输入 hello,yes进行测试:

 在消费者代码出进行获取:

注意消费者的message本身就是一个可迭代对象,是无穷尽的。

并且auto_offset_reset 参数控制了是从第一个开始获取还是从接入的时候再算起,移除参数就代表从接入开始获取message里面的数据,如果是 earliest 就会从第一个开始获取,即使已经处理了!

那么,如果是消费者掉线,生产者在掉线期间新增了若干条数据,如何让消费者上线后从没有处理的数据开始处理呢?可以给消费者设置三个参数并且手动commit:

auto_offset_reset='earliest',  # 从最新的消息位置开始消费
enable_auto_commit=False,  # 开启自动提交偏移量
group_id='aaa'

参考代码:

from kafka import KafkaConsumer
import json

# 配置 Kafka 消费者
consumer = KafkaConsumer(
    'test_topic',
    bootstrap_servers=['127.0.0.1:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',  # 从最新的消息位置开始消费
    enable_auto_commit=False,  # 关闭自动提交偏移量
    group_id='aaa'
)

try:
    # 持续消费消息
    for message in consumer:
        print(f"Received message: {message.value} from partition {message.partition} at offset {message.offset}")
        consumer.commit()  # 手动提交偏移量
except KeyboardInterrupt:
    print("Consumer interrupted by user.")
finally:
    # 关闭消费者
    consumer.close()

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2328024.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构初阶: 顺序表的增删查改

顺序表 概念 顺序表是⽤⼀段物理地址连续的存储单元依次存储数据元素的线性结构,⼀般情况下采⽤数组存储。如图1: 顺序表和数组有什么区别? 顺序表的底层是用数组实现的,是对数组的封装,实现了增删查改等接口。 分…

详解AI采集框架Crawl4AI,打造智能网络爬虫

大家好,Crawl4AI作为开源Python库,专门用来简化网页爬取和数据提取的工作。它不仅功能强大、灵活,而且全异步的设计让处理速度更快,稳定性更好。无论是构建AI项目还是提升语言模型的性能,Crawl4AI都能帮您简化工作流程…

【爬虫开发】爬虫开发从0到1全知识教程第14篇:scrapy爬虫框架,介绍【附代码文档】

本教程的知识点为:爬虫概要 爬虫基础 爬虫概述 知识点: 1. 爬虫的概念 requests模块 requests模块 知识点: 1. requests模块介绍 1.1 requests模块的作用: 数据提取概要 数据提取概述 知识点 1. 响应内容的分类 知识点&#xff1a…

SQLark:一款国产免费数据库开发和管理工具

SQLark(百灵连接)是一款面向信创应用开发者的数据库开发和管理工具,用于快速查询、创建和管理不同类型的数据库系统,目前可以支持达梦数据库、Oracle 以及 MySQL。 对象管理 SQLark 支持丰富的数据库对象管理功能,包括…

防爆对讲机VS非防爆对讲机,如何选择?

在通信设备的广阔市场中,对讲机以其高效、便捷的特点,成为众多行业不可或缺的沟通工具。而面对防爆对讲机与非防爆对讲机,许多用户常常陷入选择困境。究竟该如何抉择,且听我为您细细道来。 防爆对讲机,专为危险作业场…

微信小程序开发:开发实践

微信小程序开发实践研究 摘要 随着移动互联网的迅猛发展,微信小程序作为一种轻量化、无需安装的应用形式,逐渐成为开发者和用户的首选。本文以“个人名片”小程序为例,详细阐述了微信小程序的开发流程,包括需求分析、项目规划、…

操作 Office Excel 文档类库Excelize

Excelize 是 Go 语言编写的一个用来操作 Office Excel 文档类库,基于 ECMA-376 OOXML 技术标准。可以使用它来读取、写入 XLSX 文件,相比较其他的开源类库,Excelize 支持操作带有数据透视表、切片器、图表与图片的 Excel 并支持向 Excel 中插…

青铜与信隼的史诗——TCP与UDP的千年博弈

点击下面图片带您领略全新的嵌入式学习路线 🔥爆款热榜 88万阅读 1.6万收藏 第一章 契约之匣与自由之羽 熔岩尚未冷却的铸造台上,初代信使长欧诺弥亚将液态秘银倒入双生模具。左侧模具刻着交握的青铜手掌,右侧则是展开的隼翼纹章。当星辰…

「青牛科技」GC5849 12V三相无感正弦波电机驱动芯片

芯片描述: • 4 ~ 20V 工作电压, 30V 最大耐压 • 驱动峰值电流 2.0A ,连续电流 800mA 以内 • 芯片内阻: 900mΩ (上桥 下桥) • eSOP-8 封装,底部 ePAD 散热,引…

Java基础之反射的基本使用

简介 在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法;对于任意一个对象,都能够调用它的任意属性和方法;这种动态获取信息以及动态调用对象方法的功能称为Java语言的反射机制。反射让Java成为了一门动…

大语言模型中的嵌入模型

本教程将拆解什么是嵌入模型、为什么它们在NLP中如此重要,并提供一个简单的Python实战示例。 分词器将原始文本转换为token和ID,而嵌入模型则将这些ID映射为密集向量表示。二者合力为LLMs的语义理解提供动力。图片来源:[https://tzamtzis.gr/2024/coding/tokenization-by-an…

【从零实现Json-Rpc框架】- 项目实现 - 服务端主题实现及整体封装

📢博客主页:https://blog.csdn.net/2301_779549673 📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正! &…

开源的 LLM 应用开发平台Dify的安装和使用

文章目录 前提环境应用安装deocker desktop镜像源配置Dify简介Dify本地docker安装Dify安装ollama插件Dify安装硅基流动插件简单应用练习进阶应用练习数据库图像检索与展示助手echart助手可视化 前提环境 Windows环境 docker desktop魔法环境:访问Dify项目ollama电脑…

从零构建大语言模型全栈开发指南:第五部分:行业应用与前沿探索-5.1.2行业落地挑战:算力成本与数据隐私解决方案

👉 点击关注不迷路 👉 点击关注不迷路 👉 点击关注不迷路 文章大纲 从零构建大语言模型全栈开发指南-第五部分:行业应用与前沿探索5.1.2 行业落地挑战:算力成本与数据隐私解决方案1. 算力成本挑战与优化策略1.1 算力成本的核心问题1.2 算力优化技术方案2. 数据隐私挑战…

NodeJS--NPM介绍使用

1、使用npm install命令安装模块 1.1、本地安装 npm install express 1.2、全局安装 npm install express -g 1.3、本地安装和全局安装的区别

DeepSeek与ChatGPT的优势对比:选择合适的工具来提升工作效率

选DeepSeek还是ChatGPT?这就像问火锅和披萨哪个香! "到底该用DeepSeek还是ChatGPT?” 这个问题最近在互联网圈吵翻天!其实这就跟选手机系统-样,安卓党iOS党都能说出一万条理由,但真正重要的是你拿它来干啥!&am…

25大唐杯赛道一本科B组知识点大纲(下)

5G/6G网络技术知识点(10%) 工程概论及通信工程项目实践(20%) 5G垂直行业应用知识点(20%) ⭐⭐⭐为重点知识,尽量要过一遍哦 大唐杯赛道一国一备赛思路 大唐杯国一省赛回忆录--有付出就会有收…

Python+Playwright自动化测试-1-环境准备与搭建

1、Playwright 是什么? 微软在 2020 年初开源的新一代自动化测试工具,它的功能类似于 Selenium、Pyppeteer 等,都可以驱动浏览器进行各种自动化操作。它的功能也非常强大,对市面上的主流浏览器都提供了支持,API 功能简…

生产管理系统如何破解汽车零部件行业追溯难痛点

在汽车零部件制造行业中,生产追溯一直是企业面临的核心挑战之一。随着市场竞争的加剧和客户需求的日益复杂,如何确保产品质量、快速定位问题源头、减少批次性返工,成为了每个企业亟待解决的问题。而生产管理系统,作为智能制造的重…

【XTerminal】【树莓派】Linux系统下的函数调用编程

目录 一、XTerminal下的Linux系统调用编程 1.1理解进程和线程的概念并在Linux系统下完成相应操作 (1) 进程 (2)线程 (3) 进程 vs 线程 (4)Linux 下的实践操作 1.2Linux的“虚拟内存管理”和stm32正式物理内存(内存映射)的区别 (1)Linux虚拟内存管…