依赖镜像制作
1 | FROM flink:1.10.0-scala_2.11 |
制作镜像1
docker build -t='k8stest.net:8876/study/flink:1.10.0' .
提交镜像到仓库1
docker push k8stest.net:8876/study/flink:1.10.0
密码1
kubectl create secret docker-registry harborregsecret --docker-server=k8stest.net:8876 --docker-username=admin --docker-password=123456 -n flink
installFlink.sh1
2
3
4
5
6
7
8
9
10
11namespace=$1
if [ ! $namespace ]; then
echo "error : nameSpace is NULL";
exit 0
fi
echo 'nameSpace is '$namespace
kubectl apply -f flink-configuration-configmap.yaml -n $namespace
kubectl apply -f jobmanager-service.yaml -n $namespace
kubectl apply -f jobmanager-rest-service.yaml -n $namespace
kubectl apply -f jobmanager-deployment.yaml -n $namespace
kubectl apply -f taskmanager-deployment.yaml -n $namespace
deleteFlink.sh1
2
3
4
5
6
7
8
9
10
11namespace=$1
if [ ! $namespace ]; then
echo "error : nameSpace is NULL";
exit 0
fi
echo 'nameSpace is '$namespace
kubectl delete -f flink-configuration-configmap.yaml -n $namespace
kubectl delete -f jobmanager-service.yaml -n $namespace
kubectl delete -f jobmanager-deployment.yaml -n $namespace
kubectl delete -f jobmanager-rest-service.yaml -n $namespace
kubectl delete -f taskmanager-deployment.yaml -n $namespace
flink-configuration-configmap.yaml
1 | apiVersion: v1 |
jobmanager-deployment.yaml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: k8stest.net:8876/study/flink:1.10.0
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
while :;
do
if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*jobmanager*.log;
fi;
done"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
jobmanager-service.yaml
1 | apiVersion: v1 |
taskmanager-deployment.yaml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 5
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: k8stest.net:8876/study/flink:1.10.0
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
while :;
do
if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*taskmanager*.log;
fi;
done"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
通过nodePort的暴露服务
jobmanager-rest-service.yaml
1 | apiVersion: v1 |
通过ingress暴露服务
上面用的是ndePort的方式,不方便管理,而且要在每台机器上都开一个port,也不好。通过ingress的方式可以解决这个问题。
jobmanager-ingress.yaml
1 | apiVersion: extensions/v1beta1 |
就可以通过app.host.local/app/来访问了。一定要带app/的后斜杠访问。
ingress方式下提交作业
通过FQDN的方式
那么提交作业呢,就没办法通过域名的restApi访问了,因为加了url进行了重定向,原生的api只有域名和端口。可以通过如下方式:
- 获取coreDns的ip
1 | kubectl get pods -n kube-system | grep dns |
1 | kubectl describe pod coredns-7f9c544f75-p4rvz -n kube-system |
修改本机的dns解析
1 | cat /etc/resolv.conf |
可以通过
的方式访问clusterIP形式的服务了。
例如
1 | /usr/local/flink/flink-1.10.0/bin/flink run -p 30 -m flink-jobmanager-rest.flink-project.svc.cluster.local:8081 -d -c test.startClass Porject.jar |
提交jar包
通过脚本获取clusterIP的方式与jobManager
以参数的形式提交
1 | sh /opt/flink-submit.sh -n flink-project -u /usr/local/flink/flink-1.10.0/ -c study.Starter -j Study.jar -p 200 |
flink-submit.sh
1 | #!/bin/bash |
以配置文件的形式
通过1
sh /opt/flink-submit-config.sh
提交作业
通过1
sh /opt/flink-list.sh
来获取作业运行信息
1 | sh /opt/flink-cancel.sh -j 03e80ff36f00b761eb81400b6ae8328d |
取消作业
首先,当前目录下比较要有flink-config.yaml文件
1 | namespace: flink-project |
上述的脚本都会调用 /opt/flink-config.sh获取配置信息,然后作为变量。
flink-config.sh
1 | readFromConfig() { |
/opt/flink-submit-config.sh
1 | !/bin/bash |
/opt/flink-list.sh1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
. /opt/flink-config.sh
Check NAMESPACE
if [ -z "$NAMESPACE" ];then
>&2 echo "namespace miss"
usage
exit 1
fi
Check FLINKPATH
if [ -z "$FLINKPATH" ];then
>&2 echo "flink path miss"
usage
exit 1
fi
CLUSTERIP=$(kubectl get svc -n "$NAMESPACE" -o=jsonpath='{.items[0].spec.clusterIP}')
echo namespace is $NAMESPACE
echo flink path is $FLINKPATH
echo $FLINKPATH"/bin/flink" list -m $CLUSTERIP:8081
FLINKPATH"/bin/flink" list -m $CLUSTERIP:8081
/opt/flink-cancel.sh
1 | . /opt/flink-config.sh |
HA版本的搭建
在配置文件中添加如下信息
1 | high-availability: zookeeper |
特别注意,ha的jobmanager.port选择jm的rpc端口。
状态后台配置
1 | state.checkpoints.dir: hdfs://hdfs-address/flink/flink-checkpoints |
config比较完整的配置
1 | apiVersion: v1 |
资源限制
一般而言,我们会在资源上做限制,避免flink跑高互相影响,但是如果出现超过资源上限,是会被kill的,问题也不好查。
1 | resources: |
rockersdb写磁盘
由于状态后台很吃磁盘性能,直接用系统盘会有性能问题。通过hostPath将磁盘挂载到指定目录
jobmanager-deployment.yaml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: k8stest.net:8876/study/flink:1.10.0
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
while :;
do
if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*jobmanager*.log;
fi;
done"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: state-volume
mountPath : /opt/state
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
- name: state-volume
hostPath:
path: /cache1/flink/state
type: DirectoryOrCreate
taskmanager-deployment.yaml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 5
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: k8stest.net:8876/study/flink:1.10.0
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
while :;
do
if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*taskmanager*.log;
fi;
done"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: state-volume
mountPath : /opt/state
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
- name: state-volume
hostPath:
path: /cache1/flink/state
type: DirectoryOrCreate
在配置文件中配置
1 | state.backend.rocksdb.localdir: /opt/state |
Reference
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/kubernetes.html
HA:
http://shzhangji.com/cnblogs/2019/08/25/deploy-flink-job-cluster-on-kubernetes/