flink 消费 kafka 数据,提交消费组 offset 有三种类型
- 开启 checkpoint :在 checkpoint 完成后提交
- 开启 checkpoint,禁用 checkpoint 提交: 不提交消费组 offset
- 不开启 checkpoint:依赖kafka client 的自动提交
一个简单的 flink 程序: 读取kafka topic 数据,写到另一个 topic
1 | val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment |
1 启动 checkpoint
开启checkpoint 默认值就是 消费组 offset 的提交方式是: ON_CHECKPOINTS
offsetCommitMode 提交方法在 FlinkKafkaConsumerBase open 的时候会设置:
FlinkKafkaConsumer 提交消费者的 offset 的行为在 FlinkKafkaConsumerBase open 的时候会设置:
1 | @Override |
OffsetCommitModes.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) {
if (enableCheckpointing) {
// if checkpointing is enabled, the mode depends only on whether committing on
// checkpoints is enabled
return (enableCommitOnCheckpoint)
? OffsetCommitMode.ON_CHECKPOINTS
: OffsetCommitMode.DISABLED;
} else {
// else, the mode depends only on whether auto committing is enabled in the provided
// Kafka properties
return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
}
}
可以看出,如果开启了checkpoint但是没开启enableCommitOnCheckpoint就会选择OffsetCommitMode.DISABLED(不提交offset)
当 flink 触发一次 checkpoint 的时候,会依次调用所有算子的 notifyCheckpointComplete 方法,kafka source 会调用到 FlinkKafkaConsumerBase.notifyCheckpointComplete
注:FlinkKafkaConsumerBase 是 FlinkKafkaConsumer 的父类
1 |
|
最后调用了 AbstractFetcher.commitInternalOffsetsToKafka
1 | public final void commitInternalOffsetsToKafka( |
AbstractFetcher.doCommitInternalOffsetsToKafka 的实现 KafkaFetcher.doCommitInternalOffsetsToKafka
使用 Map<KafkaTopicPartition, Long> offsets 构造提交 kafka offset 的 Map<TopicPartition, OffsetAndMetadata> offsetsToCommit
注:offset + 1 表示下一次消费的位置
1 |
|
然后调用 KafkaConsumerThread.setOffsetsToCommit: 将待提交的 offset 放到 kafka 的消费线程对于的属性 nextOffsetsToCommit 中,等待下一个消费循环提交
1 | void setOffsetsToCommit( |
然后就到了kafka 消费的线程,KafkaConsumerThread.run 方法中: 这里是消费 kafka 数据的地方,也提交对应消费组的offset
1 |
|
到这里就能看到 flink 的offset 提交到了 kafka 中
kafka如果不提交offset,还能正常消费吗?
kafka client的offset是保存在自身的内存中的,启动的时候发现为空,就会去broker中获取,有多种获取策略。 然后后续的消费,就跟自身内存打交道了,只是无脑提交offset到kafka。后续重启才会再次和broker打交道拿offset。