0. 内存数据结构

  • 内存层级结构自上而下主要分为:ArenaChunkListChunkPageSubPage 五级。

20171124195052572.png

  • PoolArena 是一块连续的内存块。为了优化并发性能,Netty 内存池中存在一个由多个 Arena 组成的数组。当多个线程进行内存分配时,会按照轮询策略选择一个 Arena 进行分配。

    • 一个 PoolArena 内存块由两个 SubPagePools(用来存储零碎内存)和多个 ChunkList 组成。两个 SubPagePools 数组分别为 tinySubpagePoolssmallSubpagePools
    • 每个 ChunkList 里包含多个 Chunk,按照双向链表排列。每个 Chunk 里包含多个 Page(默认 2048 个),每个 Page(默认大小为 8KB)由多个 SubPage 组成。
    • 每个 ChunkList 里包含的 Chunk 数量会动态变化。例如,当该 Chunk 的内存利用率发生变化时,它会向其它 ChunkList 里移动。
final PooledByteBufAllocator parent;

private final int maxOrder;
final int pageSize;
final int pageShifts;
final int chunkSize;
final int subpageOverflowMask;
final int numSmallSubpagePools;
final int directMemoryCacheAlignment;
final int directMemoryCacheAlignmentMask;

private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;

private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
  • 内存池分配规则

    • 小于 PageSize:在 tinySubPagePoolssmallSubPagePools 中分配。

      • tinySubPagePools:用来分配小于 512 字节的内存。
      • smallSubPagePools:用来分配大于 512 字节且小于 PageSize 的内存。
    • 大于 PageSize 且小于 ChunkSize:在 PoolChunkList 中的 Chunk 中分配。
    • 大于 ChunkSize:直接创建非池化的 Chunk 来分配,并且该 Chunk 不会放在内存池中重用。

1. 内存池的入口:PooledByteBufAllocator

  • 内存池进行内存分配是通过 PooledByteBufAllocator 类的 buffer() 方法实现的。
public static void main(String[] args) {
    // 默认直接内存
    ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(1024);    
    buf.writeBytes("hello".getBytes());

    // 堆内存 (false) 或者直接内存
    PooledByteBufAllocator p = new PooledByteBufAllocator(false);    
    ByteBuf buf1 = p.buffer(1024);
    buf1.writeBytes("world".getBytes());
}
  • 判断创建的缓冲区类型(直接缓冲区或者堆缓冲区)。如果在创建 PooledByteBufAllocator 实例时参数是 false,则为堆缓冲区。
public ByteBuf buffer(int initialCapacity) {
    if (directByDefault) {
        return directBuffer(initialCapacity);
    }
    return heapBuffer(initialCapacity);
}
  • 通过 newHeapBuffer() 方法创建堆缓冲区。
public ByteBuf heapBuffer(int initialCapacity) {
    return heapBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
}

@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);
    return newHeapBuffer(initialCapacity, maxCapacity);
}
  • newHeapBuffer() 方法首先从 PoolThreadLocalCache 中获取与线程绑定的缓存池 PoolThreadCache,缓存池中保存着回收的内存。

    • PoolThreadLocalCache 继承了 FastThreadLocal,保存线程与内存缓冲池 (PoolThreadCache) 的映射。
    • 在进行内存分配时,先从映射中取出缓存内存块 Arena,再将内存分配委托给内存块 Arenaallocate() 方法。
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<byte[]> heapArena = cache.heapArena;

    final ByteBuf buf;
    if (heapArena != null) {
        buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        buf = PlatformDependent.hasUnsafe() ?
                new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
                new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
}
  • 如果不存在与线程对应的缓存,则轮询分配一个 Arena 数组中的 Arena 内存块,创建一个新的 PoolThreadCache 作为内存缓存。
protected synchronized PoolThreadCache initialValue() {
    final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
    final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

    if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) {
        return new PoolThreadCache(
                heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
    }
    // No caching for non FastThreadLocalThreads.
    return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}

2. 内存分配块:PoolArena

  • 在应用层通过设置 PooledByteBufAllocator 来执行 ByteBuf 的分配,但是最终的内存分配工作被委托给 PoolArena
  • 由于 Netty 常用于高并发系统,各个线程进行内存分配时竞争不可避免,这可能会极大地影响内存分配的效率。为了缓解高并发时的线程竞争,Netty 允许使用者创建多个分配器(Arena)来分离锁,提高内存分配效率,当然这是以内存作为代价的。
  • PooledByteBufAllocator 将内存分配的任务委托给 Arena 进行,主要包括两步:

    1. Recycler 对象池中获取复用的 Buf 对象。
    2. Buf 对象分配内存。
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
    PooledByteBuf<T> buf = newByteBuf(maxCapacity);    // 获取复用对象
    allocate(cache, buf, reqCapacity);    // 分配内存
    return buf;
}
  • 调用 allocate() 方法从 Arena 内存块中分配内存:

    • 小于 PageSize:分配 tiny 内存或者 small 内存。

      • 如果需要分配的内存小于 512 字节,调用 allocateTiny() 方法进行 tiny 内存分配。
      • 否则,调用 allocateSmall() 方法进行 small 内存分配。
    • 大于 PageSize 且小于 ChunkSize:调用 allocateNormal() 方法进行 normal 内存分配。
    • 大于 ChunkSize:内存池无法分配,需要 JVM 分配,则调用 allocateHuge() 方法在池外进行分配。
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    final int normCapacity = normalizeCapacity(reqCapacity);
    if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
        int tableIdx;
        PoolSubpage<T>[] table;
        boolean tiny = isTiny(normCapacity);
        if (tiny) { // < 512
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            tableIdx = tinyIdx(normCapacity);
            table = tinySubpagePools;
        } else {
            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }

        final PoolSubpage<T> head = table[tableIdx];

        /**
         * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
         * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
         */
        synchronized (head) {
            final PoolSubpage<T> s = head.next;
            if (s != head) {
                assert s.doNotDestroy && s.elemSize == normCapacity;
                long handle = s.allocate();
                assert handle >= 0;
                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                incTinySmallAllocation(tiny);
                return;
            }
        }
        synchronized (this) {
            allocateNormal(buf, reqCapacity, normCapacity);
        }

        incTinySmallAllocation(tiny);
        return;
    }
    if (normCapacity <= chunkSize) {
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            // was able to allocate out of the cache so move on
            return;
        }
        synchronized (this) {
            allocateNormal(buf, reqCapacity, normCapacity);
            ++allocationsNormal;
        }
    } else {
        // Huge allocations are never served via the cache so just call allocateHuge
        allocateHuge(buf, reqCapacity);
    }
}
  • 内存池的初始阶段,线程是没有内存缓存的,所以最开始的内存分配都需要在 Chunk 分配区进行。也就是说,无论是 tinySubpagePools 还是 smallSubpagePools 成员,在内存池初始化时是不会预置内存的,所以最开始的内存分配都会进入 PoolArenaallocateNormal 方法:

    • 调用 allocateNormal() 方法从 Chunk 级别上分配内存,从 PoolChunkList 中查找可用 PoolChunk 并进行内存分配。
    • 如果没有可用的 PoolChunk,则创建一个并加入到 PoolChunkList 中,完成此次内存分配。
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
        q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
        q075.allocate(buf, reqCapacity, normCapacity)) {
        return;
    }

    // Add a new chunk.
    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
    long handle = c.allocate(normCapacity);
    assert handle > 0;
    c.initBuf(buf, handle, reqCapacity);
    qInit.add(c);
}
  • Arena 中创建新的 PoolChunk 后,根据其内存占用率放入相应的 ChunkList 中。
void add(PoolChunk<T> chunk) {
    if (chunk.usage() >= maxUsage) {
        nextList.add(chunk);
        return;
    }

    chunk.parent = this;

    if (head == null) {
        head = chunk;
        chunk.prev = null;
        chunk.next = null;
    } else {
        chunk.prev = null;
        chunk.next = head;
        head.prev = chunk;
        head = chunk;
    }
}

3. 内存分配基本单元:PoolChunk

  • PoolChunk 的几个重要参数

    • memory:物理内存,内存请求者的最终目标。在 HeapArena 中它就是一个 chunkSize 大小的 byte 数组。默认 PoolChunk 是由 11 层二叉树构成,也就是大小为 ChunkSize = 2048 * PageSize
    • memoryMap 数组:内存分配控制信息,数组元素是一个 32 位的整数。
    • subpages 数组:页分配信息,数组元素的个数等于 Chunk 中 Page 的数量。
  • Arena 中创建 PoolChunk 后,通过调用 PoolChunk.allocate() 方法真正进行内存分配。

    • 在 Chunk 中的内存分配是根据需要分配的内存大小将 Page 内存页划分为 SubPage,并将多余的 SubPage 加入到 SubPagePools 缓存中,将被分配的 Page 和 SubPage 在控制数组中进行标记。
private long allocateSubpage(int normCapacity) {
    PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
    synchronized (head) {
        int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
        int id = allocateNode(d);
        if (id < 0) {
            return id;
        }

        final PoolSubpage<T>[] subpages = this.subpages;
        final int pageSize = this.pageSize;

        freeBytes -= pageSize;

        int subpageIdx = subpageIdx(id);
        PoolSubpage<T> subpage = subpages[subpageIdx];
        if (subpage == null) {
            subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
            subpages[subpageIdx] = subpage;
        } else {
            subpage.init(head, normCapacity);
        }
        return subpage.allocate();
    }
}

总结

  • 内存池主要是将内存分配管理起来,不经过 JVM 的内存分配,有效减小内存碎片避免内存浪费,同时也能减少频繁 GC 带来的性能影响。
  • 内存池内存分配入口是 PooledByteBufAllocator 类,该类最终将内存分配委托给 PoolArena 进行。为了减少高并发下多线程内存分配碰撞带来的性能影响,PooledByteBufAllocator 维护着一个 PoolArena 数组,线程通过轮询获取其中一个进行内存分配,进而实现锁分离。
  • 内存分配的基本单元是 PoolChunk。从 PoolArena 中分配获取一个 PoolChunk,一个 PoolChunk 包含多个 Page 内存页,通过完全二叉树维护多个内存页用于内存分配。
说明:本文代码示例基于 Netty 4.1 早期版本(如 4.1.15 之前),后续版本中 PoolArena 内部字段(如 q050 等)已重构为数组结构,但核心原理一致。