diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 61b5af8e29..5976572570 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -2870,6 +2870,9 @@ int clusterProcessPacket(clusterLink *link) { uint16_t flags = ntohs(hdr->flags); uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0; clusterNode *sender = getNodeFromLinkAndMsg(link, hdr); + int sender_claims_primary = !memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, CLUSTER_NAMELEN); + int sender_was_replica = sender && nodeIsReplica(sender); + int sender_was_primary = sender && nodeIsPrimary(sender); if (sender && (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) { sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED; @@ -2887,8 +2890,7 @@ int clusterProcessPacket(clusterLink *link) { senderConfigEpoch = ntohu64(hdr->configEpoch); if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch; /* Update the sender configEpoch if it is a primary publishing a newer one. */ - if (!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof)) && - senderConfigEpoch > sender->configEpoch) { + if (sender_claims_primary && senderConfigEpoch > sender->configEpoch) { sender->configEpoch = senderConfigEpoch; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG); } @@ -3051,19 +3053,17 @@ int clusterProcessPacket(clusterLink *link) { /* Check for role switch: replica -> primary or primary -> replica. */ if (sender) { serverLog(LL_DEBUG, "node %.40s (%s) announces that it is a %s in shard %.40s", sender->name, - sender->human_nodename, - !memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof)) ? "primary" : "replica", - sender->shard_id); - if (!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof))) { + sender->human_nodename, sender_claims_primary ? "primary" : "replica", sender->shard_id); + if (sender_claims_primary) { /* Node is a primary. */ clusterSetNodeAsPrimary(sender); } else { /* Node is a replica. */ - clusterNode *primary = clusterLookupNode(hdr->replicaof, CLUSTER_NAMELEN); + clusterNode *new_primary = clusterLookupNode(hdr->replicaof, CLUSTER_NAMELEN); - if (clusterNodeIsPrimary(sender)) { + if (sender_was_primary) { /* Primary turned into a replica! Reconfigure the node. */ - if (primary && areInSameShard(primary, sender)) { + if (new_primary && areInSameShard(new_primary, sender)) { /* `sender` was a primary and was in the same shard as its new primary */ if (sender->configEpoch > senderConfigEpoch) { serverLog(LL_NOTICE, @@ -3074,13 +3074,13 @@ int clusterProcessPacket(clusterLink *link) { } else { /* `primary` is still a `replica` in this observer node's view; * update its role and configEpoch */ - clusterSetNodeAsPrimary(primary); - primary->configEpoch = senderConfigEpoch; + clusterSetNodeAsPrimary(new_primary); + new_primary->configEpoch = senderConfigEpoch; serverLog(LL_NOTICE, "A failover occurred in shard %.40s; node %.40s (%s)" " failed over to node %.40s (%s) with a config epoch of %llu", - sender->shard_id, sender->name, sender->human_nodename, primary->name, - primary->human_nodename, (unsigned long long)primary->configEpoch); + sender->shard_id, sender->name, sender->human_nodename, new_primary->name, + new_primary->human_nodename, (unsigned long long)new_primary->configEpoch); } } else { /* `sender` was moved to another shard and has become a replica, remove its slot assignment */ @@ -3089,9 +3089,9 @@ int clusterProcessPacket(clusterLink *link) { "Node %.40s (%s) is no longer primary of shard %.40s;" " removed all %d slot(s) it used to own", sender->name, sender->human_nodename, sender->shard_id, slots); - if (primary != NULL) { + if (new_primary != NULL) { serverLog(LL_NOTICE, "Node %.40s (%s) is now part of shard %.40s", sender->name, - sender->human_nodename, primary->shard_id); + sender->human_nodename, new_primary->shard_id); } } @@ -3103,17 +3103,17 @@ int clusterProcessPacket(clusterLink *link) { } /* Primary node changed for this replica? */ - if (primary && sender->replicaof != primary) { + if (new_primary && sender->replicaof != new_primary) { if (sender->replicaof) clusterNodeRemoveReplica(sender->replicaof, sender); serverLog(LL_NOTICE, "Node %.40s (%s) is now a replica of node %.40s (%s) in shard %.40s", - sender->name, sender->human_nodename, primary->name, primary->human_nodename, + sender->name, sender->human_nodename, new_primary->name, new_primary->human_nodename, sender->shard_id); - clusterNodeAddReplica(primary, sender); - sender->replicaof = primary; + clusterNodeAddReplica(new_primary, sender); + sender->replicaof = new_primary; /* Update the shard_id when a replica is connected to its * primary in the very first time. */ - updateShardId(sender, primary->shard_id); + updateShardId(sender, new_primary->shard_id); /* Update config. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); @@ -3128,63 +3128,38 @@ int clusterProcessPacket(clusterLink *link) { /* Many checks are only needed if the set of served slots this * instance claims is different compared to the set of slots we have - * for it. Check this ASAP to avoid other computational expansive - * checks later. */ - clusterNode *sender_primary = NULL; /* Sender or its primary if replica. */ - int dirty_slots = 0; /* Sender claimed slots don't match my view? */ - - if (sender) { - sender_primary = clusterNodeIsPrimary(sender) ? sender : sender->replicaof; - if (sender_primary) { - dirty_slots = memcmp(sender_primary->slots, hdr->myslots, sizeof(hdr->myslots)) != 0; - - /* Force dirty when the sending shard owns no slots so that - * we have a chance to examine and repair slot migrating/importing - * states that involve empty shards. */ - dirty_slots |= sender_primary->numslots == 0; - } - } - - /* 1) If the sender of the message is a primary, and we detected that - * the set of slots it claims changed, scan the slots to see if we - * need to update our configuration. */ - if (sender_primary && dirty_slots) - clusterUpdateSlotsConfigWith(sender_primary, senderConfigEpoch, hdr->myslots); - - /* Explicitly check for a replication loop before attempting the replication - * chain folding logic. */ - if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) { - /* Safeguard against sub-replicas. A replica's primary can turn itself - * into a replica if its last slot is removed. If no other node takes - * over the slot, there is nothing else to trigger replica migration. */ - serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s", - myself->replicaof->replicaof->name, myself->replicaof->name); - clusterSetPrimary(myself->replicaof->replicaof, 1); - clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); - } - - /* 2) We also check for the reverse condition, that is, the sender - * claims to serve slots we know are served by a primary with a - * greater configEpoch. If this happens we inform the sender. - * - * This is useful because sometimes after a partition heals, a - * reappearing primary may be the last one to claim a given set of - * hash slots, but with a configuration that other instances know to - * be deprecated. Example: - * - * A and B are primary and replica for slots 1,2,3. - * A is partitioned away, B gets promoted. - * B is partitioned away, and A returns available. - * - * Usually B would PING A publishing its set of served slots and its - * configEpoch, but because of the partition B can't inform A of the - * new configuration, so other nodes that have an updated table must - * do it. In this way A will stop to act as a primary (or can try to - * failover if there are the conditions to win the election). */ - if (sender && dirty_slots) { - int j; - - for (j = 0; j < CLUSTER_SLOTS; j++) { + * for it or if there was a failover in the sender's shard. Check + * this ASAP to avoid other computational expensive checks later.*/ + + if (sender && sender_claims_primary && + (sender_was_replica || memcmp(sender->slots, hdr->myslots, sizeof(hdr->myslots)))) { + /* Make sure CLUSTER_NODE_PRIMARY has already been set by now on sender */ + serverAssert(nodeIsPrimary(sender)); + + /* 1) If the sender of the message is a primary, and we detected that + * the set of slots it claims changed, scan the slots to see if we + * need to update our configuration. */ + clusterUpdateSlotsConfigWith(sender, senderConfigEpoch, hdr->myslots); + + /* 2) We also check for the reverse condition, that is, the sender + * claims to serve slots we know are served by a primary with a + * greater configEpoch. If this happens we inform the sender. + * + * This is useful because sometimes after a partition heals, a + * reappearing primary may be the last one to claim a given set of + * hash slots, but with a configuration that other instances know to + * be deprecated. Example: + * + * A and B are primary and replica for slots 1,2,3. + * A is partitioned away, B gets promoted. + * B is partitioned away, and A returns available. + * + * Usually B would PING A publishing its set of served slots and its + * configEpoch, but because of the partition B can't inform A of the + * new configuration, so other nodes that have an updated table must + * do it. In this way A will stop to act as a primary (or can try to + * failover if there are the conditions to win the election). */ + for (int j = 0; j < CLUSTER_SLOTS; j++) { if (bitmapTestBit(hdr->myslots, j)) { if (server.cluster->slots[j] == sender || isSlotUnclaimed(j)) continue; if (server.cluster->slots[j]->configEpoch > senderConfigEpoch) { @@ -3203,10 +3178,21 @@ int clusterProcessPacket(clusterLink *link) { } } + /* Explicitly check for a replication loop before attempting the replication + * chain folding logic. */ + if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) { + /* Safeguard against sub-replicas. A replica's primary can turn itself + * into a replica if its last slot is removed. If no other node takes + * over the slot, there is nothing else to trigger replica migration. */ + serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s", + myself->replicaof->replicaof->name, myself->replicaof->name); + clusterSetPrimary(myself->replicaof->replicaof, 1); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); + } + /* If our config epoch collides with the sender's try to fix * the problem. */ - if (sender && clusterNodeIsPrimary(myself) && clusterNodeIsPrimary(sender) && - senderConfigEpoch == myself->configEpoch) { + if (sender && nodeIsPrimary(myself) && nodeIsPrimary(sender) && senderConfigEpoch == myself->configEpoch) { clusterHandleConfigEpochCollision(sender); } diff --git a/tests/unit/cluster/hostnames.tcl b/tests/unit/cluster/hostnames.tcl index 04b32e380b..232c6cf818 100644 --- a/tests/unit/cluster/hostnames.tcl +++ b/tests/unit/cluster/hostnames.tcl @@ -156,18 +156,18 @@ test "Verify the nodes configured with prefer hostname only show hostname for ne # to accept our isolated nodes connections. At this point they will # start showing up in cluster slots. wait_for_condition 50 100 { - [llength [R 6 CLUSTER SLOTS]] eq 3 + [llength [R 6 CLUSTER SLOTS]] eq 2 } else { fail "Node did not learn about the 2 shards it can talk to" } wait_for_condition 50 100 { - [lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-1.com" + [lindex [get_slot_field [R 6 CLUSTER SLOTS] 0 2 3] 1] eq "shard-1.com" } else { fail "hostname for shard-1 didn't reach node 6" } wait_for_condition 50 100 { - [lindex [get_slot_field [R 6 CLUSTER SLOTS] 2 2 3] 1] eq "shard-2.com" + [lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-2.com" } else { fail "hostname for shard-2 didn't reach node 6" }