节点启动与集群配置

启动节点

节点启动后,进入集群模式等待配置或加入现有集群。

通过命令配置集群

集群初始化通常涉及以下步骤:

  1. 加入节点:使用 CLUSTER MEET 将新节点加入集群。
  2. 分派 Slot:将哈希槽(Slot)分配给主节点。
  3. 添加 Slave:为主节点配置从节点以实现高可用。

命令接受与处理

当集群节点接收到客户端命令时,会根据 Key 进行路由处理:

  • Key 路由:根据 Key 计算哈希槽,将命令转发到对应的节点。
  • Hash Tag 支持:特殊的标记 {...} 可用于将多个 Key 分派到相同的节点。

    • {} 不能出现在 Key 的结尾。
    • 如果存在 {...},哈希算法仅使用 {} 中间的字符串计算槽位。
    • 注意{} 中间必须包含字符,否则所有 Key 将会被分配到一个节点。
  • 多 Key 命令限制:如果命令中包含多个 Key,且这些 Key 不在相同的 Slot(即使在相同的节点也不行),则返回错误 CLUSTER_REDIR_CROSS_SLOT(消息:-CROSSSLOT)。EXEC 命令中的多条子命令包含的所有 Key 也必须在同一个 Slot 中。
  • 节点获取:通过 getNodeByQuery 根据 Key 获取目标节点。

路由逻辑详细流程

  1. 根据 Key 计算 Slot 值(keyHashSlot)。
  2. 如果 Slot 没有对应的 Node,则返回错误 CLUSTER_REDIR_DOWN_UNBOUND,对应返回消息:-CLUSTERDOWN
  3. 如果多个 Key 对应不同的 Slot,则返回错误 CLUSTER_REDIR_CROSS_SLOT,对应返回消息:-CROSSSLOT
  4. 如果命令中没有 Key,则返回当前节点。
  5. 如果集群不是 OK 状态,则返回错误 CLUSTER_REDIR_DOWN_STATE,对应返回消息:-CLUSTERDOWN
  6. 如果是迁移命令(cmd->proc == migrateCommand),也将返回当前节点。
  7. 如果对应 Slot 正往别的节点迁移数据,且当前节点没有包含所有命令中需要的 Key,则返回 ASKCLUSTER_REDIR_ASK),对应返回消息:-ASK hashslot ip port,对应的节点是迁移的目标节点。
  8. 如果对应的 Slot 正从别的节点导入到当前节点,如果包含所有命令中需要的 Key,则返回当前节点;否则返回错误 CLUSTER_REDIR_UNSTABLE,对应返回消息:-TRYAGAIN
  9. 如果客户端设置 CLIENT_READONLY 标记,命令也是只读的,且当前是 Slot 对应的 Slave 节点,也将返回当前节点。(如果命令是 EVAL / EVALSHA 也与只读命令是同样的处理)。
  10. 如果 Slot 对应的节点不是当前节点,则返回 CLUSTER_REDIR_MOVED,对应的消息是:-MOVED hashslot ip port,指向其节点;否则返回当前节点。
  11. 总结

    • 如果是对应 Slot 在迁移中,可能返回 ASK,去访问新的节点。
    • 如果是导入中,可能返回 TRYAGAIN,等会重试连接当前节点。
    • ASK 中有调整 ip/port,而 TRYAGAIN 却没有,注意这个区别。
    • MOVED 消息也是跳转,但是这是正常的 Slot 节点不是当前节点的情况,而 ASK 是代表迁移中的 Slot 的情况。

定时任务与状态检测

通过 Cron 定时任务检测内部变量状态,发送各种消息(这里面将包含非常重要的高可用实现、自动故障转移)。

执行频率与变量更新

  1. 频率:每秒执行 10 次,即 100ms 执行一次。
  2. 更新内核变量

    • myself->ip = server.cluster_announce_ip(因为 cluster-announce-ip 可以在运行时配置,一旦有更新,及时同步)。
    • handshake_timeout = server.cluster_node_timeout(同理,cluster_node_timeout 也是可以在运行时配置,及时同步。如果小于 1s,则强制设置为 1s,也就是节点超时重试间隔大于 1s)。
    • myself->flags:根据 server.cluster_slave_no_failover 的状态情况更新,也就是更新当前节点的 Failover 状态。

节点遍历与连接管理

遍历所有节点 server.cluster->nodes

  1. 统计 PFAIL 节点(server.cluster->stats_pfail_nodes++)。
  2. 移除 HANDSHAKE 状态并且超过 handshake_timeout 的节点。
  3. 如果连接为 NULL,则建立连接,发送 MEET / PING 消息,连接成功则去掉 CLUSTER_NODE_MEET 标记。

    • 如果是之前为活动的连接,但是没有成功收到 PONG 消息,则这里将继续保持 ping_sent 为之前的值。

心跳消息发送

发送消息给最早响应的一个节点:

  1. 大约 1s 执行一次。
  2. 在节点列表中随机取 5 次,每次取一个节点,然后取 pong_received 最小的节点。
  3. 对最小的节点发送 PING 消息。

节点状态深度检测

再次遍历所有节点 server.cluster->nodes

  1. 统计孤立的 Master 节点个数(有分派 Slot,无 Slave 节点)。
  2. 统计最大的单个 Master 对应的 Slave 节点个数。这一步与上一步执行时机相同,都是在当前节点为 Slave 节点,此次遍历的节点为 Master。如果达到条件将会执行 Slave 迁移,这就是为什么只在 Slave 节点执行这个判断的原因。
  3. 对于超时未收到节点的 PONG 消息,关闭连接 (now - node->ping_sent > server.cluster_node_timeout/2)。后续可能有 2 种情况:

    • 第一,走重试连接流程,给几次重试连接机会。
    • 第二,如果重试几次还是不正常,标记为可能失败状态(Possible Failure)。
  4. 对于正常的连接,如果长时间未发送 PING 消息的节点,则发送(node->ping_sent == 0 && (now - node->pong_received) > server.cluster_node_timeout/2)。
  5. 如果在执行手动故障转移(Manual Failover),当前节点是 Master 节点,此次遍历的节点是 mf_slave 节点,则发送 PING 消息。
  6. 对于超时过长的节点,标记为可能失败状态:

    now - node->ping_sent > server.cluster_node_timeout
    node->flags |= CLUSTER_NODE_PFAIL

Slave 节点特定逻辑

如果当前节点是 Slave 节点:

  1. Master 恢复处理:如果其对应的 Master 从故障中恢复正常,重新设置为 Master 信息,开启复制。
  2. 手动故障转移超时检测:如果超时,则重置相关变量。
  3. 故障转移与迁移处理

    • 处理手动故障转移 clusterHandleManualFailover

      • 如果无手动故障转移 (server.cluster->mf_end == 0),则直接返回。如果有手动故障转移,这个值是一个结束超时时间,因此在多处都使用这个值来判断是否在执行手动故障转移。
      • 如果手动故障转移开始标记已设置,则直接返回。
      • 如果当前节点的 Offset 与 Master 上的 Offset 一致,则标记开始故障转移 server.cluster->mf_can_start = 1
    • 处理 Slave 故障转移 clusterHandleSlaveFailover:详见下文「处理 Slave 故障转移」章节。
    • 处理 Slave 迁移 clusterHandleSlaveMigration:如果有孤立的 Master 节点,Slave 个数 >= 2,当前 Slave 节点属于其中,则执行 Slave 迁移。详见下文「处理 Slave 迁移」章节。

集群状态更新 clusterUpdateState

  1. 重启延迟:当重启节点后,如果一切状态看上去都正常也不会立即恢复节点状态,留一段时间让其重新配置这个节点,也就是重新与其他节点通讯确认真的可用了(CLUSTER_WRITABLE_DELAY 2000)。
  2. Slot 覆盖检查:如果要求全部 Slot 覆盖时节点才可用,则检查 Slot 是否已经全部分派,并且 Slot 对应的节点也需要是可用状态,不能是 CLUSTER_NODE_FAIL(配置项为 server.cluster_require_full_coverage)。如果不是全部可用,则标记当前节点状态为不可用(CLUSTER_FAIL)。
  3. 节点统计:计算集群节点总数 server.cluster->size(负责 Slot 的 Master 节点,这里不包括 Slave 节点)与可用节点数(不能是这 2 种状态 CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)。
  4. 少数派保护:如果可用节点数小于总节点数的一半,则认为当前节点与其他节点隔离,属于少数派,标记当前节点状态为不可用(CLUSTER_FAIL)。
  5. 恢复延时:如果节点从不可用中恢复到正常状态,并且之前是属于少数派,则延时恢复,延时时间为 server.cluster_node_timeout(如不在这个范围,则调整到边界值 CLUSTER_MIN_REJOIN_DELAY <= t <= CLUSTER_MAX_REJOIN_DELAY)。这项限制也是为了使其节点持续稳定后再加入集群,以免数据在节点间不停的迁移。

集群消息处理

消息类型

截止到目前为止,消息类型总共有 10 种:

#define CLUSTERMSG_TYPE_PING 0          /* Ping */
#define CLUSTERMSG_TYPE_PONG 1          /* Pong (reply to Ping) */
#define CLUSTERMSG_TYPE_MEET 2          /* Meet "let's join" message */
#define CLUSTERMSG_TYPE_FAIL 3          /* Mark node xxx as failing */
#define CLUSTERMSG_TYPE_PUBLISH 4       /* Pub/Sub Publish propagation */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* May I failover? */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6     /* Yes, you have my vote */
#define CLUSTERMSG_TYPE_UPDATE 7        /* Another node slots configuration */
#define CLUSTERMSG_TYPE_MFSTART 8       /* Pause clients for manual failover */
#define CLUSTERMSG_TYPE_MODULE 9        /* Module cluster API message. */
#define CLUSTERMSG_TYPE_COUNT 10        /* Total number of message types. */

消息结构

  1. 消息体使用:这 10 种消息中,使用到消息体(clusterMsgData)的又可以分为 PINGFAILPUBLISHUPDATEMODULE 5 种大类。其中 MEET/PONGPING 归为同一类,其他消息只使用到了消息头。
  2. 结构定义:消息头的类型为 struct clusterMsg,消息体的类型为 union clusterMsgDataclusterMsgData 中包含不同类型消息的消息数据,clusterMsg 的最后一个字段 data 即是 clusterMsgData。在源码中称 clusterMsghdrclusterMsgDatahdr->datahdr 也就是 header 的简写,代表消息头的意思。
  3. Gossip 信息:只有 PING 类型(包括 PING/MEET/PONG)的消息会携带 Gossip 信息,也就是集群其他节点的信息(ip / port / send_ping / pong_received 等),用来传播这些集群节点信息,达到让所有节点都保存了全部节点信息,用于后续的投票选举,故障转移,最终达到高可用。
  4. 消息头字段:每个消息都会带有消息头,其中包含:

    • hdr->ver:集群协议版本。
    • hdr->sig[4]:"RCmb",标记这个消息是属于 Redis Cluster 消息,其中 RCmb 是指 Redis Cluster message bus。
    • hdr->type:消息类型,即上面提到的 10 种消息中的其中一种。
    • hdr->sender:当前节点的名称 Node ID。
    • hdr->myip:如果有指定 server.cluster_announce_ip 则使用,否则设置为 0。
    • hdr->myslots:为 master->slots
    • hdr->slaveof:如果为 Slave 节点,则为对应 Master 的名称 Node ID。
    • hdr->port:如果有设置 server.cluster_announce_port,则使用,否则使用 server.port
    • hdr->cport:如果有设置 server.cluster_announce_bus_port,则使用,否则使用 server.port + 10000
    • hdr->flagshtons(myself->flags) 当前节点的所有标识。
    • hdr->stateserver.cluster->state 集群状态。
    • hdr->currentEpochhtonu64(server.cluster->currentEpoch) 当前纪元。在发送选举请求时,currentEpoch 将会在 configEpoch 的基础上 + 1,标识一个新的纪元开始。纪元也就是周期的意思,也相当于版本号,有变化时递增。收消息方一旦发现收到的 epoch 与自己保存的 epoch 不同,说明发生了变化,进而执行对应的逻辑。也就是通过这种方式来确保集群所有节点间的最终一致性。
    • hdr->configEpochhtonu64(master->configEpoch) 节点纪元。
    • hdr->offset:如果是 Slave 节点则为 server.master->reploff,如果是 Master 节点则为 server.master_repl_offset
    • hdr->mflags:如果是 Master 节点,并且在手动故障转移,则设置标记 hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED
    • hdr->totlen:消息总长度。由于部分消息有消息体,并且消息体的长度与其定义的长度可能不同(比如 PING 类型消息),所以这个消息的总长度将有一个计算的过程。不过当消息中不需要消息体时,这里的 totlen 将会减去 clusterMsgData 的长度;如果需要消息体时,也会减去 clusterMsgData,然后再根据不同的消息类型增加上对应消息体的长度。这里消息体使用了 union 类型,也是为了灵活的控制消息的长度。
    • 注意:以上这些信息都是关于当前节点的信息,而并不是要发送的目标节点的信息,也就是说每个消息中都会携带关于当前节点的信息。如果当前节点为 Slave 节点,在发送消息时为了更加准确,会使用其对应 Master 节点的部分信息(Slots 槽信息,configEpoch 纪元)。

消息发送逻辑

1. 发送 PING (MEET/PING/PONG)

  • 流程:新加入节点时,将发送 MEET 消息,对方节点回复 PONG;建立连接后定期的心跳检测是通过发送 PING 消息,对方节点回复 PONG 消息来实现。
  • Gossip 节点选择

    • 计算 freshnodes 个数,即总节点数 - 2(这 2 个是指自己与要发送消息的目标节点),也就是说,消息中包含的节点数最多也不会超过这个数,否则就是有重复了。
    • 携带的节点分为 2 种:一种是正常节点,一种是 PFAIL 状态的节点(可能失败的节点,即当前节点没有在超时时间内收到 PONG 回复)。
    • 正常节点数(wanted)的计算方式是总节点数的 1/10,PFAIL 节点数(pfail_wanted)是全部的 PFAIL 节点。这 2 种节点最后在消息体中是一个列表,每次在添加时检测是否已经存在其列表来保证最终这个列表中的节点是不重复的。wanted 最小值为 3,最大值为 freshnodes
    • 如果是 PING 类型消息,保存 PING 消息发送时间,用于验证之后收到的 PONG 消息是否会超时(link->node->ping_sent = mstime())。
    • 循环 N 次,取 1/10 个正常节点,因此每次都会随机在列表中取一个,循环最大次数 N = wanted * 3
    • 正常节点列表排除myself(自己)、CLUSTER_NODE_PFAIL 状态的节点、CLUSTER_NODE_HANDSHAKE 状态的节点、CLUSTER_NODE_NOADDR 状态的节点、连接为空且 Slot 数为 0 的节点。
    • 可能失败的节点(PFAIL)列表排除CLUSTER_NODE_HANDSHAKE 状态的节点、CLUSTER_NODE_NOADDR 状态的节点。并且必须有 CLUSTER_NODE_PFAIL 标记。
  • 长度计算

    • 有 2 个计算消息总长度的过程,一个是预分配的,一个是实际发送的消息长度。消息头是固定的长度,消息体根据消息类型及携带的集群节点个数不同而不同。
    • 预分配是提前根据 wanted + pfail_wanted 乘以单个 gossip 的长度计算,但是实际可能没有取到这么多节点,所以长度可能不同。所以在实际发送消息时将根据实际的节点数再进行计算一次。
    • 在初始结构中 gossip 数组长度为 1(clusterMsgDataGossip gossip[1]),所以在上一步计算总长度时,需要把这个长度减掉,然后再加上节点个数乘以 gossip 占用的长度。这里也有一些技巧,定义为数组长度为 1,可以使用它的首地址为指针的特性,再者这个长度在定义时是确定的,后续的计算过程方便运算。
    • 在消息中包括携带的节点个数与消息总长度。
    • 关于为什么选择 1/10 个节点,这个单独另行讨论。

2. 发送 FAIL 消息

  1. 当在 A 节点中设置 B 节点为 FAIL 状态时,将发送消息给其他所有节点。只有在设置 FAIL 状态时会发送,如果状态未改变时,不会再次重复发送。
  2. FAIL 消息,在消息体中包含有 FAIL 节点的名称。

3. 发送 PUBLISH 消息

  1. 当客户端发送 PUBLISH 命令时,接收消息的节点要把这个消息发送到其他所有节点(调用函数 clusterBroadcastMessage)。
  2. 消息体将会包含这么几个信息:

    uint32_t channel_len
    uint32_t message_len
    unsigned char bulk_data[8]
  3. channelmessage 的实际内容会拼接在 bulk_data 中。

4. 发送 FAILOVER_AUTH_REQUEST

  1. 发送选举请求,请求其他节点选举自己执行故障转移。
  2. currentEpoch 将会自增 1。
  3. 这个请求会发送给所有可用的节点,包括 Master 与 Slave 节点,但是只 Master 会回复,所以参与投票的节点只有 Master。
  4. 这个消息只有消息头,没有对应的消息体。
  5. 如果在手动故障转移,将携带 hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK

5. 发送 FAILOVER_AUTH_ACK 消息

  1. 回复选举请求,同意、或者不同意。
  2. 这消息只有消息头,没有对应的消息体。

6. 发送 UPDATE 消息

  1. UPDATE 消息,用来发送更新消息。比如:B 节点收到 A 节点发送的 PING/PONG 时,如果 B 中保存的 Slot 关系与收到的消息中的 Slot(A 负责的 Slot)不同,进而将展开不同情况的处理。

    • 如果 B 节点保存的某个 Slot 对应的 configEpoch 大于消息中携带的 A 节点的 configEpoch,则认为 A 的 Slot 消息过旧,给 A 发送 UPDATE 消息,其中包含对应这个 Slot 的节点信息。
    • 如果认为 A 的 Slot 消息较新,则更新 B 节点对应的 Slot 信息(通过调用函数 clusterUpdateSlotsConfigWith)。
  2. 消息体格式:

    uint64_t configEpoch; /* Config epoch of the specified instance. */
    char nodename[CLUSTER_NAMELEN]; /* Name of the slots owner. */
    unsigned char slots[CLUSTER_SLOTS/8]; /* Slots bitmap. */

7. 发送 MFSTART 消息

  1. MFSTARTmanual start,代表手动故障转移。
  2. 是在执行了命令 CLUSTER FAILOVER 命令,并且不带任何参数的情况下,由当前的 Slave 节点发送给其 Master 节点。
  3. 这个消息也是只要消息头,没有对应的消息体。

8. 发送 MODULE 消息

  • 模块集群消息 API。

消息接收逻辑

接收消息后的处理(这个视角与之前说到的视角不同,这个是接收者的视角,所以其中“当前节点”所指的对象是不同的)。

1. 统一处理函数 clusterProcessPacket

  1. 这么多种消息类型的接收处理都放到了一个统一的函数中 clusterProcessPacket
  2. 这个函数的主要实现流程分为了这么几部分:

    • 第一,处理 MEET/PING/PONG 消息,这也是代码量最大的逻辑块。
    • 第二,CLUSTERMSG_TYPE_FAIL 类型的消息。
    • 第三,CLUSTERMSG_TYPE_PUBLISH 类型的消息。
    • 第四,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 类型的消息。
    • 第五,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 类型的消息。
    • 第六,CLUSTERMSG_TYPE_MFSTART 类型的消息。
    • 第七,CLUSTERMSG_TYPE_UPDATE 类型的消息。
    • 第八,CLUSTERMSG_TYPE_MODULE 类型的消息。
  3. 这里先记录下函数开始通用的一些处理,然后单独分节说明每一部分消息。
  4. 把接收到的数据通过强制类型转换,直接转换为 clusterMsg 类型 (clusterMsg *hdr = (clusterMsg*) link->rcvbuf),这样后续的逻辑就直接操作 hdr 指针,非常方便,同时也易于理解。
  5. 通过 type < CLUSTERMSG_TYPE_COUNT 来验证接收的消息类型是否属于正确的类型,如果是,server.cluster->stats_bus_messages_received[type]++
  6. 消息完整性检测,totlen 小于 16 或者大于实际接收到的 buf 长度 sdslen(link->rcvbuf) 均认为有问题,直接返回。
  7. 通讯协议版本检测,如果不同,直接返回。不能处理不同版本间的消息。
  8. 然后根据不同类型的消息计算期望的消息长度,如果与实际发送的 totlen 的值不同,则也认为消息有问题,直接返回。
  9. 根据 hdr->sender 为发送者的 Node ID,在当前节点已知的节点列表(server.cluster->nodes)中查找,如果找到则认为这个节点已经是集群中的已知节点,则信任其发送过来的数据。当然也是允许新加入节点的,也就是这里会找不到。
  10. 如果是已知节点,握手已经完成,则:

    • 判断 epoch,如果发送者携带的 epoch 大于当前节点的 epoch,则把当前节点的 epoch 值更新为最新的值。currentEpochconfigEpoch 都是。
    • 如果当前节点是 Slave 节点,正在执行手动故障转移,发送者为对应的 Master 节点,消息 mflag 包含 CLUSTERMSG_FLAG0_PAUSED 标记,server.cluster->mf_master_offset == 0,这时更新 mf_master_offset 的值为 sender->repl_offset。(在手动故障转移的过程中其中有一个过程是 Master 会暂停接收客户端请求,等待正在故障转移的 Slave 节点同步完自己的所有数据,这个变量 mf_master_offset 就是用来记录这个偏移量,在手动故障转移逻辑中会根据这个偏移量与当前实际同步的偏移量 server.master->reploff 比较,来验证是否已经完成数据同步)。
  11. 如果当前节点 server.cluster_announce_ip 未设定,并且 myself->ip 为空时,则通过 getsockname 函数来获取当前节点的 ip,如果获取到的值与 myself->ip 不同,则更新 myself->ip 为新值。
  12. 如果是 MEET 消息,并且发送者未找到,则创建新节点,并添加到 server.cluster->nodes 列表中。其中 flag 为 CLUSTER_NODE_HANDSHAKE,也就是设置为握手阶段。
  13. 如果是 MEET 消息,并且发送者未找到,则执行 clusterProcessGossipSection,处理 gossip 部分。这是由于消息 MEET 消息的特殊性。(这部分将单独展开)。
  14. 如果是 PING/MEET 消息,则回复 PONG 消息。

2. 接收 MEET/PING/PONG 消息的处理

  1. link->node 是什么? 目前能理解到的是主动建立连接的一方才会正确设置这个值,监听接收消息的一方这个值为 NULL。也许是用来区分这 2 种情况的吧。
  2. 如果 link->node 有值(主动建立连接方)

    • 如果在握手状态 (nodeInHandshake(link->node)),并且发送者是已知的节点:

      • 先会更新已知节点的信息(如果 ip/port 有变化,link 相同认为无变化)。
      • 然后则删除 link->node 这个节点的所有相关的信息,包括节点自身的信息(包括 slot / importing_slots_from / migrating_slots_to / node->fail_reports 等等)。
      • 清理完之后,此函数就返回,不执行后续逻辑。
      • 关键点sender 是根据收到的消息头中的 hdr->sender(Node ID)在 server.cluster->nodes 列表中查找到的节点,而 link->node 是与当前连接关联的节点,可能在大多数情况是相同的。但是在最初节点 A 向节点 B 发送 MEET 消息时,并不知道节点 B 的 Node ID 所以会随机生成一个,在 A 收到 B 的回复 PONG 时,B 会携带自己真实的 Node ID,这时这 2 个值就不同了。如果相同就表明,这个节点 B 已经在节点 A 的已知列表,相当于实际上的一个节点,存在了 2 份,重复了,就执行删除正在链接的节点。
      • 疑问解答:已经在 A 的已知列表,为啥还要再向 B 发送 MEET 消息?这就要说起 CLUSTER MEET 命令,带有相同 ip / port 的 MEET 命令可以多次执行。但是如果上一个 MEET 命令已经与对应节点建立连接完成,这个 MEET 消息就会重复发送,就是这里的情况。如果上一个 MEET 命令与对应的节点还在握手状态,则会立即返回,忽略这条 MEET 命令,这种情况 MEET 消息就不会重复发送。还有一种情况也可能会出现这样的问题,在执行了 CLUSTER MEET 命令后,与实际发送 MEET 消息给对应节点这中间是有时间间隔的,在这个时间窗口,当前节点可能会收到其他节点已经把这个节点携带过来了,这时也就重复了,所以这里的这个排重逻辑还是少不了。
    • 如果在握手状态,发送者是未知节点:这应该就是正常的收到 PONG 回复的情况了。重命名节点的名称(node->name 也就是 Node ID)。移除 CLUSTER_NODE_HANDSHAKE 标记。增加 CLUSTER_NODE_MASTERCLUSTER_NODE_SLAVE 标记。因为不知道是 Slave 还是 Master,所以都加上,后续再移除。
    • 如果不是握手状态:那么就比较连接的节点名称与消息中包含的节点名称是否相同,如果不同,则断开这个连接,link->node->ip / link->node->port / link->node->cport 设置为 0 值,link->node->flags 增加 CLUSTER_NODE_NOADDR 标记。然后函数就立即返回了。
  3. 如果发送者是已知节点(sender 有值),同步消息中的 CLUSTER_NODE_NOFAILOVER 标记。
  4. 如果发送者是已知节点(sender 有值),不是握手状态,并且是 PING 消息,则如果有必要的话更新节点信息(ip / port / cport)。
  5. 如果 link->node 有值,并且是 PONG 消息时,这是一个正常的 ping <-> pong 流程的回复。

    • 设置 pong 接收时间 link->node->pong_received = mstime(),将会根据这个时间,时间越小,会优先再次发送 ping 消息来检测节点状态。
    • 设置 ping 发送时间 link->node->ping_sent = 0,设置为 0 表明之前发送的 ping 已经收到 pong 回复,说明节点正常,否则将会根据这个 ping_sent 的时间来判断是否超时。
    • 收到 pong 回复,说明节点正常。如果之前有标记 PFAIL(可能失败),则移除 link->node->flags &= ~CLUSTER_NODE_PFAIL。如果之前有标记 FAIL(失败),满足一定条件则移除这个标记,并不总是直接移除,Slave 节点与 Master 节点的处理逻辑不同。
  6. 角色转换处理:如果是已知节点(sender 有值),接下来将处理 Slave -> Master 或者 Master -> Slave 角色转换的情况。

    • 根据 hdr->slaveof 来判断发送者角色是 Master 还是 Slave,如果为空,则认为是 Master 节点。
    • 如果当前节点已经保存的角色与发送者最新的角色不同,则认为是发生了角色变化。如果同样是 Slave,但是对应的 Master 节点不同的话也同样需要更新。因此分为了 4 种情况:a. Slave -> Master, b. Master -> Slave, c. 一直是 Slave,但是对应的 Master 节点不同,d. 无变化。但是在这里代码结构稍有不同,是按照最新的状态的不同而分开处理。如下:

      • 最新是 Master 角色

        • Slave -> Master:需要把原来 Master 的 Slave 列表中删除这个节点,添加 CLUSTER_NODE_MIGRATE_TO 标记。
        • 清除 CLUSTER_NODE_SLAVE 标记。
        • 添加 CLUSTER_NODE_MASTER 标记。
        • 清除 slaveofn->slaveof = NULL
      • 最新是 Slave 角色

        • Master -> Slave:删除 server.cluster->slots 中对应的这个节点。移除 CLUSTER_NODE_MASTER CLUSTER_NODE_MIGRATE_TO 标记。添加 CLUSTER_NODE_SLAVE 标记。
        • 对应的 Master 节点不同:需要把原来 Master 的 Slave 列表中删除这个节点,然后配置新的主从关系。
    • 通过上面 2、3 的 CLUSTER_NODE_SLAVECLUSTER_NODE_MASTER 标记的处理,最后只保留了其中一个标记,也就是要么是 Slave、要么是 Master。
  7. 更新 Slot 信息

    • 当然这一步也需要发送者是已知节点,并且是在设置 Master / Slave 状态之后。这是因为这一步会使用到 Master / Slave 状态,如果在这之前发送者消息中声称的状态与当前节点已经保存的状态不同,则无法正确处理。
    • 发送者消息中声称的 Slot 信息与当前节点已经保存的这个发送者负责的 Slot 不是完全相同。可能是发送者消息中声称的更新,也可能是当前节点已知的更新。两种情况分别处理。
    • 如果发送者是 Master,则使用其下保存的 Slot 信息,如果是 Slave 则使用其对于 Master 的 Slot 信息,这样为的是取到最新的 Slot 信息。
    • 如果消息中包含的 Slot 信息与当前保存的 Slot 信息不同,并且这个 Slot 对应的节点版本更大,则更新当前保存的 Slot 信息(clusterUpdateSlotsConfigWith)。这里只考虑发送者是 Master 的情况,那么 Slave 保存的 Slot 何时更新呢?
    • 如果与上面的情况相反,则发送消息给发送者(UPDATE 类型的消息)。
    • clusterUpdateSlotsConfigWith 函数执行逻辑在其他小节中有提到,这里不再重复。
  8. 解决 Epoch 相同的情况

    • 只有发送者与当前节点都为 Master,并且 configEpoch 相同时才执行此项处理(senderConfigEpoch == myself->configEpoch)。
    • 比较节点名称,节点名称比较大的节点不执行任何处理。因此只有在当前节点的名称小于发送者的名称时才会更新 server.cluster->currentEpoch++myself->configEpoch = server.cluster->currentEpoch
  9. 如果发送者已知,则执行 clusterProcessGossipSection,处理 gossip 部分。(这部分将单独展开)。

3. 接收 FAIL 消息的处理

  1. 发送者未知时,直接返回。
  2. FAIL 消息中会携带一个 FAIL 状态的节点的节点名称,是在消息体中发送过来的。如果这个 FAIL 节点不是已知节点,也将直接返回。
  3. 如果 FAIL 节点不是自己,并且不是 CLUSTER_NODE_FAIL 状态,则:

    • 设置这个节点为失败状态(failing->flags |= CLUSTER_NODE_FAIL)。
    • 设置这个节点的失败时间(failing->fail_time = mstime())。
    • 删除 PFAIL 状态,因为 FAIL 是真正代表失败(failing->flags &= ~CLUSTER_NODE_PFAIL)。

4. 接收 PUBLISH 消息的处理

  1. 接收到 PUBLISH 消息后,解析 channelmessage 字符串,然后使用与普通的 pub/sub 方式相同的处理(调用函数 pubsubPublishMessage),向订阅消息的客户端发送这个最新的消息。
  2. pubsubPublishMessage 函数的实现细节

    • 有 2 个订阅列表,第一个是字典型的 server.pubsub_channels,key 是 channel 名称,values 是订阅这个 channel 的客户端列表。第二个是列表型的 server.pubsub_patterns,每一个条目中包含订阅的 channel 模式及客户端信息。
    • 对于 server.pubsub_channels,先根据 key 找到对应的客户端列表,然后循环发送消息给客户端。
    • 对于 server.pubsub_patterns,只能依次循环遍历,如果模式匹配将发送消息给对应的客户端。
    • 最后返回接收消息的客户端个数。

5. 接收 FAILOVER_AUTH_REQUEST 消息的处理

  1. 如果发送者未知,直接返回(即 senderNULL 时)。
  2. 如果当前节点是 Slave,或者不负责任何 Slot,则不参与投票,直接返回。
  3. 如果发送者当前的 Epoch 小于当前的 Epoch 则认为消息不是最新的,直接拒绝,返回。
  4. 如果最后投票的 Epoch 等于当前的 Epoch,则认为这个 Epoch(纪元、周期)已经投过票,不再重复投票,直接返回。
  5. 提出故障转移的节点必须是 Slave 节点,并且当前节点认为其对应的 Master 节点已经不可用,否则也将直接返回。还有一种是手动故障转移,这时对应的 Master 节点可能还是可用的,所以会通过消息中的 mflag 标记来验证是否是这种情况(force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK),这种情况下也将接着下面的流程,不会直接返回。
  6. 在 2 个集群超时时间范围内,不重复投票。否则直接返回。
  7. 发送者负责的 Slot 要比当前节点保存的这些 Slot 对应的 configEpoch 要大,否则认为消息不是最新的,直接返回。
  8. 然后设置最后投票的 Epoch(server.cluster->lastVoteEpoch = server.cluster->currentEpoch)。
  9. 设置投票时间 node->slaveof->voted_time = mstime()
  10. 最后就是发送 FAILOVER_AUTH_ACK 类型的消息了。发送 ACK 类型消息之前已经有介绍,这里不再重复。

6. 接收 FAILOVER_AUTH_ACK 消息的处理

  • 依然验证发送者是否已知,如果 sender 未知(sender = NULL)则直接返回。
  • 接着验证发送者是否是 Master,并且有负责 Slot(sender->numslots > 0),并且当前的 Epoch 大于等于 auth_epochsenderCurrentEpoch >= server.cluster->failover_auth_epoch)。
  • 如果同时符合这 3 个条件,则增加票数(server.cluster->failover_auth_count++)。
  • 总结:Slave 不参与投票,只有负责的 Slot 数大于 0 的 Master 才参与投票。

7. 接收 MFSTART 消息的处理

  1. MFSTART 消息只有自己的 Slave 才发送消息给其对应的 Master,所有只有 Master 才会接受到这类消息。
  2. 如果发送者不是已知节点(sender = NULL),或者发送者不是自己的 Slave 节点,直接返回。
  3. 重置 MF 相关变量。
  4. 设置 MF 结束时间(server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT)。
  5. 设置 MF Slave(server.cluster->mf_slave = sender)。
  6. 暂停客户端请求,并设置暂停时间(server.clients_paused = 1)。

8. 接收 UPDATE 消息的处理

  1. UPDATE 消息除了统一消息头,还包含 UPDATE 消息体,其中包含一个 update 节点的概要信息(configEpoch 纪元、nodename 节点名称、slots 节点负责的槽信息),消息体中携带的这个节点与发送者是不同的节点,要注意区分。
  2. 涉及到当前节点更新操作,所以发送者必须要是已知节点(sender 不为空,否则直接返回,不执行后续逻辑)。
  3. 根据消息体中携带的 nodename 在当前 server.cluster->nodes 下查找,如果找不到则认为 update 节点还是未知节点,直接返回,不执行后续逻辑。
  4. 再验证这个 update 节点的 epoch,消息体中携带的 configEpoch 与当前已经保存的这个 update 节点的 configEpoch 比较,如果当前保存的 configEpoch 较大,则认为发送的消息中的 update 节点已经不是最新的,不处理后续逻辑,直接返回。
  5. 上面的 3 步检查完之后,我们认为当前节点的信息不是最新的,如果需要就更新。
  6. 消息体中的发送的 update 节点与其对应的 Slot 信息,说明其是 Master 节点。所以如果当前节点保存的是 Slave 状态,则重新设置为 Master 状态。
  7. 更新 Slot 相关信息(clusterUpdateSlotsConfigWith

    • 在这个函数内部 sender 变量就是指 update 节点,这个函数有 2 处被调用,其他一处是发送 update 消息的相反情况被调用,上面有提到过,在这种情况下 sender 变量就是消息发送者节点。
    • 其实涉及到更新 Slot 的情况,就只有 2 种可能:第一,执行过故障转移,有 Slave 提升到 Master,Slot 信息势必要变更;第二,执行过集群 Slot 迁移动作,即 CLUSTER SETSLOT <slot> (IMPORTING | MIGRATION <node ID>) 命令。
    • 获取到当前的 Master 节点,如果当前是 Slave 节点,则取其对应的 Master 节点(curmaster = nodeIsMaster(myself) ? myself : myself->slaveof)。
    • 关于自己负责的 Slot 信息,只有自己有最终发言权,所以对于这个 update 节点等于 myself 节点的情况,统统不理会,也就是说我负责哪些 Slot 自己知道,不需要别人传递给我。
    • 然后就是循环所有的 update 节点负责的 Slot,如果 Slot 对应的节点是 update 节点,则说明没有变化,继续下一个 Slot;如果这个 Slot 正在从别的节点导入,也将继续下一个 Slot。
    • 剩下的情况就是 Slot 对应的节点不同的情况了。如果当前节点保存的这个 Slot 为 NULL,或者当前节点保存的这个 Slot 对应的节点的纪元(configEpoch)比较早,即小于这个 update 节点的纪元,则更新,否则不做任何处理。更新操作包含:删除当前 Slot 信息,把 update 节点设置为当前 Slot 对应的节点。
    • 在上一步的条件之下,我们认为当前节点的 Slot 信息不是最新的,所以对应有别的逻辑处理:

      • 第一,当前 Slot 对应的节点是 myself,也就是自己,并且 Slot 下还有 Key 存在,则认为这个 Slot 是 dirty_slot(有脏数据存在),并且清理其对应的 Key 数据。
      • 第二,当前 Slot 对应的节点是 curmaster,并且 curmaster 负责的 Slot 数为 0 时,修改当前节点的 Master 为 update 节点。
    • 第二点就会处理故障转移中没有完成的步骤。比如,原来的 Master(A 节点)故障,这个 Master 节点下可能有几个 Slave(B 节点、C 节点、D 节点),但是由其下的 Slave(B 节点)晋升为了新的 Master 节点,新的 Master(B 节点)将拥有更大的纪元(configEpoch)。当 B 节点发送 PING/PONG 给 A/C/D 节点时,接收到 B 节点的消息是,这些节点会执行此函数,进而执行此处逻辑,把 A/C/D 节点都设置为 B 节点的 Slave。如果原来的 Master(A 节点)一直没有恢复,也就不会处理它,但是 C/D 节点可能是正常的,然后通过上述逻辑把关于 A 节点的相关 Slot 信息删掉,这时 curmaster 负责的 Slot 数就为 0,这样就顺利的执行了这个过程。这时完整的故障转移将才算是真正完成。

9. 接收 MODULE 消息的处理

  • 模块集群消息 API 处理。

处理 Slave 故障转移 (clusterHandleSlaveFailover)

  1. 选举机制:故障转移是由 Slave 接管其已经产生故障的 Master 节点,这时在这个 Master 下可能有多个可用的 Slave 节点,因此就需要选出其中一个 Slave 来执行这个操作,这个选择的过程就是选举的过程,也就是通过投票来选出。
  2. 法定票数:需要赢得选举,至少需要一半以上的节点同意(负责 Slot 的 Master 节点,这里不包括 Slave 节点),即 needed_quorum = (server.cluster->size / 2) + 1
  3. 超时控制:在发送选举请求,等待投票结果时,有 2 个时间,超时时间 auth_timeout = server.cluster_node_timeout * 2,超时重试时间 auth_retry_time = auth_timeout * 2。也就是说在一次选举超时后,不会立即开始下一次,中间会再间隔一个超时时间。
  4. 执行条件:对于 Master 没有故障,或者此 Slave 异常的节点将不执行这个操作,认为没有理由执行 failover。在手动故障模式下,即使 Master 没有故障也会执行故障转移,相当于把管控权交给了集群管理员,方便在特殊场景下使用。
  5. 数据有效性:如果有设置 server.cluster_slave_validity_factor,则对最后同步时间验证,如果太久,则也认为不能执行 failover。手动故障转移则没有这个限制。默认值是 10。
  6. 选举启动时间:当一个 Master 节点故障,很可能在短时间其对应的好多 Slave 都发起选举请求,因此在初始化一个选举请求发起时间,会先给每个 Slave 节点算出一个不同的开始时间。

    • 大致算法是,按照复制偏移量 reploffset 排名,然后乘以 failover_auth_time,再加一个 500ms 的随机时间,最后加一个固定的 500ms。
    • 最终结果,与 Master 数据最接近的节点将最先开始发起选举请求,相同的偏移量的情况下,根据随机数的情况,避免了同一时间有多个 Slave 节点同时发起。
    • 手动故障转移没有这个限制,会立即开始。
  7. 选举请求发送:如果上述条件满足,将开始一次选举请求,在一次选举结束前,同一个节点不会发出第二次,如果时间太长,到达 auth_timeout,则返回,等待重试,不执行后续逻辑。

    • 如果 server.cluster->failover_auth_sent == 0,也代表没有在执行的选举请求,则发送。

      server.cluster->currentEpoch++;
      server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
      clusterRequestFailoverAuth();
      server.cluster->failover_auth_sent = 1;
  8. 投票验证:在上一步发送完选举请求后,响应结果不会同步返回,所以这次函数调用就返回了。到达下一个时间周期,将会验证所得票数是否大于等于需要的票数,如果满足条件则赢得选举,将执行从 Slave 到 Master 角色的切换。如果选票不足,则继续等待(也许别的节点的结果还没有发送回来),直到到达超时时间 auth_timeout
  9. 消息交互:发送选举请求的详细过程将在上文消息发送部分展开,发送选举请求即发送 FAILOVER_AUTH_REQUEST 类型的消息,消息接收方接着会发送 CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 类型的消息进行回复。等这些参与投票的大多数节点进行了回复之后,就可以执行上面的步骤,计算所支持票数够不够。
  10. 角色切换过程:Slave 到 Master 角色切换过程有这么几步:

    1. 改变角色标记,也就是说从 CLUSTER_NODE_SLAVECLUSTER_NODE_MASTER。删除原来 Master 节点下 Slaves 列表中的当前节点,因为当前节点要变更为 Master 了。
    2. 取消与原来 Master 的复制,重置及释放相关资源。

      • 重置 server.replid,用于 PSYNC
      • 关闭原来的复制连接,并且设置复制状态为关闭 server.repl_state = REPL_STATE_NONE
      • 关闭与 Slave 节点的连接,server.slaves
      • server.slaveseldb = -1,防止在 Master 切换后进行全量同步,而是按照预期使用部分同步。
      • 记录无 Slave 的时间 server.repl_no_slaves_since = server.unixtime(由于一旦 Slave 提升为 Master,在很短时间是没有对应的 Slave,用于统计在复制积压时间)。
    3. Slot 相关的切换,在原来的 Master 删除、在新的节点增加。
    4. 更新集群状态,保存配置,并且强制保存到磁盘 fsync
    5. 发送 PONG 消息给所有节点,广播最新状态。
    6. 重置手动故障转移(Manual Failover)相关变量,清除状态。

处理 Slave 迁移 (Migration)

  1. 只有在 Slave 节点执行。
  2. 不正常的集群状态下,这步将不会执行(server.cluster->state != CLUSTER_OK),直接返回。
  3. 没有对应的 Master 节点时,这步也将不会执行,直接返回。
  4. 计算对应的 Master 节点下可用 Slave 节点个数 okslaves。(可用代表不是这 2 中状态 CLUSTER_NODE_FAIL CLUSTER_NODE_PFAIL)。
  5. 如果 okslaves <= server.cluster_migration_barrier 时,直接返回。迁移界限 cluster_migration_barrier 默认值为 1。
  6. 遍历所有的节点找到孤立的 Master 节点,与候选(Candidate)节点

    • 一个有较多的 Slave 节点的 Master 节点要把超过一定界限的 Slave 节点分派给一些孤立 Master 节点。这时这个 Master 下的多个 Slave 节点可能同时执行这个操作,导致最终自己成了孤立节点。所以会使用一个规则来避免这种情况发生,规则就是这几个 Slave 节点中 Node ID 最小的那个,成为 Candidate 节点。
    • 如果有多个孤立 Master 节点,按顺序优先取到时则把他作为目标(Target)节点,也就是候选节点接下来要跟随的 Master 节点。
    • 如果是孤立 Master 节点就会标记孤立时间 node->orphaned_time = mstime(),如果不是将置 orphaned_time = 0
    • 如果能找到 okslaves 个数与之前统计的 max_slaves 相同的 Master 节点,则取其 Slaves 列表中 Node ID 最小的节点作为 Candidate 节点。否则,myself 节点兜底,也就是当前节点作为 Candidate 节点。
  7. 如果找到 Target 节点,Candidate 节点是当前节点,并且 Target 节点的孤立时长超过 5s,则重新设置当前节点的 Master 为 Target,即完成了一次 Slave 迁移。

处理 Gossip 部分

  1. 在接收到 PING/MEET/PONG 消息时,有 2 种情况会处理:

    • 第一,是 MEET 消息,发送者未知节点。
    • 第二,是 PING/MEET/PONG 消息,但是发送者是已知节点。
  2. Gossip 中包含有 N 个节点的部分信息,然后依次遍历处理,这节点我们就以代号 G 来称呼。
  3. 如果 G 节点不是已知节点时,给这个 G 节点发送握手消息(HANDSHAKE 类型),如果存在继续下面的步骤。
  4. 如果发送者是已知节点,并且是 Master,G 节点不是当前节点,则:

    • 如果是 Gossip 消息中标记的是 CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL 这 2 个中的任一状态,那么:

      • 增加失败汇报记录(clusterNodeAddFailureReport)。
      • 如果需要则标记 G 节点为失败节点(markNodeAsFailingIfNeeded)。
    • 否则,将删除失败汇报记录(clusterNodeDelFailureReport)。
  5. 如果 Gossip 消息中的标记并没有 CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL 中其中任一,当前节点也按时收到了 G 节点的 PONG 回复,并且之前没有节点汇报关于 G 节点的 FAILPFAIL 状态,那么:

    • 如果 Gossip 中包含的 pongtime 大于当前节点保存的关于 G 节点的 PONG 回复时,设置 G 节点的回复时间为 Gossip 中的 pongtime。(也许这是认为既然节点是可用的,就同步最后可用时间,这个在发送 PING 消息时会有用到,pong_received 越小的节点将优先发送 PING 消息)。
  6. 如果当前节点认为是 FAILPFAIL,但是 Gossip 消息中并没有包含 FAIL PFAIL 状态,当前节点保存的 G 节点的 ip / port 与 Gossip 消息中的不是完全相同时,将断开当前节点与 G 节点的连接,并且重新设置 G 节点的 ip/port 为 Gossip 消息中的 ip/port,同时也取消 CLUSTER_NODE_NOADDR 标记。
  7. 在发送握手消息时,也会有一些额外的条件,发送者需要是已知节点,Gossip 中有关于 G 节点的 ip/port 信息,G 节点不在黑名单中。
  8. clusterNodeAddFailureReport 函数内部实现

    • 在节点列表中的每个节点下都有一个 fail_reports,记录哪些节点认为这个节点失败。
    • 遍历 fail_reports 列表,如果上报节点已经存在,则重新设置上报时间为当前节点。
    • 如果不存在时,添加上报节点与上报时间到 fail_reports 列表中。
    • 上面提到的删除失败上报记录与这个函数逻辑正好相反,将不再展开。
  9. markNodeAsFailingIfNeeded 函数内部实现

    • 计算需要的票数(int needed_quorum = (server.cluster->size / 2) + 1)。
    • 如果当前节点还没有把 G 节点标记 PFAIL 状态,则直接返回。(说明当前节点还可以连接 G 节点,不认为是 G 是失败节点)。
    • 如果当前节点已经把 G 节点已经标记为了 FAIL 状态,也将直接返回,认为这个过程已经处理过了,不重复处理。
    • 计算对节点 G 的失败上报次数 failures,也就是计算 fail_reports 列表的长度。
    • 如果当前节点是 Master,则增加一个失败票数(failures++)。
    • 如果失败票数小于需要的票数,则直接返回(failures < needed_quorum)。
    • 到此处时,说明节点是失败节点了,票数也通过,当前节点连接也超时,那么设置这个节点的失败状态:

      node->flags &= ~CLUSTER_NODE_PFAIL
      node->flags |= CLUSTER_NODE_FAIL
      node->fail_time = mstime()
    • 如果当前节点是 Master 节点,则发送 FAIL 类型消息(包含这个失败节点的信息)给其他所有节点。

说明:本文基于 Redis Cluster 内部机制整理,主要适用于 Redis 3.x 至 6.x 版本的核心逻辑。Redis 7.x 及后续版本可能在部分内部实现细节或配置项上有所调整,请以官方源码及文档为准。