Window Join and CoGroup
Window Join 操作,顾名思义,是基于时间窗口对两个流进行关联操作。相比于 Join 操作, CoGroup 提供了一个更为通用的方式来处理两个流在相同的窗口内匹配的元素。 Join 复用了 CoGroup 的实现逻辑。它们的使用方式如下:
1 | stream.join(otherStream) |
从 JoinFunction 和 CogroupFunction 接口的定义中可以大致看出它们的区别:
1 |
|
可以看出来,JoinFunction 主要关注的是两个流中按照 key 匹配的每一对元素,而 CoGroupFunction 的参数则是两个中 key 相同的所有元素。JoinFunction 的逻辑更类似于 INNER JOIN,而 CoGroupFunction 除了可以实现 INNER JOIN,也可以实现 OUTER JOIN。
Window Join 的是被转换成 CoGroup 进行处理的:
1 | public class JoinedStreams<T1, T2> { |
那么 CoGroup 又是怎么实现两个流的操作的呢?
union + map 组合完成。
Flink 其实是通过一个变换(mapfunction),将两个流转换成一个流进行处理,转换之后数据流中的每一条消息都有一个标记来记录这个消息是属于左边的流还是右边的流,这样窗口的操作就和单个流的实现一样了。等到窗口被触发的时候,再按照标记将窗口内的元素分为左边的一组和右边的一组,然后交给 CoGroupFunction 进行处理。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73public class CoGroupedStreams<T1, T2> {
public static class WithWindow<T1, T2, KEY, W extends Window> {
public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
//clean the closure
function = input1.getExecutionEnvironment().clean(function);
UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);
DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
.map(new Input1Tagger<T1, T2>())
.setParallelism(input1.getParallelism())
.returns(unionType); //左边流
DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
.map(new Input2Tagger<T1, T2>())
.setParallelism(input2.getParallelism())
.returns(unionType); //右边流
//合并成一个数据流
DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
// we explicitly create the keyed stream to manually pass the key type information in
windowedStream =
new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
.window(windowAssigner);
if (trigger != null) {
windowedStream.trigger(trigger);
}
if (evictor != null) {
windowedStream.evictor(evictor);
}
if (allowedLateness != null) {
windowedStream.allowedLateness(allowedLateness);
}
return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
}
}
//将 CoGroupFunction 封装为 WindowFunction
private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
extends WrappingFunction<CoGroupFunction<T1, T2, T>>
implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
private static final long serialVersionUID = 1L;
public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
super(userFunction);
}
public void apply(KEY key,
W window,
Iterable<TaggedUnion<T1, T2>> values,
Collector<T> out) throws Exception {
List<T1> oneValues = new ArrayList<>();
List<T2> twoValues = new ArrayList<>();
//窗口内的所有元素按标记重新分为左边的一组和右边的一组
for (TaggedUnion<T1, T2> val: values) {
if (val.isOne()) {
oneValues.add(val.getOne());
} else {
twoValues.add(val.getTwo());
}
}
//调用 CoGroupFunction
wrappedFunction.coGroup(oneValues, twoValues, out);
}
}
}
从上面的源码来看,Iterable
join和cogroup没有什么太大差别,还是需要将数据全部转为list,只是交给function是单个还是多个而已。
Interval Join
Window Join 的一个局限是关联的两个数据流必须在同样的时间窗口中。但有些时候,我们希望在一个数据流中的消息到达时,在另一个数据流的一段时间内去查找匹配的元素。更确切地说,如果数据流 b 中消息到达时,我们希望在数据流 a 中匹配的元素的时间范围为 a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound;同样,对数据流 a 中的消息也是如此。在这种情况,就可以使用 Interval Join。具体的用法如下:
1 | stream |
Interval Join 是基于 ConnectedStreams 实现的:
1 | public class KeyedStream<T, KEY> extends DataStream<T> { |
将两条keyStream通过connect转为ConnectedStreams 。
在 IntervalJoinOperator 中,使用两个 MapState 分别保存两个数据流到达的消息,MapState 的 key 是消息的时间。当一个数据流有新消息到达时,就会去另一个数据流的状态中查找时间落在匹配范围内的消息,然后进行关联处理。每一条消息会注册一个定时器,在时间越过该消息的有效范围后从状态中清除该消息。
1 | public class IntervalJoinOperator<K, T1, T2, OUT> |
Reference
https://blog.jrwang.me/2019/flink-source-code-two-stream-join/