Flink 使用调优(一)

flink任务调度

  • Flink中的执行资源通过任务槽(Task Slots)定义。每个TaskManager都有一个或多个任务槽,每个槽都可以运行一个并行任务管道(pipeline)。管道由多个连续的任务组成,例如第n个MapFunction并行实例和第n个ReduceFunction并行实例。Flink经常并发地执行连续的任务:对于流程序,这在任何情况下都会发生,对于批处理程序,它也经常发生。
  • 关于Flink调度,有两个非常重要的原则:
    • 1.同一个operator的各个subtask是不能呆在同一个SharedSlot中的,例如FlatMap[1]和FlatMap[2]是不能在同一个SharedSlot中的。
    • 2.Flink是按照拓扑顺序从Source一个个调度到Sink的。例如WordCount(Source并行度为1,其他并行度为2),那么调度的顺序依次是:Source -> FlatMap[1] -> FlatMap[2] -> KeyAgg->Sink[1] -> KeyAgg->Sink[2]。
      image

注意点

在flink中,相同并行度,只要不是热rebalance操作,都会在同一个slot中,可以加大吞吐,如果只是扩充维度,没有进行groupby 或者 聚合操作,那么出现rebalance是一个很不明智的做法,会出现反压等情况,需要加大下游的处理能力,才能抵掉rebalance带来的网络对吞吐的消耗。

shuffle 是我最不愿意见到的环节,因为一旦出现了非常多的 shuffle,就会占用大量的磁盘和网络 IO,从而导致任务进行得非常缓慢。

优化前

层级 并行度
source 10
flatmap 40
hash 40
sink 10
现象 出现反压

image

优化后

层级 并行度
source 10
flatmap 10
hash 30
sink 10
现象 数据稳定

image

算子调整

窗口后的聚合函数有两个模式,全量聚合processWindowFunction和增量聚合AggregateFunction。

性能对比差距与两倍以上。

差别

增量聚合是遇到一条数据就加一,全量数据是获取窗口内的所有数据进行计算,很容易导致吞吐上不去。

实例复用

1
2
3
4
5
6
7
8
9
stream
.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
@Override
public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
long changesCount = ...
// A new Tuple instance is created on every execution
collector.collect(new Tuple2<>(userName, changesCount));
}
}

上述例子中每一条记录都会创建一个Tuple2实例,而flink在传递实体的时候,会copy这个实体,因此在大量创建实体会导致频繁的垃圾回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

stream
.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
// Create an instance that we will reuse on every call
private Tuple2<String, Long> result = new Tuple<>();

@Override
public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
long changesCount = ...

// Set fields on an existing object instead of creating a new one
result.f0 = userName;
// Auto-boxing!! A new Long value may be created
result.f1 = changesCount;

// Reuse the same Tuple2 object
collector.collect(result);
}
}

例如Long,Int之类的可以通过IntValue, LongValue, StringValue, FloatValue等替换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
stream
.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
// Create a mutable count instance
private LongValue count = new IntValue();
// Assign mutable count to the tuple
private Tuple2<String, LongValue> result = new Tuple<>("", count);

@Override
// Notice that now we have a different return type
public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, LongValue>> collector) throws Exception {
long changesCount = ...

// Set fields on an existing object instead of creating a new one
result.f0 = userName;
// Update mutable count value
count.setValue(changesCount);

// Reuse the same tuple and the same LongValue instance
collector.collect(result);
}
}

是否需要做到这一步,开发人员需要自行衡量

Reference

http://wuchong.me/blog/2016/04/26/flink-internals-how-to-handle-backpressure/

https://dzone.com/articles/four-ways-to-optimize-your-flink-applications