FlatMap为例
1 | public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) { |
1 | /** |
上述逻辑中,除了构建出了SingleOutputStreamOperator这个实例为并返回外,还有一句代码:1
getExecutionEnvironment().addOperator(resultTransform);
就是将上述构建的OneInputTransFormation的实例,添加到了StreamExecutionEnvironment的属性transformations这个类型为List.
keyBy转换
1 | /** |
KeyedStream的构造函数,先基于keySelector构造了一个KeyGroupStreamPartitioner的实例,再进一步构造了一个PartitionTransformation实例。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18/**
* Creates a new {@link KeyedStream} using the given {@link KeySelector}
* to partition operator state by key.
*
* @param dataStream
* Base stream of data
* @param keySelector
* Function for determining state partitions
*/
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
this(
dataStream,
new PartitionTransformation<>(
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
keySelector,
keyType);
}
KeyGroupStreamPartitioner
1 |
|
在这个partitioner中会选择record下游的channel id。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26/**
* 计算下游选择选择的channel id
* Assigns the given key to a parallel operator index.
*
* @param key the key to assign
* @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
* @param parallelism the current parallelism of the operator
* @return the index of the parallel operator to which the given key should be routed.
*/
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
//通过key 和 最大并行度,给出一个id,通过key的hashcode与最大并行度取模,获取一个分组id
public static int assignToKeyGroup(Object key, int maxParallelism) {
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
//通过分组id获取下游的channel id
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
return keyGroupId * parallelism / maxParallelism;
}
- 先通过key的hashCode,算出maxParallelism的余数,也就是可以得到一个[0, maxParallelism)的整数,为分组id;
- 在通过公式 keyGroupId * parallelism / maxParallelism ,计算出一个[0, parallelism)区间的整数,为下游的channel id,从而实现分区功能。
keyby与flatmap的区别
- flatMap中,根据传入的flatMapper这个Function构建的是StreamOperator这个接口的子类的实例,而keyBy中,则是根据keySelector构建了ChannelSelector接口的子类实例;
- keyBy中构建的StreamTransformation实例,并没有添加到StreamExecutionEnvironment的属性transformations这个列表中。
timeWindow转换
- KeyedStream中存在这么一个调用window的方法
1
2
3
4
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
return new WindowedStream<>(this, assigner);
}
WindowAssigner
抽象类WindowAssigner中最主要的方法,针对数据进行窗口的分配1
2
3
4
5
6
7
8/**
* Returns a {@code Collection} of windows that should be assigned to the element.
*
* @param element The element to which windows should be assigned.
* @param timestamp The timestamp of the element.
* @param context The {@link WindowAssignerContext} in which the assigner operates.
*/
public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
- SlidingProcessingTimeWindows对这个函数的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
//获取当前的处理时间
timestamp = context.getCurrentProcessingTime();
//初始化窗口
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
//获取这个元素对应对应窗口的window_start(窗口开始时间)
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
//从开始时间开始,根据slide创建窗口
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
}
1 |
|
1 | a、timestamp = 1520406257000 // 2018-03-07 15:04:17 |