1.简介
Kafka的poll()方法消费无法精准的掌握其消费的起始位置,auto.offset.reset
参数也只能在比较粗粒度的指定消费方式。更细粒度的消费方式kafka提供了seek()
方法可以指定位移消费允许消费者从特定位置(如固定偏移量、时间戳或分区首尾)开始消费消息。
2.指定消费位置
2.1.从特定偏移量开始消费
使用seek(TopicPartition partition, long offset)
指定具体偏移量。
源码分析:
seek()
方法更新消费者内部的subscriptions
对象的position
字段,记录目标偏移量。- 后续
poll()
时,Fetcher
类根据此位置向Broker发送拉取请求。
代码示例:
consumer.subscribe(Collections.singleton("test-topic"));
Set<TopicPartition> assignment = new HashSet<>();
// 确保分配到分区
while (assignment.isEmpty()) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();
}
// 设置所有分区从offset=100开始消费
assignment.forEach(tp -> consumer.seek(tp, 100));
2.2.从时间戳开始消费
使用offsetsForTimes()
获取时间戳对应的偏移量,再调用seek()
。
源码分析:
offsetsForTimes()
向Broker发送ListOffsetRequest
,查询满足时间戳条件的最早或最新偏移量。
代码实例:
Map<TopicPartition, Long> timestamps = assignment.stream()
.collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 24 * 3600 * 1000L));
// 获取24小时前的偏移量
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
offsets.forEach((tp, offsetAndTs) -> {
if (offsetAndTs != null) consumer.seek(tp, offsetAndTs.offset());
});
2.3.从分区首尾消费
使用seekToBeginning()
或seekToEnd()
,或通过beginningOffsets()
/endOffsets()
获取首尾偏移量后手动设置。
代码实例:
// 从分区末尾开始消费(等效于auto.offset.reset=latest)
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
assignment.forEach(tp -> consumer.seek(tp, endOffsets.get(tp)));
2.4.注意事项
-
分区分配与poll()的依赖
seek()
必须在分区分配完成后调用,否则会抛出IllegalStateException
。需通过循环poll()
确保分配到分区。 -
数据过期问题
若指定偏移量对应的消息已被删除(如日志清理导致),seek()
将失效。此时需使用beginningOffsets()
获取当前最小有效偏移量。 -
异步提交与位移覆盖风险
异步提交(commitAsync()
)失败时不会重试,可能因位移回滚导致重复消费。需结合同步提交(commitSync()
)保证原子性。 -
seek()
方法提供了我们可以将消费者位移保存在外部的能力,还可以配合在均衡监听器来提供更加精准的消费能力。
3.完整代码实例
public class SeekToTimestampDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "seek-demo");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton("test-topic"));
// 等待分区分配
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.isEmpty()) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();
}
// 获取24小时前的时间戳对应偏移量
Map<TopicPartition, Long> timestamps = assignment.stream()
.collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 86400000L));
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
// 指定位移
offsets.forEach((tp, offsetAndTs) -> {
if (offsetAndTs != null) {
consumer.seek(tp, offsetAndTs.offset());
} else {
// 处理无有效偏移量的情况(如从头开始)
consumer.seekToBeginning(Collections.singleton(tp));
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> System.out.printf("offset=%d, value=%s%n", record.offset(), record.value()));
}
}
}