Netty中的ByteBuf-Pooled与内存管理
Demo
1 | ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(); |
直接看 PooledByteBufAllocator.newHeapBuffer(int initialCapacity, int maxCapacity)
1 | protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { |
Netty 的内存管理 - Arena
从 heapArena.allocate
中我们可以窥见整个 Netty 的内存管理机制 - Arean
参考文章:
- 深入浅出Netty内存管理:PoolChunk
- 深入浅出Netty内存管理:PoolSubpage
- 深入浅出Netty内存管理:PoolChunkList
- 深入浅出Netty内存管理:PoolArena
- 自顶向下深入分析Netty(十)–JEMalloc分配算法
- 自顶向下深入分析Netty(十)–PoolThreadCache
allocate 流程
保留主流程,略过 cache 的代码:
1 | private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { |
我们通过不同的内存需求将出现的几个存储组件 PoolArena
,PoolSubpage
,PoolChunk
与 PoolChunkList
串联起来说明他们的关系与在 Netty 内存分配中所起到的作用
小空间存储
Netty 在应对小内存(默认小于 8K)空间时候常会选择使用 PoolSubpage,就算在上述描述过程中 PoolArena 已经没有了 PoolSubpage 的空间而转到了 PoolChunk,但 PoolChunk 里实际也在维护着一个 PoolSubpage 数组用于应对这种情景。
在 PoolArena 中有两个 PoolSubpage 数组,一个是 tiny 用于分配内存小于 512 字节的空间,另外则负责 512 - 8192 字节的空间内存分配。
当决定完分配到哪个 PoolSubpage 数组之后需要去决定分配在哪个具体的 PoolSubpage 中,这时候 tiny 对于 512 的所有大小做了一个集体映射到 32 位数组的操作(32 也是 tinySubpagePools 的大小)。而 smallSubpagePools 的默认数组大小为 pageShifts - 9
即 3。2^8 - 2^12 分为三个区域存储。
这个时候对于新分配的 PoolSubpage,它的 head.next == head,会跳入 PoolChunk 分配 PoolSubpage 内存这时候涉及到 allocateNormal
。
1 | private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { |
对于 PoolChunk.allocate 我们直接摘取它对于 PoolSubpage 内存分配的部分来说明
1 | boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { |
这里有必要介绍一下 PoolChunk 的内存分布构造了。
PoolChunk 内部通过 memoryMap 数组维护了一颗平衡二叉树作为管理底层内存分布及回收的标记位,所有的子节点管理的内存也属于其父节点。
poolChunk默认由2048个page组成,一个page默认大小为8k,图中节点的值为在数组memoryMap的下标。
- 如果需要分配大小8k的内存,则只需要在第11层,找到第一个可用节点即可。
- 如果需要分配大小16k的内存,则只需要在第10层,找到第一个可用节点即可。
- 如果节点1024存在一个已经被分配的子节点2048,则该节点不能被分配,如需要分配大小16k的内存,这个时候节点2048已被分配,节点2049未被分配,就不能直接分配节点1024,因为该节点目前只剩下8k内存。
这和 PoolSubpage 有什么关系呢?PoolSubpage 分配的内存即是 PoolChunk 的叶子节点标记。
1 | /** |
subpage.init()
这里要开始说是 PageSubpage 是怎么做的标识了
在创建 PoolSubpage 时候会有个专用于记录是否有内存暂用的字段 bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
,考虑到 pageSize 默认是 8k,则 bitmap 默认长度则为 8,为什么 8 个 long 就足够了呢?因为 subpage 中允许分配的最小内存单元是 16,而 long 有 64 位标志位,则 8196 / 16 / 64 即可描述所有内存段。
接下来就是 init 阶段
1 | void init(PoolSubpage<T> head, int elemSize) { |
下面举两个实例
- 申请 4096 的内存
- maxNumElems 与 numAvali 为 2,说明 1 个 page 被拆分为 2 个
- bitmapLength = maxNumElems >>> 6 = 0 && maxNnmElems & 63 != 0 说明此时 bitmapLength = 1
- 此时则说明只需要一个 long 就能保存两个内存端的状态了
- 申请 32 的内存
- maxNumElems 与 numAvali 为 256,说明 1 个 page 被拆分为 256 个内存段
- bitmapLength = maxNumElems >>> 6 = 4 && maxNnmElems & 63 == 0 说明此时 bitmapLength = 4
- 此时则说明需要 4 个 long 来描述 256 个内存段状态
下面转而看向内存如何分配
subpage.allocate()
1 | /** |
举例说明
若是需要分配 128 的内存,则 bitmap 被分拆为 64 个内存段 (8196 / 128) ,只需要 1 个 long(64位)即可描述,此时 bitmap 只会用到第一个元素, getNextAvail 通过位运算得到 long 中描述内存段状态的值,然后通过 bitmap[q] |= 1L << r 将该位数值置为 1 表示内存已被占用。
初始化 buf
回到 PoolChunk.allocate()
1 | boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { |
initBuf()
initBuf 中判断传入的 handle 表示的内存是否被初始化过,若没有则直接不计算偏移量使用,若初始化过需要根据 bitmap 计算出在数组中的偏移值来进行初始化(这段代码涉及太复杂了[捂脸],而且不是很能理解 tmpNioBuf 在哪个地方使用了)。真正存储使用的是 PoolChunk 中 memory 字段,而在 HeapByteBuf 中创建的就是 byte 数组。
此时我们回到 PoolArena.allocate() 再看若 tiny / small 的 subpage 数组中已经存在可分配的内存的话,就会显得比较简单了。
若 PoolSubpage 其实已经分配过内存了,其实此时它就会进入下面的 if 模块
1 | if (s != head) { |
大于 PageSize 空间
1 | private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { |
这里实际与上述小容量分配区别在于在 PoolChunk.allocate()
中会进入 allocateRun()
分支
1 | boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { |
实际上 allocateRun 里面逻辑就很简单了(在有上文铺垫情况下),在二叉树中找到可分配的节点,记录已分配,返回 handle。余下的流程与之前均相同。
大于 ChunkSize 空间
PoolArena.allocateHuge
简单粗暴了,申请 Unpooled 不入池。
allocate 流程中的 PoolThreadCache
PoolThreadCache 是从 PoolThreadLocalCache 中获取的。
这里先说明一下 PoolThreadLocalCache 的父类 FastThreadLocal。这里我参考了这几篇文章:
简单来说 Fast 是因为 ThrealLocal 是使用线行探查法(下一个一个个查找)来解决 hash 冲突的,而 FastThreadLocal 依托于 InternalThreadLocalMap.nextVariableIndex
(AtomInteger++) 来获取唯一 index 避免 hash 冲突带来的性能损耗。
这里为什么用 ThreadLocal 呢?在防止线程冲突情况下,找到最近使用的 Arena。
在 PoolThreadCache 中,使用到了 MemoryRegionCache 这个新的数据结构用于存储 ByteBuf 的实际内存数据。MemoryRegionCache 本质上是一个用于存储 PoolChunk (PoolThreacCache.Entry 中)的队列,同样也是根据了 Tiny(0-512b),Small(512b - 4k),Normal(8k-32M),以及 Direct/Heap 区别。
PoolThreadCache.Entry 中存储 PoolChunk 的队列使用的是 MpscArrayQueue / MpscAtomicArrayQueue,一种用于多生产者而只有一个消费者的高并发队列.
初始化
初始化入口在 PoolArena$PoolThreadLocalCache.initialValue
中,通过 FastThreadLocal
机制触发。PoolThreadCache 中用于分配的 PoolArena 是使用最近被使用到的 PooledByteBufAllocator 中的 PoolArena 数组。然后再分别初始化每个 MemoryRegionCache 即可。
内存分配
在内存分配期间涉及到 PoolThreadCache 无论是 Tiny/Small/Normal 最终都是找到对应的 MemoryRegionCache 走到 allocate 这个方法中
1 | private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) { |
首先,分配内存
MemoryRegionCache.allocate
1 | /** |
然后判断分配内存次数是否达到了阈值,如果达到对齐并释放内存,所有 MemoryRegionCache 都将走到 MemoryRegionCache.trim()
1 | /** |
关于 PoolArena 中 PoolChunkList 逻辑说明
当调用 allocateNormal
的时候会先逐一调用 PoolChunkList.allocate,试图寻找已存在的可以利用的 PoolChunk,那什么作为可以利用的依据呢?
PoolArena 中存在 init-000-025-050-075-100 的 PoolChunkList 链表,分别对应了使用到的 PoolChunk 不同的内存利用率
- qInit:存储内存利用率 0-25% 的chunk
- q000:存储内存利用率 1-50% 的chunk
- q025:存储内存利用率 25-75% 的chunk
- q050:存储内存利用率 50-100% 的chunk
- q075:存储内存利用率 75-100% 的chunk
- q100:存储内存利用率 100% 的chunk
在执行 PoolChunkList.allocate 的时候首先会看当前要分配的内存大小是否符合该 PoolChunkList 的利用率要求,若满足,则 head(PoolChunk) 开始遍历分配直到成功,成功之后判断分配完成后的 PoolChunk 是否满足在此 PoolChunkList 内存利用率要求,若不满足,则将其移动置下个链表中.
q000存在的目的是什么?
q000 是用来保存内存利用率在 1%-50% 的chunk,那么这里为什么不包括 0% 的 chunk?
直接弄清楚这些,才好理解为什么不从 q000 开始分配。q000 中的 chunk,当内存利用率为0时,就从链表中删除,直接释放物理内存,避免越来越多的 chunk 导致内存被占满。
想象一个场景,当应用在实际运行过程中,碰到访问高峰,这时需要分配的内存是平时的好几倍,当然也需要创建好几倍的chunk,如果先从 q000 开始,这些在高峰期创建的 chunk 被回收的概率会大大降低,延缓了内存的回收进度,造成内存使用的浪费。
为什么选择从q050开始?
- q050 保存的是内存利用率 50%~100% 的 chunk,这应该是个折中的选择!这样大部分情况下,chunk 的利用率都会保持在一个较高水平,提高整个应用的内存利用率;
- qinit 的 chunk 利用率低,但不会被回收;
- q075 和 q100 由于内存利用率太高,导致内存分配的成功率大大降低,因此放到最后;
free 流程
通过 ReferenceCountUtil.release
or ByteBuf.release
来触发的 free 流程,其触发的起点是 PooledByteBuf.deallocate
1 | protected final void deallocate() { |
1 | void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) { |
此处产生了两个分支,先看非缓存的部分 PoolArena.freeChunk
1 | void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer) { |
- 调用对应 PoolChunkList 的 free 方法
chunk.parent.free
,这里会区分是否是 Subpage,分别进行 free (memoryMapIdx 判断哪个 page, bitmapIdx 判断哪个内存块)。PoolChunkList 还会根据内存使用率移动 Chunk(如果有需要). 对于 Chunk 而言 free 主要是标记二叉树哪些位置内存需要清理。 - 若 free 成功,则销毁 PoolChunk
PoolArena.destroyChunk
。此处是实际清理内存操作,Heap 等待 GC,Direct 调用 Unsafe.
再说缓存部分,这儿其实是缓存的回收流程,这里 PoolThreadCache 会将被释放的 chunk 放置入缓存中,供下次分配使用。
由于缓存获取使用的是其线程缓存 PoolThreadLocalCache,当其线程生命周期结束的时候会调用 onRemoval
对 PoolThreadCache 进行内存释放清理操作。