使用
详情见https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/broadcast_state.html
1 |
|
如果colorPartitionedStream是一个KeyedStream,那么process使用的是KeyedBroadcastProcessFunction,如果是一个普通的DataStream,那么用的是BroadcastProcessFunction。
1 |
|
1 |
|
官网提供的demo
1 | new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() { |
需要注意的是,目前广播流的状态都保存在内存中,RocksDB 状态后端目前还不支持广播状态。
分析源码
MapStateDescriptor
首先要说明一些概念:
- Flink中包含两种基础的状态:Keyed State和Operator State。
- Keyed State和Operator State又可以 以两种形式存在:原始状态和托管状态。
- 托管状态是由Flink框架管理的状态,如ValueState, ListState, MapState等。
- raw state即原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。
- MapState是托管状态的一种:即状态值为一个map。用户通过
put
或putAll
方法添加元素。
根据dataStream的不同决定了选用不同的state。
DataStream.broadcast
1 |
|
在初始构造的时候,做了如下的事情
- 将partitioning设置为BroadcastPartitioner,这个分区策略是将数据发送到所有的下游,如果有n个channel,那就是每个channel都发一份数据。
- 用dataStream和状态描述创建一个BroadcastStream
BroadcastStream没什么特别的,属性如下
1 | public class BroadcastStream<T> { |
在DataStream.connect(BroadcastStream)的时候构建出关键类BroadcastConnectedStream
1 |
|
在构造函数上,将dataStream和broadcastStream复制给input1和input2,作为TwoInput。
1 | protected BroadcastConnectedStream( |
用户代码DataStream.connect(BroadcastStream).process()的时候调用process方法将真正的计算操作交给了CoBroadcastWithKeyedOperator
1 |
|
CoBroadcastWithKeyedOperator
1 |
|
本质上,其实还是双流join,只是没有数据late或者其他watermark之类的判断,广播流的数据存储在用户定义的state中,另一条从state中获取,形成广播流的join效果。
状态的创建主要是在CoBroadcastWithKeyedOperator.open方法
1 | for (MapStateDescriptor<?, ?> descriptor: broadcastStateDescriptors) { |
OperatorStateBackEnd
OperatorStateBackEnd 主要管理OperatorState. 目前只有一种实现: DefaultOperatorStateBackend。
BroadcastState的实现也只有一个HeapBroadcastState,所以广播流的状态都保存在内存中。
1 |
|
1 | broadcastState = new HeapBroadcastState<>( |
可以看出这部分是放在内存中的
Reference
https://www.cnblogs.com/rossiXYZ/p/12594315.html
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/broadcast_state.html