CLI提交Job
启动Job
1 | ./bin/flink run examples/streaming/SocketWindowWordCount.jar |
跟踪Flink的脚本代码就会发现,最终会执行以下命令:1
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.CliFrontend "$@"
实际上调用了CliFrontend这个类,这个类的main方法,主要是处理接到的参数,根据参数决定要执行上面函数。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34/**
* Submits the job based on the arguments.
*/
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines(
configuration,
configurationDirectory);
try {
final CliFrontend cli = new CliFrontend(
configuration,
customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
int retCode = SecurityUtils.getInstalledContext()
.runSecured(() -> cli.parseParameters(args));
System.exit(retCode);
}
catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
System.exit(31);
}
}
其中核心的部分在于cli.parseParameters(args)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57/**
* Parses the command line arguments and starts the requested action.
*
* @param args command line arguments of the client.
* @return The return code of the program
*/
public int parseParameters(String[] args) {
// check for action
if (args.length < 1) {
CliFrontendParser.printHelp();
System.out.println("Please specify an action.");
return 1;
}
// get action
String action = args[0];
// remove action from parameters
final String[] params = Arrays.copyOfRange(args, 1, args.length);
// do action
switch (action) {
case ACTION_RUN:
return run(params);
case ACTION_LIST:
return list(params);
case ACTION_INFO:
return info(params);
case ACTION_CANCEL:
return cancel(params);
case ACTION_STOP:
return stop(params);
case ACTION_SAVEPOINT:
return savepoint(params);
case "-h":
case "--help":
CliFrontendParser.printHelp();
return 0;
case "-v":
case "--version":
String version = EnvironmentInformation.getVersion();
String commitID = EnvironmentInformation.getRevisionInformation().commitId;
System.out.print("Version: " + version);
System.out.println(!commitID.equals(EnvironmentInformation.UNKNOWN) ? ", Commit ID: " + commitID : "");
return 0;
default:
System.out.printf("\"%s\" is not a valid action.\n", action);
System.out.println();
System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
System.out.println();
System.out.println("Specify the version option (-v or --version) to print Flink version.");
System.out.println();
System.out.println("Specify the help option (-h or --help) to get help on the command.");
return 1;
}
}
根据第一个参数作为action,调用不同的方法。jar包的运行的入参是run,往下看run方法。
1 | protected void run(String[] args) throws Exception { |
核心代码主要是runProgram(customCommandLine, commandLine, runOptions, program);
而runProgram中调用了executeProgram(program, client, userParallelism);这个函数,这个函数主要是与JobManager进行交互。
1 | protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException { |
与jobManager的交互结果全部都封装在了JobSubmissionResult实体中,
client.run的主要代码如下
1 |
|
run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings)中会调用submitJob方法,这个方法实现有两个MiniCluserClient.submitJob和RestClusterClient.submitJob。
MiniCluserClient.submitJob通过miniCluster.submitJob(jobGraph)提交作业,以本地模式运行。
1 |
|
RestClusterClient.submitJob发送RPC请求到JobManager
到这里,已经完成了通过CLI提交作业的过程,CLI通过RCP的方式与JobManger交互,实现job的提交。
JobMaster接收job submit请求
- 本地环境下,MiniCluster完成了大部分任务,直接把任务委派给了MiniDispatcher;
- 远程环境下,启动了一个RestClusterClient,这个类会以HTTP Rest的方式把用户代码提交到集群上;
- 远程环境下,请求发到集群上之后,必然有个handler去处理,在这里是JobSubmitHandler。这个类接手了请求后,委派StandaloneDispatcher启动job,到这里之后,本地提交和远程提交的逻辑往后又统一了;
- Dispatcher接手job之后,会实例化一个JobManagerRunner,然后用这个runner启动job;
- JobManagerRunner接下来把job交给了JobMaster去处理;
- JobMaster使用ExecutionGraph的方法启动了整个执行图,将Task天交给TaskManger,整个任务就启动起来了。
attention
JobManagerRunner负责作业的运行,和对JobMaster的管理。 不是集群级别的JobManager。这个是历史原因。JobManager是老的runtime框架,JobMaster是社区 flip-6引入的新的runtime框架。目前起作用的应该是JobMaster。因此这个类应该叫做和对JobMasterRunner比较合适。
从 flip-6起,开始通过使用JobMaster,以Flink Dispatcher分发到JobManagerRunner将JobGraph发给JobMaster,JobMaster再将JobGraph实现万ExecutionGraph,传递给TaskManager工作。
首先来看下Flink的四种图
flink总共提供了三种图的抽象,StreamGraph和JobGraph,还有一种是ExecutionGraph,是用于调度的基本数据结构。
在JobGraph转换到ExecutionGraph的过程中,主要发生了以下转变:
- 加入了并行度的概念,成为真正可调度的图结构
- 生成了与JobVertex对应的ExecutionJobVertex,ExecutionVertex,与IntermediateDataSet对应的IntermediateResult和IntermediateResultPartition等,并行将通过这些类实现
ExecutionGraph已经可以用于调度任务。我们可以看到,flink根据该图生成了一一对应的Task,每个task对应一个ExecutionGraph的一个Execution。Task用InputGate、InputChannel和ResultPartition对应了上面图中的IntermediateResult和ExecutionEdge。
Dispatcher.submitJob是提交job的主要处理方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
try {
if (isDuplicateJob(jobGraph.getJobID())) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobGraph.getJobID(), "Job has already been submitted."));
} else {
return internalSubmitJob(jobGraph);
}
} catch (FlinkException e) {
return FutureUtils.completedExceptionally(e);
}
}
主要运行的是Dispatcher中的submitJob—>internalSubmitJob—>persistAndRunJob –> runJob–>createJobManagerRunner–>startJobManagerRunner
下面是runJob方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private CompletableFuture<Void> runJob(JobGraph jobGraph) {
//判断这个job是否已经运行,避免重复提交
Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
return jobManagerRunnerFuture
.thenApply(FunctionUtils.nullFn())
.whenCompleteAsync(
(ignored, throwable) -> {
if (throwable != null) {
jobManagerRunnerFutures.remove(jobGraph.getJobID());
}
},
getMainThreadExecutor());
}
在createJobManagerRunner中会调用
DefaultJobManagerRunnerFactory.createJobManagerRunner创建一个JobManagerRunner,调用Dispatcher.startJobManagerRunner;–>jobManagerRunner.start();
jobManagerRunner.start()中调用了StandaloneHaServices.start
1 | public void start() throws Exception { |
StandaloneLeaderElectionService实际调用了JobManagerRunner.grantLeadership
1 |
|
JobManagerRunner.grantLeadership中的核心部分为verifyJobSchedulingStatusAndStartJobManager
1 | private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) { |
1 | private CompletionStage<Void> startJobMaster(UUID leaderSessionId) { |
接着JobMaster的启动,继续往下看
JobMaster.start ===> startJobExecution1
2
3
4
5
6public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
// make sure we receive RPC and async calls
start();
return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
}
1 | //-- job starting and stopping ----------------------------------------------------------------- |
这里将JobMastert中的slotpool启动,并和JM的ResourceManager通信1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private void startJobMasterServices() throws Exception {
// start the slot pool make sure the slot pool now accepts messages for this leader
slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());//slotPool是一个Rpc服务
scheduler.start(getMainThreadExecutor());
//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
// try to reconnect to previously known leader
reconnectToResourceManager(new FlinkException("Starting JobMaster component."));//连接resourceManager
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
// - on notification of the leader, the connection will be established and
// the slot pool will start requesting slots
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());//告知resourceManager启动正常
}
在slotPool和resourcemanager通信完毕后 开始执行job ,resetAndScheduleExecutionGraph();//执行job
这里会将JobGraph转为ExecutionGraph并执行
===>
scheduleExecutionGraph()
===>ExecutionGraph.scheduleForExecution();
1 |
|
==>scheduleEager(slotProvider, allocationTimeout);//立即执行
===>执行任务的核心方法
申请资源。
1 | private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) { |
execution.deploy();//任务触发执行
1 | public void deploy() throws JobException { |
1 | taskManagerGateway.submitTask(deployment, rpcTimeout)////==》AkkaInvocationHandler通过rpc调用,然后TM收到服务调用RpcTaskManagerGateway.submitTask |
到这里JobManger通过AkkaInvocationHandler调用rpc服务将任务提交到分配的TM上。
TM接收submitTask请求运行Task
AkkaInvocationHandler通过rpc调用,然后TM收到服务调用RpcTaskManagerGateway.submitTask
RpcTaskManagerGateway.submitTask
1 |
|
TaskExecutor.submitTask
1 | Task task = new Task( |
申请资源
申请资源就得从ejv.allocateResourcesForAll 即 ExecutionJobVertex的allocateResourcesForAll 方法说起。
未完待续
Reference
https://www.jianshu.com/p/4a5017f20641
https://www.jianshu.com/p/7b4af44eb3f3
https://blog.csdn.net/qq475781638/article/details/90923673