@KafkaListener
是 Spring Kafka 提供的一个核心注解,用于标记一个方法作为 Kafka 消息的消费者。下面是对该注解的详细解析:
基本用法
@KafkaListener(topics = "myTopic", groupId = "myGroup")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
主要属性
1. 必需属性
- topics / topicPattern:指定监听的 topic
topics
:逗号分隔的 topic 列表- `topicPattern**:使用正则表达式匹配 topic
@KafkaListener(topics = "topic1,topic2")
// 或
@KafkaListener(topicPattern = "test.*")
2. 消费者配置
- groupId:指定消费者组 ID
- containerFactory:指定使用的 KafkaListenerContainerFactory
@KafkaListener(topics = "myTopic", groupId = "myGroup",
containerFactory = "myFactory")
3. 消息处理
- id:为监听器指定唯一 ID
- concurrency:设置并发消费者数量
@KafkaListener(id = "myListener", topics = "myTopic", concurrency = "3")
4. 高级配置
- containerGroup:指定容器组(Spring Kafka 2.5+)
- errorHandler:指定错误处理器
- idIsGroup:是否使用监听器 ID 作为组 ID(默认 false)
消息处理方法签名
监听器方法可以接受多种形式的参数:
-
简单消息处理:
@KafkaListener(topics = "myTopic") public void listen(String message) { ... }
-
带元数据的消息处理:
@KafkaListener(topics = "myTopic") public void listen(ConsumerRecord<?, ?> record) { ... }
-
批量消息处理:
@KafkaListener(topics = "myTopic") public void listen(List<String> messages) { ... }
-
带确认的消息处理:
@KafkaListener(topics = "myTopic") public void listen(String message, Acknowledgment ack) { // 处理消息后手动确认 ack.acknowledge(); }
配置选项
可以通过 @KafkaListener
的 containerFactory
属性引用自定义配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> myFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@KafkaListener(topics = "myTopic", containerFactory = "myFactory")
public void listen(String message) { ... }
错误处理
可以通过以下方式处理错误:
-
配置错误处理器:
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setErrorHandler(new SeekToCurrentErrorHandler()); return factory; }
-
使用 @SendTo 发送到死信队列:
@KafkaListener(topics = "myTopic", groupId = "myGroup") @SendTo("myDltTopic") public String listen(String message) { // 处理失败时返回错误消息 return "error"; }
注意事项
- 监听器方法应该是 public 的
- 避免在监听器方法中执行长时间运行的操作
- 考虑消息处理的幂等性
- 对于批量处理,确保方法参数是 List 类型
- 在 Spring Boot 中,许多配置可以通过 application.properties/yml 设置
完整示例
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
}
@Service
public class KafkaMessageListener {
@KafkaListener(topics = "myTopic", groupId = "myGroup",
containerFactory = "kafkaListenerContainerFactory")
public void listen(String message, Acknowledgment ack) {
try {
System.out.println("Received Message: " + message);
// 业务处理逻辑
ack.acknowledge();
} catch (Exception e) {
// 错误处理
}
}
}
@KafkaListener
注解提供了灵活的方式来消费 Kafka 消息,开发者可以根据具体需求进行配置和扩展。
ConcurrentKafkaListenerContainerFactory详解
在Spring Kafka中,ConcurrentKafkaListenerContainerFactory
是一个核心配置类,用于创建并发消息监听容器,支持多线程消费Kafka消息,以下是其详细介绍:
1、核心作用
- 并发消费支持:通过创建多个
KafkaMessageListenerContainer
实例(每个对应一个线程),实现多线程并发消费消息。例如设置concurrency=3
会创建3个消费者线程,每个线程处理分配到的分区。 - 线程安全保障:生成的
ConcurrentMessageListenerContainer
内部委托给多个单线程的KafkaMessageListenerContainer
实例,保证线程安全性(Kafka Consumer本身非线程安全)。
2、关键特性
-
并发度配置:
- 通过
setConcurrency()
方法设置并发消费者数量,可提高消息处理速度和吞吐量。 - 配置规则为
concurrency<=分区数/应用实例数
,设置过多会导致线程闲置。
- 通过
-
批量处理支持:
- 通过
setBatchListener(true)
启用批量消费 - 配合
MAX_POLL_RECORDS_CONFIG
参数控制单次poll最大返回记录数
- 通过
-
错误处理机制:
- 可配置自定义错误处理器(如
SeekToCurrentErrorHandler
) - 支持重试策略集成
- 可配置自定义错误处理器(如
-
分区分配控制:
- 可自定义分区分配逻辑
- 配合
group.id
实现消费者组协调
3、配置示例
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // 设置并发消费者数量
factory.setBatchListener(true); // 启用批量消费
factory.getContainerProperties().setPollTimeout(3000); // 设置轮询超时
factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 设置错误处理器
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); // 批量消费配置
return new DefaultKafkaConsumerFactory<>(props);
}
}
4、使用场景
- 高吞吐量需求:通过增加并发消费者数量提升处理能力
- 批量数据处理:需要批量处理消息的场景
- 复杂错误处理:需要自定义错误处理逻辑的场景
- 多主题监听:需要同时监听多个主题的场景
5、注意事项
- 顺序性问题:并发消费可能导致消息顺序混乱,需业务保证
- 重复处理问题:需实现幂等性处理机制
- 数据库访问:需注意并发访问控制
- 资源限制:并发度设置需考虑系统资源限制