diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 204d0d3fcf..4fa2926d5e 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -2925,18 +2925,18 @@ int clusterProcessPacket(clusterLink *link) { return 1; } - clusterMsg *sender_message = (clusterMsg *)link->rcvbuf; - uint16_t type = ntohs(sender_message->type); + clusterMsg *hdr = (clusterMsg *)link->rcvbuf; + uint16_t type = ntohs(hdr->type); mstime_t now = mstime(); - uint16_t flags = ntohs(sender_message->flags); + uint16_t flags = ntohs(hdr->flags); uint64_t sender_claimed_current_epoch = 0, sender_claimed_config_epoch = 0; - clusterNode *sender = getNodeFromLinkAndMsg(link, sender_message); - int sender_claims_to_be_primary = !memcmp(sender_message->replicaof, CLUSTER_NODE_NULL_NAME, CLUSTER_NAMELEN); + clusterNode *sender = getNodeFromLinkAndMsg(link, hdr); + int sender_claims_to_be_primary = !memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, CLUSTER_NAMELEN); int sender_was_replica = sender && nodeIsReplica(sender); int sender_was_primary = sender && nodeIsPrimary(sender); - if (sender && (sender_message->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) { + if (sender && (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) { sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED; } @@ -2948,8 +2948,8 @@ int clusterProcessPacket(clusterLink *link) { if (sender && !nodeInHandshake(sender)) { /* Update our currentEpoch if we see a newer epoch in the cluster. */ - sender_claimed_current_epoch = ntohu64(sender_message->currentEpoch); - sender_claimed_config_epoch = ntohu64(sender_message->configEpoch); + sender_claimed_current_epoch = ntohu64(hdr->currentEpoch); + sender_claimed_config_epoch = ntohu64(hdr->configEpoch); if (sender_claimed_current_epoch > server.cluster->currentEpoch) server.cluster->currentEpoch = sender_claimed_current_epoch; /* Update the sender configEpoch if it is a primary publishing a newer one. */ @@ -2958,12 +2958,12 @@ int clusterProcessPacket(clusterLink *link) { clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG); } /* Update the replication offset info for this node. */ - sender->repl_offset = ntohu64(sender_message->offset); + sender->repl_offset = ntohu64(hdr->offset); sender->repl_offset_time = now; /* If we are a replica performing a manual failover and our primary * sent its offset while already paused, populate the MF state. */ if (server.cluster->mf_end && nodeIsReplica(myself) && myself->replicaof == sender && - sender_message->mflags[0] & CLUSTERMSG_FLAG0_PAUSED && server.cluster->mf_primary_offset == -1) { + hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED && server.cluster->mf_primary_offset == -1) { server.cluster->mf_primary_offset = sender->repl_offset; clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER); serverLog(LL_NOTICE, @@ -3004,9 +3004,9 @@ int clusterProcessPacket(clusterLink *link) { clusterNode *new_sender_node; new_sender_node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE); - serverAssert(nodeIp2String(new_sender_node->ip, link, sender_message->myip) == C_OK); - getClientPortFromClusterMsg(sender_message, &new_sender_node->tls_port, &new_sender_node->tcp_port); - new_sender_node->cport = ntohs(sender_message->cport); + serverAssert(nodeIp2String(new_sender_node->ip, link, hdr->myip) == C_OK); + getClientPortFromClusterMsg(hdr, &new_sender_node->tls_port, &new_sender_node->tcp_port); + new_sender_node->cport = ntohs(hdr->cport); clusterAddNode(new_sender_node); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } @@ -3014,7 +3014,7 @@ int clusterProcessPacket(clusterLink *link) { /* If this is a MEET packet from an unknown node, we still process * the gossip section here since we have to trust the sender because * of the message type. */ - if (!sender && type == CLUSTERMSG_TYPE_MEET) clusterProcessGossipSection(sender_message, link); + if (!sender && type == CLUSTERMSG_TYPE_MEET) clusterProcessGossipSection(hdr, link); /* Anyway reply with a PONG */ clusterSendPing(link, CLUSTERMSG_TYPE_PONG); @@ -3040,7 +3040,7 @@ int clusterProcessPacket(clusterLink *link) { "Handshake: we already know node %.40s (%s), " "updating the address if needed.", sender->name, sender->human_nodename); - if (nodeUpdateAddressIfNeeded(sender, link, sender_message)) { + if (nodeUpdateAddressIfNeeded(sender, link, hdr)) { clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE); } /* Free this node as we already have it. This will @@ -3051,12 +3051,12 @@ int clusterProcessPacket(clusterLink *link) { /* First thing to do is replacing the random name with the * right node name if this was a handshake stage. */ - clusterRenameNode(link->node, sender_message->sender); + clusterRenameNode(link->node, hdr->sender); serverLog(LL_DEBUG, "Handshake with node %.40s completed.", link->node->name); link->node->flags &= ~CLUSTER_NODE_HANDSHAKE; link->node->flags |= flags & (CLUSTER_NODE_PRIMARY | CLUSTER_NODE_REPLICA); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); - } else if (memcmp(link->node->name, sender_message->sender, CLUSTER_NAMELEN) != 0) { + } else if (memcmp(link->node->name, hdr->sender, CLUSTER_NAMELEN) != 0) { /* If the reply has a non matching node ID we * disconnect this node and set it as not having an associated * address. */ @@ -3090,7 +3090,7 @@ int clusterProcessPacket(clusterLink *link) { /* Update the node address if it changed. */ if (sender && type == CLUSTERMSG_TYPE_PING && !nodeInHandshake(sender) && - nodeUpdateAddressIfNeeded(sender, link, sender_message)) { + nodeUpdateAddressIfNeeded(sender, link, hdr)) { clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE); } @@ -3124,7 +3124,7 @@ int clusterProcessPacket(clusterLink *link) { } else { /* Node is a replica. */ clusterNode *sender_claimed_primary_node = - clusterLookupNode(sender_message->replicaof, CLUSTER_NAMELEN); + clusterLookupNode(hdr->replicaof, CLUSTER_NAMELEN); if (sender_was_primary) { /* Primary turned into a replica! Reconfigure the node. */ @@ -3200,14 +3200,14 @@ int clusterProcessPacket(clusterLink *link) { if (sender && sender_claims_to_be_primary && (sender_was_replica || - memcmp(sender->slots, sender_message->myslots, sizeof(sender_message->myslots)))) { + memcmp(sender->slots, hdr->myslots, sizeof(hdr->myslots)))) { /* Make sure CLUSTER_NODE_PRIMARY has already been set by now on sender */ serverAssert(nodeIsPrimary(sender)); /* 1) If the sender of the message is a primary, and we detected that * the set of slots it claims changed, scan the slots to see if we * need to update our configuration. */ - clusterUpdateSlotsConfigWith(sender, sender_claimed_config_epoch, sender_message->myslots); + clusterUpdateSlotsConfigWith(sender, sender_claimed_config_epoch, hdr->myslots); /* 2) We also check for the reverse condition, that is, the sender * claims to serve slots we know are served by a primary with a @@ -3228,7 +3228,7 @@ int clusterProcessPacket(clusterLink *link) { * do it. In this way A will stop to act as a primary (or can try to * failover if there are the conditions to win the election). */ for (int j = 0; j < CLUSTER_SLOTS; j++) { - if (bitmapTestBit(sender_message->myslots, j)) { + if (bitmapTestBit(hdr->myslots, j)) { if (server.cluster->slots[j] == sender || isSlotUnclaimed(j)) continue; if (server.cluster->slots[j]->configEpoch > sender_claimed_config_epoch) { serverLog(LL_VERBOSE, @@ -3267,17 +3267,17 @@ int clusterProcessPacket(clusterLink *link) { /* Get info from the gossip section */ if (sender) { - clusterProcessGossipSection(sender_message, link); - clusterProcessPingExtensions(sender_message, link); + clusterProcessGossipSection(hdr, link); + clusterProcessPingExtensions(hdr, link); } } else if (type == CLUSTERMSG_TYPE_FAIL) { clusterNode *failing; if (sender) { - failing = clusterLookupNode(sender_message->data.fail.about.nodename, CLUSTER_NAMELEN); + failing = clusterLookupNode(hdr->data.fail.about.nodename, CLUSTER_NAMELEN); if (failing && !(failing->flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_MYSELF))) { - serverLog(LL_NOTICE, "FAIL message received from %.40s (%s) about %.40s (%s)", sender_message->sender, - sender->human_nodename, sender_message->data.fail.about.nodename, + serverLog(LL_NOTICE, "FAIL message received from %.40s (%s) about %.40s (%s)", hdr->sender, + sender->human_nodename, hdr->data.fail.about.nodename, failing->human_nodename); failing->flags |= CLUSTER_NODE_FAIL; failing->fail_time = now; @@ -3285,8 +3285,8 @@ int clusterProcessPacket(clusterLink *link) { clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE); } } else { - serverLog(LL_NOTICE, "Ignoring FAIL message from unknown node %.40s about %.40s", sender_message->sender, - sender_message->data.fail.about.nodename); + serverLog(LL_NOTICE, "Ignoring FAIL message from unknown node %.40s about %.40s", hdr->sender, + hdr->data.fail.about.nodename); } } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) { if (!sender) return 1; /* We don't know that node. */ @@ -3298,17 +3298,17 @@ int clusterProcessPacket(clusterLink *link) { * Pub/Sub subscribers. */ if ((type == CLUSTERMSG_TYPE_PUBLISH && serverPubsubSubscriptionCount() > 0) || (type == CLUSTERMSG_TYPE_PUBLISHSHARD && serverPubsubShardSubscriptionCount() > 0)) { - channel_len = ntohl(sender_message->data.publish.msg.channel_len); - message_len = ntohl(sender_message->data.publish.msg.message_len); - channel = createStringObject((char *)sender_message->data.publish.msg.bulk_data, channel_len); - message = createStringObject((char *)sender_message->data.publish.msg.bulk_data + channel_len, message_len); + channel_len = ntohl(hdr->data.publish.msg.channel_len); + message_len = ntohl(hdr->data.publish.msg.message_len); + channel = createStringObject((char *)hdr->data.publish.msg.bulk_data, channel_len); + message = createStringObject((char *)hdr->data.publish.msg.bulk_data + channel_len, message_len); pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD); decrRefCount(channel); decrRefCount(message); } } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) { if (!sender) return 1; /* We don't know that node. */ - clusterSendFailoverAuthIfNeeded(sender, sender_message); + clusterSendFailoverAuthIfNeeded(sender, hdr); } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) { if (!sender) return 1; /* We don't know that node. */ /* We consider this vote only if the sender is a primary serving @@ -3342,10 +3342,10 @@ int clusterProcessPacket(clusterLink *link) { clusterSendPing(link, CLUSTERMSG_TYPE_PING); } else if (type == CLUSTERMSG_TYPE_UPDATE) { clusterNode *n; /* The node the update is about. */ - uint64_t reportedConfigEpoch = ntohu64(sender_message->data.update.nodecfg.configEpoch); + uint64_t reportedConfigEpoch = ntohu64(hdr->data.update.nodecfg.configEpoch); if (!sender) return 1; /* We don't know the sender. */ - n = clusterLookupNode(sender_message->data.update.nodecfg.nodename, CLUSTER_NAMELEN); + n = clusterLookupNode(hdr->data.update.nodecfg.nodename, CLUSTER_NAMELEN); if (!n) return 1; /* We don't know the reported node. */ if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */ @@ -3358,15 +3358,15 @@ int clusterProcessPacket(clusterLink *link) { /* Check the bitmap of served slots and update our * config accordingly. */ - clusterUpdateSlotsConfigWith(n, reportedConfigEpoch, sender_message->data.update.nodecfg.slots); + clusterUpdateSlotsConfigWith(n, reportedConfigEpoch, hdr->data.update.nodecfg.slots); } else if (type == CLUSTERMSG_TYPE_MODULE) { if (!sender) return 1; /* Protect the module from unknown nodes. */ /* We need to route this message back to the right module subscribed * for the right message type. */ - uint64_t module_id = sender_message->data.module.msg.module_id; /* Endian-safe ID */ - uint32_t len = ntohl(sender_message->data.module.msg.len); - uint8_t type = sender_message->data.module.msg.type; - unsigned char *payload = sender_message->data.module.msg.bulk_data; + uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */ + uint32_t len = ntohl(hdr->data.module.msg.len); + uint8_t type = hdr->data.module.msg.type; + unsigned char *payload = hdr->data.module.msg.bulk_data; moduleCallClusterReceivers(sender->name, module_id, type, payload, len); } else { serverLog(LL_WARNING, "Received unknown packet type: %d", type);