Flink调度
Flink调度^1的时候,会尽量将同一个Task的多个并行度,分配到同一个TM中。会使得某一些TM的cpu消耗比较大,资源使用不够均衡。
配置项
cluster.evenly-spread-out-slots
默认是false。可以配置成true使得task分散分配在不同的TM中。
Enable the slot spread out allocation strategy. This strategy tries to spread out the slots evenly across all available TaskExecutors
.
源码解析
JobMaster初始化的时候,会初始化一个调度工程SchedulerFactory。默认的实现是DefaultSchedulerFactory。
SchedulerFactory调用selectSlotSelectionStrategy通过配置获取是否配置了cluster.evenly-spread-out-slots。
SlotSelectionStrategy用于task选择slot的位置。有两种策略。
LocationPreferenceSlotSelectionStrategy.createDefault()和LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut()。
LocationPreferenceSlotSelectionStrategy的两种实现
对应两个实体:
默认调度:DefaultLocationPreferenceSlotSelectionStrategy
分散调度:EvenlySpreadOutLocationPreferenceSlotSelectionStrategy
1 | selectWitLocationPreference |
DefaultLocationPreferenceSlotSelectionStrategy和EvenlySpreadOutLocationPreferenceSlotSelectionStrategy给出了不一样的计算公式
1 | protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference(@Nonnull Collection<SlotInfoAndResources> availableSlots, @Nonnull ResourceProfile resourceProfile) { |
EvenlySpreadOut的分发策略是将有相同task的slot的尽量分配在不同TM上
1 | protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference(@Nonnull Collection<SlotInfoAndResources> availableSlots, @Nonnull ResourceProfile resourceProfile) { |
taskExecutorUtilization^3是taskExecutor的利用率,每个JobVertex都有一个groupId,会有多个task,taskExecutor中有n个slot,没有被release的个数为m,其中m中没有分配同一个groupid的slot数量为k,
利用率是(m-k)/m。在[0,1]之间。
min(Comparator.comparing(SlotInfoAndResources::getTaskExecutorUtilization)) 获取最小的同groupId占用率。然后将该Solt返回
补充说明
ResourceId
1 | /** |
host源码
1 | /** |
calculateTaskExecutorUtilization
1 | private double calculateTaskExecutorUtilization(Map<AllocationID, MultiTaskSlot> map, AbstractID groupId) { |