Java连接Kafka示例
1、引入依赖dependency groupIdorg.apache.kafka/groupId artifactIdkafka_2.12/artifactId version2.1.0/version scopeprovided/scope /dependency !-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -- dependency groupIdorg.apache.kafka/groupId artifactIdkafka-clients/artifactId version2.1.1/version /dependency dependency groupIdorg.apache.kafka/groupId artifactIdkafka-streams/artifactId version1.0.0/version /dependency dependency groupIdorg.apache.commons/groupId artifactIdcommons-lang3/artifactId version3.12.0/version /dependency2、生产者import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class KafkaProducerTest implements Runnable { private final KafkaProducerString, String producer; private final String topic; private String clientid; public KafkaProducerTest(String topicName,String clientid) { Properties props new Properties(); props.put(bootstrap.servers, 10.1.11.212:32765,10.1.11.212:32766,10.1.11.212:32767); props.put(acks, all); props.put(retries, 0); props.put(batch.size, 16384); props.put(key.serializer, StringSerializer.class.getName()); props.put(value.serializer, StringSerializer.class.getName()); // 你要的 SASL 认证配置 props.put(security.protocol, SASL_PLAINTEXT); props.put(sasl.mechanism, PLAIN); props.put(sasl.jaas.config, org.apache.kafka.common.security.plain.PlainLoginModule required username\admin\ password\HcCloud01\;); this.producer new KafkaProducerString, String(props); this.topic topicName; this.clientid clientid; } Override public void run() { int messageNo 1; try { for(;;) { String messageStr 你好这是第messageNo条数据 clientid clientid; producer.send(new ProducerRecordString, String(topic, Message, messageStr)); //生产了100条就打印 if(messageNo%1000){ System.out.println(发送的信息: messageStr); } //生产1000条就退出 if(messageNo 1000){ System.out.println(成功发送了messageNo条); break; } messageNo; } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } public static void main(String args[]) { KafkaProducerTest test1 new KafkaProducerTest(logstash-08-04, clientid1); Thread thread1 new Thread(test1); thread1.start(); } }2、消费者package com.example.demo; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class KafkaConsumerTest implements Runnable { private final KafkaConsumerString, String consumer; private ConsumerRecordsString, String msgList; private final String topic; private String clientid; private static final String GROUPID groupA; public KafkaConsumerTest(String topicName,String clientid) { Properties props new Properties(); props.put(bootstrap.servers, 10.1.11.212:32765,10.1.11.212:32766,10.1.11.212:32767); props.put(group.id, GROUPID); props.put(enable.auto.commit, true); props.put(auto.commit.interval.ms, 1000); props.put(session.timeout.ms, 30000); props.put(auto.offset.reset, earliest); props.put(key.deserializer, StringDeserializer.class.getName()); props.put(value.deserializer, StringDeserializer.class.getName()); // 你要的 SASL 认证配置 props.put(security.protocol, SASL_PLAINTEXT); props.put(sasl.mechanism, PLAIN); props.put(sasl.jaas.config, org.apache.kafka.common.security.plain.PlainLoginModule required username\admin\ password\HcCloud01\;); this.consumer new KafkaConsumerString, String(props); this.topic topicName; this.consumer.subscribe(Arrays.asList(topic)); this.clientid clientid; } Override public void run() { int messageNo 1; System.out.println(---------开始消费---------); try { for (;;) { msgList consumer.poll(1000); if(null!msgListmsgList.count()0){ for (ConsumerRecordString, String record : msgList) { //消费100条就打印 ,但打印的数据不一定是这个规律的 if(messageNo%1000){ System.out.println(messageNo成功消费receive: key record.key() , value record.value() offsetrecord.offset()); } //当消费了1000条就退出 if(messageNo 1000){ break; } messageNo; } }else{ Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.close(); } } public static void main(String args[]) { KafkaConsumerTest test1 new KafkaConsumerTest(logstash-08-04, clientid1); Thread thread1 new Thread(test1); thread1.start(); } }4、logback.xml可选logger nameorg.apache.kafka.clients.consumer.ConsumerConfig levelon /
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2505446.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!