源码基于1.12.4
初始化
通常添加一个 kafka sink 的代码如下:
1 |
|
初始化执行 env.addSink 的时候会创建 StreamSink 对象,即 StreamSink sinkOperator = new StreamSink<>(clean(sinkFunction));这里的 sinkFunction 就是传入的 FlinkKafkaProducer 对象,StreamSink 构造函数中将这个对象传给父类 AbstractUdfStreamOperator 的 userFunction 变量
Task运行
StreamSink 会调用下面的方法发送数据
1 |
|
也就是实际调用的是 FlinkKafkaProducer#invoke 方法。在 FlinkKafkaProducer 的构造函数中需要指 FlinkKafkaProducer.Semantic.
1 | public enum Semantic { |
Semantic.NONE
这种方式不会做任何额外的操作,完全依靠 kafka producer 自身的特性,也就是FlinkKafkaProducer#invoke 里面发送数据之后,Flink 不会再考虑 kafka 是否已经正确的收到数据。
transaction.producer.send(record, callback);
Semantic.AT_LEAST_ONCE
这种语义下,除了会走上面说到的发送数据的流程外,如果开启了 checkpoint 功能,在 FlinkKafkaProducer#snapshotState 中会首先执行父类的 snapshotState方法,里面最终会执行 FlinkKafkaProducer#preCommit。
1 |
|
AT_LEAST_ONCE 会执行了 flush 方法,里面执行了:
1 | /** |
这个函数主要做的事就是transaction.producer.flush();
就是将 send 的数据立即发送给 kafka 服务端,详细含义可以参考 KafkaProducer api:http://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
flush()
Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.
EXACTLY_ONCE 语义也会执行 send 和 flush 方法,但是同时会开启 kafka producer 的事务机制。FlinkKafkaProducer 中 beginTransaction 的源码如下,可以看到只有是 EXACTLY_ONCE 模式才会真正开始一个事务。
1 |
|
和 AT_LEAST_ONCE 另一个不同的地方在于 checkpoint 的时候,会将事务相关信息保存到变量 nextTransactionalIdHintState 中,这个变量存储的信息会作为 checkpoint 中的一部分进行持久化。
1 |
|
完整调用流程
- snapshotState(开始checkPoint)
- preCommit
- flush
- beginTransactionInternal
- beginTransaction
- notifyCheckpointComplete (完成checkPoint并且上传到TM后回调)
- commit
- commitTransaction
完整性差别
如果Source->map->sink的topology中,如果完成下一次checkpoint前,已经出现了5条数据。
- none模式
5条数据已经在kafkaClient的send队列中了,是否发送取决于 LINGER_MS 和 BATCH_SIZE 两个参数,如果这两个参数过大,程序重启可能会丢数据,丢的数据是上几个checkpoint种还没来得及flush的数据,这次还没checkpoint的数据并没有丢
- AtLeastOnce
5条数据已经在kafkaClient的send队列中了,并且每次checkPoint的时候,都会flush kakfaClient的send队列,保证每次新的checkpoint没有残留上一个checkPoint的数据。如果send这次的数据出现程序重启,那么就会重新发送数据,但是不会出现丢数据的情况。
- exactly once
5条数据都跟着事务走,checkpoint的时候先preCommit,如果checkpoint完成并且tm回掉了,那么就提交事务commit。每个事务都是new一个producter,保存对应的事务id到状态中。
Reference
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
https://developer.aliyun.com/article/752225