SocketChannel 读取 ByteBuf 的过程

本文深入分析 Netty 中 SocketChannel 读取数据并封装为 ByteBuf 的底层流程。整个过程主要涉及 NioEventLoop 的事件轮询、Unsafe 类的读取操作以及 ByteBuf 的内存分配策略。

1. 事件轮询与入口

流程始于 NioEventLoopprocessSelectedKey 方法。该方法处理 SelectionKey 就绪的事件,包括连接、写、读和接受连接等。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    // 获取 Channel 中的 Unsafe 对象
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    
    // 如果 Key 不合法,说明 Channel 可能存在问题
    if (!k.isValid()) {
        // 代码省略
    }
    
    try {
        // 获取就绪的 IO 事件
        int readyOps = k.readyOps();
        
        // 处理连接事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }
        
        // 处理写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
        
        // 处理读事件和接受连接事件
        // 如果是 Worker 线程,通常处理 OP_READ 事件
        // 如果是 Boss 线程,通常处理 OP_ACCEPT 事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                return;
            }
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

判断条件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) 表示轮询到了读事件或接受连接事件。

  • Boss 线程:通常处理 OP_ACCEPT,对应 NioServerSocketChannel
  • Worker 线程:通常处理 OP_READ,对应 NioSocketChannel,表示客户端发来了数据流。

此时会调用 unsafe.read() 方法。对于 NioSocketChannel,其绑定的 Unsafe 实现类为 NioByteUnsafe,因此流程进入 NioByteUnsafe.read() 方法。

2. 读取流程核心逻辑

NioByteUnsafe.read() 方法是读取数据的核心入口,主要负责分配 ByteBuf、读取数据以及触发后续事件。

public final void read() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            // 1. 分配 ByteBuf
            byteBuf = allocHandle.allocate(allocator);
            // 2. 读取数据到 ByteBuf,并记录读取字节数
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            
            // 3. 如果没有读取到数据,释放缓冲区并判断是否关闭
            if (allocHandle.lastBytesRead() <= 0) {
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                break;
            }

            // 4. 更新读取统计
            allocHandle.incMessagesRead(1);
            readPending = false;
            // 5. 触发 ChannelRead 事件
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());
        
        // 6. 读取完成回调
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

3. ByteBuf 内存分配器

read() 方法伊始,代码获取了内存分配器:final ByteBufAllocator allocator = config.getAllocator()

  1. 获取分配器:调用 DefaultChannelConfiggetAllocator() 方法,返回成员变量 allocator
  2. 默认配置:该成员变量初始化为 ByteBufAllocator.DEFAULT
  3. 默认实现ByteBufAllocator.DEFAULT 指向 ByteBufUtil.DEFAULT_ALLOCATOR
  4. 初始化逻辑DEFAULT_ALLOCATOR 在静态代码块中初始化,根据系统属性 io.netty.allocator.type 决定使用池化(pooled)还是非池化(unpooled)分配器。默认情况下(非 Android 环境),使用 PooledByteBufAllocator.DEFAULT
// ByteBufUtil 静态初始化块片段
static {
    String allocType = SystemPropertyUtil.get(
            "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
    allocType = allocType.toLowerCase(Locale.US).trim();

    ByteBufAllocator alloc;
    if ("unpooled".equals(allocType)) {
        alloc = UnpooledByteBufAllocator.DEFAULT;
    } else if ("pooled".equals(allocType)) {
        alloc = PooledByteBufAllocator.DEFAULT;
    } else {
        alloc = PooledByteBufAllocator.DEFAULT;
    }
    DEFAULT_ALLOCATOR = alloc;
    // ... 其他配置初始化
}

PooledByteBufAllocator 基于内存池技术,通过申请一块连续内存进行缓冲区分配,能有效减少内存碎片和系统调用。

4. 分配与读取细节

回到 NioByteUnsafe.read() 方法的循环中:

  1. 分配缓冲区byteBuf = allocHandle.allocate(allocator)

    • 这里传入的是 PooledByteBufAllocator
    • 实际进入 DefaultMaxMessagesRecvByteBufAllocator.allocate(),调用 alloc.ioBuffer(guess())
    • guess() 方法由 AdaptiveRecvByteBufAllocator 实现,返回 nextReceiveBufferSize。首次分配默认大小为 1024 字节。
    • ioBuffer() 方法会根据是否支持 Unsafe 操作,最终分配一个 PooledUnsafeDirectByteBuf 对象(默认优先使用堆外内存)。
  2. 读取数据allocHandle.lastBytesRead(doReadBytes(byteBuf))

    • doReadBytes(byteBuf) 负责将 JDK 底层 SocketChannel 中的数据读取到分配的 ByteBuf 中。
    • 内部调用 NioSocketChannel.doReadBytes()

      protected int doReadBytes(ByteBuf byteBuf) throws Exception {
          final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
          allocHandle.attemptedBytesRead(byteBuf.writableBytes());
          return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
      }
    • 该方法记录尝试读取的字节数,并实际执行写入操作,返回实际写入的字节数。
  3. 记录读取结果lastBytesRead() 方法更新统计信息。

    public final void lastBytesRead(int bytes) {
        lastBytesRead = bytes;
        totalBytesRead += bytes;
        // 处理溢出等边界情况
        if (totalBytesRead < 0) {
            totalBytesRead = Integer.MAX_VALUE;
        }
    }
  4. 循环控制

    • allocHandle.incMessagesRead(1):增加消息读取计数。默认最大循环次数为 16 次,达到上限则结束循环。
    • pipeline.fireChannelRead(byteBuf):将读取到的数据向下传递,触发业务逻辑。
    • 注意:如果一次读取不完,Netty 会多次触发 channelRead 事件。关于数据完整性(半包处理),通常需要在业务层或通过 LengthFieldBasedFrameDecoder 等解码器处理。

5. 自适应缓冲区调整

循环结束后,执行 allocHandle.readComplete()。Netty 会根据本次读取的实际字节数,动态调整下一次分配 ByteBuf 的大小,以适配业务场景。

逻辑位于 AdaptiveRecvByteBufAllocator.readComplete()

public void readComplete() {
    record(totalBytesRead());
}

核心调整逻辑在 record() 方法中:

private void record(int actualReadBytes) {
    // 缩容判断:如果实际读取字节数小于当前索引前一位的大小
    if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
        if (decreaseNow) {
            // 执行缩容
            index = Math.max(index - INDEX_DECREMENT, minIndex);
            nextReceiveBufferSize = SIZE_TABLE[index];
            decreaseNow = false;
        } else {
            // 标记下一次需要缩容
            decreaseNow = true;
        }
    } 
    // 扩容判断:如果实际读取字节数大于等于当前分配大小
    else if (actualReadBytes >= nextReceiveBufferSize) {
        // 执行扩容
        index = Math.min(index + INDEX_INCREMENT, maxIndex);
        nextReceiveBufferSize = SIZE_TABLE[index];
        decreaseNow = false;
    }
}
  • 缩容:当连续两次读取的数据量都较小,且小于预设阈值时,减小 index,从而减小 nextReceiveBufferSize,避免内存浪费。
  • 扩容:当读取的数据量填满或超过了当前分配的缓冲区大小,增大 index,从而增大下一次分配的大小,减少分配次数。

调整完成后,通过 pipeline.fireChannelReadComplete() 传播读取完成事件,至此一次完整的读取流程结束。


说明:本文基于 Netty 4.x 版本源码分析。Netty 内部实现细节可能随版本迭代有所调整,具体请以实际使用的版本源码为准。