主要思路
1.ValueState内部包含了计数、key和最后修改时间
2.对于每一个输入的记录,ProcessFunction都会增加计数,然后注册对应的过期检测timer
3.在onTimer中进行检测和输出
上代码
模拟数据source如下:数据以Tuple3的形式, key,无用,时间戳的形式向下游发送,每隔5秒发送一条数据.
1 |
|
KeyprocessFunction
1 |
|
1 |
|
主函数如下:
1 |
|
采用的是Flink 1.11 的新的水印生成策略,可以参考
https://guosmilesmile.github.io/2020/07/23/Flink1-11%E5%8D%87%E7%BA%A7%E5%A1%AB%E5%9D%91/
结果
1 |
|
注意
setStreamTimeCharacteristic
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
这行必须加,不然会以processTime进行处理,一直得不到结果,因为注册的是EventTime(registerEventTimeTimer)
结果如下
1 | 元素a进入事件时间为:2001-09-09 09:47:30 |
ctx.timestamp() 为null
该现象出现于没有加assignTimestampsAndWatermarks
withTimestampAssigner中的extractTimestamp方法是在TimestampsAndWatermarksOperator.processElement中被调用
TimestampsAndWatermarksOperator.processElement
1 |
|
ctx.timestamp()源码见KeyedProcessOperator#ContextImpl.timestamp()
1 |
|
如果没有assignTimestampsAndWatermarks就会以nullPoint的异常出现