前言
从Flink 1.6版本开始,社区为状态引入了TTL(time-to-live,生存时间)机制,支持Keyed State的自动过期,有效解决了状态数据在无干预情况下无限增长导致OOM的问题。
1 | StateTtlConfig ttlConfig = StateTtlConfig |
那么State TTL的背后又隐藏着什么样的思路呢?下面就从设置类StateTtlConfig入手开始研究
State
Keyed State和Operator State
Flink中包含两种基础的状态:Keyed State和Operator State。
Keyed State
顾名思义,就是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,可能都对应一个state。
Operator State
与Keyed State不同,Operator State跟一个特定operator的一个并发实例绑定,整个operator只对应一个state。相比较而言,在一个operator上,可能会有很多个key,从而对应多个keyed state。
举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。
StateTtlConfig
该类中有5个成员属性,它们就是用户需要指定的全部参数了。
1 | private final UpdateType updateType; |
其中,ttl参数表示用户设定的状态生存时间。而UpdateType、StateVisibility和TtlTimeCharacteristic都是枚举,分别代表状态时间戳的更新方式、过期状态数据的可见性,以及对应的时间特征。它们的含义在注释中已经解释得很清楚了。
1 | /** |
1 | /** |
CleanupStrategies内部类则用来规定过期状态的特殊清理策略,用户在构造StateTtlConfig时,可以通过调用以下方法之一指定。
- cleanupFullSnapshot
1
strategies.put(CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT, EMPTY_STRATEGY);
当对状态做全量快照时清理过期数据,对开启了增量检查点(incremental checkpoint)的RocksDB状态后端无效,对应源码中的EmptyCleanupStrategy。
为什么叫做“空的”清理策略呢?因为该选项只能保证状态持久化时不包含过期数据,但TaskManager本地的过期状态则不作任何处理,所以无法从根本上解决OOM的问题,需要定期重启作业。
- cleanupIncrementally(int cleanupSize, boolean runCleanupForEveryRecord)
增量清理过期数据,默认在每次访问状态时进行清理,将runCleanupForEveryRecord设为true可以附加在每次写入/删除时清理。cleanupSize指定每次触发清理时检查的状态条数。
仅对基于堆的状态后端有效,对应源码中的IncrementalCleanupStrategy。
- cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)
当RocksDB做compaction操作时,通过Flink定制的过滤器(FlinkCompactionFilter)过滤掉过期状态数据。参数queryTimeAfterNumEntries用于指定在写入多少条状态数据后,通过状态时间戳来判断是否过期。当RocksDB做compaction操作时,通过Flink定制的过滤器(FlinkCompactionFilter)过滤掉过期状态数据。参数queryTimeAfterNumEntries用于指定在写入多少条状态数据后,通过状态时间戳来判断是否过期。
该策略仅对RocksDB状态后端有效,对应源码中的RocksdbCompactFilterCleanupStrategy。CompactionFilter是RocksDB原生提供的机制。
如果不调用上述方法,则采用默认的后台清理策略,下文有讲。
TtlStateFactory、TtlStateContext
在所有Keyed State状态后端的抽象基类AbstractKeyedStateBackend中,创建并记录一个状态实例的方法如下
1 | /** |
可见是调用了TtlStateFactory.createStateAndWrapWithTtlIfEnabled()方法来真正创建。顾名思义,TtlStateFactory是产生TTL状态的工厂类。
1 | public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled( |
由上可知,如果我们为状态描述符StateDescriptor加入了TTL,那么就会调用TtlStateFactory.createState()方法创建一个带有TTL的状态实例;否则,就调用StateBackend.createInternalState()创建一个普通的状态实例。
TtlStateFactory.createState()的代码如下。
1 | private IS createState() throws Exception { |
stateFactories是一个Map结构,维护了各种状态描述符与对应产生该种状态对象的工厂方法映射。所有的工厂方法都被包装成了Supplier(Java 8提供的函数式接口),所以在上述createState()方法中,可以通过Supplier.get()方法来实际执行createTtl*State()工厂方法,并获得新的状态实例。
1 | Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> stateFactories; |
带有TTL的状态类名其实就是普通状态类名加上Ttl前缀,只是没有公开给用户而已。并且在生成Ttl*State时,还会通过createTtlStateContext()方法生成TTL状态的上下文。来看下TtlStateFactory.createTtlStateContext方法
1 | private <OIS extends State, TTLS extends State, V, TTLV> TtlStateContext<OIS, V> |
TtlStateContext的本质是对以下几个实例做了封装。
T original 原始State(通过StateBackend.createInternalState()方法创建)及其序列化器(通过StateDescriptor.getSerializer()方法取得), 对应不同状态后台的map,如果是rocksdb状态后台,得到的是RocksDBMapState;
StateTtlConfig,前文已经讲过;
TtlTimeProvider,用来提供判断状态过期标准的时间戳。当前只是简单地代理了System.currentTimeMillis(),没有任何其他代码;
一个Runnable类型的回调方法,通过registerTtlIncrementalCleanupCallback()方法产生,用于状态数据的增量清理,后面会看到它的用途。
接下来就具体看看TTL状态是如何实现的。
所有Ttl*State都是AbstractTtlState的子类,而AbstractTtlState又是装饰器AbstractTtlDecorator的子类。
1 |
|
它的成员属性比较容易理解,例如,updateTsOnRead表示在读取状态值时也更新时间戳(即UpdateType.OnReadAndWrite),returnExpired表示即使状态过期,在被真正删除之前也返回它的值(即StateVisibility.ReturnExpiredIfNotCleanedUp)。
状态值与TTL的包装(成为TtlValue)以及过期检测都由工具类TtlUtils来负责
1 |
|
TtlValue的属性只有两个:状态值和时间戳
1 |
|
AbstractTtlDecorator核心方法是获取状态值的getWrappedWithTtlCheckAndUpdate(),它接受三个参数:
- getter:一个可抛出异常的Supplier,用于获取状态值;
- updater:一个可抛出异常的Consumer,用于更新状态的时间戳;
- stateClear:一个可抛出异常的Runnable,用于异步删除过期状态。
可见,在默认情况下的后台清理策略是:只有状态值被读取时,才会做过期检测,并异步清除过期的状态。这种惰性清理的机制会导致那些实际已经过期但从未被再次访问过的状态无法被删除,需要特别注意。官方文档中也已有提示:
1 | By default, expired values are explicitly removed on read, such as ValueState#value, and periodically garbage collected in the background if supported by the configured state backend. |
不过到了1.10版本,这个已经有了解决方法,在官网也有提到
1 | heap state backend relies on incremental cleanup and RocksDB backend uses compaction filter for background cleanup. |
每次处理一定数量的 StateEntry 之后,会获取当前的 timestamp 然后在 RocksDB 的 compaction
时对所有的 StateEntry 进行 filter,过期的状态 Key就过滤删除掉。
当确认到状态过期时,会调用stateClear的逻辑进行删除;如果需要在读取时顺便更新状态的时间戳,会调用updater的逻辑重新包装一个TtlValue。
AbstractTtlState的代码
1 |
|
其中,accessCallback就是TtlStateContext中注册的增量清理回调。
下面以TtlMapState为例,看看具体的TTL状态如何利用上文所述的这些实现。
TtlMapState
1 | class TtlMapState<K, N, UK, UV> |
可见,TtlMapState的增删改查操作都是在原MapState上进行,只是加上了TTL相关的逻辑,这也是装饰器模式的特点。例如,TtlMapState.get()方法调用了上述AbstractTtlDecorator.getWrappedWithTtlCheckAndUpdate()方法,传入的获取(getter)、插入(updater)和删除(stateClear)的逻辑就是原MapState的get()、put()和remove()方法。而TtlMapState.put()只是在调用原MapState的put()方法之前,将状态包装为TtlValue而已。
增量清理策略
另外需要注意,所有增删改查操作之前都需要执行accessCallback.run()方法。如果启用了增量清理策略,该Runnable会通过在状态数据上维护一个全局迭代器向前清理过期数据。如果未启用增量清理策略,accessCallback为空。前文提到过的TtlStateFactory.registerTtlIncrementalCleanupCallback()方法如下
TtlStateFactory.registerTtlIncrementalCleanupCallback
1 |
|
实际清理的代码则位于TtlIncrementalCleanup类中,stateIterator就是状态数据的迭代器。
1 | private void runCleanup() { |
Reference
https://blog.csdn.net/nazeniwaresakini/article/details/106094778