0%

Kafka 重置偏移量到某个时间点


背景

有需求排查某个时间的数据是否重复,因此需要将消费者组重置到某个时间点

操作

kafka-consumer-groups.sh 使用 –to-datetime 参数可以做到,需要注意这个–to-datetime是utc时间,需要减去8个小时

1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group --reset-offsets --all-topics --to-datetime 2020-12-31T02:00:00.000 --execute

当然可以根据时区设置时间,以东8时区进行设置,把对应时间改为 2020-12-31T10:00:00.000+08:00

重置前有个前提是consumer group状态必须是inactive的,即不能是处于正在工作中的状态。

原理

Message body

Kafka从0.10.0.0版本起,在消息内新增加了个timestamp字段

时间戳的类型有两种:可以设定为producer创建消息的时间(CreateTime),也可以设定为该消息写入Broker的时间(LogAppendTime)。默认为CreateTime,可通过参数message.timestamp.type 实现Topic级别的类型更改,Broker级别的时间戳类型参数为log.message.timestamp.type

有关Kafka Message新增时间戳的相关细节,可详见Kafka官方Doc KIP-32 - Add timestamps to Kafka message

Log Segment

从Kafka 0.10开始,对于日志文件,新增一个.timeindex文件,即每个Segment分别由.log、.index.timeindex这三个文件组成。

有关Log Segment 新增.timeindex相关细节,可详见Kafka官方Doc KIP-33 - Add a time based log index

Kafka API提供了一个 offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,该方法会返回时间戳大于等于待查询时间的第一条消息对应的偏移量。

扩展

Kafka 还支持其他位移重设策略,感兴趣的可以自行阅读Kafka consumer group位移重设


参考链接