依赖镜像制作
1 | FROM flink:1.9.1-scala_2.12 |
制作镜像1
2docker build -t='study/flink:1.9-2.8' .
docker tag study/flink:1.9-2.8 k8stest.net:8876/study/flink
提交镜像到仓库1
docker push k8stest.net:8876/study/flink
密码1
kubectl create secret docker-registry harborregsecret --docker-server=k8stest.net:8876 --docker-username=admin --docker-password=123456 -n flink
installFlink.sh1
2
3
4
5
6
7
8
9
10
11namespace=$1
if [ ! $namespace ]; then
echo "error : nameSpace is NULL";
exit 0
fi
echo 'nameSpace is '$namespace
kubectl apply -f flink-configuration-configmap.yaml -n $namespace
kubectl apply -f jobmanager-service.yaml -n $namespace
kubectl apply -f jobmanager-rest-service.yaml -n $namespace
kubectl apply -f jobmanager-deployment.yaml -n $namespace
kubectl apply -f taskmanager-deployment.yaml -n $namespace
deleteFlink.sh1
2
3
4
5
6
7
8
9
10
11namespace=$1
if [ ! $namespace ]; then
echo "error : nameSpace is NULL";
exit 0
fi
echo 'nameSpace is '$namespace
kubectl delete -f flink-configuration-configmap.yaml -n $namespace
kubectl delete -f jobmanager-service.yaml -n $namespace
kubectl delete -f jobmanager-deployment.yaml -n $namespace
kubectl delete -f jobmanager-rest-service.yaml -n $namespace
kubectl delete -f taskmanager-deployment.yaml -n $namespace
flink-configuration-configmap.yaml
1 | apiVersion: v1 |
jobmanager-deployment.yaml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:latest
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
while :;
do
if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*jobmanager*.log;
fi;
done"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
jobmanager-rest-service.yaml
1 | apiVersion: v1 |
jobmanager-service.yaml
1 | apiVersion: v1 |
taskmanager-deployment.yaml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 5
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:latest
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
while :;
do
if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*taskmanager*.log;
fi;
done"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
固定jm在具体机器
如果jobmanger随便飘,提交会有问题。
包含nodeSelector的jobmanager-deployment.yaml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
nodeSelector:
flink: jm
containers:
- name: jobmanager
image: flink:latest
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
while :;
do
if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*jobmanager*.log;
fi;
done"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
固定flink ui的端口
jobmanager-rest-service.yaml
通过增加nodePort: 30000来固定flink端口
1 | apiVersion: v1 |
HA版本的搭建
在配置文件中添加如下信息
1 | high-availability: zookeeper |
特别注意,ha的jobmanager.port选择jm的rpc端口。
状态后台配置
1 | state.checkpoints.dir: hdfs://hdfs-address/flink/flink-checkpoints |
资源限制
一般而言,我们会在资源上做限制,避免flink跑高互相影响,但是如果出现超过资源上限,是会被kill的,问题也不好查。
1 | resources: |
如果我们限制cpu为5个核,内存为42720M,那么taskmanager.heap.size设置为42720m,那么在数据量大的时候,就会出问题,taskmanager会因为内存不够被kill。
原因呢是因为我们设置内存为40G,上限也为40G,那么整个pod的内存和tm的内存一致,那么堆外内存要去哪里要呢?就会导致oom。因为网络的shuffer、rocksdb状态后台等等,都需要堆外内存,因此tm堆内存要比limit设置的小,30G即可。
通过ingress暴露服务
上面用的是ndePort的方式,不方便管理,而且要在每台机器上都开一个port,也不好。通过ingress的方式可以解决这个问题。
jobmanager-ingress.yaml
1 | apiVersion: extensions/v1beta1 |
就可以通过app.host.local/app/来访问了。一定要带app/的后斜杠访问。
那么提交作业呢,就没办法通过域名的restApi访问了,因为加了url进行了重定向,原生的api只有域名和端口。可以通过如下方式:
- 获取coreDns的ip
1 | kubectl get pods -n kube-system | grep dns |
1 | kubectl describe pod coredns-7f9c544f75-p4rvz -n kube-system |
修改本机的dns解析
1 | cat /etc/resolv.conf |
可以通过
的方式访问clusterIP形式的服务了。
例如
1 | /usr/local/flink/flink-1.10.0/bin/flink run -p 30 -m flink-jobmanager-rest.flink-project.svc.cluster.local:8081 -d -c test.startClass Porject.jar |
提交jar包
Reference
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/kubernetes.html
HA:
http://shzhangji.com/cnblogs/2019/08/25/deploy-flink-job-cluster-on-kubernetes/