背景
经常会遇到这样的需求,统计1小时的PV或者UV,但是想要每分钟都可以看到当前的数据,对接到实时大屏上,可以动态看到数据变化过程。
Flink DataStream
需要用到ContinuousProcessTimeTrigger或者ContinuousEventTimeTrigger。
使用示例:
假如我们定义一个5分钟的基于 EventTime 的滚动窗口,定义一个每2分触发计算的 Trigger,有4条数据事件时间分别是20:01、20:02、20:03、20:04,对应的值分别是1、2、3、2,我们要对值做 Sum 操作。
初始时,State 和 Result 中的值都为0。
当第一条数据在20:01进入窗口时,State 的值为1,此时还没有到达 Trigger 的触发时间。
第二条数据在20:02进入窗口,State 中的值为1+2=3,此时达到2分钟满足 Trigger 的触发条件,所以 Result 输出结果为3。
第三条数据在20:03进入窗口,State 中的值为3+3 = 6,此时未达到 Trigger 触发条件,没有结果输出。
第四条数据在20:04进入窗口,State中的值更新为6+2=8,此时又到了2分钟达到了 Trigger 触发时间,所以输出结果为8。如果我们把结果输出到支持 update 的存储,比如 MySQL,那么结果值就由之前的3更新成了8。
问题:如果 Result 只能 append?
如果 Result 不支持 update 操作,只能 append 的话,则会输出2条记录,在此基础上再做计算处理就会引起错误。
这样就需要 PurgingTrigger 来处理上面的问题。
PurgingTrigger 的应用
和上面的示例一样,唯一的不同是在 ContinuousEventTimeTrigger 外面包装了一个 PurgingTrigger,其作用是在 ContinuousEventTimeTrigger 触发窗口计算之后将窗口的 State 中的数据清除。
再看下流程:
前两条数据先后于20:01和20:02进入窗口,此时 State 中的值更新为3,同时到了Trigger的触发时间,输出结果为3。
由于 PurgingTrigger 的作用,State 中的数据会被清除。
Flink SQL
每分钟统计当日0时到当前的累计UV数.
可以有两种方式,
- 可以直接用group by + mini batch
- window聚合 + fast emit
方法一
group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, ‘yyyy-MM-dd’)。
这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
用参数[2] 来打开。
在 Flink 1.11 中,你可以尝试这样:1
2
3
4
5
6
7
8
9
10
11
12
13CREATE TABLE mysql (
time_str STRING,
uv BIGINT,
PRIMARY KEY (ts) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'myuv'
);
INSERT INTO mysql
SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), COUNT(DISTINCT user_id)
FROM user_behavior;
方法二
这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
fast emit这个配置现在还是一个experimental的feature
1 | table.exec.emit.early-fire.enabled = true |
在 Flink 1.11 中,你可以尝试这样:1
2
3
4
5
6
7
8
9
10
11
12
13CREATE TABLE mysql (
time_str STRING,
uv BIGINT,
PRIMARY KEY (ts) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'myuv'
);
INSERT INTO mysql
SELECT DATE_FORMAT(TUMBLE_START(t, INTERVAL '1' MINUTE) , 'yyyy-MM-dd HH:mm:00'), COUNT(DISTINCT user_id)
FROM user_behavior group by TUMBLE(ts, INTERVAL '1' MINUTE);
Emit的原理是这样子的:
- 当某个key下面来了第一条数据的时候,注册一个emit delay之后的处理时间定时器;
- 当定时器到了的时候,
- 检查当前的key下的聚合结果跟上次输出的结果是否有变化,
- 如果有变化,就发送-[old], +[new] 两条结果到下游;
- 如果是没有变化,则不做任何处理;
- 再次注册一个新的emit delay之后的处理时间定时器。
- 检查当前的key下的聚合结果跟上次输出的结果是否有变化,