0. ChannelPipeline 的实例

ChannelPipeline 的使用实例

以下是 Netty 客户端连接的一个典型示例:

private void connect(String host, int port) {
    EventLoopGroup group = new NioEventLoopGroup();
    try {
        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline ch = socketChannel.pipeline();
                        ch.addLast(new TimeClientHandler());
                    }
                });

        ChannelFuture f = b.connect(host, port).sync();
        f.channel().closeFuture().sync();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        group.shutdownGracefully();
    }
}
  • Bootstrap 是 Netty 的启动辅助类。在进行参数设置后,通过 bootstrap.connect() 方法正式启动客户端。
  • connect() 方法内部会调用父类 AbstractBootstrapinitAndRegister() 方法,该方法负责创建 ChannelPipeline 并初始化 Pipeline 添加 Handler。
final ChannelFuture initAndRegister() {
    Channel channel = null;

    try {
        channel = this.channelFactory.newChannel();    // 创建 Channel
        this.init(channel);    // 初始化 Channel
    } catch (Throwable var3) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
        }
        return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
    }

    ChannelFuture regFuture = this.config().group().register(channel);    // 注册 Channel
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    return regFuture;
}

1. ChannelPipeline 创建

  • ChannelPipeline(数据管道)是与 Channel(通道)绑定的,一个 Channel 对应一个 ChannelPipeline。Pipeline 通常在 Channel 初始化时被创建。
  • 在 Channel 实例化时,会通过 newInstance() 方法调用构造器创建实例。NioServerSocketChannelNioSocketChannel 都继承了 AbstractChannel,在创建实例时也会调用 AbstractChannel 的构造器。
  • AbstractChannel 构造器中,会创建 Pipeline 管道实例:
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}
  • 创建 DefaultChannelPipeline 类型的对象指向 pipeline 属性:

    • Pipeline 内维护着一个以 AbstractChannelHandlerContext 为节点的双向链表。
    • 创建的 headtail 节点分别指向链表的头尾。
    • TailContextHeadContext 都继承了 AbstractChannelHandlerContext 并实现了 ChannelHandler 接口。
    • AbstractChannelHandlerContext 内部维护着 nextprev 链表指针以及 inboundoutbound 节点方向等。
    • TailContext 实现了 ChannelInboundHandlerHeadContext 实现了 ChannelOutboundHandler

20171122154052606.png

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

2. ChannelPipeline 初始化的 Handler 添加过程

  • bootstrap.connect() 方法会在 Pipeline 创建之后,通过 newChannel() 调用 init() 方法对其进行初始化。
void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(new ChannelHandler[]{this.config.handler()});
    Map options = this.options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    Map attrs = this.attrs0();
    synchronized (attrs) {
        Iterator var6 = attrs.entrySet().iterator();
        while (var6.hasNext()) {
            Entry e = (Entry) var6.next();
            channel.attr((AttributeKey) e.getKey()).set(e.getValue());
        }
    }
}
  • 调用 pipeline.addLast() 方法添加 Handler 到 Pipeline 管道中,该 Handler 为初始化时配置的 ChannelInitializer 对象。
  • ChannelInitializer 继承了 ChannelInboundHandlerAdapter,它提供了一个 initChannel 方法供我们初始化自定义 ChannelHandler。
  • 在调用 addLast() 方法时,会创建一个 DefaultChannelHandlerContext 节点用来存放 ChannelInitializer。因为 ChannelInitializer 继承了 ChannelInboundHandlerAdapter,所以节点的 inbound 属性为 trueoutbound 属性为 false

20171122154207527.png

public final ChannelHandler handler() {
    return this.bootstrap.handler();
}
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline ch = socketChannel.pipeline();
        ch.addLast(new TimeClientHandler());
    }
});
  • 自定义 Handler 添加到 Pipeline 管道中发生在 Channel 通道的注册过程中。在调用 register0() 方法注册 Channel 过程中,调用 pipeline.fireChannelRegistered() 方法传递通道注册事件。
public ChannelPipeline fireChannelRegistered() {
    head.fireChannelRegistered();
    return this;
}
  • 调用 AbstractChannelHandlerContextinvokeChannelRegistered() 方法,调用 findContextInbound() 方法从头遍历双向链表查找第一个 inbound 类型的节点(这里就是查找 ChannelInitializer 节点),调用该节点的 channelRegistered() 方法添加自定义的 Handler,然后删除 ChannelInitializer 节点。
public ChannelHandlerContext fireChannelRegistered() {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
    return this;
}
private void invokeChannelRegistered() {
    if (this.invokeHandler()) {
        try {
            ((ChannelInboundHandler) this.handler()).channelRegistered(this);
        } catch (Throwable var2) {
            this.notifyHandlerException(var2);
        }
    } else {
        this.fireChannelRegistered();
    }
}
  • 调用 ChannelInitializer 节点的 channelRegistered() 方法添加自定义节点并删除初始节点:

    • 调用 initChannel() 方法,通过 addLast() 向链表尾部添加自定义 Handler。
    • 删除 ChannelInitializer 节点。
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    initChannel((C) ctx.channel());
    ctx.pipeline().remove(this);
    ctx.fireChannelRegistered();
}

20171122154319469.jpeg

3. ChannelPipeline 事件传输机制

  • 通过 pipeline.addLast() 方法添加自定义 Handler,为该 Handler 创建一个对应的 DefaultChannelHandlerContext 实例,并与之关联起来(Context 中有一个 handler 属性保存着对应的 Handler 实例)。
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
    synchronized (this) {
        checkDuplicateName(name); // 检查此 handler 是否有重复的名字
        AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
        addLast0(name, newCtx);
    }
    return this;
}
  • 在创建 DefaultChannelHandlerContext 时,会通过 isInbound() 方法和 isOutbound() 方法判断当前 Handler 是否继承实现了 ChannelInboundHandler 或者 ChannelOutboundHandler 接口,进而设置 DefaultChannelHandlerContext 实例的 inbound 属性和 outbound 属性。
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    } else {
        this.handler = handler;
    }
}
  • Netty 的事件分为 Inbound 事件Outbound 事件,分别代表管道中两个方向的数据流向。
  • Pipeline 管道中维护的双向链表的节点也根据 DefaultChannelHandlerContext 实例的 inbound 属性和 outbound 属性分为 Inbound 节点和 Outbound 节点。
  • 输入事件会依次经过 Inbound 节点的处理,输出事件会依次经过 Outbound 节点的处理。
  • 读写数据流依次经过相应节点处理,一个节点处理完后会调用 ChannelHandlerContext.fireChannelRegistered() 传递到下一个节点。
                            read() or write()
               Channel or ChannelHandlerContext
                                                  |
+---------------------------------------------------+---------------+
|                           ChannelPipeline         |               |
|                                                  \|/              |
|    +---------------------+            +-----------+----------+    |
|    | Inbound Handler  N  |            | Outbound Handler  1  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  .               |
|               .                                   .               |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
|        [ method call]                       [method call]         |
|               .                                   .               |
|               .                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  1  |            | Outbound Handler  M  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
+---------------+-----------------------------------+---------------+
              |                                  \|/
+---------------+-----------------------------------+---------------+
|               |                                   |               |
|       [ Socket.read() ]                    [ Socket.write() ]     |
|                                                                   |
|  Netty Internal I/O Threads (Transport Implementation)            |
+-------------------------------------------------------------------+

Outbound 事件传播机制

  • Outbound 事件是请求事件,Channel 发起具体的事件最终通过 Unsafe 底层进行处理,数据传输的方向是 Tail -> Head
  • 在自定义 Handler 中通过 ctx.write() 方法向通道中写入数据:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf buf = (ByteBuf) msg;
    ctx.write(buf);
}
  • AbstractChannelHandlerContext 中调用 write() 方法:
public ChannelFuture write(Object msg) {
    return this.write(msg, this.newPromise());
}

public ChannelFuture write(Object msg, ChannelPromise promise) {
    if (msg == null) {
        throw new NullPointerException("msg");
    } else {
        try {
            if (this.isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                return promise;
            }
        } catch (RuntimeException var4) {
            ReferenceCountUtil.release(msg);
            throw var4;
        }
        this.write(msg, false, promise);
        return promise;
    }
}
  • AbstractChannelHandlerContext 中调用重载 write() 方法,查找下一个 Outbound 节点,也就是当前 Handler 后面的一个 Outbound 类型的 Handler:
private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = this.findContextOutbound();
    Object m = this.pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        Object task;
        if (flush) {
            task = AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next, m, promise);
        } else {
            task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, (Runnable) task, promise, m);
    }
}
  • 调用下一个 AbstractChannelHandlerContext 节点的 invoke() 方法:
private void invokeWrite(Object msg, ChannelPromise promise) {
    if (this.invokeHandler()) {
        this.invokeWrite0(msg, promise);
    } else {
        this.write(msg, promise);
    }
}
  • 执行下一个 Handler 的 write() 方法,在方法中会再次调用 AbstractChannelHandlerContextwrite() 方法完成一次循环:
private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) this.handler()).write(this, msg, promise);
    } catch (Throwable var4) {
        notifyOutboundHandlerException(var4, promise);
    }
}
  • Outbound 事件循环执行流程:

    handler.write() -> context.write() -> context.findContextOutbound -> next.invokeWrite -> handler.write -> context.write()
  • Outbound 事件传播机制总结:

    • Outbound 事件是请求事件(由 write 方法或者 connect 方法发起一个请求,并最终由 unsafe 处理这个请求)。Outbound 事件的发起者是 Channel,Outbound 事件的处理者是 unsafe
    • Outbound 事件在 Pipeline 中的传输方向是 Tail -> Head
    • 在 ChannelHandler 中处理事件时,如果这个 Handler 不是最后一个 Handler,则需要调用 ctx.xxx(例如 ctx.connect)将此事件继续传播下去。如果不这样做,那么此事件的传播会提前终止。如 StringDecoder 是最后一个 Handler,则将作为事件传播的终点不再向下传播。
    • Outbound 事件流: Context.xxx -> Connect.findContextOutbound -> nextContext.invokeXxx -> nextHandler.xxx -> nextContext.xxx

Inbound 事件传播机制

  • Inbound 事件传播的起点是调用 pipeline.fireXxx() 方法,在该方法中调用了 head 链表头结点的 fireXxx() 方法,因此 Inbound 事件传播方向是 Head -> Tail
public ChannelPipeline fireChannelActive() {
    head.fireChannelActive();

    if (channel.config().isAutoRead()) {
        channel.read();
    }

    return this;
}
  • head.fireChannelActive() 会调用 AbstractChannelHandlerContextfireChannelXxx() 方法,在该方法中会查询下一个 Inbound 类型的节点,并通过 invokeChannelXxx() 方法调用该节点:
public ChannelHandlerContext fireChannelActive() {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    ...
    next.invokeChannelActive();
    ...
    return this;
}
  • 在 Inbound 节点的 invokeChannelXxx() 方法中执行该节点的 handler.ChannelXxx() 方法:
private void invokeChannelActive() {
    try {
        ((ChannelInboundHandler) handler()).channelActive(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}
  • 在节点 handler.channelXxx() 方法中,如果该节点不是最后一个处理节点,则会调用 ctx.fireChannelXxx() 方法将数据流传递给下一个 Inbound 节点,完成依次循环:
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();
}
  • Inbound 事件传播的流程:

    pipeline.fireChannelXxx() -> ctx.fireChannelXxx() -> findContextInbound() -> ctx.invokeChannelXxx() -> handler.channelXxx() -> ctx.fireChannelXxx()
  • Inbound 事件传播机制总结:

    • Inbound 事件是通知型事件,事件由底层程序产生,通知上层应用程序。Inbound 事件在 Pipeline 中传输方向是 Head -> Tail
    • Inbound 事件的处理者是 Channel,如果用户没有实现自定义的处理方法,那么 Inbound 事件默认的处理者是 TailContext,并且其处理方法是空实现。
    • 在 ChannelHandler 中处理事件时,如果这个 Handler 不是最后一个 Handler,则需要调用 ctx.fireChannelXxx(例如 ctx.fireChannelActive)将此事件继续传播下去。如果不这样做,那么此事件的传播会提前终止。
    • Inbound 事件流: Context.fireChannelXxx -> Connect.findContextInbound -> nextContext.invokeChannelXxx -> nextHandler.ChannelXxx -> nextContext.fireChannelXxx

结论

  • Pipeline 是事件传播的管道,内部维护着一个双向链表,链表节点分为 Inbound 类型Outbound 类型
  • 输入事件(比如 read 事件),数据流从链表的 Head 到 Tail 依次经过链表 Inbound 类型节点的处理。
  • 输出事件(比如 write 事件),数据流从链表的 Tail 到 Head 依次经过链表 Outbound 类型节点的处理。
说明:本文源码分析基于 Netty 4.x 版本(部分代码如 OneTimeTask 见于早期 4.x 版本)。不同版本间内部实现细节(如任务执行机制)可能存在差异,但 ChannelPipeline 的核心双向链表结构与事件传播机制保持一致。