背景
流式数据需要跟数据库中的数据进行join,也就是join维表。一般维表是不怎么变化的,不变化的维表可以通过全量加载到内存中直接进行关联,也可以通过异步io的形式访问外部存储,如果担心外部存储的并发压力可以选择第一种。
在我们这边的场景中,这份维表是一份1分钟变化一次的数据,需要每分钟去获取。
一种做法是写一个定时job讲数据库的维表发到kafka中,流式作业直接从kafka中获取维表join,带来的是增加一个作业的维护的部署。第二种就是下面的方法,自定义个source,这个source使用quartz定时触发。
例子
1 | public class QuartzSource extends RichSourceFunction { |
需要配置cancel信号量,在job停止的时候断开死循环,不然会出现问题。
具体的job可以按如下方式写
1 | public class QuatzSourceJob implements Job, Serializable { |
可以通过如下方式使用
1 | Map<String,Object> paramter = new HashMap<>(); |