kafka(windows)

news2025/6/9 7:18:39

目录

介绍

下载

配置 

测试

介绍

Kafka是一个分布式流媒体平台,类似于消息队列或企业信息传递系统。

下载

Kafka对于Zookeeper是强依赖,所以安装Kafka之前必须先安装zookeeper

官网:Apache Kafka

下载此安装包并解压 

配置 

 新建logs文件夹存放日志文件

打开config文件夹,找到 zookeeper.properties文件。将dataDir改为上面logs文件夹路径再加上/zookeeper,表示zookeeper的日志文件

打开config文件夹,找到 server.properties文件。将log.dirs改为上面logs文件夹路径再加上/kafka,表示kafka的日志文件

新建zk.cmd文件,里面放zookeeper启动脚本

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

新建kfk.cmd文件,里面放kafka启动脚本 

call bin/windows/kafka-server-start.bat config/server.properties

 先双击zk.cmd

再双击kafka.cmd(关闭的话相反)

测试

打开两个cmd窗口(bin下的windows),在第一个cmd窗口创建topic

kafka-topics.bat --bootstrap-server localhost:9092 --topic test --create

在第二个cmd窗口 创建生产者

kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test

在第一个cmd窗口 创建消费者

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test

回到第二个cmd窗口输入hello kafka 观察第一个cmd窗口是否有输出hello kafka

在idea中创建一个项目kafka-demo

添加kafka依赖

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
ProducerQuickStart
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 生产者
 */
public class ProducerQuickStart {

    public static void main(String[] args) {
        //1.kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2.生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

        /**
         * 封装发送的消息
         * 参数一:topic
         * 参数二:消息的key
         * 参数三:消息的value
         */
        ProducerRecord<String,String> record = new ProducerRecord<String, String>("topic-first","100001","hello kafka");

        //3.发送消息
        producer.send(record);

        //4.关闭消息通道,必须关闭,否则消息发送不成功
        producer.close();
    }

}
ConsumerQuickStart
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * 消费者
 */
public class ConsumerQuickStart {

    public static void main(String[] args) {
        //1.添加kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //消费者组(必须设置)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //2.消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        //3.订阅主题
        consumer.subscribe(Collections.singletonList("topic-first"));

        //当前线程一直处于监听状态
        while (true) {
            //4.获取消息
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }

    }

}

先运行ConsumerQuickStart再运行ProducerQuickStart

回到ConsumerQuickStart的控制台

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2405098.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于安卓的文件管理器程序开发研究源码数据库文档

摘 要 伴随着现代科技的发展潮流&#xff0c;移动互联网技术快速发展&#xff0c;各种基于通信技术的移动终端设备做的也越来越好了&#xff0c;现代智能手机大量的进入到了我们的生活中。电子产品的各种软硬技术技术的发展&#xff0c;操作系统的不断更新换代&#xff0c;谷歌…

EMC VNXe 存储系统日志收集方法

写在前面 有朋友找来看看VNXe的故障&#xff0c;这种问题总是要收集日志&#xff0c;顺便这里也分享给大家。 注意&#xff0c;VNXe和VNX 属于完全不同的产品&#xff0c;不要看名字很类似&#xff0c;操作系统已经完全重构了&#xff0c;如果说是否有联系&#xff0c;大概就…

从“人找政策”到“政策找人”:智能退税ERP数字化重构外贸生态

离境退税新政核心内容与外贸企业影响 &#xff08;一&#xff09;政策核心变化解析 退税商店网络扩容 新政明确鼓励在大型商圈、旅游景区、交通枢纽等境外旅客聚集地增设退税商店&#xff0c;并放宽备案条件至纳税信用M级企业。以上海为例&#xff0c;静安区计划新增1000家退…

以人类演示视频为提示,学习可泛化的机器人策略

25年5月来自清华大学、上海姚期智研究院和星动纪元&#xff08;RoboEra&#xff09;公司的论文“Learning Generalizable Robot Policy with Human Demonstration Video as a Prompt”。 最近的机器人学习方法通​​常依赖于从通过遥操作收集的大量机器人数据集中进行模仿学习…

SOC-ESP32S3部分:36-适配自己的板卡

飞书文档https://x509p6c8to.feishu.cn/wiki/RP4UwPrsKi4xuQkKLAAcKxD3n1b 如果你自己画了PCB板&#xff0c;需要把自己绘制的板卡配置小智AI工程&#xff0c;可以参考此文档。 下载源码 克隆或下载源码到本地&#xff0c;这里以1.5.5为例&#xff0c;大家可以自行修改其它版…

LLMs 系列科普文(8)

八、模型的自我认知 接下来我们聊聊另一种问题&#xff0c;即模型的自我认知。 网上经常经常可以看到人们会问大语言模型一些关于认知方面的问题&#xff0c;比如“你是什么模型&#xff1f;谁创造了你&#xff1f;” 说实话&#xff0c;其实这个问题有点无厘头。 之所以这么…

机器学习基础相关问题

机器学习相关的基础问题 K-means是否一定会收敛 K-means是否一定会收敛 K-means算法在有限步数内一定会收敛&#xff0c;但收敛到的可能是局部最优解而非全局最优解。以下是详细分析&#xff1a; K-means 的优化目标是最小化 样本到其所归属簇中心的距离平方和&#xff08;SSE…

验证负载均衡与弹性伸缩

什么是弹性伸缩&#xff08;Auto Scaling&#xff09;&#xff1f; 弹性伸缩是指 云计算平台根据实时负载自动调整计算资源&#xff08;如服务器实例、容器Pod&#xff09;数量&#xff0c;以确保系统在高峰时保持稳定&#xff0c;在低谷时节省成本。 什么时候会触发弹性伸缩&…

Three.js中AR实现详解并详细介绍基于图像标记模式AR生成的详细步骤

文档地址 Three.js中AR实现详解 以下是Three.js中实现AR功能的详细解析&#xff0c;涵盖技术原理、实现步骤、核心组件及优化策略&#xff1a; &#x1f9e9; 一、技术基础 AR.js框架的核心作用 AR.js是Three.js实现AR的基石&#xff0c;提供以下核心能力&#xff1a; 多模…

GeoBoundaries下载行政区划边界数据(提供中国资源shapefile)

要下载山东省济南市各个区的行政区划边界数据&#xff0c;你可以通过 geoBoundaries 提供的数据来实现。下面是详细步骤&#xff0c;包括网页操作和可选的 Python 自动化方式。 目录 ✅ 一、通过 geoBoundaries 官网手动下载1. 打开官网&#xff1a;2. 查找中国数据&#xff1a…

大模型如何选型?嵌入模型如何选型?

欢迎来到啾啾的博客&#x1f431;。 记录学习点滴。分享工作思考和实用技巧&#xff0c;偶尔也分享一些杂谈&#x1f4ac;。 有很多很多不足的地方&#xff0c;欢迎评论交流&#xff0c;感谢您的阅读和评论&#x1f604;。 目录 引言模型优劣认知与模型选择大模型&#xff08;L…

开源大模型网关:One API实现主流AI模型API的统一管理与分发

以下是对One API的简单介绍&#xff1a; One API是一个使用go语言开发的大语言模型 API 管理与分发系统支持Docker一键快速部署&#xff0c;且资源占用小&#xff0c;高性能开箱支持多平台大模型快速接入&#xff0c;包括OpenAI、Gemini、xAI、Grop、Anthropic Claude、Ollama…

智慧充电:新能源汽车智慧充电桩的发展前景受哪些因素影响?

全球能源结构转型与碳中和目标的推进&#xff0c;新能源汽车产业迎来爆发式增长&#xff0c;而智慧充电桩作为其核心基础设施&#xff0c;发展前景备受关注。智慧充电不仅关乎用户充电体验的优化&#xff0c;更是电网平衡、能源效率提升的关键环节。 然而&#xff0c;其发展并…

【网站建设】不同类型网站如何选择服务器?建站项目实战总结

做了几个建站项目后,深刻体会到一件事:不同类型的网站,所采用的服务器策略是完全不同的。 如果选错了服务器方案,可能带来过高的成本、过低的性能,甚至上线失败。 这篇文章分享一下我在实战中的经验,供正在做建站项目的朋友参考。 🚩 1️⃣ 纯展示型网站 —— 静态服务…

iptables实验

实验一&#xff1a;搭建web服务&#xff0c;设置任何人能够通过80端口访问。 1.下载并启用httpd服务器 dnf -y install httpd 开启httpd服务器 systemctl start httpd 查看是否启用 下载并启用iptables&#xff0c;并关闭firewalld yum install iptable…

前后端分离开发 和 前端工程化

来源&#xff1a;黑马程序员JavaWeb开发教程&#xff0c;实现javaweb企业开发全流程&#xff08;涵盖SpringMyBatisSpringMVCSpringBoot等&#xff09;_哔哩哔哩_bilibili 前后端混合开发&#xff1a; 需要使用前端的技术栈开发前端的功能&#xff0c;又需要使用Java的技术栈…

web端rtmp推拉流测试、抽帧识别计数,一键式生成巡检报告

本文旨在实现无人机城市交通智慧巡检中的一个模块——无人机视频实时推拉流以及识别流并在前端展示&#xff0c;同时&#xff0c;统计目标数量以及违停数量&#xff0c;生成结果评估&#xff0c;一并发送到前端展示。对于本文任何技术上的空缺&#xff0c;可在博主主页前面博客…

Excel 表格内批量添加前缀与后缀的实用方法

我们经常需要为 Excel 表格中的内容统一添加前缀或后缀&#xff0c;例如给编号加“NO.”、给姓名加“会员_”等。手动操作效率低&#xff0c;本文将介绍几种实用的方法&#xff0c;帮助你快速完成批量添加前缀和后缀的操作。 使用“&”运算符添加前缀或后缀&#xff08;推…

2024 CKA题库+详尽解析| 15、备份还原Etcd

目录 免费获取题库配套 CKA_v1.31_模拟系统 15、 备份还原Etcd 题目&#xff1a; 开始操作: 1&#xff09;、切换集群 2&#xff09;、登录master并提权 3&#xff09;、备份Etcd现有数据 4&#xff09;、验证备份数据快照 5&#xff09;、查看节点和Pod状态 6&am…

西门子 S7-1200 PLC 海外远程运维技术方案

西门子 S7-1200 PLC 海外远程运维技术方案 一、面向海外场景的核心优势 针对跨国企业、海外项目及远程技术支持需求&#xff0c;本方案基于巨控GRM552Y-CHE模块提供无缝的全球化远程PLC运维能力&#xff0c;突破地域及时差限制&#xff0c;显著提升国际项目响应效率。 二、海…