Timer在窗口机制中也有重要的地位。提起窗口自然就能想到Trigger,即触发器。
来看下Flink自带的EventTimeTrigger的部分代码,它是事件时间特征下的默认触发器。
1 |
|
ctx.registerEventTimeTimer(window.maxTimestamp());的内部调用了internalTimerService.registerEventTimeTimer(window, time);
internalTimerService是一个接口
1 | public interface InternalTimerService<N> { |
对应的实现类InternalTimerServiceImpl
1 | private final ProcessingTimeService processingTimeService; |
注册Timer实际上就是为它们赋予对应的时间戳、key和命名空间,并将它们加入对应的优先队列。
特别地,当注册基于处理时间的Timer时,会先检查要注册的Timer时间戳与当前在最小堆堆顶的Timer的时间戳的大小关系。如果前者比后者要早,就会用前者替代掉后者,因为处理时间是永远线性增长的,得先处理时间比较靠前的。
Timer注册好了之后是如何触发的呢?
eventTime
事件时间与内部时间戳无关,而与水印有关.
1 | public void advanceWatermark(long time) throws Exception { |
从队列头一个个取,如果获取的时间小于水位线,该任务需要处理,那就poll出来,调用trigger的onEvnetTime函数,执行方法。
向上追溯,回到InternalTimeServiceManager的同名方法。
1 | public void advanceWatermark(Watermark watermark) throws Exception { |
继续向上追溯,到达终点:算子基类AbstractStreamOperator中处理水印的方法processWatermark()。当水印到来时,就会按着上述调用链流转到InternalTimerServiceImpl中,并触发所有早于水印时间戳的Timer了。
1 | public void processWatermark(Watermark mark) throws Exception { |
processTime
1 | private void onProcessingTime(long time) throws Exception { |
当onProcessingTime()方法被触发回调时,就会按顺序从队列中获取到比时间戳time小的所有Timer,并挨个执行Triggerable.onProcessingTime()方法
然后从获取的最新的赋值给nextTimer。
这个方法用于构造函数的时候
1 | processingTimeService.registerTimer(headTimer.getTimestamp(), this::onProcessingTime); |
来到ProcessingTimeService的实现类SystemProcessingTimeService,它是用调度线程池实现回调的。
1 |
|
ScheduledTask
1 |
|
onProcessingTime()在TriggerTask线程中被回调,而TriggerTask线程按照Timer的时间戳来调度。到这里,处理时间Timer的情况就讲述完毕了。
两个模式,其实主要是有一个TimersQueue的队列,将对应的window加入队列中,如果是processTime,就会在ScheduledThreadPoolExecutor中调度一个又一个线程,每个线程的以延迟时间作为调度的条件。event不需要调度线程,根据watermark来调度,新的watermark到来就会触发。
如果出现timer的取消,就会删除TimersQueue的队列中的数据,但是ScheduledThreadPoolExecutor不会调出对应的数据,触发了找不到合适的数据而已。
TimerHeapInternalTimer
在InternalTimerServiceImpl中有这么一个类TimerHeapInternalTimer。
1 | /** The key for which the timer is scoped. */ |
可见,Timer的scope有两个,一是数据的key,二是命名空间。但是用户不会感知到命名空间的存在,所以我们可以简单地认为Timer是以key级别注册的(Timer四大特点之1)。正确估计key的量可以帮助我们控制Timer的量。
timerHeapIndex是这个Timer在优先队列里存储的下标。优先队列通常用二叉堆实现,而二叉堆可以直接用数组存储,所以让Timer持有其对应的下标可以较快地从队列里删除它。
comparePriorityTo()方法则用于确定Timer的优先级,显然Timer的优先队列是一个按Timer时间戳为关键字排序的最小堆。下面粗略看看该最小堆的实现。
HeapPriorityQueueSet
在InternalTimeServiceManager用于管理各个InternalTimeService。
有一个工厂用于创建各种Timer队列,根据设置的状态后台而定。
1 | private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue( |
PriorityQueueSetFactory目前有heap和rockersdb,主要看heap的。
要搞懂它,必须解释一下KeyGroup和KeyGroupRange。KeyGroup是Flink内部KeyedState的原子单位,亦即一些key的组合。一个Flink App的KeyGroup数量与最大并行度相同,将key分配到KeyGroup的操作则是经典的取hashCode+取模。而KeyGroupRange则是一些连续KeyGroup的范围,每个Flink sub-task都只包含一个KeyGroupRange。也就是说,KeyGroupRange可以看做当前sub-task在本地维护的所有key。
解释完毕。容易得知,上述代码中的那个HashMap<T, T>[]数组就是在KeyGroup级别对key进行去重的容器,数组中每个元素对应一个KeyGroup。以插入一个Timer的流程为例:
- 从Timer中取出key,计算该key属于哪一个KeyGroup;
- 计算出该KeyGroup在整个KeyGroupRange中的偏移量,按该偏移量定位到HashMap<T, T>[]数组的下标;
- 根据putIfAbsent()方法的语义,只有当对应HashMap不存在该Timer的key时,才将Timer插入最小堆中。
二叉堆
二叉堆是完全二元树或者是近似完全二元树,按照数据的排列方式可以分为两种:最大堆和最小堆。
最大堆:父结点的键值总是大于或等于任何一个子节点的键值;最小堆:父结点的键值总是小于或等于任何一个子节点的键值。
图文解析是以”最大堆”来进行介绍的。
最大堆的核心内容是”添加”和”删除”,理解这两个算法,二叉堆也就基本掌握了
1. 添加
假设在最大堆[90,80,70,60,40,30,20,10,50]种添加85,需要执行的步骤如下:
如上图所示,当向最大堆中添加数据时:先将数据加入到最大堆的最后,然后尽可能把这个元素往上挪,直到挪不动为止!
将85添加到[90,80,70,60,40,30,20,10,50]中后,最大堆变成了[90,85,70,60,80,30,20,10,50,40]。
1 |
|
insert(data)的作用:将数据data添加到最大堆中。mHeap是动态数组ArrayList对象。
当堆已满的时候,添加失败;否则data添加到最大堆的末尾。然后通过上调算法重新调整数组,使之重新成为最大堆。
2. 删除
假设从最大堆[90,85,70,60,80,30,20,10,50,40]中删除90,需要执行的步骤如下:
如上图所示,当从最大堆中删除数据时:先删除该数据,然后用最大堆中最后一个的元素插入这个空位;接着,把这个“空位”尽量往上挪,直到剩余的数据变成一个最大堆。
从[90,85,70,60,80,30,20,10,50,40]删除90之后,最大堆变成了[85,80,70,60,40,30,20,10,50]。
注意:考虑从最大堆[90,85,70,60,80,30,20,10,50,40]中删除60,执行的步骤不能单纯的用它的字节点来替换;而必须考虑到”替换后的树仍然要是最大堆”!
1 | /* |
最小堆的实现HeapPriorityQueue
在flink中因为要获取的是最小的Timer,用的是小顶堆。
如下是flink中添加数据的实现1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected void addInternal(@Nonnull T element) {
// 用数组实现的,需要先扩容
final int newSize = increaseSizeByOne();
// 将数据放入新扩容的位置,堆尾
moveElementToIdx(element, newSize);
// 开始调整堆
siftUp(newSize);
}
private void siftUp(int idx) {
final T[] heap = this.queue;
final T currentElement = heap[idx];
int parentIdx = idx >>> 1;
while (parentIdx > 0 && isElementPriorityLessThen(currentElement, heap[parentIdx])) {
moveElementToIdx(heap[parentIdx], idx);
idx = parentIdx;
parentIdx >>>= 1;
}
moveElementToIdx(currentElement, idx);
}
private boolean isElementPriorityLessThen(T a, T b) {
return elementPriorityComparator.comparePriority(a, b) < 0;
}
private int increaseSizeByOne() {
final int oldArraySize = queue.length;
final int minRequiredNewSize = ++size;
if (minRequiredNewSize >= oldArraySize) {
final int grow = (oldArraySize < 64) ? oldArraySize + 2 : oldArraySize >> 1;
resizeQueueArray(oldArraySize + grow, minRequiredNewSize);
}
// TODO implement shrinking as well?
return minRequiredNewSize;
}
删除数据
1 |
|
timer on RocksDb
在InternalTimeServiceManager中存在一个工厂PriorityQueueSetFactory,根据选择的状态后端决定Timer是heap还是rocksdb,rocksdb对应的工厂为RocksDBPriorityQueueSetFactory。
对应的队列实现RocksDBCachingPriorityQueueSet中,
1 | /** Cache for the head element in de-serialized form. */ |
1 |
|
1 |
|
1 |
|
Referenece
https://blog.csdn.net/nazeniwaresakini/article/details/104220113
http://aitozi.com/flink-timerservice-based-on-rocksdb.html