Flink抽象出的内存类型
- HEAP:JVM堆内存
- OFF_HEAP:非堆内存
这在Flink中被定义为一个枚举类型:MemoryType。
1 |
|
MemorySegment
Flink所管理的内存被抽象为数据结构:MemorySegment。内存管理的最小模块。
HeapMemorySegment(弃用)和HybridMemorySegment是对MemorySegment的实现。
这两个的差别在HybridMemorySegment包含HeapMemorySegment的功能,
但对单个字节的操作效率稍差。
MemorySegment有两个构造函数,分别针对堆内内存和堆外内存。
1 | MemorySegment(byte[] buffer, Object owner) { |
1 | MemorySegment(long offHeapAddress, int size, Object owner) { |
- UNSAFE : 用来对堆/非堆内存进行操作,是JVM的非安全的API
- BYTE_ARRAY_BASE_OFFSET : 二进制字节数组的起始索引,相对于字节数组对象
- LITTLE_ENDIAN : 布尔值,是否为小端对齐(涉及到字节序的问题)
- heapMemory : 如果为堆内存,则指向访问的内存的引用,否则若内存为非堆内存,则为null
- address : 字节数组对应的相对地址(若heapMemory为null,即可能为off-heap内存的绝对地址,后续会详解)
- addressLimit : 标识地址结束位置(address+size)
- size : 内存段的字节数
提供了一大堆get/put方法,这些getXXX/putXXX大都直接或者间接调用了unsafe.getXXX/unsafe.putXXX。
MemorySegment的下面几个方法需要关注一下:
1 |
|
这是一个批量拷贝方法,用于从当前memory segment的offset偏移量开始拷贝numBytes长度的字节到target memory segment中从targetOffset起始的地方。
比较1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public final int compare(MemorySegment seg2, int offset1, int offset2, int len) {
while (len >= 8) {
long l1 = this.getLongBigEndian(offset1);
long l2 = seg2.getLongBigEndian(offset2);
if (l1 != l2) {
return (l1 < l2) ^ (l1 < 0) ^ (l2 < 0) ? -1 : 1;
}
offset1 += 8;
offset2 += 8;
len -= 8;
}
while (len > 0) {
int b1 = this.get(offset1) & 0xff;
int b2 = seg2.get(offset2) & 0xff;
int cmp = b1 - b2;
if (cmp != 0) {
return cmp;
}
offset1++;
offset2++;
len--;
}
return 0;
}
自实现的比较方法,用于对当前memory segment偏移offset1长度为len的数据与seg2偏移起始位offset2长度为len的数据进行比较。
- 第一个while是逐字节比较,如果len的长度大于8就从各自的起始偏移量开始获取其数据的长整形表示进行对比,如果相等则各自后移8位(一个字节),并且长度减8,以此循环往复。
getLongBigEndian获取一个长整形,判断是否是大端序,如果是小端序,就进行反转1
2
3
4
5
6
7public final long getLongBigEndian(int index) {
if (LITTLE_ENDIAN) {
return Long.reverseBytes(getLong(index));
} else {
return getLong(index);
}
}
0x1234567的大端字节序和小端字节序的写法如下图。
- 第二个循环比较的是最后剩余不到一个字节(八个比特位),因此是按位比较
HybridMemorySegment
它既支持on-heap内存也支持off-heap内存,通过如下实现区分1
unsafe.XXX(Object o, int offset/position, ...)
这些方法有如下特点:
- 如果对象o不为null,并且后面的地址或者位置是相对位置,那么会直接对当前对象(比如数组)的相对位置进行操作,既然这里对象不为null,那么这种情况自然满足on-heap的场景;
- 如果对象o为null,并且后面的地址是某个内存块的绝对地址,那么这些方法的调用也相当于对该内存块进行操作。这里对象o为null,所操作的内存块不是JVM堆内存,这种情况满足了off-heap的场景。
针对堆内内存和堆外内存的构造函数也不一样
堆内内存1
2
3
4HybridMemorySegment(byte[] buffer, Object owner) {
super(buffer, owner);
this.offHeapBuffer = null;
}
堆外内存,使用ByteBuffer,拥有这个实现DirectByteBuffer(直接内存)。1
2
3
4
5
HybridMemorySegment(ByteBuffer buffer, Object owner) {
super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
this.offHeapBuffer = buffer;
}
获取特定位置的数据1
2
3
4
5
6
7
8
9
10
11
12
13
14/**
* Bulk get method. Copies length memory from the specified position to the
* destination memory, beginning at the given offset.
*
* @param index The position at which the first byte will be read.
* @param dst The memory into which the memory will be copied.
* @param offset The copying offset in the destination memory.
* @param length The number of bytes to be copied.
*
* @throws IndexOutOfBoundsException Thrown, if the index is negative, or too large that the requested number of
* bytes exceed the amount of memory between the index and the memory
* segment's end.
*/
public abstract void get(int index, byte[] dst, int offset, int length);
从第index位置开始读取,获取长度为length的数据,copy到dst中,
1 | public final void get(int index, byte[] dst, int offset, int length) { |
unsafe中copyMemory的解释,从scr中srcOffset位置,复制长度length的内容到dest中的destOffset开始。新数据的offset是由BYTE_ARRAY_BASE_OFFSET + offset; 二进制数组的起止索引加上offset,为新数据的offset。1
2
3public native void copyMemory(Object srcBase, long srcOffset,
Object destBase, long destOffset,
long bytes);
如何获得某个off-heap数据的内存地址
off-heap使用的类是ByteBuffer,继承于Buffer,获取buffer类中的address需要使用反射,因为是一个私有变量1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24private static final Field ADDRESS_FIELD;
static {
try {
ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address");
ADDRESS_FIELD.setAccessible(true);
}
catch (Throwable t) {
throw new RuntimeException(
"Cannot initialize HybridMemorySegment: off-heap memory is incompatible with this JVM.", t);
}
}
private static long getAddress(ByteBuffer buffer) {
if (buffer == null) {
throw new NullPointerException("buffer is null");
}
try {
return (Long) ADDRESS_FIELD.get(buffer);
}
catch (Throwable t) {
throw new RuntimeException("Could not access direct byte buffer address.", t);
}
}
MemorySegmentFactory
MemorySegmentFactory是用来创建MemorySegment,而且Flink严重推荐使用它来创建MemorySegment的实例,而不是手动实例化。为了让运行时只存在某一种MemorySegment的子类实现的实例,而不是MemorySegment的两个子类的实例都同时存在,因为这会让JIT有加载和选择上的开销,导致大幅降低性能
通过allocateUnpooledOffHeapMemory和allocateUnpooledSegment等多个方法来申请和分配堆内内存还是堆外内存。
从源码上来看,Memory Manager Pool 主要在Batch模式下使用。在Steaming模式下,该池子不会预分配内存,也不会向该池子请求内存块。也就是说该部分的内存都是可以给用户代码使用的。
MemoryManager
MemoryManager提供了两个内部类HybridHeapMemoryPool和HybridOffHeapMemoryPool,代表堆内内存池和堆外内存池
为了提升memory segment操作效率,MemoryManager鼓励长度相等的memory segment。由此引入了page的概念。其实page跟memory segment没有本质上的区别,只不过是为了体现memory segment被分配为均等大小的内存空间而引入的。可以将这个类比于操作系统的页式内存分配,page这里看着同等大小的block即可。MemoryManager提供的默认page size为32KB,并提供了自定义page size的下界值不得小于4KB。
1 | /** The default memory page size. Currently set to 32 KiBytes. */ |
构造函数有两个
1 | public MemoryManager(long memorySize, int numberOfSlots) { |
1 | public MemoryManager(long memorySize, int numberOfSlots, int pageSize, |
第二个构造器的另一个参数preAllocateMemory,指定memory manager的内存分配策略是预分配还是按需分配。我们后面会看到,对于这两种策略,相关的内存申请和释放操作是不同的。
第二个构造器内就已经根据memory type将特定的memory pool对象初始化好了:
1 | switch (memoryType) { |
通过定位到两个pool对象的构造器,可以看到在实例化构造器的时候就已经将需要预分配的内存分配到位了(当然,这里是针对preAllocateMemory为true的调用情景而言),因为如果该参数为false,那么pool构造器的memToAllocate将会被置为0。
1 | HybridHeapMemoryPool(int numInitialSegments, int segmentSize) { |
1 | HybridOffHeapMemoryPool(int numInitialSegments, int segmentSize) { |
两种模式的差别在于堆内内存是直接new byte,堆外内存ByteBuffer.allocateDirect。
allocatePages以及release方法为分配和释放内存
1 | /** |
这两个方法都共同拥有一个参数owner,一个映射关系,谁申请的memory segment,将会挂到谁的名下,释放的时候也从谁的名下删除.
allocatePages中pagenumber代表了要申请多少个segment,如果是预分配模式,调用requestSegmentFromPool方法,如果不是用的是allocateNewSegment方法,差别在于requestSegmentFromPool是从pool中的双端队列ArrayDeque中获取预先分配的,否则直接new出来.
memory segment释放
1 | public void release(MemorySegment segment) { |
基本和allocate是相反的逻辑,如果当前释放的segment是segsForOwner集合中的最后一个,那么将segsForOwner也从allocatedSegments中移除。
DataInput 数据视图
提供了基于page的对view的进一步实现,说得更直白一点就是,它提供了跨越多个memory page的数据访问(input/output)视图。它包含了从page中读取/写入数据的解码/编码方法以及跨越page的边界检查(边界检查主要由实现类来完成)。
AbstractPagedInputView中advance获取下一个memory segment
1 | /** |
nextSegment、getLimitForSegment都是由具体子类自行实现。
读取长度为len的内容,将内容填充到byte[]里头,从offset的位置开始。
如果读取的长度比当前segment的可读长度(int remaining = this.limitInSegment - this.positionInSegment;)小,那么直接读取。
如果要读取的长度比当前segment长,那么会出现读取下一个page的操作。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* Reads up to {@code len} bytes of memory and stores it into {@code b} starting at offset {@code off}.
* It returns the number of read bytes or -1 if there is no more data left.
*
* @param b byte array to store the data to
* @param off offset into byte array
* @param len byte length to read
* @return the number of actually read bytes of -1 if there is no more data left
* @throws IOException
*/
public int read(byte[] b, int off, int len) throws IOException{
if (off < 0 || len < 0 || off + len > b.length) {
throw new IndexOutOfBoundsException();
}
int remaining = this.limitInSegment - this.positionInSegment;
if (remaining >= len) {
this.currentSegment.get(this.positionInSegment, b, off, len);
this.positionInSegment += len;
return len;
}
else {
if (remaining == 0) {
try {
advance();
}
catch (EOFException eof) {
return -1;
}
remaining = this.limitInSegment - this.positionInSegment;
}
int bytesRead = 0;
while (true) {
int toRead = Math.min(remaining, len - bytesRead);
this.currentSegment.get(this.positionInSegment, b, off, toRead);
off += toRead;
bytesRead += toRead;
if (len > bytesRead) {
try {
advance();
}
catch (EOFException eof) {
this.positionInSegment += toRead;
return bytesRead;
}
remaining = this.limitInSegment - this.positionInSegment;
}
else {
this.positionInSegment += toRead;
break;
}
}
return len;
}
}
AbstractPagedOutputView写和读的方法其实差不多,在当前页就直接写,跨页就遍历每一个页,写
1 |
|
Reference
https://blog.csdn.net/yanghua_kobe/article/details/50976124
https://blog.csdn.net/yanghua_kobe/article/details/51079524
http://blog.jrwang.me/2019/flink-source-code-memory-management/
https://www.jianshu.com/p/644d430aaa39
http://wuchong.me/blog/2016/04/29/flink-internals-memory-manage/