Kafka - 04 Java客户端实现消息发送和订阅

news2025/7/19 6:24:00

1. Kafka测试命令行操作

1. 主题命令行操作

在上一节中我们安装了Kafka单机环境和集群环境,这一节来测试下Linux环境安装Kafka后的命令行操作。

我们之前在用Windows环境安装Kafka Kafka应用场景|基础架构|Windows安装|命令行操作 和命令行操作时,讲到主题命令行参数如下:

在这里插入图片描述

1. 创建主题

[root@localhost kafka-01]# bin/kafka-topics.sh --bootstrape-server localhost:9092 --create --topic test1 --partitions 3 --replication-factor 3

在这里插入图片描述

注意:这里之所以无法识别 --bootstrape-server 参数是因为kafka的版本低于2.2,我安装的kafka版本为kafka_2.12-2.2.1.tgz,应该使用 --zookeeper localhost:2181参数:

[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 3

在这里插入图片描述

–zookeeper:指定了Kafka所连接的Zookeeper服务地址
–topic:指定了所要创建主题的名称
–partitions:指定了分区个数
–replication-factor:指定了副本因子
–create:创建主题的动作指令

2. 查看主题详情

[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test

在这里插入图片描述

3. 查看所有主题

[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2181 --list

在这里插入图片描述

2. 消费者命令行操作

在这里插入图片描述

[root@localhost kafka-01]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

–bootstrap-server 指定了连接Kafka集群的地址
–topic 指定了消费端订阅的主题

在这里插入图片描述

3. 生产者命令行操作

在这里插入图片描述

[root@localhost kafka-01]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

–broker-list 指定了连接的Kafka集群的地址
–topic 指定了发送消息时的主题

生产者发送消息:

在这里插入图片描述

消费者接收消息:

在这里插入图片描述

2. Java程序调用Kafka

① 创建kafka项目并引入依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
        <relativePath/>
    </parent>

    <groupId>com.hh</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.20</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

② kafka生产者发送消息:

public class CustomProducer01 {
    public static void main(String[] args) {
        // kafka生产者属性配置
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.22:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // kafka生产者发送消息,默认是异步发送方式
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", "hello,kafka");
        try{
            // 发送消息
            kafkaProducer.send(producerRecord);
        }catch (Exception e){
            e.printStackTrace();
        }
        // 关闭资源
        kafkaProducer.close();
    }
}

③ 查看kafka消费者有没有消费消息:

在这里插入图片描述

④ kafka消费者消费消息:

查看kafka安装目录config/consumer.properties文件中的group.id:

在这里插入图片描述

public class CustomConsumer01 {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.22:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        ArrayList<String> topics = new ArrayList<>();
        topics.add("test");
        consumer.subscribe(topics);

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

⑤ 启动消费者程序后,再启动生产者程序发送消息,查看消费者控制台:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/37933.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

哪种类型的蓝牙耳机好?超高性价比蓝牙耳机推荐

朋友让我推荐蓝牙耳机的时候&#xff0c;总是喜欢问哪款蓝牙耳机的性能更强&#xff0c;想要直接入手那款性能更强的蓝牙耳机&#xff0c;以此节省对比的时间。但是用户自行进行对比的步骤&#xff0c;显然是不能省的&#xff0c;所以推荐这四款高性价比的蓝牙耳机&#xff0c;…

华为云桌面Workspace,让你的办公更加舒适惬意

在各行各业转型的过程中&#xff0c;企业对于线上办公的需求不断增多&#xff0c;越来越需要一个云办公平台&#xff0c;为企业更好实现数字化网络化办公降本增效。正逢佳节之际&#xff0c;在此为各大企业推荐一个高效的办公神器——华为云桌面Workspace。相信作为企业决策者的…

详解设计模式:抽象工厂模式

工厂方法模式&#xff0c;又称工厂模式、多态工厂模式和虚拟构造器模式&#xff0c;通过工厂父类定义负责创建产品的公共接口&#xff0c;子类负责生产具体对象。可以理解为简单工程模式的升级&#xff0c;解决简单工厂模式的弊端。 &#xff5e; 本篇内容包括&#xff1a;关于…

Executors-四种创建线程的手段

1 Executors.newCachedThreadPool() 从构造方法可以看出&#xff0c;它创建了一个可缓存的线程池。当有新的任务提交时&#xff0c;有空闲线程则直接处理任务&#xff0c;没有空闲线程则创建新的线程处理任务&#xff0c;队列中不储存任务。线程池不对线程池大小做限制&#x…

ESP三相SVPWM控制器的simulink仿真

目录 1.算法描述 2.仿真效果预览 3.MATLAB核心程序 4.完整MATLAB程序 1.算法描述 SVPWM则以三相的合成矢量为出发点&#xff0c;其基本思想为&#xff1a;在数学意义上的abc轴也好&#xff0c;αβ轴也好&#xff0c;其产生的电压都应该等于dq轴合成的那个电压。那么只要让…

swiper在动态创建dom过程中的问题:数据从后端请求回来后加载到页面上,dom加载完发现swiper没用了

怎么动态创建div标签&#xff1a; 要轮播的数据是后端返回的&#xff0c;所以我们要发ajax请求接收数据&#xff1b; 下面演示的是已经接收回来的数据&#xff0c;动态创建div标签&#xff1a; setTimeout(()>{var list ["aaa","bbb","ccc&quo…

【Redis】从计算机组成原理开始讲解为什么会出现Redis

文章目录前置知识数据库的出现Redismemcache与redis的区别前置知识 首先需要知道的一个常识就是&#xff1a;数据是存放在磁盘里面的。 而磁盘有两个指标&#xff1a; 寻址&#xff1a;表示找到对应的数据所需要的时间&#xff0c;ms带宽&#xff1a;表示单位时间可以有多少个…

Python排序:冒泡排序,选择排序,插入排序,希尔排序

编程中的交换元素逻辑&#xff1a; # python中交换元素 有内置的三方底层逻辑 可以直接交换 a 2 b 3 a, b b, a print(a) # a为3# 其他编程需要有一个中间的变量来转换 变量设为temp a 2 b 3 temp a a b b temp print(a) # a为3 -----冒泡排序----- 相邻…

openfeign原理

openfeign原理 EnableFeignClients注解启用Feign客户端&#xff0c;通过Import注解导入了FeignClientsRegistrar类加载额外的Bean。FeignClientsRegistrar实现了ImportBeanDefinitionRegistrar接口&#xff0c;在Spring启动过程中会调用registerBeanDefinitions方法注册BeanDe…

自动化项目倍加福WCS-PG210E使用GSD文件

1&#xff0e;硬件电气连接 WCS-PG210E WCS3B WCS2B Pin 颜色 Pin 颜色 24V UB 1 BN棕色 2 WH白色 0V GND 3 BU蓝色 3 BU蓝色 RS485- RS485- 4 BK黑色 1 BN棕色 RS485 RS485 2 WH白色 4 BK黑色 保留 5 GY灰色 5 GY灰色 2. 安装W…

Nginx (4):nginx动静分离

什么是动静分离不解释了&#xff0c;网上说的很清楚&#xff0c;这里只说配置 目的 02虚拟机运行一个tomcat&#xff0c;处理动态请求&#xff0c;而对静态文件的访问则交给01虚拟机。操作 下面是01虚拟机的配置文件内容&#xff1a; server {listen 82;listen [::]:82;#root /…

六、nacos环境隔离、服务配置拉取和多环境配置共享

文章目录一、环境隔离-namespace1.namespace理解2.创建命名空间二、Nacos-实现配置管理三、nacos-实现服务配置拉取1.非热更新2.热更新&#xff1a;四、实现多环境配置共享1.开发环境&#xff1a;2.测试环境3.结论一、环境隔离-namespace 1.namespace理解 Nacos中服务存储和数…

Element Plus 组件库相关技术:7. 组件实现的基本流程及 Icon 组件的实现

前言 本章节我们将要实现 Icon 组件&#xff0c;Icon 组件应该是所有组件里面最简单的一个组件了&#xff0c;所以我们由简入深&#xff0c;循序渐进进行学习。Icon 组件虽然简单&#xff0c;但它却包含了一个组件的全部基础流程&#xff0c;通过实现 Icon 组件进行理解 Eleme…

疫情失业之下,测试的未来在哪里

前天和测试圈子里一个朋友聊了关于今年求职招聘市场行情和个人认知以及发展副业的话题。 聊起了今年的求职招聘行情&#xff0c;他说他们公司已经裁了一波人了&#xff0c;估计年底还会有一波裁员。 今年的市场冷的有点吓人&#xff0c;在这么下去&#xff0c;他也会担心自己…

nacos实现负载均衡、权重

文章目录一、nacos服务分级存储模型二、Nacos-NacosRule 实现负载均衡三、nacos-服务实例的权重设置一、nacos服务分级存储模型 修改 application.yml 配置文件&#xff1a; spring:cloud:nacos:server-addr: localhost:8848discovery:cluster-name: HZ #集群位置&#xff0c…

Linux C/C++ 学习笔记(九):百万并发的服务器实现

本文内容参考自(2条消息) Linux C/C 开发&#xff08;学习笔记十三)&#xff1a;百万并发的服务器实现_菊头蝙蝠的博客-CSDN博客_linux百万并发 一、connection_refuesed ---->文件系统最大的进程fd个数 nat 模式&#xff0c;物理机的VMnet8网卡&#xff0c;连接到了VMnet…

selenium--关闭窗口,指定窗口大小,前进,后退,刷新等等

关闭窗口跳转到指定页面窗口大小设置返回上个页面前进到下一个页面页面刷新关闭窗口 在selenium中执行完关闭窗口一般有两种方法&#xff1a; driver.close() driver.quit()这两个都是常用的方法&#xff0c;但是他们有什么区别呢&#xff1f; 对于driver.close(),他是关闭当…

【FME实战教程】003:FME读取地理空间数据(矢量、栅格、点云、三维模型、数据库、地理服务)大全

FME读取地理空间数据&#xff08;矢量、栅格、点云、三维模型、空间数据库、地理服务&#xff09;大全。 文章目录1. FME读取数据1.1 读取矢量1.1.1 读取Shapefile1.1.2 读取dwg1.2 读取栅格数据1.2.1 影像DOM1.3 读取地理数据库1.3.1 读取文件数据库&#xff08;.gdb&#xff…

机械原理复习试题

​ 编辑切换为居中 添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; ​ 编辑切换为居中 添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; ​ 编辑 添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; ​ 编辑…

聚类分析的基本概念和方法

聚类分析的基本概念和方法 文章目录聚类分析的基本概念和方法前言一、什么是聚类分析1、聚类分析基本流程与步骤2、 什么是好的聚类方法3、聚类的模型评估4、聚类分析的比较5、聚类分析的挑战二、基本聚类方法概述三、划分算法1、基本概念2、k-means 聚类方法1、k-means 方法的…