笔者想要做一个实时计算平台,通过sql来减少开发,如果直接用jar包开发,是可以做精细化管理,但是开发速度就会受到限制。
目前有两种方式,一种是做成一个jar包,sql放入配置文件中,但是这样就没办法很好用上社区的功能,限制太大。另一个是使用flink-sql-client。
集群构建依赖
1 | flink-connector-kafka_2.11-1.10.0.jar |
尝试
修改配置conf/flink-conf.yaml
1 | jobmanager.rpc.address: flink-jobmanager.flink.svc.cluster.local |
配置上对应集群的address和端口6123.
或者修改源码,将集群目标改为可配置。见后面的内容。(由于笔者的集群是flink on kubernetes,社区目前对flink on kubernetes 的native需要在1.11才会有比较好的功能。flink的sql gate way暂时也没有开发,得等社区)
配置对应的yaml文件
sql-client-defaults.yaml
1 | ################################################################################ |
运行sql-client
1 | ./bin/sql-client.sh embedded -d sql-client-defaults.yaml |
1 | Flink SQL> show tables; |
就可以通过各种sql运行了。
flink sql 大状态的清理
1 | min-idle-state-retention |
这里需要注意一点,默认情况下 StreamQueryConfig 的设置并不是全局的。因此当设置了清理周期以后,需要在 StreamTableEnvironment 类调用 toAppendStream 或 toRetractStream 将 Table 转为 DataStream 时,显式传入这个 QueryConfig 对象作为参数,才可以令该功能生效。
1 | DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig); |
直接提交sql,不等待提示
在CLI会话中设置的属性(例如使用set命令)具有最高的优先级:1
CLI commands > session environment file > defaults environment file
1 | ./bin/sql-client.sh embedded -m 10.99.212.160:8081 -e /usr/local/flink/flink-1.10.0-sql/sql-client-defaults.yaml -u "insert into TableSink select * from MyTableSource;" |
1 | [root@sq-hdp-master-1 flink-1.10.0-sql]# ./bin/sql-client.sh embedded -m 10.104.155.82:8081 -e /usr/local/flink/flink-1.10.0-sql/sql-client-defaults.yaml -u "insert into TableSink select * from MyTableSource;" |
修改flink-sql-client达到可以提交到远程集群
CliOptionsParser.java
1 |
|
CliOptions.java
1 |
|
SqlClient.java
1 | private void start() { |
LocalExecutor.java
window调试需要在系统变量增加FLINK_CONF_DIR,重启电脑。
1 | 新增构造函数 |
然后打包flink-sql-client_2.11-1.10-SNAPSHOT.jar,替换opt下的该文件。搞定!
就可以通过-m提交作业。