
下面尝试一下Kafka 的生产者客户端和消费者客户端的实现。
1、客户端简介
生产者就是负责向Kafka发送消息的应用程序,消费者就是拉取Kafka消息的应用程序。
 在Kafka的历史版本中,主要的客户端如下:
- 基于Scala语言编写的客户端,称为旧客户端,已废弃;
- 基于Java语言编写的客户端(从Kafka0.9.x开始),称为新客户端,它弥补了就客户端中存在的诸多设计缺陷;
 pom依赖
		<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>2.6.1</version>
        </dependency>
2、Java生产者客户端
public class KafkaProducerTest {
    public static final String bootStrap = "localhost:9092";
    public static final String topic = "topic_1";
    public static void main(String[] args) {
        // 1、配置客户端参数
        Properties properties = new Properties();
        // 指定生产者客户端连接Kafka集群所需的broker地址列表,具体的内容格式为host1:port1,host2:port2
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrap);
        // key序列化,转换成字节数组以满足broker端接收的消息形式
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // value序列化,转换成字节数组以满足broker端接收的消息形式
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 配置重试次数,10次之后抛异常,可以在回调中处理
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);
        // 配置客户端id
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.1");
        // 2、构造KafkaProducer客户端实例
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        // 3、构建待发送消息
        ProducerRecord<String, String> record = new ProducerRecord(topic, "hello world");
        // 4、发送消息,分为3种模式,发后即忘、同步(sync)、异步(async)
        // 发后即忘,只管往Kafka中发送消息而并不关心消息是否正确到达
//        kafkaProducer.send(record);
        // 同步,堵塞等待Kafka的响应,直到消息发送成功
        try {
            kafkaProducer.send(record).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
//        // 异步
//        kafkaProducer.send(record, new CallBackTest(record.topic(), record.key(), record.value()));
    }
    public static class CallBackTest implements Callback {
        private String topic;
        private String key;
        private String value;
        public CallBackTest(String topic, String key, String value) {
            this.topic = topic;
            this.key = key;
            this.value = value;
        }
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                exception.printStackTrace();
            } else {
                System.out.println("send message to topic success, topic:" + metadata.topic() + ",partition:" + metadata.partition() + ",offset:" + metadata.offset());
            }
        }
    }
}
生产者客户端实现需要以下几步:
- 配置生产者客户端参数;
- 构造KafkaProducer客户端实例;
- 构建待发送消息;
- 发送消息,分为3种方式:发后即忘,同步,异步;
3、Java消费者客户端
public class KafkaConsumerTest {
    public static final String bootStrap = "localhost:9092";
    public static final String topic = "topic_1";
    public static final String groupId = "group.123";
    public static void main(String[] args) {
        // 1、配置客户端参数
        Properties properties = new Properties();
        // 指定消费者客户端连接Kafka集群所需的broker地址列表,具体的内容格式为host1:port1,host2:port2
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrap);
        // key序列化,转换成字节数组以满足broker端接收的消息形式
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // value序列化,转换成字节数组以满足broker端接收的消息形式
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        // 配置客户端id
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.1");
        // 2、构造KafkaConsumer客户端实例
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        // 3、订阅主题
        kafkaConsumer.subscribe(Pattern.compile("topic_1"));
        // 4、消费消息
        try {
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> item : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", item.offset(), item.key(), item.value());
                }
            }
        } finally {
            kafkaConsumer.close();
        }
    }
}
消费者客户端实现需要以下几步:
- 配置消费者客户端参数;
- 构造KafkaConsumer客户端实例;
- 订阅相应主题;
- 拉取消息然后消费,最好提交消费位移数据;
4、测试
先启动消费者,再启动生产者,结果如下
 
 若有错误之后,欢迎留言指正,感谢~



















