Lettuce是如何发送Command命令到redis的
lettuce-core 版本: 5.1.7.RELEASE
引言
在上一篇文章中,我们介绍了 Lettuce 如何基于 Netty 与 Redis 建立连接,其中提到了一个核心的 CommandHandler 类。本文将深入探讨 CommandHandler 在发送 Command 到 Redis 过程中的作用,以及 Lettuce 如何实现多线程共享同一个物理连接。
我们将通过跟踪 sync.get 方法的调用链路,分析 Lettuce 是如何发送 GET 命令到 Redis 以及如何读取 Redis 返回结果的。
示例代码
首先回顾一下示例代码,主要关注 sync.get 和 async.get 的使用方式:
/**
* @author xiaobing
* @date 2019/12/20
*/
public class LettuceSimpleUse {
private void testLettuce() throws ExecutionException, InterruptedException {
// 构建 RedisClient 对象,RedisClient 包含了 Redis 的基本配置信息,可以基于 RedisClient 创建 RedisConnection
RedisClient client = RedisClient.create("redis://localhost");
// 创建一个线程安全的 StatefulRedisConnection,可以多线程并发对该 connection 操作,底层只有一个物理连接
StatefulRedisConnection<String, String> connection = client.connect();
// 获取 SyncCommand。Lettuce 支持 SyncCommand、AsyncCommands、ReactiveCommands 三种命令模式
RedisStringCommands<String, String> sync = connection.sync();
String value = sync.get("key");
System.out.println("get redis value with lettuce sync command, value is :" + value);
// 获取 AsyncCommand
RedisAsyncCommands<String, String> async = connection.async();
RedisFuture<String> getFuture = async.get("key");
value = getFuture.get();
System.out.println("get redis value with lettuce async command, value is :" + value);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
new LettuceSimpleUse().testLettuce();
}
}同步与异步命令的转换
在分析 sync.get 方法之前,先了解一下 RedisStringCommands 是如何生成的。从源码可以看出,RedisStringCommands 其实是对 RedisAsyncCommands 方法调用的同步阻塞版本。
// 创建一个 sync 版本的 RedisCommand
protected RedisCommands<K, V> newRedisSyncCommandsImpl() {
// async() 方法返回的就是该 Connection 对应的 RedisAsyncCommand
return syncHandler(async(), RedisCommands.class, RedisClusterCommands.class);
}
// 返回一个动态代理类,代理类的实现在 FutureSyncInvocationHandler 类中
protected <T> T syncHandler(Object asyncApi, Class<?>... interfaces) {
FutureSyncInvocationHandler h = new FutureSyncInvocationHandler((StatefulConnection<?, ?>) this, asyncApi, interfaces);
// 基于 FutureSyncInvocationHandler 生成动态代理类
return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h);
}
// 异步转同步的关键
class FutureSyncInvocationHandler extends AbstractInvocationHandler {
// ...
@Override
@SuppressWarnings("unchecked")
protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {
try {
Method targetMethod = this.translator.get(method);
Object result = targetMethod.invoke(asyncApi, args);
// RedisAsyncCommand 返回的大部分对象类型都是 RedisFuture 类型的
if (result instanceof RedisFuture<?>) {
RedisFuture<?> command = (RedisFuture<?>) result;
if (isNonTxControlMethod(method.getName()) && isTransactionActive(connection)) {
return null;
}
// 获取配置的超时时间
long timeout = getTimeoutNs(command);
// 阻塞地等待 RedisFuture 返回结果
return LettuceFutures.awaitOrCancel(command, timeout, TimeUnit.NANOSECONDS);
}
return result;
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
}
}
// ...因此,sync.get 操作最终调用的依然是 async.get 操作。接下来我们重点分析 async.get 的执行流程。建议先结合下文时序图建立整体概念。
命令的构建与分发
AbstractRedisAsyncCommands
@Override
public RedisFuture<V> get(K key) {
return dispatch(commandBuilder.get(key));
}commandBuilder.get(key)
这一步骤主要是根据用户的输入参数 key、命令类型 GET、序列化方式来生成一个 Command 对象。该 Command 对象会按照 Redis 协议格式将命令序列化成字符串。
Command<K, V, V> get(K key) {
notNullKey(key);
// ValueOutput 基于序列化
return createCommand(GET, new ValueOutput<>(codec), key);
}
protected <T> Command<K, V, T> createCommand(CommandType type, CommandOutput<K, V, T> output, K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
return createCommand(type, output, args);
}
protected <T> Command<K, V, T> createCommand(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) {
return new Command<K, V, T>(type, output, args);
}AbstractRedisAsyncCommands.dispatch
public <T> AsyncCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
// 用 AsyncCommand 对 RedisCommand 做一个包装处理,这个 AsyncCommand 实现了 RedisFuture 接口
// 最后返回给调用方的就是这个对象。当 Lettuce 收到 Redis 的返回结果时会调用 AsyncCommand 的 complete 方法,异步返回数据
AsyncCommand<K, V, T> asyncCommand = new AsyncCommand<>(cmd);
// 调用 connection 的 dispatch 方法把 Command 发送给 Redis
// 这个 connection 就是上一篇中提到的 StatefulRedisConnectionImpl
RedisCommand<K, V, T> dispatched = connection.dispatch(asyncCommand);
if (dispatched instanceof AsyncCommand) {
return (AsyncCommand<K, V, T>) dispatched;
}
return asyncCommand;
}StatefulRedisConnectionImpl.dispatch
@Override
public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {
// 对 command 做预处理,当前主要是根据不同的命令配置一些异步处理
// 如:AUTH 命令成功之后把 password 写入到相应变量中,SELECT DB 操作成功之后把 db 值写入到相应变量中等
RedisCommand<K, V, T> toSend = preProcessCommand(command);
try {
// 真正的 dispatch 是在父类实现的
return super.dispatch(toSend);
} finally {
if (command.getType().name().equals(MULTI.name())) {
multi = (multi == null ? new MultiOutput<>(codec) : multi);
}
}
}
// 父类 RedisChannelHandler 的 dispatch 方法
protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
if (debugEnabled) {
logger.debug("dispatching command {}", cmd);
}
// tracingEnable 的代码先不用看
if (tracingEnabled) {
RedisCommand<K, V, T> commandToSend = cmd;
TraceContextProvider provider = CommandWrapper.unwrap(cmd, TraceContextProvider.class);
if (provider == null) {
commandToSend = new TracedCommand<>(cmd, clientResources.tracing()
.initialTraceContextProvider().getTraceContext());
}
return channelWriter.write(commandToSend);
}
// 其实就是直接调用 channelWriter.write 方法
// 而这个 channelWriter 就是上一节说的那个屏蔽底层 channel 实现的 DefaultEndpoint 类
return channelWriter.write(cmd);
}写入 Channel 与 Flush
DefaultEndpoint.write
@Override
public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
LettuceAssert.notNull(command, "Command must not be null");
try {
// sharedLock 是 Lettuce 自己实现的一个共享排他锁
// incrementWriters 相当于获取一个共享锁,当 channel 状态发生变化的时候(如断开连接)会获取排他锁执行一些清理操作
sharedLock.incrementWriters();
// validateWrite 是验证当前操作是否可以执行
// Lettuce 内部维护了一个保存已经发送但是还没有收到 Redis 消息的 Command 的 stack
// 可以配置这个 stack 的长度,防止 Redis 不可用时 stack 太长导致内存溢出
// 如果这个 stack 已经满了,validateWrite 会抛出异常
validateWrite(1);
// autoFlushCommands 默认为 true,即每执行一个 Redis 命令就执行 Flush 操作发送给 Redis
// 如果设置为 false,则需要手动 flush
// 由于 flush 操作相对较重,在某些场景下需要继续提升 Lettuce 的吞吐量可以考虑设置为 false
if (autoFlushCommands) {
if (isConnected()) {
// 写入 channel 并执行 flush 操作,核心在这个方法的实现中
writeToChannelAndFlush(command);
} else {
// 如果当前 channel 连接已经断开就先放入 Buffer 中,直接返回 AsyncCommand
// 重连之后会把 Buffer 中的 Command 再次尝试通过 channel 发送到 Redis 中
writeToDisconnectedBuffer(command);
}
} else {
writeToBuffer(command);
}
} finally {
// 释放共享锁
sharedLock.decrementWriters();
if (debugEnabled) {
logger.debug("{} write() done", logPrefix());
}
}
return command;
}DefaultEndpoint.writeToChannelAndFlush
private void writeToChannelAndFlush(RedisCommand<?, ?, ?> command) {
// queueSize 字段做 cas 1 操作
QUEUE_SIZE.incrementAndGet(this);
ChannelFuture channelFuture = channelWriteAndFlush(command);
// Lettuce 的可靠性:保证最多一次(At Most Once)
// 由于 Lettuce 的保证是基于内存的,所以并不可靠(系统 crash 时内存数据会丢失)
if (reliability == Reliability.AT_MOST_ONCE) {
// cancel on exceptions and remove from queue, because there is no housekeeping
channelFuture.addListener(AtMostOnceWriteListener.newInstance(this, command));
}
// Lettuce 的可靠性:保证最少一次(At Least Once)
// 由于 Lettuce 的保证是基于内存的,所以并不可靠(系统 crash 时内存数据会丢失)
if (reliability == Reliability.AT_LEAST_ONCE) {
// commands are ok to stay within the queue, reconnect will retrigger them
channelFuture.addListener(RetryListener.newInstance(this, command));
}
}
// 可以看到最终还是调用了 channel 的 writeAndFlush 操作,这个 Channel 就是 Netty 中的 NioSocketChannel
private ChannelFuture channelWriteAndFlush(RedisCommand<?, ?, ?> command) {
if (debugEnabled) {
logger.debug("{} write() writeAndFlush command {}", logPrefix(), command);
}
return channel.writeAndFlush(command);
}Netty 中的命令写入
到这里其实就牵扯到 Netty 的 Channel、EventLoop 相关概念了。简单来说,Channel 会把需要 write 的对象放入 Channel 对应的 EventLoop 的队列中就返回了。EventLoop 是一个 SingleThreadEventExecutor,它会回调 Bootstrap 时配置的 CommandHandler 的 write 方法。
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (debugEnabled) {
logger.debug("{} write(ctx, {}, promise)", logPrefix(), msg);
}
if (msg instanceof RedisCommand) {
// 如果是单个的 RedisCommand 就直接调用 writeSingleCommand 返回
writeSingleCommand(ctx, (RedisCommand<?, ?, ?>) msg, promise);
return;
}
if (msg instanceof List) {
List<RedisCommand<?, ?, ?>> batch = (List<RedisCommand<?, ?, ?>>) msg;
if (batch.size() == 1) {
writeSingleCommand(ctx, batch.get(0), promise);
return;
}
// 批量写操作,暂不关心
writeBatch(ctx, batch, promise);
return;
}
if (msg instanceof Collection) {
writeBatch(ctx, (Collection<RedisCommand<?, ?, ?>>) msg, promise);
}
}请求与响应的匹配机制
writeSingleCommand 核心逻辑
Lettuce 使用单一连接支持多线程并发向 Redis 发送 Command,那么 Lettuce 是怎么把请求 Command 与 Redis 返回的结果对应起来的呢?秘密就在这里。
private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command, ChannelPromise promise) {
if (!isWriteable(command)) {
promise.trySuccess();
return;
}
// 把当前 command 放入一个特定的栈中,这一步是关键
addToStack(command, promise);
// Trace 操作,暂不关心
if (tracingEnabled && command instanceof CompleteableCommand) {
// ...
}
// 调用 ChannelHandlerContext 把命令真正发送给 Redis
// 当然在发送给 Redis 之前会由 CommandEncoder 类对 RedisCommand 进行编码后写入 ByteBuf
ctx.write(command, promise);
}
private void addToStack(RedisCommand<?, ?, ?> command, ChannelPromise promise) {
try {
// 再次验证队列是否满了,如果满了就抛出异常
validateWrite(1);
// command.getOutput() == null 意味着这个 Command 不需要 Redis 返回影响,一般不会走这个分支
if (command.getOutput() == null) {
// fire&forget commands are excluded from metrics
complete(command);
}
// 这个应该是用来做 metrics 统计用的,暂时先不考虑
RedisCommand<?, ?, ?> redisCommand = potentiallyWrapLatencyCommand(command);
// 无论 promise 是什么类型的,最终都会把 command 放入到 stack 中
// stack 是一个基于数组实现的双向队列
if (promise.isVoid()) {
// 如果 promise 不是 Future 类型的就直接把当前 command 放入到 stack
stack.add(redisCommand);
} else {
// 如果 promise 是 Future 类型的就等 future 完成后把当前 command 放入到 stack
// 当前场景下就是走的这个分支
promise.addListener(AddToStack.newInstance(stack, redisCommand));
}
} catch (Exception e) {
command.completeExceptionally(e);
throw e;
}
}读取 Redis 响应
那么 Lettuce 收到 Redis 的回复消息之后是怎么通知 RedisCommand,并且把结果与 RedisCommand 对应上的呢?Netty 在收到 Redis 服务端返回的消息之后就会回调 CommandHandler 的 channelRead 方法。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf input = (ByteBuf) msg;
// ...
try {
// 重点在这里
decode(ctx, buffer);
} finally {
input.release();
}
}
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
// 如果 stack 为空,则直接返回,这个时候一般意味着返回的结果找到对应的 RedisCommand 了
if (pristine && stack.isEmpty() && buffer.isReadable()) {
// ...
return;
}
while (canDecode(buffer)) {
// 重点来了。从 stack 的头上取第一个 RedisCommand
RedisCommand<?, ?, ?> command = stack.peek();
if (debugEnabled) {
logger.debug("{} Stack contains: {} commands", logPrefix(), stack.size());
}
pristine = false;
try {
// 直接把返回的结果 buffer 给了 stack 头上的第一个 RedisCommand
// decode 操作实际上拿到 RedisCommand 的 commandOutput 对象对 Redis 的返回结果进行反序列化的
if (!decode(ctx, buffer, command)) {
return;
}
} catch (Exception e) {
ctx.close();
throw e;
}
if (isProtectedMode(command)) {
onProtectedMode(command.getOutput().getError());
} else {
if (canComplete(command)) {
stack.poll();
try {
complete(command);
} catch (Exception e) {
logger.warn("{} Unexpected exception during request: {}", logPrefix, e.toString(), e);
}
}
}
afterDecode(ctx, command);
}
if (buffer.refCnt() != 0) {
buffer.discardReadBytes();
}
}从上面的代码可以看出,当 Lettuce 收到 Redis 的回复消息时,就从 stack 的头上取第一个 RedisCommand,这个 RedisCommand 就是与该 Redis 返回结果对应的命令。
为什么这样就能对应上呢?
- Lettuce 与 Redis 之间只有一条 TCP 连接。
- 在 Lettuce 端放入
stack时是有序的。 - TCP 协议本身是有序的。
- Redis 是单线程处理请求的,所以 Redis 返回的消息也是有序的。
这样就能保证 Redis 返回的消息一定对应着 stack 中的第一个 RedisCommand。当然,如果连接断开又重连了,这个肯定就对应不上了,Lettuce 对断线重连也做了特殊处理,防止对应不上。
命令编码
最后看一下 Command 是如何编码成 Redis 协议的。
public void encode(ByteBuf buf) {
buf.writeByte('*');
// 写入参数的数量
CommandArgs.IntegerArgument.writeInteger(buf, 1 + (args != null ? args.count() : 0));
// 换行
buf.writeBytes(CommandArgs.CRLF);
// 写入命令的类型,即 GET
CommandArgs.BytesArgument.writeBytes(buf, type.getBytes());
if (args != null) {
// 调用 Args 的编码,这里面就会使用我们之前配置的 codec 序列化,当前使用的是 String.UTF8
args.encode(buf);
}
}说明
本文基于 lettuce-core 5.1.7.RELEASE 版本进行分析。该版本发布于 2019 年,较当前主流版本(6.x)可能存在差异。核心原理(如 Netty 集成、命令队列匹配机制)在后续版本中基本保持一致,但部分 API 或内部实现细节可能有所调整。
版权声明:本文为原创文章,版权归 戴老师的博客 所有,转载请联系博主获得授权。
本文地址:https://1diff.fun/archives/lettuce-shi-ru-he-fa-song-command-ming-ling-dao-redis-de.html
如果对本文有什么问题或疑问都可以在评论区留言,我看到后会尽量解答。