依赖镜像制作
| 1 | FROM flink:1.9.1-scala_2.12 | 
制作镜像1
2docker build -t='study/flink:1.9-2.8' .
docker tag study/flink:1.9-2.8 k8stest.net:8876/study/flink
提交镜像到仓库1
docker push k8stest.net:8876/study/flink
密码1
kubectl create secret docker-registry harborregsecret --docker-server=k8stest.net:8876 --docker-username=admin --docker-password=123456 -n flink
installFlink.sh1
2
3
4
5
6
7
8
9
10
11namespace=$1
if [ !  $namespace ]; then  
 echo "error : nameSpace is NULL";
 exit 0
fi 
echo 'nameSpace is '$namespace
kubectl apply -f flink-configuration-configmap.yaml  -n $namespace
kubectl apply -f jobmanager-service.yaml -n $namespace
kubectl apply -f jobmanager-rest-service.yaml -n $namespace
kubectl apply -f jobmanager-deployment.yaml -n $namespace
kubectl apply -f taskmanager-deployment.yaml -n $namespace
deleteFlink.sh1
2
3
4
5
6
7
8
9
10
11namespace=$1
if [ !  $namespace ]; then  
 echo "error : nameSpace is NULL";
 exit 0
fi 
echo 'nameSpace is '$namespace
kubectl delete -f flink-configuration-configmap.yaml  -n $namespace
kubectl delete -f jobmanager-service.yaml -n $namespace
kubectl delete -f jobmanager-deployment.yaml -n $namespace
kubectl delete -f jobmanager-rest-service.yaml -n $namespace
kubectl delete -f taskmanager-deployment.yaml -n $namespace
flink-configuration-configmap.yaml
| 1 | apiVersion: v1 | 
jobmanager-deployment.yaml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
          while :;
          do
            if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*jobmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 8081
          name: ui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties
jobmanager-rest-service.yaml
| 1 | apiVersion: v1 | 
jobmanager-service.yaml
| 1 | apiVersion: v1 | 
taskmanager-deployment.yaml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 5
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
          while :;
          do
            if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*taskmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties
固定jm在具体机器
如果jobmanger随便飘,提交会有问题。
包含nodeSelector的jobmanager-deployment.yaml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      nodeSelector:
        flink: jm
      containers:
      - name: jobmanager
        image: flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
          while :;
          do
            if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*jobmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 8081
          name: ui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties
固定flink ui的端口
jobmanager-rest-service.yaml
通过增加nodePort: 30000来固定flink端口
| 1 | apiVersion: v1 | 
HA版本的搭建
在配置文件中添加如下信息
| 1 | high-availability: zookeeper | 
特别注意,ha的jobmanager.port选择jm的rpc端口。
状态后台配置
| 1 | state.checkpoints.dir: hdfs://hdfs-address/flink/flink-checkpoints | 
资源限制
一般而言,我们会在资源上做限制,避免flink跑高互相影响,但是如果出现超过资源上限,是会被kill的,问题也不好查。
| 1 | resources: | 
如果我们限制cpu为5个核,内存为42720M,那么taskmanager.heap.size设置为42720m,那么在数据量大的时候,就会出问题,taskmanager会因为内存不够被kill。
原因呢是因为我们设置内存为40G,上限也为40G,那么整个pod的内存和tm的内存一致,那么堆外内存要去哪里要呢?就会导致oom。因为网络的shuffer、rocksdb状态后台等等,都需要堆外内存,因此tm堆内存要比limit设置的小,30G即可。
通过ingress暴露服务
上面用的是ndePort的方式,不方便管理,而且要在每台机器上都开一个port,也不好。通过ingress的方式可以解决这个问题。
jobmanager-ingress.yaml
| 1 | apiVersion: extensions/v1beta1 | 
就可以通过app.host.local/app/来访问了。一定要带app/的后斜杠访问。
那么提交作业呢,就没办法通过域名的restApi访问了,因为加了url进行了重定向,原生的api只有域名和端口。可以通过如下方式:
- 获取coreDns的ip
| 1 | kubectl get pods -n kube-system | grep dns | 
| 1 | kubectl describe pod coredns-7f9c544f75-p4rvz -n kube-system | 
修改本机的dns解析
| 1 | cat /etc/resolv.conf | 
可以通过
的方式访问clusterIP形式的服务了。
例如
| 1 | /usr/local/flink/flink-1.10.0/bin/flink run -p 30 -m flink-jobmanager-rest.flink-project.svc.cluster.local:8081 -d -c test.startClass Porject.jar | 
提交jar包
Reference
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/kubernetes.html
HA:
http://shzhangji.com/cnblogs/2019/08/25/deploy-flink-job-cluster-on-kubernetes/