AbstractUdfStreamOperator
基本上所有的流式操作,都继承了这个类,如果是单流操作就实现OneInputStreamOperator接口,如果是双流操作就实现TwoInputStreamOperator接口。
AbstractUdfStreamOperator 继承了 AbstractStreamOperator。
AbstractUdfStreamOperator 源码简版:
1 | public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> { |
AbstractUdfStreamOperator 类中有个泛型 F 类型属性 userFunction,用于保存用户定义的 MapFunction、FlatMapFunction。AbstractUdfStreamOperator 的构造器可以将 userFunction 保存起来。
也提供了open和close方法等,可以在调用上层的close方法后调用用户的close方法。
AbstractStreamOperator
所有流操作的基类。
1 | public abstract class AbstractStreamOperator<OUT> |
一个算子,可能是keyed,也可能是非keyed,包含了两者都该有的属性,主要负责生命周期相关的操作。
StreamTask
所有stream task的基本类。一个task 运行一个或者多个StreamOperator(如果成chain)。成chain的算子在同一个线程内同步运行。
每一个 StreamNode
在添加到 StreamGraph
的时候都会有一个关联的 jobVertexClass
属性,这个属性就是该 StreamNode
对应的 StreamTask
类型;对于一个 OperatorChain
而言,它所对应的 StreamTask
就是其 head operator 对应的 StreamTask
生命周期如下
1 | * -- invoke() |
创建状态存储后端,为 OperatorChain 中的所有算子提供状态
加载 OperatorChain 中的所有算子
所有的 operator 调用
setup
task 相关的初始化操作
所有 operator 调用
initializeState
初始化状态所有的 operator 调用
open
run
方法循环处理数据所有 operator 调用
close
所有 operator 调用
dispose
通用的 cleanup 操作
task 相关的 cleanup 操作
1 | abstract class StreamTask { |
在beforeInvoke
中会做一些初始化工作,包括提取出所有的operator等。
在runMailboxLoop
中调用task运行
在afterInvoke
中结束
StreamTask With Mailbox
读过Flink源码的都见过checkpointLock,这个主要是用来隔离不同线程对状态的操作,但是这种使用object lock的做法,使得main thread不得不将这个object传递给所有需要操作状态的线程,一来二去,就会发现源代码里出现大量的synchronize(lock)。这样对于开发和调试和源码阅读,都是及其不方便的。目前checkpoint lock主要使用在以下三个地方:
- Event Processing: Operator在初始化时使用lock来隔离TimerService的触发,在process element时隔离异步snapshot线程对状态的干扰,总之,很多地方都使用了这个checkpoint lock。
- Checkpoint: 自然不用说,在performCheckpoint中使用了lock。
- Processing Time Timers: 这个线程触发的callback常常对状态进行操作,所以也需要获取lock。
在重构这里,社区提议使用Mailbox加单线程的方式来替代checkpoint lock。那么Mailbox就成为了StreamTask的信息来源,(大多数情况下)替代了StreamTask#run():
1 |
|
那么对于上面三者,异步的checkpoint和processing timer会将checkpoint lock中的逻辑变成一个Runnable,放入到Mailbox中,这样我们就将并发变成了基于Mailbox的单线程模型,整个StreamTask看起来会更加轻量。
这个是任务运行的核心,即这里会产生action交由MailboxProcessor
执行。processInput
方法处理输入,是task的默认action,在输入上处理一个事件(event)。
1 | public void runMailboxLoop() throws Exception { |
上面的方法中,最关键的有两个地方:
processMail(): 它会检测 MailBox 中是否有 mail 需要处理,如果有的话,就做相应的处理,一直将全部的 mail 处理完才会返回,只要 loop 还在进行,这里就会返回 true,否则会返回 false
runDefaultAction(): 这个最终调用的 StreamTask 的 processInput() 方法,event-processing 的处理就是在这个方法中进行的
process-mail 处理
它会检测 MailBox 中是否有 mail 需要处理,如果有的话,就做相应的处理,一直将全部的 mail 处理完才会返回,只要 loop 还在进行,这里就会返回 true,否则会返回 false。
1 | private boolean processMail(TaskMailbox mailbox) throws Exception { |
mail的类型:
Checkpoint Trigger
我们知道 TaskManager 收到 JM 的 triggerCheckpoint 消息后,会调用 SourceStreamTask 的 triggerCheckpointAsync 方法,对于非 ExternallyInducedSource(该类用于外部测试触发 checkpoint 使用),会直接调用 Streamtask 的 triggerCheckpointAsync 方法,实现如下:
1 | // StreamTask.java |
processTimer
1 | public ProcessingTimeService getProcessingTimeService(int operatorIndex) { |
event-processing 处理
event-processing 现在是在 processInput() 方法中实现的
1 |
|
判断当前状态来决定是否要继续这个action:如果当前有更多输入,且输出(recordWriter
)就绪,那么直接返回(因为还有更多的输入,因此不结束action);如果输入已经结束,标记一下action为结束状态,直接返回;否则将当前的action暂停,直到有输入且输出(recordWriter
)就绪的时候恢复执行(异步等待)
StreamInputProcessor
StreamInputProcessor
有三个实现类,分别是:
1 | StreamOneInputProcessor |
这三个实现类都有一个成员变量:
1 | private final OperatorChain<?, ?> operatorChain; |
配套这个成员变量的还有两组成员变量,配套的意思是如果是StreamTwoInputProcessor
,那么下面就有两组:
1 | private final StreamTaskInput<IN> input; |
这里的input负责读,读到ouput中,调用ouput的方法,例如emitRecord
,这个方法的实现类一般是某个StreamTask
子类的实现类,在这里会开始处理这个输入数据,例如OneInputStreamTask
的内部类中的一个实现:
1 |
|
再结合 MailboxProcessor 中的 runMailboxLoop() 实现一起看,其操作的流程是:
1.首先通过 processMail() 方法处理 MailBox 中的 mail:
- 如果没有 mail 要处理,这里直接返回;
- 先将 MailBox 中当前现存的 mail 全部处理完;
- 通过 isDefaultActionUnavailable() 做一个状态检查(目的是提供一个接口方便上层控制调用,这里把这个看作一个状态检查方便讲述),如果是 true 的话,会在这里一直处理 mail 事件,不会返回,除非状态改变;
2.然后再调用 StreamTask 的 processInput() 方法来处理 event:
- 先调用 StreamInputProcessor 的 processInput() 方法来处理 event;
- 如果上面处理结果返回的状态是 MORE_AVAILABLE(表示还有可用的数据等待处理)并且 recordWriter 可用(之前的异步操作已经处理完成),就会立马返回;
- 如果上面处理结果返回的状态是 END_OF_INPUT,它表示数据处理完成,这里就会告诉 MailBox 数据已经处理完成了;
- 否则的话,这里会等待,直到有可用的数据到来及 recordWriter 可用。
Reference
http://www.liaojiayi.com/streamtask/
https://www.jishuwen.com/d/27Oh