Reference
解决Flink1.11.0不能指定SQL任务JobName问题
背景
Flink最近刚发布了1.11.0版本,由于加了很多新的功能,对sql的支持更加全面,我就迫不及待的在本地运行了个demo,但是运行的时候报错了:
1 | Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. |
虽然报错,但任务却是正常运行,不过任务却不能指定jobname了。
原因分析
先看下我的代码:
1 | public static void main(String[] args) { |
报错代码在 streamEnv.execute(), 程序找不到算子,所以报错?那问题出在哪?我们先回顾flink1.10.0的版本,看下之前是怎么执行的。之前的版本是通过 sqlUpdate() 方法执行sql的:
1 | public void sqlUpdate(String stmt) { |
从isEagerOperationTranslation 方法注释就很清楚的知道了,任务只有在 调用execute(String)方法的时候才会把算子遍历组装成task,这其实是1.11版本之前flink运行sql任务的逻辑。但是1.11版本后,我们不需要再显示指定 execute(String) 方法执行sql任务了(jar包任务不受影响)。下面我们来看1.11版本的 executeSql方法:
1 |
|
从1.11版本的代码可以看出,INSERT 语句直接执行,并没有把算子加到transformation的List中,所以当调用 execute(String) 方法时会报错,报错并不影响执行,但是却不能指定jobName了,很多时候jobName 能够反映出 job的业务和功能,不能指定jobname是很多场景所不能接受的。
Flink 1.11 改动
1.11 对 StreamTableEnvironment.execute()和 StreamExecutionEnvironment.execute() 的执行方式有所调整
简单概述为:
- StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
- Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
- 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业
(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
或 StreamExecutionEnvironment.execute()
详细参考:
如果需要批量执行多条sql,应该通过StatementSet 来执行。
1 |
|
修改源码增加jobname
首先我们追踪代码到executeInternal,如下:
1 |
|
从上面不难看出,默认jobname是 insert-into_ + sink的表名,正如代码所示,我已经把指定jobname的功能加上了,只需要增加一个job.name的TableConfig即可,然后重新编译flink代码: mvn clean install -DskipTests -Dfas, 线上环境替换掉 flink-table_2.11-1.11.0.jar jar包即可,如果是本地Idea运行,把flink编译好就可以了。
主程序修改如下:
1 | public static void main(String[] args) { |