Reactor 模式

一、Web 请求处理架构概述

在处理 Web 请求时,主要存在两种体系结构:Thread-based Architecture(基于线程架构)和 Event-driven Architecture(事件驱动架构)。

(一)Thread-based Architecture(基于线程)

10345180faaebf9335592620.jpg

基于线程的体系结构通常采用多线程方式来处理客户端请求。每当接收到一个请求,服务器就会开启一个独立的线程进行处理。这种方式在直观上易于理解,但存在一定局限性:

  1. 每个线程都需要占用一定的内存资源。
  2. 操作系统在线程之间切换时会产生上下文切换开销。
  3. 当并发访问量不大时,这种开销可能不明显;但随着线程数量增多,会显著降低 Web 服务器的性能。
  4. 当线程处理 I/O 操作并处于等待输入状态时,线程处于空闲状态,这期间会造成 CPU 资源的浪费。

以下是其典型设计流程:

  1. 客户端发起请求。
  2. 服务器接收请求后,在单独的线程中进行读取(Read)操作。
  3. 对读取的数据进行解码(Decode)。
  4. 调用相应的处理程序(Handler)进行业务逻辑处理,如计算(Compute)。
  5. 对处理结果进行编码(Encode)。
  6. 最后将响应发送(Send)回客户端。

以 Java 中简单的基于线程处理 Web 请求示例代码来看(此处仅示意,简化了诸多细节):

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class ThreadBasedWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(8080);
        while (true) {
            // 阻塞接受连接
            final Socket clientSocket = serverSocket.accept();
            // 为每个连接启动一个新线程
            new Thread(() -> {
                try {
                    BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                    // 读取请求
                    String request = reader.readLine();
                    // 解码、处理逻辑(此处简单模拟,实际复杂得多)
                    String response = "Processed: " + request;
                    OutputStream outputStream = clientSocket.getOutputStream();
                    outputStream.write(response.getBytes());
                    clientSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

上述代码在服务器端监听 8080 端口,每当有客户端连接,就新开一个线程处理请求,先读取输入流中的请求内容,简单加工后写回响应。不过在实际场景中,解码、复杂业务计算、编码等环节会有大量专业逻辑处理。

(二)Event-driven Architecture(事件驱动)

10345180fdaf4d307916cd8f.jpg

事件驱动体系结构是目前广泛使用的一种方式。它通过定义一系列事件处理器来响应事件的发生,并且将服务端接受连接与对事件的处理分离开来。这里的事件可以理解为一种状态的改变,例如在 TCP 中,Socket 的新连接到来(New Incoming Connection)、准备好读取(Ready for Read)、准备好写入(Ready for Write)等都属于事件。

二、Reactor 模式介绍

Reactor 模式是事件驱动体系结构的一种实现方式,主要用于处理多个客户端并发向服务端请求服务的场景。在服务端,每种服务可能由多个方法组成,Reactor 模式能够解耦并发请求的服务,并将其分发给对应的事件处理器进行处理。

目前,许多流行的开源框架都运用了 Reactor 模式,如 Netty、Node.js 等,Java 的 NIO 也采用了该模式。

:此处可插入一张类似文中描述的 Reactor 模式架构图,包含客户端、Reactor、分发(Dispatch)、各种处理操作(Read、Decode、Compute、Encode、Send)以及接受器(Acceptor)等元素,以更直观展示流程。

(一)Reactor 模式的主要角色

  1. Handle(句柄)

    • 在 Linux 中一般称为文件描述符(File Descriptor),在 Windows 中称为句柄(Handle),它们的含义相同。
    • Handle 是事件的发源地,比如一个网络 Socket、磁盘文件等都可以是 Handle。
    • 发生在 Handle 上的事件包括连接(Connection)、准备好读取、准备好写入等。
  2. Synchronous Event Demultiplexer(同步事件分离器)

    • 本质上是系统调用,例如 Linux 中的 selectpollepoll 等。
    • select 方法为例,它会一直阻塞,直到 Handle 上有事件发生时才会返回。
    • 在 Java NIO 中,使用 Selector 类来实现类似功能。示例如下:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NIOSelectorExample {
    public static void main(String[] args) throws IOException {
        // 创建 ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8080));
        // 设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        // 创建 Selector
        Selector selector = Selector.open();
        // 将 ServerSocketChannel 注册到 Selector 上,关注 OP_ACCEPT 事件(新连接事件)
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        while (true) {
            // 阻塞等待事件发生
            selector.select();
            // 获取发生事件的 SelectionKey 集合
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectedKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                // 处理不同类型事件,此处只是示例框架,具体处理逻辑要细化
                if (key.isAcceptable()) {
                    // 处理新连接事件
                } else if (key.isReadable()) {
                    // 处理可读事件
                } else if (key.isWritable()) {
                    // 处理可写事件
                }
                // 处理完后移除已处理的 SelectionKey,避免重复处理
                iterator.remove();
            }
        }
    }
}

这段 Java NIO 代码中,先打开 ServerSocketChannel 绑定端口并设为非阻塞,通过 Selector 来等待诸如新连接、可读、可写等事件,根据不同事件类型后续会有对应处理分支。这里只是搭建了基础框架展示同步事件分离及初步事件判断逻辑。

  1. EventHandler(事件处理器)

    • 事件处理器会定义一些回调方法(也称为钩子函数)。
    • 当 Handle 上有事件发生时,这些回调方法便会执行,从而实现一种事件处理机制。
    • 比如在一个自定义的网络事件处理框架里(伪代码示意结构):
interface EventHandler {
    void onConnect(Handle handle);
    void onReadable(Handle handle, byte[] data);
    void onWritable(Handle handle);
}

上述接口定义了连接、可读、可写等典型事件对应的回调方法,具体子类实现该接口填充对应业务逻辑。

  1. Concrete Event Handler(具体的事件处理器)

    • 具体的事件处理器实现了 EventHandler 接口。
    • 在其回调方法中会实现具体的业务逻辑,针对不同类型的事件进行相应的处理。
    • 以处理 HTTP 请求的具体事件处理器为例(简化示意,聚焦于处理可读事件部分逻辑):
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;

class HttpEventHandler implements EventHandler {
    @Override
    public void onConnect(Handle handle) {
        // 连接建立时处理,比如记录日志、初始化资源等
        System.out.println("New connection: " + handle);
    }

    @Override
    public void onReadable(Handle handle, byte[] data) {
        // 假设 data 是 HTTP 请求数据
        String request = new String(data);
        // 解析请求,提取方法、路径等信息
        String[] parts = request.split(" ");
        String method = parts[0];
        String path = parts[1];
        // 根据请求生成响应内容,这里简单返回固定响应
        String response = "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\nHello, World!";
        try {
            // 将响应写回客户端,实际会更复杂涉及编码、状态管理等
            // 注意:此处为了示意将 Handle 强转为 Socket,实际 NIO 中应使用 Channel
            OutputStream outputStream = ((Socket) handle).getOutputStream();
            outputStream.write(response.getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onWritable(Handle handle) {
        // 可写事件处理,比如发送缓存数据等,这里暂略具体逻辑细化
    }
}

这个 HttpEventHandler 针对 HTTP 请求中可读事件解析请求、生成并返回简单响应,展示具体业务处理细节在具体事件处理器中的落地。

  1. Initiation Dispatcher(初始分发器)

    • 这也是 Reactor 模式中的一个重要角色,它提供了注册、删除与转发 Event Handler 的方法。
    • 当 Synchronous Event Demultiplexer 检测到 Handle 上有事件发生时,会通知 Initiation Dispatcher 调用特定的 Event Handler 的回调方法。
    • 以简易的 Java 类模拟分发器部分功能(省略部分错误处理、优化逻辑):
import java.util.HashMap;
import java.util.Map;

// 假设 EventType 已定义
enum EventType { CONNECT, READABLE, WRITABLE }

class InitiationDispatcher {
    private Map<Handle, EventHandler> handlerMap = new HashMap<>();
    
    // 注册事件处理器与对应 Handle
    public void register(Handle handle, EventHandler eventHandler) {
        handlerMap.put(handle, eventHandler);
    }
    
    // 根据 Handle 获取对应的事件处理器
    public EventHandler getEventHandler(Handle handle) {
        return handlerMap.get(handle);
    }
    
    // 事件发生时调用此方法,分发处理逻辑
    public void handleEvent(Handle handle, int eventType) {
        EventHandler eventHandler = getEventHandler(handle);
        if (eventHandler != null) {
            if (eventType == EventType.CONNECT.ordinal()) {
                eventHandler.onConnect(handle);
            } else if (eventType == EventType.READABLE.ordinal()) {
                byte[] data = readDataFromHandle(handle);  // 假设已有读取数据方法
                eventHandler.onReadable(handle, data);
            } else if (eventType == EventType.WRITABLE.ordinal()) {
                eventHandler.onWritable(handle);
            }
        }
    }
    
    private byte[] readDataFromHandle(Handle handle) {
        // 模拟读取数据
        return new byte[0];
    }
}

这个 InitiationDispatcher 类内部用 Map 维护 Handle 与事件处理器关联,在事件发生时分发调用对应处理器回调方法,虽简化但体现核心分发逻辑。

(二)Reactor 模式的处理流程

  1. 注册处理器:应用向 Initiation Dispatcher 注册 Concrete Event Handler 时,需要标识出该事件处理器希望 Initiation Dispatcher 在何种类型的事件发生时向其通知,并且事件与 Handle 相关联。比如在上述 InitiationDispatcher 示例里,通过 register 方法将 HttpEventHandler 与对应的 Socket 类型 Handle 关联,并注明关注可读、连接等事件类型。
  2. 标识事件源:Initiation Dispatcher 要求注册的 Concrete Event Handler 传递内部关联的 Handle,该 Handle 会向操作系统标识,以便系统能够识别事件源。对应代码中 register 方法接收 Handle 并存储于 handlerMap 供后续识别事件源调用对应处理器。
  3. 启动事件循环:当所有的 Concrete Event Handler 都注册到 Initiation Dispatcher 上后,应用调用 handle_events 方法来启动 Initiation Dispatcher 的事件循环。此时,Initiation Dispatcher 会将每个 Concrete Event Handler 关联的 Handle 合并,并使用 Synchronous Event Demultiplexer 来等待这些 Handle 上事件的发生。类似之前 NIOSelectorExampleselector.select() 开启阻塞等待事件循环,配合 InitiationDispatcher 关联处理后续流程。
  4. 事件通知:当与某个事件源对应的 Handle 变为 ready 时,例如 TCP 的 Socket 变为 ready for reading,Synchronous Event Demultiplexer 便会通知 Initiation Dispatcher。在 Java NIO 场景下就是 selector 检测到对应 SelectionKey 状态变化(如 isReadabletrue)后通知分发逻辑。
  5. 分发回调:Initiation Dispatcher 会触发事件处理器的回调方法。当事件发生时,Initiation Dispatcher 会根据一个"key"(表示一个激活的 Handle)来定位并分发给特定的 EventHandler 的回调方法,对应 InitiationDispatcher 类里 handleEvent 方法按 Handle 及事件类型调用对应 EventHandler 回调。
  6. 业务处理:Initiation Dispatcher 调用特定的 Concrete Event Handler 的回调方法来响应其关联的 Handle 上发生的事件,从而完成具体的业务逻辑处理,像 HttpEventHandler 里对可读事件解析请求、生成响应返回客户端实现业务闭环。

(三)参考资料


说明:本文示例代码主要用于阐述 Reactor 模式的核心原理,部分实现(如 Handle 与 Socket 的转换)进行了简化处理。在实际生产环境中(如使用 Java NIO 或 Netty),建议遵循框架的最佳实践,注意非阻塞 I/O 的正确使用及线程模型的选择。