前置
使用方法
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/
总结
先上总结,kafka的分区发现基本还是基于另起线程,在另外的线程内,通过kafka的client的基础功能,获取集群所有的topic,与正则进行匹配(居然是框架的功能不是kafka的功能)过滤出符合的topic,通过kafka client获取所有的partition。
flink会保存已经发现过的partition,将新发现的partition和topic名字的hascode还有子任务的id进行一个计算,保证任务可以均衡的分布在每个子任务(等于partition是自我认领的方式)
源码解析
FlinkKafkaConsumerBase.java
1 |
|
如果这个discoverIntervalMillis设置为Long.MIN_VALUE就走kafkaFetcher.runFetchLoop();的方法。
如果设置了这个值,并且不是最小值,那么就会走runWithPartitionDiscovery方法。
discoverIntervalMillis是一个设置了的必须大于等于0或者等于Long.min_value。
如果启用了,最终会调用createAndStartDiscoveryLoop()方法,启动一个单独的线程,负责以discoveryIntervalMillis为周期发现新的topic/partition,并传递给KafkaFetcher。
核心代码在FlinkKafkaConsumerBase的createAndStartDiscoveryLoop
1 | private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) { |
关键代码在这边:
1 | try { |
先关注下AbstractPartitionDiscoverer.discoverPartitions,
主要做了下面这几件事
- 会根据传入的是单个固定的topic还是由正则表达式指定的多个topics获取所有的可能的partition。
- 排除旧的partition和不需要这个subtask需要订阅的partition
如果是固定topic调用getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
如果是基于正则,需要先获取这个集群上的所有的topic,进行正则的匹配(原来这个功能是在框架进行的,不是kafka的功能)。然后调用和固定topic一样的函数getAllPartitionsForTopics
1 | List<String> matchedTopics = getAllTopics(); |
KafkaPartitionDiscoverer.getAllPartitionsForTopics主要是用于发现kafka对应topic的partition。这个函数主要是使用kafka自带的client获取这个topic所有的partition
1 | final List<PartitionInfo> kafkaPartitions = kafkaConsumer.partitionsFor(topic); |
获取所有的topic也是一样调用kafka client自带的方法
1 | kafkaConsumer.listTopics() |
回归主线,看第二步,如何排除无关的partition。关键是AbstractPartitionDiscoverer.setAndCheckDiscoveredPartition.
AbstractPartitionDiscoverer这个类中有一个变量discoveredPartitions存储已经被发现过的partition.非该存储中的均为新发现的partition。
如何判断这个partition是否是这个subtask需要消费的。通过getRuntimeContext().getIndexOfThisSubtask()获取该任务的index,通过getRuntimeContext().getNumberOfParallelSubtasks()获取总任务的并行度。调用assign进行判断。
assign方法进行了比较多的考虑,产生的单个主题的分区分布具有以下特点
- 均匀地分布在子任务中
- 通过使用分区ID作为起始索引的偏移量
topicA 可能是从subtask 10 ,11 , 12 … 开始分布
topicB 可能是从subtask 2 ,3 ,4 … 开始分布
为了错开,进行讲topic名字的hashCode作为计算分部的一部分。
1 | /** |
Reference
https://cloud.tencent.com/developer/article/1677272
https://www.cnblogs.com/Springmoon-venn/p/12023350.html
https://juejin.cn/post/6844903972835344391