申请资源
申请资源就得从ejv.allocateResourcesForAll 即 ExecutionJobVertex的allocateResourcesForAll 方法说起。这个方法先遍历了每个ExecutionJobVertex中的所有的task,为每一个task申请一个slot。Execution.allocateAndAssignSlotForExecution方法是主要代码。
1 |
|
默认是使用LocationPreferenceConstraint.ALL,,等待这个task的input部署完之后,才分配。
1 | /** |
整体的分配资源的方法,在如下中体现1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107/**
* 从slot提供者获取一个slot给一个execution
* Allocates and assigns a slot obtained from the slot provider to the execution.
*
* @param slotProvider to obtain a new slot from
* @param queued if the allocation can be queued
* @param locationPreferenceConstraint constraint for the location preferences
* @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph.
* Can be empty if the allocation ids are not required for scheduling.
* @param allocationTimeout rpcTimeout for allocating a new slot
* @return Future which is completed with this execution once the slot has been assigned
* or with an exception if an error occurred.
* @throws IllegalExecutionStateException if this method has been called while not being in the CREATED state
*/
public CompletableFuture<Execution> allocateAndAssignSlotForExecution(
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint,
@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds,
Time allocationTimeout) throws IllegalExecutionStateException {
checkNotNull(slotProvider);
assertRunningInJobMasterMainThread();
//获取jobVertex的shareing 组
final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();
// sanity check
if (locationConstraint != null && sharingGroup == null) {
throw new IllegalStateException(
"Trying to schedule with co-location constraint but without slot sharing allowed.");
}
// this method only works if the execution is in the state 'CREATED'
if (transitionState(CREATED, SCHEDULED)) {
final SlotSharingGroupId slotSharingGroupId = sharingGroup != null ? sharingGroup.getSlotSharingGroupId() : null;
ScheduledUnit toSchedule = locationConstraint == null ?
new ScheduledUnit(this, slotSharingGroupId) :
new ScheduledUnit(this, slotSharingGroupId, locationConstraint);
// try to extract previous allocation ids, if applicable, so that we can reschedule to the same slot
ExecutionVertex executionVertex = getVertex();
AllocationID lastAllocation = executionVertex.getLatestPriorAllocation();
//上次分配过的id
Collection<AllocationID> previousAllocationIDs =
lastAllocation != null ? Collections.singletonList(lastAllocation) : Collections.emptyList();
//计算偏好位置,根据input的位置计算,如果上游input多于8个, 忽略。
// calculate the preferred locations
final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
calculatePreferredLocations(locationPreferenceConstraint);
final SlotRequestId slotRequestId = new SlotRequestId();
final CompletableFuture<LogicalSlot> logicalSlotFuture =
preferredLocationsFuture.thenCompose(
(Collection<TaskManagerLocation> preferredLocations) ->
slotProvider.allocateSlot(
slotRequestId,
toSchedule,
new SlotProfile(
ResourceProfile.UNKNOWN,
preferredLocations,
previousAllocationIDs,
allPreviousExecutionGraphAllocationIds),
queued,
allocationTimeout));
// register call back to cancel slot request in case that the execution gets canceled
releaseFuture.whenComplete(
(Object ignored, Throwable throwable) -> {
if (logicalSlotFuture.cancel(false)) {
slotProvider.cancelSlotRequest(
slotRequestId,
slotSharingGroupId,
new FlinkException("Execution " + this + " was released."));
}
});
// This forces calls to the slot pool back into the main thread, for normal and exceptional completion
return logicalSlotFuture.handle(
(LogicalSlot logicalSlot, Throwable failure) -> {
if (failure != null) {
throw new CompletionException(failure);
}
if (tryAssignResource(logicalSlot)) {
return this;
} else {
// release the slot
logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.'));
throw new CompletionException(
new FlinkException(
"Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
}
});
} else {
// call race, already deployed, or already done
throw new IllegalExecutionStateException(this, CREATED, state);
}
}
计算偏好位置。1
2
3
final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
calculatePreferredLocations(locationPreferenceConstraint);
主要是getVertex().getPreferredLocations();
1 |
|
如果上游的个数超过8个,那么就不考虑偏好位置的因素1
2
3
4
5
6
7 private static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8;
if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
inputLocations.clear();
break;
}
slotProvider.allocateSlot。slotProvider的实现类SchedulerImpl.allocateSlot
1 |
|
一般都会有SlotSharingGroupId,因此大部分都是调用allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout);
1 | private CompletableFuture<LogicalSlot> allocateSharedSlot( |
在没有colocation的情况,会调用1
2
3
4
5
6multiTaskSlotLocality = allocateMultiTaskSlot(
scheduledUnit.getJobVertexId(),
multiTaskSlotManager,
slotProfile,
allowQueuedScheduling,
allocationTimeout);
LogicalSlot、 MultiTaskSlot 、 SingleTaskSlot
- LogicalSlot:逻辑槽表示TaskManager中可以部署单个任务的资源
- MultiTaskSlot 资源的父节点,下面挂了很多SingleTaskSlot。
- SingleTaskSlot 资源的节点的最小单元,一个。
举例:
1 | graph LR |
其实会转化为
1 | graph LR |
计算资源权重
1 |
|
回到申请MultiTaskSlot.首先会获取已经分配的root资源槽信息.并且这个资源不含有相同的操作,如果含有来自同一个jobVertex的,会被过滤(两个map的subTask不会在同一个slot中)。然后获取计算出现有资源的最佳槽,如果有,直接分配这个root(MultiTaskSlot),并且将SingTaskSlot注册在这个root下。
如果资源不够,则想resourceManager申请,通过rpc方法(slotPool.requestNewAllocatedSlot).如果已有的TaskManager没有足够的Slot,SlotManager会向ResourceManager申请新的TaskManager(在启动或者某个TaskExecutor挂掉的情况下会出现这种情况,或者yarn上再创建新的TM)。
1 | private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot( |
slotPoolImpl.requestNewAllocatedSlot–>requestNewAllocatedSlotInternal
1 | private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal( |
requestSlotFromResourceManager中调用resourceManagerGateway.requestSlot发起rpc请求申请资源
1 | private void requestSlotFromResourceManager( |
rpc的最后会调用到YarnResourceManager.startNewWorker(如果是yarn)启动一个新的容器
1 |
|
Reference
https://blog.csdn.net/qq475781638/article/details/90923673
https://www.cnblogs.com/andyhe/p/10633692.html