Flink 1.9.1 on Kubernetes 搭建

依赖镜像制作

1
2
3
FROM  flink:1.9.1-scala_2.12
ADD ./flink-shaded-hadoop-2-uber-2.8.3-7.0.jar /opt/flink/lib/
RUN /bin/cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo 'Asia/Shanghai' >/etc/timezone

制作镜像

1
2
docker build -t='study/flink:1.9-2.8' .
docker tag study/flink:1.9-2.8 k8stest.net:8876/study/flink

提交镜像到仓库

1
docker push k8stest.net:8876/study/flink

密码

1
kubectl create secret docker-registry harborregsecret --docker-server=k8stest.net:8876 --docker-username=admin --docker-password=123456 -n flink

installFlink.sh

1
2
3
4
5
6
7
8
9
10
11
namespace=$1
if [ ! $namespace ]; then
echo "error : nameSpace is NULL";
exit 0
fi
echo 'nameSpace is '$namespace
kubectl apply -f flink-configuration-configmap.yaml -n $namespace
kubectl apply -f jobmanager-service.yaml -n $namespace
kubectl apply -f jobmanager-rest-service.yaml -n $namespace
kubectl apply -f jobmanager-deployment.yaml -n $namespace
kubectl apply -f taskmanager-deployment.yaml -n $namespace

deleteFlink.sh

1
2
3
4
5
6
7
8
9
10
11
namespace=$1
if [ ! $namespace ]; then
echo "error : nameSpace is NULL";
exit 0
fi
echo 'nameSpace is '$namespace
kubectl delete -f flink-configuration-configmap.yaml -n $namespace
kubectl delete -f jobmanager-service.yaml -n $namespace
kubectl delete -f jobmanager-deployment.yaml -n $namespace
kubectl delete -f jobmanager-rest-service.yaml -n $namespace
kubectl delete -f taskmanager-deployment.yaml -n $namespace

flink-configuration-configmap.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
log4j.properties: |+
log4j.rootLogger=INFO, file
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

jobmanager-deployment.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:latest
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
while :;
do
if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*jobmanager*.log;
fi;
done"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties

jobmanager-rest-service.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
selector:
app: flink
component: jobmanager

jobmanager-service.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: ui
port: 8081
selector:
app: flink
component: jobmanager

taskmanager-deployment.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 5
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:latest
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
while :;
do
if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*taskmanager*.log;
fi;
done"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties

固定jm在具体机器

如果jobmanger随便飘,提交会有问题。

包含nodeSelector的jobmanager-deployment.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
nodeSelector:
flink: jm
containers:
- name: jobmanager
image: flink:latest
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
while :;
do
if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*jobmanager*.log;
fi;
done"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties

jobmanager-rest-service.yaml

通过增加nodePort: 30000来固定flink端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
nodePort: 30000
selector:
app: flink
component: jobmanager

HA版本的搭建

在配置文件中添加如下信息

1
2
3
4
5
6
high-availability: zookeeper
high-availability.storageDir: hdfs://hdfs-address/flink/recovery
high-availability.zookeeper.quorum: zk-address:7072
high-availability.zookeeper.path.root: /flink
high-availability.jobmanager.port: 6123
high-availability.cluster-id: xxxx(自取)

特别注意,ha的jobmanager.port选择jm的rpc端口。

状态后台配置

1
2
3
state.checkpoints.dir: hdfs://hdfs-address/flink/flink-checkpoints
state.savepoints.dir: hdfs://hdfs-address/flink/flink-savepoints
state.backend: rocksdb

资源限制

一般而言,我们会在资源上做限制,避免flink跑高互相影响,但是如果出现超过资源上限,是会被kill的,问题也不好查。

1
2
3
4
5
6
7
resources: 
requests:
cpu: "5000m"
memory: "42720Mi"
limits:
cpu: "5000m"
memory: "42720Mi"

如果我们限制cpu为5个核,内存为42720M,那么taskmanager.heap.size设置为42720m,那么在数据量大的时候,就会出问题,taskmanager会因为内存不够被kill。

原因呢是因为我们设置内存为40G,上限也为40G,那么整个pod的内存和tm的内存一致,那么堆外内存要去哪里要呢?就会导致oom。因为网络的shuffer、rocksdb状态后台等等,都需要堆外内存,因此tm堆内存要比limit设置的小,30G即可。

通过ingress暴露服务

上面用的是ndePort的方式,不方便管理,而且要在每台机器上都开一个port,也不好。通过ingress的方式可以解决这个问题。

jobmanager-ingress.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: ingress-flink
annotations:
kubernetes.io/ingress.class: "nginx"
nginx.ingress.kubernetes.io/rewrite-target: /$2
spec:
rules:
- host: app.host.local
http:
paths:
- backend:
serviceName: flink-jobmanager-rest
servicePort: 8081
path: /app(/|$)(.*)

就可以通过app.host.local/app/来访问了。一定要带app/的后斜杠访问。

那么提交作业呢,就没办法通过域名的restApi访问了,因为加了url进行了重定向,原生的api只有域名和端口。可以通过如下方式:

  1. 获取coreDns的ip
1
2
3
kubectl get pods -n kube-system | grep dns
coredns-7f9c544f75-p4rvz 1/1 Running 1 6d5h
coredns-7f9c544f75-psbnx 1/1 Running 1 6d5h
1
2
3
4
5
6
7
kubectl describe pod coredns-7f9c544f75-p4rvz -n kube-system

...
Annotations: <none>
Status: Running
IP: 10.32.0.2
...

修改本机的dns解析

1
2
3
4
5
6
cat /etc/resolv.conf 
nameserver 10.32.0.2
nameserver 119.29.29.29
nameserver 114.114.114.114
options randomize-case:0
options timeout:2

可以通过

..svc.cluster.local:8081
的方式访问clusterIP形式的服务了。

例如

1
/usr/local/flink/flink-1.10.0/bin/flink run -p  30 -m flink-jobmanager-rest.flink-project.svc.cluster.local:8081 -d -c test.startClass  Porject.jar

提交jar包

Reference

https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/kubernetes.html

HA:
http://shzhangji.com/cnblogs/2019/08/25/deploy-flink-job-cluster-on-kubernetes/