背景
对于实时的流式处理系统来说,我们需要关注数据输入、计算和输出的及时性,所以处理延迟是一个比较重要的监控指标,特别是在数据量大或者软硬件条件不佳的环境下。Flink早在FLINK-3660(https://issues.apache.org/jira/browse/FLINK-3660 ) 就为用户提供了开箱即用的链路延迟监控功能,只需要配置好metrics.latency.interval参数,再观察TaskManagerJobMetricGroup/operator_id/operator_subtask_index/latency这个metric即可。本文简单walk一下源码,看看它是如何实现的,并且简要说明注意事项。
LatencyMarker的产生
与通过水印来标记事件时间的推进进度相似,Flink也用一种特殊的流元素(StreamElement)作为延迟的标记,称为LatencyMarker。
LatencyMarker的数据结构甚简单,只有3个field,即它被创建时携带的时间戳、算子ID和算子并发实例(sub-task)的ID。
1 | public final class LatencyMarker extends StreamElement { |
LatencyMarker和水印不同,不需要通过用户抽取产生,而是在Source端自动按照metrics.latency.interval参数指定的周期生成。StreamSource专门实现了一个内部类LatencyMarksEmitter用来发射LatencyMarker,而它又借用了负责协调处理时间的服务ProcessingTimeService,如下代码所示。
1 |
|
AbstractStreamOperator是所有Flink Streaming算子的基类,在它的初始化方法setup()中,会先创建用于延迟统计的LatencyStats实例。
1 | final String configuredGranularity = taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY); |
LatencyStats中的延迟最终会转化为直方图表示,通过直方图就可以统计出延时的最大值、最小值、均值、分位值(quantile)等指标。以下是reportLatency()方法的源码。
1 | public void reportLatency(LatencyMarker marker) { |
延迟是由当前时间戳减去LatencyMarker携带的时间戳得到的,所以在Sink端统计到的就是全链路延迟了。
整体流程如下
注意事项
- LatencyMarker不参与window、MiniBatch的缓存计时,直接被中间Operator下发
- Metric路径:TaskManagerJobMetricGroup/operator_id/operator_subtask_index/latency
- 每个中间Operator、以及Sink都会统计自己与Source节点的链路延迟,我们在监控页面,一般展示Source至Sink链路延迟
- 延迟粒度细分到Task,可以用来排查哪台机器的Task时延偏高,进行对比和运维排查
- 从实现原理来看,发送时延标记间隔配置大一些(例如20秒一次),一般不会影响系统处理业务数据的性能
Reference
https://blog.csdn.net/nazeniwaresakini/article/details/106615777