背景
参考了腾讯Oceanus和阿里Blink中minibatch的思路,针对热流进行预聚合的方式解决数据倾斜的问题。
Local Keyed Streams
现实中,很多数据具有幂律分布(,幂律就是两个通俗的定律,一个是“长尾”理论,只有少数大的门户网站是很多人关注的,但是还有一个长长的尾巴,就是小网站,小公司。长尾理论就是对幂律通俗化的解释。另外一个通俗解释就是马太效应,穷者越穷富者越富)。在处理这类数据时,作业执行性能就会由于负载倾斜而急剧下降。
以WordCount程序作为示例。为了统计每个出现word的次数,我们需要将每个word送到对应的aggregator上进行统计。当有部分word出现的次数远远超过其他word时,那么将只有少数的几个aggregator在执行,而其他的aggregator将空闲。当我们增加更多的aggregator时,因为绝大部分word仍然只会被发送到少数那几个aggregator上,程序性能也不会得到任何提高。
解决思路
将数据在分发前提前聚合
设计思路
- 处理每一条记录的时候,先缓存
- 到达触发条件仅需预聚合
- 讲这批数据向下发放
实现一个基于条数为触发条件的LocalAgg
- 首先需要有一个方法,方法主要是将新加的元素和存储在内存中相同key的元素仅需合并,做到处理一条数据合并一条数据,而不是一起合并。
- 达到阈值的时候,出发flush动作将数据下发。
1 | import org.apache.flink.api.common.functions.Function; |
作为一个算子,因为是单流输入,需要继承AbstractStreamOperator和OneInputStreamOperator。
这个算子需要下面变量
- keySelect获取key
- Map存储数据
- 计数
- 用户函数
processElement处理元素的时候,先获取对应的key,然后内存中的对应key的元素,再交给用户函数去合并数据,并将返回的数据塞入内存中对应key的value。如果计数满足flush条件,调用用户函数的flinsh方法提交数据。
1 | import org.apache.flink.api.common.functions.util.FunctionUtils; |
StreamRecordCollector主要是实现StreamRecord的复用。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class StreamRecordCollector<T> implements Collector<T> {
private final StreamRecord<T> element = new StreamRecord<>(null);
private final Output<StreamRecord<T>> underlyingOutput;
public StreamRecordCollector(Output<StreamRecord<T>> output) {
this.underlyingOutput = output;
}
public void collect(T record) {
underlyingOutput.collect(element.replace(record));
}
public void close() {
underlyingOutput.close();
}
}
构建流,其实有两种模式,一种是直接改在DataStream这个类中,一种是另外构建一个类。
1 |
|
到这里就完成了开发。看下如何调用.构建一个TestLocalAggFunction类,实现其中的方法,如果有相同的key,字符串拼接,如果满足条件以key=value的形式下发。
1 | public class TestLocalAggFunction extends LocalAggFunction<String, String, Tuple2<String, String>, String> { |
结果1
2
3gy=s,s
gy1=s
guoy1=s,s
优化
- LocalAggOperation里关于count触发计算可以抽出一个抽象类叫做CountLocalTrigger,将触发相关的交给用户来实现。
- 可以实现count和time一起避免数据没到一直不下发的情况。(不过会用到localAgg说明数据量不小了,应该不会出现这种,实现价值不高)
参考code
1 | package study.flink.bundle; |
1 | package study.flink.bundle; |
1 | package study.flink.bundle; |
1 | package study.flink.bundle; |
1 | package study.flink.bundle; |
1 | package study.flink.bundle.trigger; |
1 | package study.flink.bundle.trigger; |
1 | package study.flink.bundle.trigger; |