SocketChannel 读取ByteBuf 的过程
SocketChannel 读取 ByteBuf 的过程
本文深入分析 Netty 中 SocketChannel 读取数据并封装为 ByteBuf 的底层流程。整个过程主要涉及 NioEventLoop 的事件轮询、Unsafe 类的读取操作以及 ByteBuf 的内存分配策略。
1. 事件轮询与入口
流程始于 NioEventLoop 的 processSelectedKey 方法。该方法处理 SelectionKey 就绪的事件,包括连接、写、读和接受连接等。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 获取 Channel 中的 Unsafe 对象
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 如果 Key 不合法,说明 Channel 可能存在问题
if (!k.isValid()) {
// 代码省略
}
try {
// 获取就绪的 IO 事件
int readyOps = k.readyOps();
// 处理连接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 处理写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 处理读事件和接受连接事件
// 如果是 Worker 线程,通常处理 OP_READ 事件
// 如果是 Boss 线程,通常处理 OP_ACCEPT 事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}判断条件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) 表示轮询到了读事件或接受连接事件。
- Boss 线程:通常处理
OP_ACCEPT,对应NioServerSocketChannel。 - Worker 线程:通常处理
OP_READ,对应NioSocketChannel,表示客户端发来了数据流。
此时会调用 unsafe.read() 方法。对于 NioSocketChannel,其绑定的 Unsafe 实现类为 NioByteUnsafe,因此流程进入 NioByteUnsafe.read() 方法。
2. 读取流程核心逻辑
NioByteUnsafe.read() 方法是读取数据的核心入口,主要负责分配 ByteBuf、读取数据以及触发后续事件。
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 1. 分配 ByteBuf
byteBuf = allocHandle.allocate(allocator);
// 2. 读取数据到 ByteBuf,并记录读取字节数
allocHandle.lastBytesRead(doReadBytes(byteBuf));
// 3. 如果没有读取到数据,释放缓冲区并判断是否关闭
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
// 4. 更新读取统计
allocHandle.incMessagesRead(1);
readPending = false;
// 5. 触发 ChannelRead 事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
// 6. 读取完成回调
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}3. ByteBuf 内存分配器
在 read() 方法伊始,代码获取了内存分配器:final ByteBufAllocator allocator = config.getAllocator()。
- 获取分配器:调用
DefaultChannelConfig的getAllocator()方法,返回成员变量allocator。 - 默认配置:该成员变量初始化为
ByteBufAllocator.DEFAULT。 - 默认实现:
ByteBufAllocator.DEFAULT指向ByteBufUtil.DEFAULT_ALLOCATOR。 - 初始化逻辑:
DEFAULT_ALLOCATOR在静态代码块中初始化,根据系统属性io.netty.allocator.type决定使用池化(pooled)还是非池化(unpooled)分配器。默认情况下(非 Android 环境),使用PooledByteBufAllocator.DEFAULT。
// ByteBufUtil 静态初始化块片段
static {
String allocType = SystemPropertyUtil.get(
"io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
allocType = allocType.toLowerCase(Locale.US).trim();
ByteBufAllocator alloc;
if ("unpooled".equals(allocType)) {
alloc = UnpooledByteBufAllocator.DEFAULT;
} else if ("pooled".equals(allocType)) {
alloc = PooledByteBufAllocator.DEFAULT;
} else {
alloc = PooledByteBufAllocator.DEFAULT;
}
DEFAULT_ALLOCATOR = alloc;
// ... 其他配置初始化
}PooledByteBufAllocator 基于内存池技术,通过申请一块连续内存进行缓冲区分配,能有效减少内存碎片和系统调用。
4. 分配与读取细节
回到 NioByteUnsafe.read() 方法的循环中:
分配缓冲区:
byteBuf = allocHandle.allocate(allocator)。- 这里传入的是
PooledByteBufAllocator。 - 实际进入
DefaultMaxMessagesRecvByteBufAllocator.allocate(),调用alloc.ioBuffer(guess())。 guess()方法由AdaptiveRecvByteBufAllocator实现,返回nextReceiveBufferSize。首次分配默认大小为 1024 字节。ioBuffer()方法会根据是否支持 Unsafe 操作,最终分配一个PooledUnsafeDirectByteBuf对象(默认优先使用堆外内存)。
- 这里传入的是
读取数据:
allocHandle.lastBytesRead(doReadBytes(byteBuf))。doReadBytes(byteBuf)负责将 JDK 底层SocketChannel中的数据读取到分配的ByteBuf中。内部调用
NioSocketChannel.doReadBytes():protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }- 该方法记录尝试读取的字节数,并实际执行写入操作,返回实际写入的字节数。
记录读取结果:
lastBytesRead()方法更新统计信息。public final void lastBytesRead(int bytes) { lastBytesRead = bytes; totalBytesRead += bytes; // 处理溢出等边界情况 if (totalBytesRead < 0) { totalBytesRead = Integer.MAX_VALUE; } }循环控制:
allocHandle.incMessagesRead(1):增加消息读取计数。默认最大循环次数为 16 次,达到上限则结束循环。pipeline.fireChannelRead(byteBuf):将读取到的数据向下传递,触发业务逻辑。- 注意:如果一次读取不完,Netty 会多次触发
channelRead事件。关于数据完整性(半包处理),通常需要在业务层或通过LengthFieldBasedFrameDecoder等解码器处理。
5. 自适应缓冲区调整
循环结束后,执行 allocHandle.readComplete()。Netty 会根据本次读取的实际字节数,动态调整下一次分配 ByteBuf 的大小,以适配业务场景。
逻辑位于 AdaptiveRecvByteBufAllocator.readComplete():
public void readComplete() {
record(totalBytesRead());
}核心调整逻辑在 record() 方法中:
private void record(int actualReadBytes) {
// 缩容判断:如果实际读取字节数小于当前索引前一位的大小
if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
if (decreaseNow) {
// 执行缩容
index = Math.max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
// 标记下一次需要缩容
decreaseNow = true;
}
}
// 扩容判断:如果实际读取字节数大于等于当前分配大小
else if (actualReadBytes >= nextReceiveBufferSize) {
// 执行扩容
index = Math.min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}- 缩容:当连续两次读取的数据量都较小,且小于预设阈值时,减小
index,从而减小nextReceiveBufferSize,避免内存浪费。 - 扩容:当读取的数据量填满或超过了当前分配的缓冲区大小,增大
index,从而增大下一次分配的大小,减少分配次数。
调整完成后,通过 pipeline.fireChannelReadComplete() 传播读取完成事件,至此一次完整的读取流程结束。
说明:本文基于 Netty 4.x 版本源码分析。Netty 内部实现细节可能随版本迭代有所调整,具体请以实际使用的版本源码为准。
版权声明:本文为原创文章,版权归 戴老师的博客 所有,转载请联系博主获得授权。
本文地址:https://1diff.fun/archives/socketchannel-du-qu-bytebuf-de-guo-cheng.html
如果对本文有什么问题或疑问都可以在评论区留言,我看到后会尽量解答。