flink 消费 kafka 数据,提交消费组 offset 有三种类型
- 开启 checkpoint :在 checkpoint 完成后提交
- 开启 checkpoint,禁用 checkpoint 提交: 不提交消费组 offset
- 不开启 checkpoint:依赖kafka client 的自动提交
一个简单的 flink 程序: 读取kafka topic 数据,写到另一个 topic
| 1 | val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment | 
1 启动 checkpoint
开启checkpoint 默认值就是 消费组 offset 的提交方式是: ON_CHECKPOINTS
offsetCommitMode 提交方法在 FlinkKafkaConsumerBase open 的时候会设置:
FlinkKafkaConsumer 提交消费者的 offset 的行为在 FlinkKafkaConsumerBase open 的时候会设置:
| 1 | @Override | 
OffsetCommitModes.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public static OffsetCommitMode fromConfiguration(
            boolean enableAutoCommit,
            boolean enableCommitOnCheckpoint,
            boolean enableCheckpointing) {
        if (enableCheckpointing) {
            // if checkpointing is enabled, the mode depends only on whether committing on
            // checkpoints is enabled
            return (enableCommitOnCheckpoint)
                    ? OffsetCommitMode.ON_CHECKPOINTS
                    : OffsetCommitMode.DISABLED;
        } else {
            // else, the mode depends only on whether auto committing is enabled in the provided
            // Kafka properties
            return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
        }
    }
可以看出,如果开启了checkpoint但是没开启enableCommitOnCheckpoint就会选择OffsetCommitMode.DISABLED(不提交offset)
当 flink 触发一次 checkpoint 的时候,会依次调用所有算子的 notifyCheckpointComplete 方法,kafka source 会调用到 FlinkKafkaConsumerBase.notifyCheckpointComplete
注:FlinkKafkaConsumerBase 是 FlinkKafkaConsumer 的父类
| 1 | 
 | 
最后调用了 AbstractFetcher.commitInternalOffsetsToKafka
| 1 | public final void commitInternalOffsetsToKafka( | 
AbstractFetcher.doCommitInternalOffsetsToKafka 的实现 KafkaFetcher.doCommitInternalOffsetsToKafka
使用 Map<KafkaTopicPartition, Long> offsets 构造提交 kafka offset 的 Map<TopicPartition, OffsetAndMetadata> offsetsToCommit
注:offset + 1 表示下一次消费的位置
| 1 | 
 | 
然后调用 KafkaConsumerThread.setOffsetsToCommit: 将待提交的 offset 放到 kafka 的消费线程对于的属性 nextOffsetsToCommit 中,等待下一个消费循环提交
| 1 | void setOffsetsToCommit( | 
然后就到了kafka 消费的线程,KafkaConsumerThread.run 方法中: 这里是消费 kafka 数据的地方,也提交对应消费组的offset
| 1 | 
 | 
到这里就能看到 flink 的offset 提交到了 kafka 中
kafka如果不提交offset,还能正常消费吗?
kafka client的offset是保存在自身的内存中的,启动的时候发现为空,就会去broker中获取,有多种获取策略。 然后后续的消费,就跟自身内存打交道了,只是无脑提交offset到kafka。后续重启才会再次和broker打交道拿offset。