Skip to content

Commit

Permalink
Revert sender_message to hdr
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Xie <pingxie@google.com>
  • Loading branch information
PingXie committed Jul 11, 2024
1 parent ef8970a commit bb5f48f
Showing 1 changed file with 41 additions and 41 deletions.
82 changes: 41 additions & 41 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2925,18 +2925,18 @@ int clusterProcessPacket(clusterLink *link) {
return 1;
}

clusterMsg *sender_message = (clusterMsg *)link->rcvbuf;
uint16_t type = ntohs(sender_message->type);
clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
uint16_t type = ntohs(hdr->type);
mstime_t now = mstime();

uint16_t flags = ntohs(sender_message->flags);
uint16_t flags = ntohs(hdr->flags);
uint64_t sender_claimed_current_epoch = 0, sender_claimed_config_epoch = 0;
clusterNode *sender = getNodeFromLinkAndMsg(link, sender_message);
int sender_claims_to_be_primary = !memcmp(sender_message->replicaof, CLUSTER_NODE_NULL_NAME, CLUSTER_NAMELEN);
clusterNode *sender = getNodeFromLinkAndMsg(link, hdr);
int sender_claims_to_be_primary = !memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, CLUSTER_NAMELEN);
int sender_was_replica = sender && nodeIsReplica(sender);
int sender_was_primary = sender && nodeIsPrimary(sender);

if (sender && (sender_message->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) {
if (sender && (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) {
sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
}

Expand All @@ -2948,8 +2948,8 @@ int clusterProcessPacket(clusterLink *link) {

if (sender && !nodeInHandshake(sender)) {
/* Update our currentEpoch if we see a newer epoch in the cluster. */
sender_claimed_current_epoch = ntohu64(sender_message->currentEpoch);
sender_claimed_config_epoch = ntohu64(sender_message->configEpoch);
sender_claimed_current_epoch = ntohu64(hdr->currentEpoch);
sender_claimed_config_epoch = ntohu64(hdr->configEpoch);
if (sender_claimed_current_epoch > server.cluster->currentEpoch)
server.cluster->currentEpoch = sender_claimed_current_epoch;
/* Update the sender configEpoch if it is a primary publishing a newer one. */
Expand All @@ -2958,12 +2958,12 @@ int clusterProcessPacket(clusterLink *link) {
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG);
}
/* Update the replication offset info for this node. */
sender->repl_offset = ntohu64(sender_message->offset);
sender->repl_offset = ntohu64(hdr->offset);
sender->repl_offset_time = now;
/* If we are a replica performing a manual failover and our primary
* sent its offset while already paused, populate the MF state. */
if (server.cluster->mf_end && nodeIsReplica(myself) && myself->replicaof == sender &&
sender_message->mflags[0] & CLUSTERMSG_FLAG0_PAUSED && server.cluster->mf_primary_offset == -1) {
hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED && server.cluster->mf_primary_offset == -1) {
server.cluster->mf_primary_offset = sender->repl_offset;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER);
serverLog(LL_NOTICE,
Expand Down Expand Up @@ -3004,17 +3004,17 @@ int clusterProcessPacket(clusterLink *link) {
clusterNode *new_sender_node;

new_sender_node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE);
serverAssert(nodeIp2String(new_sender_node->ip, link, sender_message->myip) == C_OK);
getClientPortFromClusterMsg(sender_message, &new_sender_node->tls_port, &new_sender_node->tcp_port);
new_sender_node->cport = ntohs(sender_message->cport);
serverAssert(nodeIp2String(new_sender_node->ip, link, hdr->myip) == C_OK);
getClientPortFromClusterMsg(hdr, &new_sender_node->tls_port, &new_sender_node->tcp_port);
new_sender_node->cport = ntohs(hdr->cport);
clusterAddNode(new_sender_node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

/* If this is a MEET packet from an unknown node, we still process
* the gossip section here since we have to trust the sender because
* of the message type. */
if (!sender && type == CLUSTERMSG_TYPE_MEET) clusterProcessGossipSection(sender_message, link);
if (!sender && type == CLUSTERMSG_TYPE_MEET) clusterProcessGossipSection(hdr, link);

/* Anyway reply with a PONG */
clusterSendPing(link, CLUSTERMSG_TYPE_PONG);
Expand All @@ -3040,7 +3040,7 @@ int clusterProcessPacket(clusterLink *link) {
"Handshake: we already know node %.40s (%s), "
"updating the address if needed.",
sender->name, sender->human_nodename);
if (nodeUpdateAddressIfNeeded(sender, link, sender_message)) {
if (nodeUpdateAddressIfNeeded(sender, link, hdr)) {
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);
}
/* Free this node as we already have it. This will
Expand All @@ -3051,12 +3051,12 @@ int clusterProcessPacket(clusterLink *link) {

/* First thing to do is replacing the random name with the
* right node name if this was a handshake stage. */
clusterRenameNode(link->node, sender_message->sender);
clusterRenameNode(link->node, hdr->sender);
serverLog(LL_DEBUG, "Handshake with node %.40s completed.", link->node->name);
link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
link->node->flags |= flags & (CLUSTER_NODE_PRIMARY | CLUSTER_NODE_REPLICA);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
} else if (memcmp(link->node->name, sender_message->sender, CLUSTER_NAMELEN) != 0) {
} else if (memcmp(link->node->name, hdr->sender, CLUSTER_NAMELEN) != 0) {
/* If the reply has a non matching node ID we
* disconnect this node and set it as not having an associated
* address. */
Expand Down Expand Up @@ -3090,7 +3090,7 @@ int clusterProcessPacket(clusterLink *link) {

/* Update the node address if it changed. */
if (sender && type == CLUSTERMSG_TYPE_PING && !nodeInHandshake(sender) &&
nodeUpdateAddressIfNeeded(sender, link, sender_message)) {
nodeUpdateAddressIfNeeded(sender, link, hdr)) {
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);
}

Expand Down Expand Up @@ -3124,7 +3124,7 @@ int clusterProcessPacket(clusterLink *link) {
} else {
/* Node is a replica. */
clusterNode *sender_claimed_primary_node =
clusterLookupNode(sender_message->replicaof, CLUSTER_NAMELEN);
clusterLookupNode(hdr->replicaof, CLUSTER_NAMELEN);

if (sender_was_primary) {
/* Primary turned into a replica! Reconfigure the node. */
Expand Down Expand Up @@ -3200,14 +3200,14 @@ int clusterProcessPacket(clusterLink *link) {

if (sender && sender_claims_to_be_primary &&
(sender_was_replica ||
memcmp(sender->slots, sender_message->myslots, sizeof(sender_message->myslots)))) {
memcmp(sender->slots, hdr->myslots, sizeof(hdr->myslots)))) {
/* Make sure CLUSTER_NODE_PRIMARY has already been set by now on sender */
serverAssert(nodeIsPrimary(sender));

/* 1) If the sender of the message is a primary, and we detected that
* the set of slots it claims changed, scan the slots to see if we
* need to update our configuration. */
clusterUpdateSlotsConfigWith(sender, sender_claimed_config_epoch, sender_message->myslots);
clusterUpdateSlotsConfigWith(sender, sender_claimed_config_epoch, hdr->myslots);

/* 2) We also check for the reverse condition, that is, the sender
* claims to serve slots we know are served by a primary with a
Expand All @@ -3228,7 +3228,7 @@ int clusterProcessPacket(clusterLink *link) {
* do it. In this way A will stop to act as a primary (or can try to
* failover if there are the conditions to win the election). */
for (int j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(sender_message->myslots, j)) {
if (bitmapTestBit(hdr->myslots, j)) {
if (server.cluster->slots[j] == sender || isSlotUnclaimed(j)) continue;
if (server.cluster->slots[j]->configEpoch > sender_claimed_config_epoch) {
serverLog(LL_VERBOSE,
Expand Down Expand Up @@ -3267,26 +3267,26 @@ int clusterProcessPacket(clusterLink *link) {

/* Get info from the gossip section */
if (sender) {
clusterProcessGossipSection(sender_message, link);
clusterProcessPingExtensions(sender_message, link);
clusterProcessGossipSection(hdr, link);
clusterProcessPingExtensions(hdr, link);
}
} else if (type == CLUSTERMSG_TYPE_FAIL) {
clusterNode *failing;

if (sender) {
failing = clusterLookupNode(sender_message->data.fail.about.nodename, CLUSTER_NAMELEN);
failing = clusterLookupNode(hdr->data.fail.about.nodename, CLUSTER_NAMELEN);
if (failing && !(failing->flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_MYSELF))) {
serverLog(LL_NOTICE, "FAIL message received from %.40s (%s) about %.40s (%s)", sender_message->sender,
sender->human_nodename, sender_message->data.fail.about.nodename,
serverLog(LL_NOTICE, "FAIL message received from %.40s (%s) about %.40s (%s)", hdr->sender,
sender->human_nodename, hdr->data.fail.about.nodename,
failing->human_nodename);
failing->flags |= CLUSTER_NODE_FAIL;
failing->fail_time = now;
failing->flags &= ~CLUSTER_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);
}
} else {
serverLog(LL_NOTICE, "Ignoring FAIL message from unknown node %.40s about %.40s", sender_message->sender,
sender_message->data.fail.about.nodename);
serverLog(LL_NOTICE, "Ignoring FAIL message from unknown node %.40s about %.40s", hdr->sender,
hdr->data.fail.about.nodename);
}
} else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
if (!sender) return 1; /* We don't know that node. */
Expand All @@ -3298,17 +3298,17 @@ int clusterProcessPacket(clusterLink *link) {
* Pub/Sub subscribers. */
if ((type == CLUSTERMSG_TYPE_PUBLISH && serverPubsubSubscriptionCount() > 0) ||
(type == CLUSTERMSG_TYPE_PUBLISHSHARD && serverPubsubShardSubscriptionCount() > 0)) {
channel_len = ntohl(sender_message->data.publish.msg.channel_len);
message_len = ntohl(sender_message->data.publish.msg.message_len);
channel = createStringObject((char *)sender_message->data.publish.msg.bulk_data, channel_len);
message = createStringObject((char *)sender_message->data.publish.msg.bulk_data + channel_len, message_len);
channel_len = ntohl(hdr->data.publish.msg.channel_len);
message_len = ntohl(hdr->data.publish.msg.message_len);
channel = createStringObject((char *)hdr->data.publish.msg.bulk_data, channel_len);
message = createStringObject((char *)hdr->data.publish.msg.bulk_data + channel_len, message_len);
pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD);
decrRefCount(channel);
decrRefCount(message);
}
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
if (!sender) return 1; /* We don't know that node. */
clusterSendFailoverAuthIfNeeded(sender, sender_message);
clusterSendFailoverAuthIfNeeded(sender, hdr);
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
if (!sender) return 1; /* We don't know that node. */
/* We consider this vote only if the sender is a primary serving
Expand Down Expand Up @@ -3342,10 +3342,10 @@ int clusterProcessPacket(clusterLink *link) {
clusterSendPing(link, CLUSTERMSG_TYPE_PING);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
clusterNode *n; /* The node the update is about. */
uint64_t reportedConfigEpoch = ntohu64(sender_message->data.update.nodecfg.configEpoch);
uint64_t reportedConfigEpoch = ntohu64(hdr->data.update.nodecfg.configEpoch);

if (!sender) return 1; /* We don't know the sender. */
n = clusterLookupNode(sender_message->data.update.nodecfg.nodename, CLUSTER_NAMELEN);
n = clusterLookupNode(hdr->data.update.nodecfg.nodename, CLUSTER_NAMELEN);
if (!n) return 1; /* We don't know the reported node. */
if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */

Expand All @@ -3358,15 +3358,15 @@ int clusterProcessPacket(clusterLink *link) {

/* Check the bitmap of served slots and update our
* config accordingly. */
clusterUpdateSlotsConfigWith(n, reportedConfigEpoch, sender_message->data.update.nodecfg.slots);
clusterUpdateSlotsConfigWith(n, reportedConfigEpoch, hdr->data.update.nodecfg.slots);
} else if (type == CLUSTERMSG_TYPE_MODULE) {
if (!sender) return 1; /* Protect the module from unknown nodes. */
/* We need to route this message back to the right module subscribed
* for the right message type. */
uint64_t module_id = sender_message->data.module.msg.module_id; /* Endian-safe ID */
uint32_t len = ntohl(sender_message->data.module.msg.len);
uint8_t type = sender_message->data.module.msg.type;
unsigned char *payload = sender_message->data.module.msg.bulk_data;
uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
uint32_t len = ntohl(hdr->data.module.msg.len);
uint8_t type = hdr->data.module.msg.type;
unsigned char *payload = hdr->data.module.msg.bulk_data;
moduleCallClusterReceivers(sender->name, module_id, type, payload, len);
} else {
serverLog(LL_WARNING, "Received unknown packet type: %d", type);
Expand Down

0 comments on commit bb5f48f

Please sign in to comment.