TypeInformation 类是描述一切类型的公共基类,它和它的所有子类必须可序列化(Serializable),因为类型信息将会伴随 Flink 的作业提交,被传递给每个执行节点。
类型信息由 TypeInformation 类表示,TypeInformation 支持以下几种类型:
- BasicTypeInfo: 任意Java 基本类型(装箱的)或 String 类型。
- BasicArrayTypeInfo: 任意Java基本类型数组(装箱的)或 String 数组。
- WritableTypeInfo: 任意 Hadoop Writable 接口的实现类。
- TupleTypeInfo: 任意的 Flink Tuple 类型(支持Tuple1 to Tuple25)。Flink tuples 是固定长度固定类型的Java Tuple实现。
- CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)。
- PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java对象的所有成员变量,要么是 public 修饰符定义,要么有 getter/setter 方法。
- GenericTypeInfo: 任意无法匹配之前几种类型的类。
TypSerializer
所有序列化的基础类。此接口描述Flink运行时处理数据类型所需的方法。 具体来说,此接口包含序列化和复制方法。
其中有一个深度拷贝的方法duplicate,因为这个序列化器是非线程安全的,如果序列化器不具备状态,直接返回自身,如果有状态,调用这个方法返回一个深度拷贝(抽象发发,具体实现子类自行实现。)
1 | /** |
还有序列化和逆序列化的定义,将数据与DataView进行转换,dataView作为MemorySegment 的抽象显示,序列化与逆序列化就是讲自定义的内存结构与实体进行转换的过程。
1 | /** |
选几个具体的序列化器分析一下
分为定长(LongSerializer)和变长(StringSerializer)
LongSerializer
1 |
|
直接调用DataView的方法
AbstractPagedOutputView.writeLong
1 | public void writeLong(long v) throws IOException { |
通过this.positionInSegment < this.segmentSize - 7判断剩余的长度,如果剩余的长度还够8位,那么直接put进去
1 | this.currentSegment.putLongBigEndian(this.positionInSegment, v); |
这里会判断是大端序还是小端序,如果是小端,需要做一次转换1
2
3
4
5
6
7public final void putLongBigEndian(int index, long value) {
if (LITTLE_ENDIAN) {
putLong(index, Long.reverseBytes(value));
} else {
putLong(index, value);
}
}
如果剩余的不足8位,且刚好剩余为0,通过advance进行切换页操作,然后写。如果剩余的还有一些,因为long是8个byte,所以每次写一个byte进去,不够再切页。切页操作是由具体的子类实现的。
StringSerializer
重点在于序列化和逆序列化1
2
3
4
5
6
7
8
9
public void serialize(String record, DataOutputView target) throws IOException {
StringValue.writeString(record, target);
}
public String deserialize(DataInputView source) throws IOException {
return StringValue.readString(source);
}
重点的放在StringValue中。
StringValue
变长的string,是比较复杂的。
- 以一个byte(8位)为一格,小于128的都好办,大于128的需要循环写入
- 大于128的字符读取结束的条件是遇到小于128的byte,所以变长字符的写入要大于128,通过每次写入7位,第八位强制设为1实现。
在这个类中,定义了一个高bit 0x1 << 7(128,ASCII 定义了128个字符)
1 | private static final int HIGH_BIT = 0x1 << 7; |
先研究写入,写入的时候,是通过out.write(byte)写入数据,一次最多只能写入8位数据。flink的做法是每次获取7位的数据,然后第八位通过( | HIGH_BIT[1000 0000]) 强制置为1,在二进制中,置为第八位代表正负,1位负,0位正。然后右移7位,获取下一个高7位的数据,以此类推,写入到流中。
假设写入一个汉字 :测
测 这个字 对应的 (int)s.charAt(0)为27979,
写入的时候,先获取长度+1,第一位为2,将27979与HIGH_BIT进行或操作将第八位置为1,得到28107,其实真实写入的是28107的后八位11001011,右移7位后进行计算得到218,再一次得到1,序列化后的是 2 28107 218 1 . 写入byte中得到2 -53 -38 1.
因为有符号范围在 -128 —- 127之间,无符号是在0-256,由于写是默认按照有符号写,所以写11001011会变成-53.
1 | public static final void writeString(CharSequence cs, DataOutput out) throws IOException { |
逆序列化的时候,逆序列化 2 -53 -38 1.
1 | if (c < HIGH_BIT) { |
默认如果小于128,就会当做是结束条件,直接转换成char,所以在变长的string中,还没结束的部分,一定要大于128,这就是为什么写入的时候,一定要强制将第八位置为1.读取的2-1=1获取数据长度为1, -53 通过in.readUnsignedByte()得到203,对应11001011,取七位1001011,作为低七位,
继续获取218[11011010],取七位[1011010],再进行左移7位操作作为高一阶的七位,拼上低七位,得到10110101001011,最后一个1,进行左移14位操作作为再高一阶的七位,拼上低十四位得到110110101001011对应的十进制为27979,转为ascII为测这个字。
1 | public static String readString(DataInput in) throws IOException { |
PojoSerializer
POJO多个一个字节的header,PojoSerializer只负责将header序列化进去,并委托每个字段对应的serializer对字段进行序列化。
1 | // Flags for the header |
序列化的时候,会先判断是否是null,是的话,置为IS_NULL.
接着进行header的判断。
如果Class<?> actualClass = value.getClass();等于构造函数的class,说明这个class不是一个subClass,置为IS_SUBCLASS。如果在registeredClasses可以获取到子类,说明这个类自身存在子类,他是一个父类,置为IS_TAGGED_SUBCLASS
先将flag写入,target.writeByte(flags);
如果是subclass,将类的全名写入序列化中,如果
如果是NO_SUBCLASS,直接
1 | header{ |
写完header,再委托每个字段对应的serializer对字段进行序列化。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57public void serialize(T value, DataOutputView target) throws IOException {
int flags = 0;
// handle null values
if (value == null) {
flags |= IS_NULL;
target.writeByte(flags);
return;
}
Integer subclassTag = -1;
Class<?> actualClass = value.getClass();
TypeSerializer subclassSerializer = null;
if (clazz != actualClass) {
subclassTag = registeredClasses.get(actualClass);
if (subclassTag != null) {
flags |= IS_TAGGED_SUBCLASS;
subclassSerializer = registeredSerializers[subclassTag];
} else {
flags |= IS_SUBCLASS;
subclassSerializer = getSubclassSerializer(actualClass);
}
} else {
flags |= NO_SUBCLASS;
}
target.writeByte(flags);
// if its a registered subclass, write the class tag id, otherwise write the full classname
if ((flags & IS_SUBCLASS) != 0) {
target.writeUTF(actualClass.getName());
} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
target.writeByte(subclassTag);
}
// if its a subclass, use the corresponding subclass serializer,
// otherwise serialize each field with our field serializers
if ((flags & NO_SUBCLASS) != 0) {
try {
for (int i = 0; i < numFields; i++) {
Object o = (fields[i] != null) ? fields[i].get(value) : null;
if (o == null) {
target.writeBoolean(true); // null field handling
} else {
target.writeBoolean(false);
fieldSerializers[i].serialize(o, target);
}
}
} catch (IllegalAccessException e) {
throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e);
}
} else {
// subclass
if (subclassSerializer != null) {
subclassSerializer.serialize(value, target);
}
}
}
Flink 如何直接操作二进制数据
排序
Flink 提供了如 group、sort、join 等操作,这些操作都需要访问海量数据。这里,我们以sort为例,这是一个在 Flink 中使用非常频繁的操作。
首先,Flink 会从 MemoryManager 中申请一批 MemorySegment,我们把这批 MemorySegment 称作 sort buffer,用来存放排序的数据。
将实际的数据和指针加定长key分开存放有两个目的。第一,交换定长块(key+pointer)更高效,不用交换真实的数据也不用移动其他key和pointer。第二,这样做是缓存友好的,因为key都是连续存储在内存中的,可以大大减少 cache miss(后面会详细解释)。
排序的关键是比大小和交换。Flink 中,会先用 key 比大小,这样就可以直接用二进制的key比较而不需要反序列化出整个对象。因为key是定长的,所以如果key相同(或者没有提供二进制key),那就必须将真实的二进制数据反序列化出来,然后再做比较。之后,只需要交换key+pointer就可以达到排序的效果,真实的数据不用移动。
最后,访问排序后的数据,可以沿着排好序的key+pointer区域顺序访问,通过pointer找到对应的真实数据,并写到内存或外部
可见NormalizedKeySorter.java
Reference
https://segmentfault.com/a/1190000016350098
http://wuchong.me/blog/2016/04/29/flink-internals-memory-manage/