NetworkEnvironment
网络环境(NetworkEnvironment)是TaskManager进行网络通信的主对象,主要用于跟踪中间结果并负责所有的数据交换。每个TaskManager的实例都包含一个网络环境对象,在TaskManager启动时创建。NetworkEnvironment管理着多个协助通信的关键部件,它们是:
part | 解释 |
---|---|
NetworkBufferPool | 网络缓冲池,负责申请一个TaskManager的所有的内存段用作缓冲池;每个ResultPartition(等价于一个task一个)都有一个localBufferPool,与全局的NetworkBufferPool进行交互申请和释放内存段。 |
ConnectionManager | 连接管理器,用于管理本地(远程)通信连接; |
ResultPartitionManager | 结果分区管理器,用于跟踪一个TaskManager上所有生产/消费相关的ResultPartition;主要就是用于track所有的result partitions。 |
TaskEventDispatcher | 任务事件分发器,从消费者任务分发事件给生产者任务; |
ResultPartitionConsumableNotifier | 结果分区可消费通知器,用于通知消费者生产者生产的结果分区可消费; |
当NetworkEnvironment被初始化时,它首先根据配置创建网络缓冲池(NetworkBufferPool)。
1 | public NetworkEnvironment( |
初始化创建NetworkBufferPool时需要指定Buffer数目、单个Buffer的大小,并且申请的网络buffer为OffHeapMemory。使用array阻塞队列。1
ArrayBlockingQueue<MemorySegment> availableMemorySegments;
申请流程如下1
2
3
4
5
6try {
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));
}
}
catch (OutOfMemoryError err) {
关于NetworkBufferPool相关的在本章后面介绍。
在任务执行的核心逻辑中,有一个步骤是需要将自身(Task)注册到网络栈(也就是这里的NetworkEnvironment)。
该步骤会调用NetworkEnvironment的实例方法registerTask进行注册,注册之后NetworkEnvironment会对任务的通信进行管理:
1 | public void registerTask(Task task) throws IOException { |
初始化结果分区和初始化输入分区,是两个重要的核心。
初始化结果分区
核心操作是初始化localBufferPool,然后将localBufferPool注册到partition中,再将partition注册到结果分区管理器ResultPartitionManager中
1 | public void setupPartition(ResultPartition partition) throws IOException { |
初始化输入分区
主要是判断是否是基于信道的通信方式,决定要申请多少的buffer,然后将创建好的localBufferPool注册到对应的inputGate中。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void setupInputGate(SingleInputGate gate) throws IOException {
BufferPool bufferPool = null;
int maxNumberOfMemorySegments;
try {
if (enableCreditBased) {
maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
extraNetworkBuffersPerGate : Integer.MAX_VALUE;
// assign exclusive buffers to input channels directly and use the rest for floating buffers
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
} else {
maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
gate.getNumberOfInputChannels() * networkBuffersPerChannel +
extraNetworkBuffersPerGate : Integer.MAX_VALUE;
bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
maxNumberOfMemorySegments);
}
gate.setBufferPool(bufferPool);
} catch (Throwable t) {
if (bufferPool != null) {
bufferPool.lazyDestroy();
}
ExceptionUtils.rethrowIOException(t);
}
}
统一的数据交换对象
在Flink的执行引擎中,流动的元素主要有两种:缓冲(Buffer)和事件(Event)。Buffer主要针对用户数据交换,而Event则用于一些特殊的控制标识。但在实现时,为了在通信层统一数据交换,Flink提供了数据交换对象——BufferOrEvent。它是一个既可以表示Buffer又可以表示Event的类。上层使用者只需调用isBuffer和isEvent方法即可判断当前收到的这条数据是Buffer还是Event。
1 | /** |
缓冲 Buffer
缓冲(Buffer)是数据交换的载体,几乎所有的数据(当然事件是特殊的)交换都需要经过Buffer。Buffer底层依赖于Flink自管理内存的内存段(MemorySegment)作为数据的容器。Buffer在内存段上做了一层封装,这一层封装是为了对基于引用计数的Buffer回收机制提供支持。
具体实现NetworkBuffer.java,这个类继承了netty中的AbstractReferenceCountedByteBuf并且实现了自定义的Buffer。
AbstractReferenceCountedByteBuf是netty中已经实现的引用计数的功能。通过这个类,可以判断这个buffer是否需要回收。
1 | public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Buffer { |
它在内部维护着一个计数器referenceCount,初始值为1。内存回收由缓冲回收器(BufferRecycler)来完成,回收的对象就是内存段(MemorySegment)。
1 | /** The recycler for the backing {@link MemorySegment}. */ |
BufferRecycler接口有一个名为FreeingBufferRecycler的简单实现者,它的做法是直接释放内存段。当然通常为了分配和回收的效率,会对Buffer进行预先分配然后加入到Buffer池中。所以,BufferRecycler的常规实现是基于缓冲池的。
BufferRecycler的具体实现
LocalBufferPool
1 | private final NetworkBufferPool networkBufferPool; |
拥有一个NetworkBufferPool的属性,这个NetworkBufferPool是一个全局网络buffer池,一个TaskManger只有一个。
NetWork BufferPool 是 TaskManager 内所有 Task 共享的 BufferPool,TaskManager 初始化时就会向堆外内存申请 NetWork BufferPool。LocalBufferPool 是每个 Task 自己的 BufferPool,假如一个 TaskManager 内运行着 5 个 Task,那么就会有 5 个 LocalBufferPool,但 TaskManager 内永远只有一个 NetWork BufferPool。
1 | ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>(); |
当前可用的内存段。 这些段是从网络缓冲池中请求的,目前尚未作为缓冲区实例分发。
1 | private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>(); |
监听buffer是否可用,如果可用,通过这个队列通知。
requestBuffer()调用requestMemorySegment()将获取到的MemorySegment封装成NetworkBuffer,以自身为recycler的入参。申请内存这一块是像全局NetWork BufferPool申请。
requestMemorySegment()方法是申请内存块的
1 | private MemorySegment requestMemorySegment(boolean isBlocking) throws InterruptedException, IOException { |
deallocate是一个在netty线程执行的函数,调用自定义的recycle函数。
1 |
|
NetworkBufferPool
NetworkBufferPool是网络堆栈的 MemorySegment实例的固定大小的池。申请的是堆外内存。(个人猜想网络是一块需要经常进行替换数据的地方,频繁的替换会发送大量的young gc,在java中,C c = new C, c= new C;的方式旧的实例需要通过回收才可以消除,而通过堆外内存可以直接通过put byte直接在内存空间操作,不需要回收。)
1 | /** |
NetworkBufferPool中申请内存段和释放内存段,都是加入和放回自身的队列中。
每个task对应的localBufferPool需要buffer则去networkBufferPool的队列中获取。NetworkBufferPool有用的buffer数量,在初始化的时候就限定了。(这个数量是由配置计算出来的,具体见TaskManagerServices.calculateNetworkBufferMemory)
redistributeBuffers这个函数可以动态调节每个localPool的size,让手头空着的内存段,分配给更需要的本地pool中。通过1
2/需要申请的数量/总需要申请的数量 求出百分比,去掉小数点。
现有内存段个数 * 百分比 - 已经分配为可以分配的
需要申请的段越多 百分比就越大,优先分配给权重大的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70private void redistributeBuffers() throws IOException {
assert Thread.holdsLock(factoryLock);
// All buffers, which are not among the required ones
final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
//如果可用内存段为0,将所有的localBufferpool中申请的多余的buffer返还给队列
if (numAvailableMemorySegment == 0) {
// in this case, we need to redistribute buffers so that every pool gets its minimum
for (LocalBufferPool bufferPool : allBufferPools) {
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
}
return;
}
/*
* With buffer pools being potentially limited, let's distribute the available memory
* segments based on the capacity of each buffer pool, i.e. the maximum number of segments
* an unlimited buffer pool can take is numAvailableMemorySegment, for limited buffer pools
* it may be less. Based on this and the sum of all these values (totalCapacity), we build
* a ratio that we use to distribute the buffers.
*/
long totalCapacity = 0; // long to avoid int overflow
//计算每个LocalBufferPool的还可以申请内存段之和
for (LocalBufferPool bufferPool : allBufferPools) {
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
bufferPool.getNumberOfRequiredMemorySegments();
totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
}
// no capacity to receive additional buffers?
if (totalCapacity == 0) {
return; // necessary to avoid div by zero when nothing to re-distribute
}
// since one of the arguments of 'min(a,b)' is a positive int, this is actually
// guaranteed to be within the 'int' domain
// (we use a checked downCast to handle possible bugs more gracefully).
final int memorySegmentsToDistribute = MathUtils.checkedDownCast(
Math.min(numAvailableMemorySegment, totalCapacity));
long totalPartsUsed = 0; // of totalCapacity
int numDistributedMemorySegment = 0;
for (LocalBufferPool bufferPool : allBufferPools) {
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
bufferPool.getNumberOfRequiredMemorySegments();
// shortcut
if (excessMax == 0) {
continue;
}
totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);
// avoid remaining buffers by looking at the total capacity that should have been
// re-distributed up until here
// the downcast will always succeed, because both arguments of the subtraction are in the 'int' domain
//需要申请的数量/总需要申请的数量 求出百分比,去掉小数点。 * 现有内存段个数 * 百分比 - 已经分配为可以分配的
final int mySize = MathUtils.checkedDownCast(
memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);
numDistributedMemorySegment += mySize;
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
}
assert (totalPartsUsed == totalCapacity);
assert (numDistributedMemorySegment == memorySegmentsToDistribute);
}
1 |
|
配置天坑
1 | /** |
通过配比内存大小与 最小大小比,取最大值,再与最大值比取较小值。。结果就是哪怕你的配比配的再大,网络buffer也是1gb。。。转为Count为 10241024/32K= 32,768。。。如果需要调整网络参数,一定要把最大值和最小值一起配上。1
2final long networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
(long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction)));
事件 Event
Flink的数据流中不仅仅只有用户的数据,还包含了一些特殊的事件,这些事件都是由算子注入到数据流中的。它们在每个流分区里伴随着其他的数据元素而被有序地分发。接收到这些事件的算子会对这些事件给出响应,典型的事件类型有:
- 检查点屏障:用于隔离多个检查点之间的数据,保障快照数据的一致性;
- 迭代屏障:标识流分区已到达了一个超级步的结尾;
- 子分区数据结束标记:当消费任务获取到该事件时,表示其所消费的对应的分区中的数据已被全部消费完成;
所有事件的最终基类都是AbstractEvent。AbstractEvent这一抽象类又派生出另一个抽象类RuntimeEvent,几乎所有预先内置的事件都直接派生于此。除了预定义的事件外,Flink还支持自定义的扩展事件,所有自定义的事件都继承自派生于AbstractEvent的TaskEvent。
ResultPartitionManager
ResultPartitionManager:结果分区管理器,用于跟踪一个TaskManager上所有生产/消费相关的ResultPartition;主要就是用于track所有的result partitions,核心结构为1
Table<ExecutionAttemptID, IntermediateResultPartitionID, ResultPartition> registeredPartitions =HashBasedTable.create();
通过createSubpartitionView创建消费ResultSubpartition的视图ResultSubpartitionView,入参之一为BufferAvailabilityListener,是用来notify这个listener有数据到来,可以消费。
所以ResultSubpartition就是消费分区,ResultSubpartitionView是消费分区与消费者绑定在一起的视图。
TaskEventDispatcher
任务事件分发器,从消费者任务分发事件给生产者任务。
ConnectionManager
连接管理器,用于管理本地(远程)通信连接.存在两种模式,一种是本地,一种是netty远程。线上环境都是netty模式。
1 | NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig(); |
netty模式涉及到基于Netty的网络通信部分。后面再讲。
结果分区消费端
输入网关(InputGate)用于消费中间结果(IntermediateResult)在并行执行时由子任务生产的一个或多个结果分区(ResultPartition)。
Flink当前提供了两个输入网关的实现,分别是:
- SingleInputGate:常规输入网关;
- UnionInputGate:联合输入网关,它允许将多个输入网关联合起来;
我们主要分析SingleInputGate,因为它是消费ResultPartition的实体,而UnionInputGate主要充当InputGate容器的角色。
作为数据的消费者,InputGate最关键的方法自然是获取生产者所生产的缓冲区,提供该功能的方法为getNextBufferOrEvent,它返回的对象是我们后面谈到的统一的数据交换对象BufferOrEvent。
BufferOrEvent的直接消费对象是通信层API中的记录读取器,它会将Buffer中的数据反序列化为记录供上层任务使用。
1 | private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException { |
由于requestPartitions只是起到触发其内部的InputChannel去请求的作用,这个调用可能并不会阻塞等待远程数据被返回。因为不同的InputChannel其请求的机制并不相同,RemoteChannel就是利用Netty异步请求的
SingleInputGate.requestPartitions1
2
3
4
for (InputChannel inputChannel : inputChannels.values()) {
inputChannel.requestSubpartition(consumedSubpartitionIndex);
}
所以SingleInputGate采用阻塞等待以及事件回调的方式来等待InputChannel上的数据可用。具体而言,它在while代码块中循环阻塞等待有可获取数据的InputChannel。而可用的InputChannel则由它们自己通过回调SingleInputGate的onAvailableBuffer添加到阻塞队列inputChannelsWithData中来。当有可获取数据的InputChannel之后,即可获取到Buffer。
UnionInputGate
UnionInputGate,它更像一个包含SingleInputGate的容器,同时可以这些SingleInputGate拥有的InputChannel联合起来。并且多数InputGate约定的接口方法的实现,都被委托给了每个SingleInputGate。
那么它在实现getNextBufferOrEvent方法的时候,到底从哪个InputGate来获得缓冲区呢。它采用的是事件通知机制,所有加入UnionInputGate的InputGate都会将自己注册到InputGateListener。当某个InputGate上有数据可获取,该InputGate将会被加入一个阻塞队列。接着我们再来看getNextBufferOrEvent方法的实现:
1 | public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException { |
输入通道
一个InputGate包含多个输入通道(InputChannel),输入通道用于请求ResultSubpartitionView,并从中消费数据。
1 | 所谓的ResultSubpartitionView是由ResultSubpartition所创建的用于供消费者任务消费数据的视图对象。 |
对于每个InputChannel,消费的生命周期会经历如下的方法调用过程:
- requestSubpartition:请求ResultSubpartition;
- getNextBuffer:获得下一个Buffer;
- releaseAllResources:释放所有的相关资源;
InputChannel根据ResultPartitionLocation提供了三种实现:
- LocalInputChannel:用于请求同实例中生产者任务所生产的ResultSubpartitionView的输入通道;
- RemoteInputChannel:用于请求远程生产者任务所生产的ResultSubpartitionView的输入通道;
- UnknownInputChannel:一种用于占位目的的输入通道,需要占位通道是因为暂未确定相对于生产者任务位置,但最终要么被替换为RemoteInputChannel,要么被替换为LocalInputChannel。
LocalInputChannel会从相同的JVM实例中消费生产者任务所生产的Buffer。因此,这种模式是直接借助于方法调用和对象共享的机制完成消费,无需跨节点网络通信。具体而言,它是通过ResultPartitionManager来直接创建对应的ResultSubpartitionView的实例,这种通道相对简单。
RemoteInputChannel是我们重点关注的输入通道,因为它涉及到远程请求结果子分区。远程数据交换的通信机制建立在Netty框架的基础之上,因此会有一个主交互对象PartitionRequestClient来衔接通信层跟输入通道。
我们以请求子分区的requestSubpartition为入口来进行分析。首先,通过一个ConnectionManager根据连接编号(对应着目的主机)来创建PartitionRequestClient实例。1
2partitionRequestClient = connectionManager
.createPartitionRequestClient(connectionId);
接着具体的请求工作被委托给PartitionRequestClient的实例:
1 | partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0); |
Netty以异步的方式处理请求。因此,上面的代码段中会看到将代表当前RemoteChannel实例的this对象作为参数注入到Netty的特定的ChannelHandler中去,在处理时根据特定的处理逻辑会触发RemoteChannel中相应的回调方法。
在RemoteChannel中定义了多个“onXXX”回调方法来衔接Netty的事件回调。其中,较为关键的自然是接收到数据的onBuffer方法:
RemoteInputChannel.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
boolean recycleBuffer = true;
try {
final boolean wasEmpty;
synchronized (receivedBuffers) {
// Similar to notifyBufferAvailable(), make sure that we never add a buffer
// after releaseAllResources() released all buffers from receivedBuffers
// (see above for details).
if (isReleased.get()) {
return;
}
//如果实际序列号跟所期待的序列号不一致,则会触发onError回调,并相应以一个特定的异常对象
//该方法调用在成功设置完错误原因后,同样会触发notifyAvailableBuffer方法调用
if (expectedSequenceNumber != sequenceNumber) {
onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
return;
}
wasEmpty = receivedBuffers.isEmpty();
//将数据加入接收队列同时将预期序列号计数器加一
receivedBuffers.add(buffer);
recycleBuffer = false;
}
++expectedSequenceNumber;
if (wasEmpty) {
notifyChannelNonEmpty();
}
if (backlog >= 0) {
onSenderBacklog(backlog);
}
} finally {
if (recycleBuffer) {
buffer.recycleBuffer();
}
}
}
onBuffer方法的执行处于Netty的I/O线程上,但RemoteInputChannel中getNextBuffer却不会在Netty的I/O线程上被调用,所以必须有一个数据共享的容器,这个容器就是receivedBuffers队列。getNextBuffer就是直接从receivedBuffers队列中出队一条数据然后返回。
Reference
https://zhuanlan.zhihu.com/p/35008079
https://blog.csdn.net/yanghua_kobe/article/details/53648748
https://blog.csdn.net/yanghua_kobe/article/details/53946640
https://blog.csdn.net/yanghua_kobe/article/details/54089128
https://www.jianshu.com/p/2779e73abcb8
https://www.jianshu.com/p/c261307757c4
基于netty通信