lettuce-core 版本: 5.1.7.RELEASE

引言

在上一篇文章中,我们介绍了 Lettuce 如何基于 Netty 与 Redis 建立连接,其中提到了一个核心的 CommandHandler 类。本文将深入探讨 CommandHandler 在发送 Command 到 Redis 过程中的作用,以及 Lettuce 如何实现多线程共享同一个物理连接。

我们将通过跟踪 sync.get 方法的调用链路,分析 Lettuce 是如何发送 GET 命令到 Redis 以及如何读取 Redis 返回结果的。

示例代码

首先回顾一下示例代码,主要关注 sync.getasync.get 的使用方式:

/**
 * @author xiaobing
 * @date 2019/12/20
 */
public class LettuceSimpleUse {
    private void testLettuce() throws ExecutionException, InterruptedException {
        // 构建 RedisClient 对象,RedisClient 包含了 Redis 的基本配置信息,可以基于 RedisClient 创建 RedisConnection
        RedisClient client = RedisClient.create("redis://localhost");

        // 创建一个线程安全的 StatefulRedisConnection,可以多线程并发对该 connection 操作,底层只有一个物理连接
        StatefulRedisConnection<String, String> connection = client.connect();

        // 获取 SyncCommand。Lettuce 支持 SyncCommand、AsyncCommands、ReactiveCommands 三种命令模式
        RedisStringCommands<String, String> sync = connection.sync();
        String value = sync.get("key");
        System.out.println("get redis value with lettuce sync command, value is :" + value);

        // 获取 AsyncCommand
        RedisAsyncCommands<String, String> async = connection.async();
        RedisFuture<String> getFuture = async.get("key");
        value = getFuture.get();
        System.out.println("get redis value with lettuce async command, value is :" + value);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        new LettuceSimpleUse().testLettuce();
    }
}

同步与异步命令的转换

在分析 sync.get 方法之前,先了解一下 RedisStringCommands 是如何生成的。从源码可以看出,RedisStringCommands 其实是对 RedisAsyncCommands 方法调用的同步阻塞版本。

// 创建一个 sync 版本的 RedisCommand
protected RedisCommands<K, V> newRedisSyncCommandsImpl() {
    // async() 方法返回的就是该 Connection 对应的 RedisAsyncCommand
    return syncHandler(async(), RedisCommands.class, RedisClusterCommands.class);
}

// 返回一个动态代理类,代理类的实现在 FutureSyncInvocationHandler 类中
protected <T> T syncHandler(Object asyncApi, Class<?>... interfaces) {
    FutureSyncInvocationHandler h = new FutureSyncInvocationHandler((StatefulConnection<?, ?>) this, asyncApi, interfaces);
    // 基于 FutureSyncInvocationHandler 生成动态代理类
    return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h);
}

// 异步转同步的关键
class FutureSyncInvocationHandler extends AbstractInvocationHandler {

    // ...

    @Override
    @SuppressWarnings("unchecked")
    protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {
        try {
            Method targetMethod = this.translator.get(method);
            Object result = targetMethod.invoke(asyncApi, args);

            // RedisAsyncCommand 返回的大部分对象类型都是 RedisFuture 类型的
            if (result instanceof RedisFuture<?>) {
                RedisFuture<?> command = (RedisFuture<?>) result;

                if (isNonTxControlMethod(method.getName()) && isTransactionActive(connection)) {
                    return null;
                }
                // 获取配置的超时时间
                long timeout = getTimeoutNs(command);
                // 阻塞地等待 RedisFuture 返回结果
                return LettuceFutures.awaitOrCancel(command, timeout, TimeUnit.NANOSECONDS);
            }

            return result;
        } catch (InvocationTargetException e) {
            throw e.getTargetException();
        }
    }
}
// ...

因此,sync.get 操作最终调用的依然是 async.get 操作。接下来我们重点分析 async.get 的执行流程。建议先结合下文时序图建立整体概念。

命令的构建与分发

AbstractRedisAsyncCommands

@Override
public RedisFuture<V> get(K key) {
    return dispatch(commandBuilder.get(key));
}

commandBuilder.get(key)

这一步骤主要是根据用户的输入参数 key、命令类型 GET、序列化方式来生成一个 Command 对象。该 Command 对象会按照 Redis 协议格式将命令序列化成字符串。

Command<K, V, V> get(K key) {
    notNullKey(key);
    // ValueOutput 基于序列化
    return createCommand(GET, new ValueOutput<>(codec), key);
}

protected <T> Command<K, V, T> createCommand(CommandType type, CommandOutput<K, V, T> output, K key) {
    CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
    return createCommand(type, output, args);
}

protected <T> Command<K, V, T> createCommand(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) {
    return new Command<K, V, T>(type, output, args);
}

AbstractRedisAsyncCommands.dispatch

public <T> AsyncCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
    // 用 AsyncCommand 对 RedisCommand 做一个包装处理,这个 AsyncCommand 实现了 RedisFuture 接口
    // 最后返回给调用方的就是这个对象。当 Lettuce 收到 Redis 的返回结果时会调用 AsyncCommand 的 complete 方法,异步返回数据
    AsyncCommand<K, V, T> asyncCommand = new AsyncCommand<>(cmd);
    
    // 调用 connection 的 dispatch 方法把 Command 发送给 Redis
    // 这个 connection 就是上一篇中提到的 StatefulRedisConnectionImpl
    RedisCommand<K, V, T> dispatched = connection.dispatch(asyncCommand);
    
    if (dispatched instanceof AsyncCommand) {
        return (AsyncCommand<K, V, T>) dispatched;
    }
    return asyncCommand;
}

StatefulRedisConnectionImpl.dispatch

@Override
public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {
    // 对 command 做预处理,当前主要是根据不同的命令配置一些异步处理
    // 如:AUTH 命令成功之后把 password 写入到相应变量中,SELECT DB 操作成功之后把 db 值写入到相应变量中等
    RedisCommand<K, V, T> toSend = preProcessCommand(command);

    try {
        // 真正的 dispatch 是在父类实现的
        return super.dispatch(toSend);
    } finally {
        if (command.getType().name().equals(MULTI.name())) {
            multi = (multi == null ? new MultiOutput<>(codec) : multi);
        }
    }
}

// 父类 RedisChannelHandler 的 dispatch 方法
protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
    if (debugEnabled) {
        logger.debug("dispatching command {}", cmd);
    }
    
    // tracingEnable 的代码先不用看
    if (tracingEnabled) {
        RedisCommand<K, V, T> commandToSend = cmd;
        TraceContextProvider provider = CommandWrapper.unwrap(cmd, TraceContextProvider.class);

        if (provider == null) {
            commandToSend = new TracedCommand<>(cmd, clientResources.tracing()
                    .initialTraceContextProvider().getTraceContext());
        }
        return channelWriter.write(commandToSend);
    }
    
    // 其实就是直接调用 channelWriter.write 方法
    // 而这个 channelWriter 就是上一节说的那个屏蔽底层 channel 实现的 DefaultEndpoint 类
    return channelWriter.write(cmd);
}

写入 Channel 与 Flush

DefaultEndpoint.write

@Override
public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
    LettuceAssert.notNull(command, "Command must not be null");

    try {
        // sharedLock 是 Lettuce 自己实现的一个共享排他锁
        // incrementWriters 相当于获取一个共享锁,当 channel 状态发生变化的时候(如断开连接)会获取排他锁执行一些清理操作
        sharedLock.incrementWriters();
        
        // validateWrite 是验证当前操作是否可以执行
        // Lettuce 内部维护了一个保存已经发送但是还没有收到 Redis 消息的 Command 的 stack
        // 可以配置这个 stack 的长度,防止 Redis 不可用时 stack 太长导致内存溢出
        // 如果这个 stack 已经满了,validateWrite 会抛出异常
        validateWrite(1);
        
        // autoFlushCommands 默认为 true,即每执行一个 Redis 命令就执行 Flush 操作发送给 Redis
        // 如果设置为 false,则需要手动 flush
        // 由于 flush 操作相对较重,在某些场景下需要继续提升 Lettuce 的吞吐量可以考虑设置为 false
        if (autoFlushCommands) {
            if (isConnected()) {
                // 写入 channel 并执行 flush 操作,核心在这个方法的实现中
                writeToChannelAndFlush(command);
            } else {
                // 如果当前 channel 连接已经断开就先放入 Buffer 中,直接返回 AsyncCommand
                // 重连之后会把 Buffer 中的 Command 再次尝试通过 channel 发送到 Redis 中
                writeToDisconnectedBuffer(command);
            }
        } else {
            writeToBuffer(command);
        }
    } finally {
        // 释放共享锁
        sharedLock.decrementWriters();
        if (debugEnabled) {
            logger.debug("{} write() done", logPrefix());
        }
    }

    return command;
}

DefaultEndpoint.writeToChannelAndFlush

private void writeToChannelAndFlush(RedisCommand<?, ?, ?> command) {
    // queueSize 字段做 cas 1 操作
    QUEUE_SIZE.incrementAndGet(this);
    
    ChannelFuture channelFuture = channelWriteAndFlush(command);
    
    // Lettuce 的可靠性:保证最多一次(At Most Once)
    // 由于 Lettuce 的保证是基于内存的,所以并不可靠(系统 crash 时内存数据会丢失)
    if (reliability == Reliability.AT_MOST_ONCE) {
        // cancel on exceptions and remove from queue, because there is no housekeeping
        channelFuture.addListener(AtMostOnceWriteListener.newInstance(this, command));
    }
    
    // Lettuce 的可靠性:保证最少一次(At Least Once)
    // 由于 Lettuce 的保证是基于内存的,所以并不可靠(系统 crash 时内存数据会丢失)
    if (reliability == Reliability.AT_LEAST_ONCE) {
        // commands are ok to stay within the queue, reconnect will retrigger them
        channelFuture.addListener(RetryListener.newInstance(this, command));
    }
}

// 可以看到最终还是调用了 channel 的 writeAndFlush 操作,这个 Channel 就是 Netty 中的 NioSocketChannel
private ChannelFuture channelWriteAndFlush(RedisCommand<?, ?, ?> command) {
    if (debugEnabled) {
        logger.debug("{} write() writeAndFlush command {}", logPrefix(), command);
    }
    return channel.writeAndFlush(command);
}

Netty 中的命令写入

到这里其实就牵扯到 Netty 的 ChannelEventLoop 相关概念了。简单来说,Channel 会把需要 write 的对象放入 Channel 对应的 EventLoop 的队列中就返回了。EventLoop 是一个 SingleThreadEventExecutor,它会回调 Bootstrap 时配置的 CommandHandlerwrite 方法。

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    if (debugEnabled) {
        logger.debug("{} write(ctx, {}, promise)", logPrefix(), msg);
    }

    if (msg instanceof RedisCommand) {
        // 如果是单个的 RedisCommand 就直接调用 writeSingleCommand 返回
        writeSingleCommand(ctx, (RedisCommand<?, ?, ?>) msg, promise);
        return;
    }

    if (msg instanceof List) {
        List<RedisCommand<?, ?, ?>> batch = (List<RedisCommand<?, ?, ?>>) msg;

        if (batch.size() == 1) {
            writeSingleCommand(ctx, batch.get(0), promise);
            return;
        }
        // 批量写操作,暂不关心
        writeBatch(ctx, batch, promise);
        return;
    }

    if (msg instanceof Collection) {
        writeBatch(ctx, (Collection<RedisCommand<?, ?, ?>>) msg, promise);
    }
}

请求与响应的匹配机制

writeSingleCommand 核心逻辑

Lettuce 使用单一连接支持多线程并发向 Redis 发送 Command,那么 Lettuce 是怎么把请求 Command 与 Redis 返回的结果对应起来的呢?秘密就在这里。

private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command, ChannelPromise promise) {
    if (!isWriteable(command)) {
        promise.trySuccess();
        return;
    }
    
    // 把当前 command 放入一个特定的栈中,这一步是关键
    addToStack(command, promise);
    
    // Trace 操作,暂不关心
    if (tracingEnabled && command instanceof CompleteableCommand) {
        // ...
    }
    
    // 调用 ChannelHandlerContext 把命令真正发送给 Redis
    // 当然在发送给 Redis 之前会由 CommandEncoder 类对 RedisCommand 进行编码后写入 ByteBuf
    ctx.write(command, promise);
}

private void addToStack(RedisCommand<?, ?, ?> command, ChannelPromise promise) {
    try {
        // 再次验证队列是否满了,如果满了就抛出异常
        validateWrite(1);
        
        // command.getOutput() == null 意味着这个 Command 不需要 Redis 返回影响,一般不会走这个分支
        if (command.getOutput() == null) {
            // fire&forget commands are excluded from metrics
            complete(command);
        }
        
        // 这个应该是用来做 metrics 统计用的,暂时先不考虑
        RedisCommand<?, ?, ?> redisCommand = potentiallyWrapLatencyCommand(command);
        
        // 无论 promise 是什么类型的,最终都会把 command 放入到 stack 中
        // stack 是一个基于数组实现的双向队列
        if (promise.isVoid()) {
            // 如果 promise 不是 Future 类型的就直接把当前 command 放入到 stack
            stack.add(redisCommand);
        } else {
            // 如果 promise 是 Future 类型的就等 future 完成后把当前 command 放入到 stack
            // 当前场景下就是走的这个分支
            promise.addListener(AddToStack.newInstance(stack, redisCommand));
        }
    } catch (Exception e) {
        command.completeExceptionally(e);
        throw e;
    }
}

读取 Redis 响应

那么 Lettuce 收到 Redis 的回复消息之后是怎么通知 RedisCommand,并且把结果与 RedisCommand 对应上的呢?Netty 在收到 Redis 服务端返回的消息之后就会回调 CommandHandlerchannelRead 方法。

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf input = (ByteBuf) msg;
    // ...
    try {
        // 重点在这里
        decode(ctx, buffer);
    } finally {
        input.release();
    }
}

protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
    // 如果 stack 为空,则直接返回,这个时候一般意味着返回的结果找到对应的 RedisCommand 了
    if (pristine && stack.isEmpty() && buffer.isReadable()) {
        // ...
        return;
    }

    while (canDecode(buffer)) {
        // 重点来了。从 stack 的头上取第一个 RedisCommand
        RedisCommand<?, ?, ?> command = stack.peek();
        if (debugEnabled) {
            logger.debug("{} Stack contains: {} commands", logPrefix(), stack.size());
        }

        pristine = false;

        try {
            // 直接把返回的结果 buffer 给了 stack 头上的第一个 RedisCommand
            // decode 操作实际上拿到 RedisCommand 的 commandOutput 对象对 Redis 的返回结果进行反序列化的
            if (!decode(ctx, buffer, command)) {
                return;
            }
        } catch (Exception e) {
            ctx.close();
            throw e;
        }

        if (isProtectedMode(command)) {
            onProtectedMode(command.getOutput().getError());
        } else {
            if (canComplete(command)) {
                stack.poll();
                try {
                    complete(command);
                } catch (Exception e) {
                    logger.warn("{} Unexpected exception during request: {}", logPrefix, e.toString(), e);
                }
            }
        }

        afterDecode(ctx, command);
    }

    if (buffer.refCnt() != 0) {
        buffer.discardReadBytes();
    }
}

从上面的代码可以看出,当 Lettuce 收到 Redis 的回复消息时,就从 stack 的头上取第一个 RedisCommand,这个 RedisCommand 就是与该 Redis 返回结果对应的命令。

为什么这样就能对应上呢?

  1. Lettuce 与 Redis 之间只有一条 TCP 连接。
  2. 在 Lettuce 端放入 stack 时是有序的。
  3. TCP 协议本身是有序的。
  4. Redis 是单线程处理请求的,所以 Redis 返回的消息也是有序的。

这样就能保证 Redis 返回的消息一定对应着 stack 中的第一个 RedisCommand。当然,如果连接断开又重连了,这个肯定就对应不上了,Lettuce 对断线重连也做了特殊处理,防止对应不上。

命令编码

最后看一下 Command 是如何编码成 Redis 协议的。

public void encode(ByteBuf buf) {
    buf.writeByte('*');
    // 写入参数的数量
    CommandArgs.IntegerArgument.writeInteger(buf, 1 + (args != null ? args.count() : 0));
    // 换行
    buf.writeBytes(CommandArgs.CRLF);
    // 写入命令的类型,即 GET
    CommandArgs.BytesArgument.writeBytes(buf, type.getBytes());

    if (args != null) {
        // 调用 Args 的编码,这里面就会使用我们之前配置的 codec 序列化,当前使用的是 String.UTF8
        args.encode(buf);
    }
}

说明

本文基于 lettuce-core 5.1.7.RELEASE 版本进行分析。该版本发布于 2019 年,较当前主流版本(6.x)可能存在差异。核心原理(如 Netty 集成、命令队列匹配机制)在后续版本中基本保持一致,但部分 API 或内部实现细节可能有所调整。