数据管道ChannelPipeline源码分析
0. ChannelPipeline 的实例
ChannelPipeline 的使用实例
以下是 Netty 客户端连接的一个典型示例:
private void connect(String host, int port) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline ch = socketChannel.pipeline();
ch.addLast(new TimeClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}- Bootstrap 是 Netty 的启动辅助类。在进行参数设置后,通过
bootstrap.connect()方法正式启动客户端。 connect()方法内部会调用父类AbstractBootstrap的initAndRegister()方法,该方法负责创建ChannelPipeline并初始化 Pipeline 添加 Handler。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = this.channelFactory.newChannel(); // 创建 Channel
this.init(channel); // 初始化 Channel
} catch (Throwable var3) {
if (channel != null) {
channel.unsafe().closeForcibly();
}
return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
ChannelFuture regFuture = this.config().group().register(channel); // 注册 Channel
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}1. ChannelPipeline 创建
- ChannelPipeline(数据管道)是与 Channel(通道)绑定的,一个 Channel 对应一个 ChannelPipeline。Pipeline 通常在 Channel 初始化时被创建。
- 在 Channel 实例化时,会通过
newInstance()方法调用构造器创建实例。NioServerSocketChannel和NioSocketChannel都继承了AbstractChannel,在创建实例时也会调用AbstractChannel的构造器。 - 在
AbstractChannel构造器中,会创建 Pipeline 管道实例:
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}创建
DefaultChannelPipeline类型的对象指向pipeline属性:- Pipeline 内维护着一个以
AbstractChannelHandlerContext为节点的双向链表。 - 创建的
head和tail节点分别指向链表的头尾。 TailContext和HeadContext都继承了AbstractChannelHandlerContext并实现了ChannelHandler接口。AbstractChannelHandlerContext内部维护着next、prev链表指针以及inbound、outbound节点方向等。TailContext实现了ChannelInboundHandler,HeadContext实现了ChannelOutboundHandler。
- Pipeline 内维护着一个以

public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}2. ChannelPipeline 初始化的 Handler 添加过程
bootstrap.connect()方法会在 Pipeline 创建之后,通过newChannel()调用init()方法对其进行初始化。
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelHandler[]{this.config.handler()});
Map options = this.options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
Map attrs = this.attrs0();
synchronized (attrs) {
Iterator var6 = attrs.entrySet().iterator();
while (var6.hasNext()) {
Entry e = (Entry) var6.next();
channel.attr((AttributeKey) e.getKey()).set(e.getValue());
}
}
}- 调用
pipeline.addLast()方法添加 Handler 到 Pipeline 管道中,该 Handler 为初始化时配置的ChannelInitializer对象。 ChannelInitializer继承了ChannelInboundHandlerAdapter,它提供了一个initChannel方法供我们初始化自定义 ChannelHandler。- 在调用
addLast()方法时,会创建一个DefaultChannelHandlerContext节点用来存放ChannelInitializer。因为ChannelInitializer继承了ChannelInboundHandlerAdapter,所以节点的inbound属性为true,outbound属性为false。

public final ChannelHandler handler() {
return this.bootstrap.handler();
}bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline ch = socketChannel.pipeline();
ch.addLast(new TimeClientHandler());
}
});- 自定义 Handler 添加到 Pipeline 管道中发生在 Channel 通道的注册过程中。在调用
register0()方法注册 Channel 过程中,调用pipeline.fireChannelRegistered()方法传递通道注册事件。
public ChannelPipeline fireChannelRegistered() {
head.fireChannelRegistered();
return this;
}- 调用
AbstractChannelHandlerContext的invokeChannelRegistered()方法,调用findContextInbound()方法从头遍历双向链表查找第一个inbound类型的节点(这里就是查找ChannelInitializer节点),调用该节点的channelRegistered()方法添加自定义的 Handler,然后删除ChannelInitializer节点。
public ChannelHandlerContext fireChannelRegistered() {
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
return this;
}private void invokeChannelRegistered() {
if (this.invokeHandler()) {
try {
((ChannelInboundHandler) this.handler()).channelRegistered(this);
} catch (Throwable var2) {
this.notifyHandlerException(var2);
}
} else {
this.fireChannelRegistered();
}
}调用
ChannelInitializer节点的channelRegistered()方法添加自定义节点并删除初始节点:- 调用
initChannel()方法,通过addLast()向链表尾部添加自定义 Handler。 - 删除
ChannelInitializer节点。
- 调用
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
ctx.fireChannelRegistered();
}
3. ChannelPipeline 事件传输机制
- 通过
pipeline.addLast()方法添加自定义 Handler,为该 Handler 创建一个对应的DefaultChannelHandlerContext实例,并与之关联起来(Context 中有一个handler属性保存着对应的 Handler 实例)。
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name); // 检查此 handler 是否有重复的名字
AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
addLast0(name, newCtx);
}
return this;
}- 在创建
DefaultChannelHandlerContext时,会通过isInbound()方法和isOutbound()方法判断当前 Handler 是否继承实现了ChannelInboundHandler或者ChannelOutboundHandler接口,进而设置DefaultChannelHandlerContext实例的inbound属性和outbound属性。
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
} else {
this.handler = handler;
}
}- Netty 的事件分为 Inbound 事件 和 Outbound 事件,分别代表管道中两个方向的数据流向。
- Pipeline 管道中维护的双向链表的节点也根据
DefaultChannelHandlerContext实例的inbound属性和outbound属性分为 Inbound 节点和 Outbound 节点。 - 输入事件会依次经过 Inbound 节点的处理,输出事件会依次经过 Outbound 节点的处理。
- 读写数据流依次经过相应节点处理,一个节点处理完后会调用
ChannelHandlerContext.fireChannelRegistered()传递到下一个节点。
read() or write()
Channel or ChannelHandlerContext
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+Outbound 事件传播机制
- Outbound 事件是请求事件,Channel 发起具体的事件最终通过
Unsafe底层进行处理,数据传输的方向是 Tail -> Head。 - 在自定义 Handler 中通过
ctx.write()方法向通道中写入数据:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
ctx.write(buf);
}- 在
AbstractChannelHandlerContext中调用write()方法:
public ChannelFuture write(Object msg) {
return this.write(msg, this.newPromise());
}
public ChannelFuture write(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
} else {
try {
if (this.isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
return promise;
}
} catch (RuntimeException var4) {
ReferenceCountUtil.release(msg);
throw var4;
}
this.write(msg, false, promise);
return promise;
}
}- 在
AbstractChannelHandlerContext中调用重载write()方法,查找下一个 Outbound 节点,也就是当前 Handler 后面的一个 Outbound 类型的 Handler:
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = this.findContextOutbound();
Object m = this.pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
Object task;
if (flush) {
task = AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, (Runnable) task, promise, m);
}
}- 调用下一个
AbstractChannelHandlerContext节点的invoke()方法:
private void invokeWrite(Object msg, ChannelPromise promise) {
if (this.invokeHandler()) {
this.invokeWrite0(msg, promise);
} else {
this.write(msg, promise);
}
}- 执行下一个 Handler 的
write()方法,在方法中会再次调用AbstractChannelHandlerContext的write()方法完成一次循环:
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) this.handler()).write(this, msg, promise);
} catch (Throwable var4) {
notifyOutboundHandlerException(var4, promise);
}
}Outbound 事件循环执行流程:
handler.write() -> context.write() -> context.findContextOutbound -> next.invokeWrite -> handler.write -> context.write()Outbound 事件传播机制总结:
- Outbound 事件是请求事件(由
write方法或者connect方法发起一个请求,并最终由unsafe处理这个请求)。Outbound 事件的发起者是 Channel,Outbound 事件的处理者是unsafe。 - Outbound 事件在 Pipeline 中的传输方向是 Tail -> Head。
- 在 ChannelHandler 中处理事件时,如果这个 Handler 不是最后一个 Handler,则需要调用
ctx.xxx(例如ctx.connect)将此事件继续传播下去。如果不这样做,那么此事件的传播会提前终止。如StringDecoder是最后一个 Handler,则将作为事件传播的终点不再向下传播。 - Outbound 事件流:
Context.xxx->Connect.findContextOutbound->nextContext.invokeXxx->nextHandler.xxx->nextContext.xxx
- Outbound 事件是请求事件(由
Inbound 事件传播机制
- Inbound 事件传播的起点是调用
pipeline.fireXxx()方法,在该方法中调用了head链表头结点的fireXxx()方法,因此 Inbound 事件传播方向是 Head -> Tail。
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}head.fireChannelActive()会调用AbstractChannelHandlerContext的fireChannelXxx()方法,在该方法中会查询下一个 Inbound 类型的节点,并通过invokeChannelXxx()方法调用该节点:
public ChannelHandlerContext fireChannelActive() {
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
...
next.invokeChannelActive();
...
return this;
}- 在 Inbound 节点的
invokeChannelXxx()方法中执行该节点的handler.ChannelXxx()方法:
private void invokeChannelActive() {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}- 在节点
handler.channelXxx()方法中,如果该节点不是最后一个处理节点,则会调用ctx.fireChannelXxx()方法将数据流传递给下一个 Inbound 节点,完成依次循环:
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}Inbound 事件传播的流程:
pipeline.fireChannelXxx() -> ctx.fireChannelXxx() -> findContextInbound() -> ctx.invokeChannelXxx() -> handler.channelXxx() -> ctx.fireChannelXxx()Inbound 事件传播机制总结:
- Inbound 事件是通知型事件,事件由底层程序产生,通知上层应用程序。Inbound 事件在 Pipeline 中传输方向是 Head -> Tail。
- Inbound 事件的处理者是 Channel,如果用户没有实现自定义的处理方法,那么 Inbound 事件默认的处理者是
TailContext,并且其处理方法是空实现。 - 在 ChannelHandler 中处理事件时,如果这个 Handler 不是最后一个 Handler,则需要调用
ctx.fireChannelXxx(例如ctx.fireChannelActive)将此事件继续传播下去。如果不这样做,那么此事件的传播会提前终止。 - Inbound 事件流:
Context.fireChannelXxx->Connect.findContextInbound->nextContext.invokeChannelXxx->nextHandler.ChannelXxx->nextContext.fireChannelXxx
结论
- Pipeline 是事件传播的管道,内部维护着一个双向链表,链表节点分为 Inbound 类型 和 Outbound 类型。
- 输入事件(比如 read 事件),数据流从链表的 Head 到 Tail 依次经过链表 Inbound 类型节点的处理。
- 输出事件(比如 write 事件),数据流从链表的 Tail 到 Head 依次经过链表 Outbound 类型节点的处理。
说明:本文源码分析基于 Netty 4.x 版本(部分代码如 OneTimeTask 见于早期 4.x 版本)。不同版本间内部实现细节(如任务执行机制)可能存在差异,但 ChannelPipeline 的核心双向链表结构与事件传播机制保持一致。 版权声明:本文为原创文章,版权归 戴老师的博客 所有,转载请联系博主获得授权。
本文地址:https://1diff.fun/archives/shu-ju-guan-dao-channelpipeline-yuan-ma-fen-xi.html
如果对本文有什么问题或疑问都可以在评论区留言,我看到后会尽量解答。