问题
这有两个相同代码的程序:
1 |
|
同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?
而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的
解答
在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。
为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 的位点开始消费,保证 exactly-once. 如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。
注意
,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。
源码分析
先看下社区的分析:
Internally, the Flink Kafka connectors don’t use the consumer group management functionality because they are using lower-level APIs (SimpleConsumer in 0.8, and KafkaConsumer#assign(…) in 0.9) on each parallel instance for more control on individual partition consumption. So, essentially, the “group.id” setting in the Flink Kafka connector is only used for committing offsets back to ZK / Kafka brokers.
flink的版本没有用到group id这个属性。。
初步结论
https://issues.apache.org/jira/browse/FLINK-11325
connecter消费数据的时候,使用 ./bin/kafka-consumer-groups.sh就是无法获取 CONSUMER-ID HOST CLIENT-ID等值。因为Flink实现connecter的时候,就没有使用到kafka的这个feature。此时我们需要通过Flink的metric可以看到消费情况。
进一步看源码吧:
KafkaConsumer实现
我们使用kafka-client的时候,一般使用KafkaConsumer构建我们的消费实例,使用poll来获取数据:
1 | KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props); |
我们看下org.apache.kafka.clients.consumer.KafkaConsumer核心部分
1 | private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { |
从这里我们大致了解了KafkaConsumer的构建过程,知道了client-id是由当参数传递给KafkaConsumer,或者是有client按“consumer-序列号”规则生成。
而consumer-id是有服务端生成,其过程:
KafkaConsumer实例构建后,会向服务端发起JOIN_GROUP操作kafkaApis1
ApiKeys.JOIN_GROUP;
handleJoinGroupRequest=> handleJoinGroup
1 | case Some(group) => |
第一次请求时服务端没有分配memberId(即consumerId),按isUnknownMember处理
1 | // doUnknownJoinGroup |
1 | def generateMemberIdSuffix = UUID.randomUUID().toString |
FlinkKafkaConsumer实现
Flink的通用kafka-connector部分源码:
FlinkKafkaConsumer
1 | private FlinkKafkaConsumer( |
connector拉取数据的逻辑见org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher
1 |
|
org.apache.kafka.clients.consumer.ConsumerRecords从指定partition获取数据
1 | /** |
在flink中,会通过1
final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
1 | partitionRecords = |
去拉去对应partition的数据。
那么consumer的partition如何重新分配的呢
在KafkaComsumerThreader. run的时候会分配
1 | if (newPartitions != null) { |
1 | /** |
一句话总结
connector自己实现了FlinkKafkaConsumer,且没有按照kafka的feature实现coordinator以及JOIN_GROUOP的逻辑。消费数据,是通过将partition重新分配给consumer,直接poll,不是走coordinator逻辑。
总结
配置相同的group.id消费相同的topic
不管有没有开启checkPoint
两个程序相互隔离,同一条数据,两个程序都可以消费到。
差别在于消费的位置
如果配置startFromLast,都会从最新的 数据开始消费
如果采用默认配置,第一次消费的时候从上面kafka上的offset开始消费,后面就开始各管各的。
Reference
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html
http://apache-flink.147419.n8.nabble.com/FlinkKafkaConsumer-td6818.html