数据传递
AbstractStreamOperator$CountingOutput.collect1
2
3
4
5
6
public void collect(StreamRecord<OUT> record) {
numRecordsOut.inc();
output.collect(record);
}
同一个TM和同一线程的Operator的区分在这里。如果是同一个TM
用到的是RecordWriterOutput.collect,如果是同一个线程的operator,用到的是OperatorChain$CopyingChainingOutput.collect
同一线程的Operator数据传递(同一个Task)
allOperators中有算子和out的关系,在调用out(CopyingChainingOutput)的时候,会调用pushToOperator函数,这个函数内部会通过深拷贝复制出一个实体,在这个out中,存在一个属性,这个属性是下游的function,然后调用function函数进行计算并继续out出去。
构建operation chain
这个过程就是将整个operation chain构建出来,然后将CopyingChainingOutput中注入下游operation,形成当前operation包含CopyingChainingOutput,CopyingChainingOutput中有下游的operation。
实现数据从上游处理后,调用out.collect可以直接传递给下游的operation。
operatorChain = new OperatorChain<>(this, recordWriters);获取这个task的整个操作链和headOperator,初始化output.
1 |
|
重点函数
1 | // 创建操作链,并且将 设置链的leads |
OperatorChain1
2
3
4
5
6
7
8private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators,
OutputTag<IN> outputTag) {
先获取同一个操作链中head的outputEdge,先通过递归的方式,创建每个operation对应的output,如下
1 | WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector( |
然后将操作函数和out组成chainedOperator加入allOperators.
1 | currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this); |
通过如上方法,将chainedOperator(下一个operation)和CopyingChainingOutput绑定在一起,调用和CopyingChainingOutput的collect方法可以直接调用operation.processElements实现内部调用。。
同一个线程的Operator中数据的复制
OperatorChain1
2StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue())); 数据复制
operator.processElement(copy);
serializer会有多种类型,StringSerializer、TupleSerializer、PojoSerializer等等,String这种类型,copy方法会就是直接返回,对于Tuple、Pojo,Tuple会创建一个Tuple,将原来的数据set到新的Tuple中,如果是Pojo,会通过反射,
先将所有的属性设置为可以访问,屏蔽掉private的影响1
this.fields[i].setAccessible(true);
然后根据字段对应的类型用对应的序列化器深度copy一个值
1 | Object value = fields[i].get(from); |
本地线程数据传递(同一个TM)
如果并行度一致,并且是forward,会被归在同一个operationChain,走上面的情况,如果不是,则会归属在不同的task中。不同task会出现两种情况,一种是task归属于同一个TM,一种是task归属于不同TM
不同个TM通过RecordWriter来发送数据到下游。
1 | graph TB |
RecordWriter初始化
在StreamTask.createRecordWriters. 某一个顶点,以后多个输出那么List
RecordWriter的初始化是在Task创建StreamTask的时候调用如下构建的
Task.java1
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
StreamTask.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
StreamConfig configuration,
Environment environment) {
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<>();
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());
Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(environment.getUserClassLoader());
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge edge = outEdgesInOrder.get(i);
recordWriters.add(
createRecordWriter(
edge,
i,
environment,
environment.getTaskInfo().getTaskName(),
chainedConfigs.get(edge.getSourceId()).getBufferTimeout()));
}
return recordWriters;
}
如果下游有一个,就创建一RecordWriter,如果下游有两个(重复消费),就创建两个RecordWriter。
1 | private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter( |
重点在于如下代码,可以构建出下游有多少消费者1
ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
environment的Writer是对应到Task中的producedPartitions,producedPartitions的初始化是在Task初始化的时候.
Task.java的构造函数.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23// Produced intermediate result partitions
this.producedPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()];
int counter = 0;
for (ResultPartitionDeploymentDescriptor desc: resultPartitionDeploymentDescriptors) {
ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId);
this.producedPartitions[counter] = new ResultPartition(
taskNameWithSubtaskAndId,
this,
jobId,
partitionId,
desc.getPartitionType(),
desc.getNumberOfSubpartitions(),
desc.getMaxParallelism(),
networkEnvironment.getResultPartitionManager(),
resultPartitionConsumableNotifier,
ioManager,
desc.sendScheduleOrUpdateConsumersMessage());
++counter;
}
如果是同一个operationChain中,resultPartitionDeploymentDescriptors为0,如果是出现不同个task的情况,resultPartitionDeploymentDescriptors会是下游的个数(source(1并行度)—>map(3并行度)会有一个resultPartitionDeploymentDescriptors会是下游操作类型的个数)
这个时候就初始化好了ResultPartition,如果下游的并行度为3,那么该数组为3,对应三个ResultPartition。
resultPartitionDeploymentDescriptors怎么来的?
resultPartitionDeploymentDescriptors是TaskDeploymenentDescriptor的一部分信息,TaskDeploymenentDescriptor包含了部署一个task在一个taskManger中的所有信息。
RecordWriter传递数据
RecordWriterOutput.collect
1 |
|
RecordWriter
1 | public void emit(T record) throws IOException, InterruptedException { |
copyFromSerializerToTargetChannel这个会将数据发到resultSubpartition,传递到buffer中,再针对是否是本地数据进行操作,如果是不同TM,就会通过rpc发送,如果是同一个TM,会放在IG中,通知下游来获取
1 | serializer.serializeRecord(record); |
数据序列化会调用SerializationDelegate.write,1
2
3
4
5
public void write(DataOutputView out) throws IOException {
this.serializer.serialize(this.instance, out);
}
`
实际调用了StreamElementSerializer.serialize
1 |
|
针对数据还是waterMark或者其他状态数据进行处理。如果是数据,先判断数据是否有时间,调用这个数据对应类型的序列化器(TupleSerializer)的serialize方法,将数据写成buffer转成DataOutputView实体吗,变成序列化数据.
然后在RecordWriteer.java中的emit方法调用copyFromSerializerToTargetChannel将序列化的数据发送出去1
2
3
4
5
6
7private void emit(T record, int targetChannel) throws IOException, InterruptedException {
serializer.serializeRecord(record);
if (copyFromSerializerToTargetChannel(targetChannel)) {
serializer.prune();
}
}
将序列化数据写到buffer中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31private boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
// We should reset the initial position of the intermediate serialization buffer before
// copying, so the serialization results can be copied to multiple target buffers.
serializer.reset();
boolean pruneTriggered = false;
BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
numBytesOut.inc(bufferBuilder.finish());
numBuffersOut.inc();
// If this was a full record, we are done. Not breaking out of the loop at this point
// will lead to another buffer request before breaking out (that would not be a
// problem per se, but it can lead to stalls in the pipeline).
if (result.isFullRecord()) {
pruneTriggered = true;
bufferBuilders[targetChannel] = Optional.empty();
break;
}
bufferBuilder = requestNewBufferBuilder(targetChannel);
result = serializer.copyToBufferBuilder(bufferBuilder);
}
checkState(!serializer.hasSerializedData(), "All data should be written at once");
if (flushAlways) {
targetPartition.flush(targetChannel);
}
return pruneTriggered;
}
下游OneInputStreamTask.run()会通过死循环一直获取数据来消费1
2
3while (running && inputProcessor.processInput()) {
// all the work happens in the "processInput" method
}
inputProcessor对应的是StreamInputProcessor,中间有一个变量为inputGate.1
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
上面的代码会从buffer中逆序列化成具体的数据,然后交给userfunction处理..
buffer的获取,是通过如下先获取buffer,再将buffer放入currentRecordDeserializer,调用上面的currentRecordDeserializer.getNextRecord(deserializationDelegate);将buffer中解析到的数据逆序列化成deserializationDelegate。1
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
1 | currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); |
如何从上游中获取buffer
StreamInputProcessor1
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
实际调用BarrierTrack.getNextNonBlocked()
1 | Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent(); |
SingleInputGate1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException {
return getNextBufferOrEvent(true);
}
private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
················
if (blocking) {
inputChannelsWithData.wait();
}
·····················
}
这个地方会wait,等待上游的ResultSubpartition去notify
远程线程数据传递(不同TM)
与同一个TM不一样的地方在于SingleInputGate中的InputChannel,
同一个TM用的是LocalInputChannel,不同TM用的是RemoteInputChannel.
RemoteInputCahnnel获取数据
1 |
|
通过ArrayDeque