AbstractUdfStreamOperator
基本上所有的流式操作,都继承了这个类,如果是单流操作就实现OneInputStreamOperator接口,如果是双流操作就实现TwoInputStreamOperator接口。
AbstractUdfStreamOperator 继承了 AbstractStreamOperator。
AbstractUdfStreamOperator 源码简版:
1 | public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> { |
AbstractUdfStreamOperator 类中有个泛型 F 类型属性 userFunction,用于保存用户定义的 MapFunction、FlatMapFunction。AbstractUdfStreamOperator 的构造器可以将 userFunction 保存起来。
也提供了open和close方法等,可以在调用上层的close方法后调用用户的close方法。
AbstractStreamOperator
所有流操作的基类。
1 | public abstract class AbstractStreamOperator<OUT> |
一个算子,可能是keyed,也可能是非keyed,包含了两者都该有的属性,主要负责生命周期相关的操作。
StreamTask
所有stream task的基本类。一个task 运行一个或者多个StreamOperator(如果成chain)。成chain的算子在同一个线程内同步运行。
每一个 StreamNode 在添加到 StreamGraph 的时候都会有一个关联的 jobVertexClass 属性,这个属性就是该 StreamNode 对应的 StreamTask 类型;对于一个 OperatorChain 而言,它所对应的 StreamTask 就是其 head operator 对应的 StreamTask
生命周期如下
1 | * -- invoke() |
创建状态存储后端,为 OperatorChain 中的所有算子提供状态
加载 OperatorChain 中的所有算子
所有的 operator 调用
setuptask 相关的初始化操作
所有 operator 调用
initializeState初始化状态所有的 operator 调用
openrun方法循环处理数据所有 operator 调用
close所有 operator 调用
dispose通用的 cleanup 操作
task 相关的 cleanup 操作
1 | abstract class StreamTask { |
在beforeInvoke中会做一些初始化工作,包括提取出所有的operator等。
在runMailboxLoop中调用task运行
在afterInvoke中结束
StreamTask With Mailbox

读过Flink源码的都见过checkpointLock,这个主要是用来隔离不同线程对状态的操作,但是这种使用object lock的做法,使得main thread不得不将这个object传递给所有需要操作状态的线程,一来二去,就会发现源代码里出现大量的synchronize(lock)。这样对于开发和调试和源码阅读,都是及其不方便的。目前checkpoint lock主要使用在以下三个地方:
- Event Processing: Operator在初始化时使用lock来隔离TimerService的触发,在process element时隔离异步snapshot线程对状态的干扰,总之,很多地方都使用了这个checkpoint lock。
- Checkpoint: 自然不用说,在performCheckpoint中使用了lock。
- Processing Time Timers: 这个线程触发的callback常常对状态进行操作,所以也需要获取lock。
在重构这里,社区提议使用Mailbox加单线程的方式来替代checkpoint lock。那么Mailbox就成为了StreamTask的信息来源,(大多数情况下)替代了StreamTask#run():
1 |
|
那么对于上面三者,异步的checkpoint和processing timer会将checkpoint lock中的逻辑变成一个Runnable,放入到Mailbox中,这样我们就将并发变成了基于Mailbox的单线程模型,整个StreamTask看起来会更加轻量。
这个是任务运行的核心,即这里会产生action交由MailboxProcessor执行。processInput方法处理输入,是task的默认action,在输入上处理一个事件(event)。
1 | public void runMailboxLoop() throws Exception { |
上面的方法中,最关键的有两个地方:
processMail(): 它会检测 MailBox 中是否有 mail 需要处理,如果有的话,就做相应的处理,一直将全部的 mail 处理完才会返回,只要 loop 还在进行,这里就会返回 true,否则会返回 false
runDefaultAction(): 这个最终调用的 StreamTask 的 processInput() 方法,event-processing 的处理就是在这个方法中进行的
process-mail 处理
它会检测 MailBox 中是否有 mail 需要处理,如果有的话,就做相应的处理,一直将全部的 mail 处理完才会返回,只要 loop 还在进行,这里就会返回 true,否则会返回 false。
1 | private boolean processMail(TaskMailbox mailbox) throws Exception { |
mail的类型:
Checkpoint Trigger
我们知道 TaskManager 收到 JM 的 triggerCheckpoint 消息后,会调用 SourceStreamTask 的 triggerCheckpointAsync 方法,对于非 ExternallyInducedSource(该类用于外部测试触发 checkpoint 使用),会直接调用 Streamtask 的 triggerCheckpointAsync 方法,实现如下:
1 | // StreamTask.java |
processTimer
1 | public ProcessingTimeService getProcessingTimeService(int operatorIndex) { |
event-processing 处理
event-processing 现在是在 processInput() 方法中实现的
1 |
|
判断当前状态来决定是否要继续这个action:如果当前有更多输入,且输出(recordWriter)就绪,那么直接返回(因为还有更多的输入,因此不结束action);如果输入已经结束,标记一下action为结束状态,直接返回;否则将当前的action暂停,直到有输入且输出(recordWriter)就绪的时候恢复执行(异步等待)
StreamInputProcessor
StreamInputProcessor有三个实现类,分别是:
1 | StreamOneInputProcessor |
这三个实现类都有一个成员变量:
1 | private final OperatorChain<?, ?> operatorChain; |
配套这个成员变量的还有两组成员变量,配套的意思是如果是StreamTwoInputProcessor,那么下面就有两组:
1 | private final StreamTaskInput<IN> input; |
这里的input负责读,读到ouput中,调用ouput的方法,例如emitRecord,这个方法的实现类一般是某个StreamTask子类的实现类,在这里会开始处理这个输入数据,例如OneInputStreamTask的内部类中的一个实现:
1 |
|
再结合 MailboxProcessor 中的 runMailboxLoop() 实现一起看,其操作的流程是:
1.首先通过 processMail() 方法处理 MailBox 中的 mail:
- 如果没有 mail 要处理,这里直接返回;
- 先将 MailBox 中当前现存的 mail 全部处理完;
- 通过 isDefaultActionUnavailable() 做一个状态检查(目的是提供一个接口方便上层控制调用,这里把这个看作一个状态检查方便讲述),如果是 true 的话,会在这里一直处理 mail 事件,不会返回,除非状态改变;
2.然后再调用 StreamTask 的 processInput() 方法来处理 event:
- 先调用 StreamInputProcessor 的 processInput() 方法来处理 event;
- 如果上面处理结果返回的状态是 MORE_AVAILABLE(表示还有可用的数据等待处理)并且 recordWriter 可用(之前的异步操作已经处理完成),就会立马返回;
- 如果上面处理结果返回的状态是 END_OF_INPUT,它表示数据处理完成,这里就会告诉 MailBox 数据已经处理完成了;
- 否则的话,这里会等待,直到有可用的数据到来及 recordWriter 可用。
Reference
http://www.liaojiayi.com/streamtask/
https://www.jishuwen.com/d/27Oh