1. Kafka测试命令行操作
1. 主题命令行操作
在上一节中我们安装了Kafka单机环境和集群环境,这一节来测试下Linux环境安装Kafka后的命令行操作。
我们之前在用Windows环境安装Kafka Kafka应用场景|基础架构|Windows安装|命令行操作 和命令行操作时,讲到主题命令行参数如下:
1. 创建主题
[root@localhost kafka-01]# bin/kafka-topics.sh --bootstrape-server localhost:9092 --create --topic test1 --partitions 3 --replication-factor 3
注意:这里之所以无法识别 --bootstrape-server 参数是因为kafka的版本低于2.2,我安装的kafka版本为kafka_2.12-2.2.1.tgz,应该使用 --zookeeper localhost:2181参数:
[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 3
–zookeeper:指定了Kafka所连接的Zookeeper服务地址
–topic:指定了所要创建主题的名称
–partitions:指定了分区个数
–replication-factor:指定了副本因子
–create:创建主题的动作指令
2. 查看主题详情
[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
3. 查看所有主题
[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2181 --list
2. 消费者命令行操作
[root@localhost kafka-01]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
–bootstrap-server 指定了连接Kafka集群的地址
–topic 指定了消费端订阅的主题
3. 生产者命令行操作
[root@localhost kafka-01]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
–broker-list 指定了连接的Kafka集群的地址
–topic 指定了发送消息时的主题
生产者发送消息:
消费者接收消息:
2. Java程序调用Kafka
① 创建kafka项目并引入依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.hh</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.20</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
② kafka生产者发送消息:
public class CustomProducer01 {
public static void main(String[] args) {
// kafka生产者属性配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.22:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// kafka生产者发送消息,默认是异步发送方式
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", "hello,kafka");
try{
// 发送消息
kafkaProducer.send(producerRecord);
}catch (Exception e){
e.printStackTrace();
}
// 关闭资源
kafkaProducer.close();
}
}
③ 查看kafka消费者有没有消费消息:
④ kafka消费者消费消息:
查看kafka安装目录config/consumer.properties文件中的group.id:
public class CustomConsumer01 {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.22:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
ArrayList<String> topics = new ArrayList<>();
topics.add("test");
consumer.subscribe(topics);
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
⑤ 启动消费者程序后,再启动生产者程序发送消息,查看消费者控制台: