1.简介
KafkaConsumer是非线程安全的,它定义了一个acquire()
方法来检测当前是否只有一个线程在操作,如不是则会抛出ConcurrentModifcationException
异常。
acquire()
可以看做是一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()
方法和release()
方法成对出现,表示相应的加锁操作和解锁操作。
KafkaConsumer虽然是单线程的执行方式,但是在某些情况下如:生产者发送消息的速度远大于消费者消费的速度,这样长时间可能会造成消息的丢失,此时我们就需要消费者采用多线程消费的方式增加消费速度。
2.多线程实现的方式
2.1.线程封闭多线程
即为每个线程实例化一个KafkaConsumer,如图所示,一个线程对应一个KafkaConsumer实例,所有的消费线程都属于同一个消费者组。
这种方式的并发度受限于分区的实际个数。
实现代码示例:
public class kafkaConsumer1 {
public void ConsuermMultithread1() {
Properties props = initConsifg(); // 此处初始化消费者配置参数省略
int consumerThreadNum = 5;
for (int i = 0; i < consumerThreadNum; i++) {
new KafkaConsumerThread(props, topic).start();
}
}
// 消费线程
public static class KafkaConsumerThread extends Thread {
private KafkaConsumer<String, String> kafkaConsumer;
public KafkaConsumerThread(Properties prop, String topic) {
this.kafkaConsumer = new KafkaConsumer<>(prop);
this.kafkaConsumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record: records) {
// 处理消息
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}
}
}
2.1.消息处理模块多线程
此方法是对上面方法的进一步优化,在消息处理模块增加多线程来处理消息,进一步提升消息消费的速度。
public class kafkaConsumer1 {
public void ConsuermMultithread1() {
Properties props = initConsifg(); // 此处初始化消费者配置参数省略
int consumerThreadNum = 5;
for (int i = 0; i < consumerThreadNum; i++) {
new KafkaConsumerThread(props, topic).start();
}
}
public static class KafkaConsumerThread extends Thread {
private KafkaConsumer<String, String> kafkaConsumer;
private ExecutorService executorService;
private int threadNumber;
public KafkaConsumerThread(Properties prop, String topic, int threadNumber) {
this.kafkaConsumer = new KafkaConsumer<>(prop);
this.kafkaConsumer.subscribe(Collections.singletonList(topic));
this.threadNumber = threadNumber;
executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
executorService.submit(new RecordsHandler(records));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}
}
public static class RecordsHandler extends Thread {
public final ConsumerRecords<String, String> records;
public RecordsHandler(ConsumerRecords<String, String> records) {
this.records = records;
}
@Override
public void run() {
/// 处理records
}
}
}
此方法需要引入一个共享的offsets来参与提交。