文章目录
- Pre
- 思路
- 示例
- 配置文件
- 自定义 拦截器
- 使用
- 测试
- 小结

Pre
Apache Kafka - ConsumerInterceptor 实战 (1) 用代码的方式实现了ConsumerInterceptor , 接下来我们用 配置的方式来实现一下 。
思路
如何找配置类
KafkaProperties
有些属性是很明显的有的,其他没有的一般都在 Map里
那map的 key value 从哪里找呢?
找原生的配置 Kafka Consumer的 都在 ConsumerConfig
找到
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
OK,继续
示例
配置文件
自定义 拦截器
package net.zf.module.system.kafka.interceptor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author artisan
*/
@Slf4j
@Component
public class FailureRateInterceptor implements ConsumerInterceptor<Object, Object> {
/**
* 消息消费前的拦截处理
*
* @param consumerRecords
* @return
*/
@Override
public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> consumerRecords) {
// TODO
log.info("FailureRateInterceptor#onConsume");
// 根据设定的规则计算失败率,并进行判断是否跳过消息的消费
// 返回ConsumerRecords对象, 继续执行下游的消费逻辑或者直接返回空的ConsumerRecords对象
return consumerRecords;
}
/**
* 消息提交前进行拦截处理
*
* @param map
*/
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
log.info("FailureRateInterceptor#onCommit");
}
/**
* 拦截器关闭前进行拦截处理(如果有的话)
*/
@Override
public void close() {
log.info("FailureRateInterceptor#close");
}
/**
* 初始化配置(如果有的话)
*
* @param map
*/
@Override
public void configure(Map<String, ?> map) {
log.info("FailureRateInterceptor#configure");
}
}
使用
测试
启动服务,发送消息,进行消费
小结
在Spring Boot中配置Kafka消费者的拦截器需要进行以下步骤:
- 首先,创建一个拦截器类,实现Kafka的
ConsumerInterceptor
接口,定义拦截器的逻辑。 - 在应用的配置文件(例如
application.properties
或application.yml
)中,添加拦截器相关的配置项,其中包括设置interceptor.class
属性为拦截器类的全限定名。
下面是一个示例,演示如何在Spring Boot中配置Kafka消费者的拦截器:
- 创建拦截器类:
@Slf4j
@Component
public class MyConsumerInterceptor implements ConsumerInterceptor<Object, Object> {
@Override
public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
// 在消息消费前的处理逻辑
// ...
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
// 在消息提交前的处理逻辑
// ...
}
@Override
public void close() {
// 拦截器关闭前的处理逻辑
// ...
}
@Override
public void configure(Map<String, ?> configs) {
// 初始化配置的处理逻辑
// ...
}
}
- 在应用的配置文件中设置拦截器相关的配置项:
spring.kafka.consumer.properties.interceptor.classes=com.example.MyConsumerInterceptor
或者在application.yml
文件中:
spring:
kafka:
consumer:
properties:
interceptor.classes: com.example.MyConsumerInterceptor
这样配置之后,Spring Boot会自动创建Kafka消费者,并将指定的拦截器应用于消费者。在消费者处理消息的过程中,拦截器的方法将会被调用,可以在这些方法中编写自定义的逻辑来处理消息或拦截操作。