Skip to content

Commit

Permalink
Ensure only primary sender drives slot ownership updates
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Xie <pingxie@google.com>
  • Loading branch information
PingXie committed Jul 7, 2024
1 parent f2bbd1f commit 5223ecb
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 82 deletions.
144 changes: 65 additions & 79 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2870,6 +2870,9 @@ int clusterProcessPacket(clusterLink *link) {
uint16_t flags = ntohs(hdr->flags);
uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
clusterNode *sender = getNodeFromLinkAndMsg(link, hdr);
int sender_claims_primary = !memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, CLUSTER_NAMELEN);
int sender_was_replica = sender && nodeIsReplica(sender);
int sender_was_primary = sender && nodeIsPrimary(sender);

if (sender && (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) {
sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
Expand All @@ -2887,8 +2890,7 @@ int clusterProcessPacket(clusterLink *link) {
senderConfigEpoch = ntohu64(hdr->configEpoch);
if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch;
/* Update the sender configEpoch if it is a primary publishing a newer one. */
if (!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof)) &&
senderConfigEpoch > sender->configEpoch) {
if (sender_claims_primary && senderConfigEpoch > sender->configEpoch) {
sender->configEpoch = senderConfigEpoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG);
}
Expand Down Expand Up @@ -3051,19 +3053,17 @@ int clusterProcessPacket(clusterLink *link) {
/* Check for role switch: replica -> primary or primary -> replica. */
if (sender) {
serverLog(LL_DEBUG, "node %.40s (%s) announces that it is a %s in shard %.40s", sender->name,
sender->human_nodename,
!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof)) ? "primary" : "replica",
sender->shard_id);
if (!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof))) {
sender->human_nodename, sender_claims_primary ? "primary" : "replica", sender->shard_id);
if (sender_claims_primary) {
/* Node is a primary. */
clusterSetNodeAsPrimary(sender);
} else {
/* Node is a replica. */
clusterNode *primary = clusterLookupNode(hdr->replicaof, CLUSTER_NAMELEN);
clusterNode *new_primary = clusterLookupNode(hdr->replicaof, CLUSTER_NAMELEN);

if (clusterNodeIsPrimary(sender)) {
if (sender_was_primary) {
/* Primary turned into a replica! Reconfigure the node. */
if (primary && areInSameShard(primary, sender)) {
if (new_primary && areInSameShard(new_primary, sender)) {
/* `sender` was a primary and was in the same shard as its new primary */
if (sender->configEpoch > senderConfigEpoch) {
serverLog(LL_NOTICE,
Expand All @@ -3074,13 +3074,13 @@ int clusterProcessPacket(clusterLink *link) {
} else {
/* `primary` is still a `replica` in this observer node's view;
* update its role and configEpoch */
clusterSetNodeAsPrimary(primary);
primary->configEpoch = senderConfigEpoch;
clusterSetNodeAsPrimary(new_primary);
new_primary->configEpoch = senderConfigEpoch;
serverLog(LL_NOTICE,
"A failover occurred in shard %.40s; node %.40s (%s)"
" failed over to node %.40s (%s) with a config epoch of %llu",
sender->shard_id, sender->name, sender->human_nodename, primary->name,
primary->human_nodename, (unsigned long long)primary->configEpoch);
sender->shard_id, sender->name, sender->human_nodename, new_primary->name,
new_primary->human_nodename, (unsigned long long)new_primary->configEpoch);
}
} else {
/* `sender` was moved to another shard and has become a replica, remove its slot assignment */
Expand All @@ -3089,9 +3089,9 @@ int clusterProcessPacket(clusterLink *link) {
"Node %.40s (%s) is no longer primary of shard %.40s;"
" removed all %d slot(s) it used to own",
sender->name, sender->human_nodename, sender->shard_id, slots);
if (primary != NULL) {
if (new_primary != NULL) {
serverLog(LL_NOTICE, "Node %.40s (%s) is now part of shard %.40s", sender->name,
sender->human_nodename, primary->shard_id);
sender->human_nodename, new_primary->shard_id);
}
}

Expand All @@ -3103,17 +3103,17 @@ int clusterProcessPacket(clusterLink *link) {
}

/* Primary node changed for this replica? */
if (primary && sender->replicaof != primary) {
if (new_primary && sender->replicaof != new_primary) {
if (sender->replicaof) clusterNodeRemoveReplica(sender->replicaof, sender);
serverLog(LL_NOTICE, "Node %.40s (%s) is now a replica of node %.40s (%s) in shard %.40s",
sender->name, sender->human_nodename, primary->name, primary->human_nodename,
sender->name, sender->human_nodename, new_primary->name, new_primary->human_nodename,
sender->shard_id);
clusterNodeAddReplica(primary, sender);
sender->replicaof = primary;
clusterNodeAddReplica(new_primary, sender);
sender->replicaof = new_primary;

/* Update the shard_id when a replica is connected to its
* primary in the very first time. */
updateShardId(sender, primary->shard_id);
updateShardId(sender, new_primary->shard_id);

/* Update config. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
Expand All @@ -3128,63 +3128,38 @@ int clusterProcessPacket(clusterLink *link) {

/* Many checks are only needed if the set of served slots this
* instance claims is different compared to the set of slots we have
* for it. Check this ASAP to avoid other computational expansive
* checks later. */
clusterNode *sender_primary = NULL; /* Sender or its primary if replica. */
int dirty_slots = 0; /* Sender claimed slots don't match my view? */

if (sender) {
sender_primary = clusterNodeIsPrimary(sender) ? sender : sender->replicaof;
if (sender_primary) {
dirty_slots = memcmp(sender_primary->slots, hdr->myslots, sizeof(hdr->myslots)) != 0;

/* Force dirty when the sending shard owns no slots so that
* we have a chance to examine and repair slot migrating/importing
* states that involve empty shards. */
dirty_slots |= sender_primary->numslots == 0;
}
}

/* 1) If the sender of the message is a primary, and we detected that
* the set of slots it claims changed, scan the slots to see if we
* need to update our configuration. */
if (sender_primary && dirty_slots)
clusterUpdateSlotsConfigWith(sender_primary, senderConfigEpoch, hdr->myslots);

/* Explicitly check for a replication loop before attempting the replication
* chain folding logic. */
if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) {
/* Safeguard against sub-replicas. A replica's primary can turn itself
* into a replica if its last slot is removed. If no other node takes
* over the slot, there is nothing else to trigger replica migration. */
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
myself->replicaof->replicaof->name, myself->replicaof->name);
clusterSetPrimary(myself->replicaof->replicaof, 1);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

/* 2) We also check for the reverse condition, that is, the sender
* claims to serve slots we know are served by a primary with a
* greater configEpoch. If this happens we inform the sender.
*
* This is useful because sometimes after a partition heals, a
* reappearing primary may be the last one to claim a given set of
* hash slots, but with a configuration that other instances know to
* be deprecated. Example:
*
* A and B are primary and replica for slots 1,2,3.
* A is partitioned away, B gets promoted.
* B is partitioned away, and A returns available.
*
* Usually B would PING A publishing its set of served slots and its
* configEpoch, but because of the partition B can't inform A of the
* new configuration, so other nodes that have an updated table must
* do it. In this way A will stop to act as a primary (or can try to
* failover if there are the conditions to win the election). */
if (sender && dirty_slots) {
int j;

for (j = 0; j < CLUSTER_SLOTS; j++) {
* for it or if there was a failover in the sender's shard. Check
* this ASAP to avoid other computational expensive checks later.*/

if (sender && sender_claims_primary &&
(sender_was_replica || memcmp(sender->slots, hdr->myslots, sizeof(hdr->myslots)))) {
/* Make sure CLUSTER_NODE_PRIMARY has already been set by now on sender */
serverAssert(nodeIsPrimary(sender));

/* 1) If the sender of the message is a primary, and we detected that
* the set of slots it claims changed, scan the slots to see if we
* need to update our configuration. */
clusterUpdateSlotsConfigWith(sender, senderConfigEpoch, hdr->myslots);

/* 2) We also check for the reverse condition, that is, the sender
* claims to serve slots we know are served by a primary with a
* greater configEpoch. If this happens we inform the sender.
*
* This is useful because sometimes after a partition heals, a
* reappearing primary may be the last one to claim a given set of
* hash slots, but with a configuration that other instances know to
* be deprecated. Example:
*
* A and B are primary and replica for slots 1,2,3.
* A is partitioned away, B gets promoted.
* B is partitioned away, and A returns available.
*
* Usually B would PING A publishing its set of served slots and its
* configEpoch, but because of the partition B can't inform A of the
* new configuration, so other nodes that have an updated table must
* do it. In this way A will stop to act as a primary (or can try to
* failover if there are the conditions to win the election). */
for (int j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(hdr->myslots, j)) {
if (server.cluster->slots[j] == sender || isSlotUnclaimed(j)) continue;
if (server.cluster->slots[j]->configEpoch > senderConfigEpoch) {
Expand All @@ -3203,10 +3178,21 @@ int clusterProcessPacket(clusterLink *link) {
}
}

/* Explicitly check for a replication loop before attempting the replication
* chain folding logic. */
if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) {
/* Safeguard against sub-replicas. A replica's primary can turn itself
* into a replica if its last slot is removed. If no other node takes
* over the slot, there is nothing else to trigger replica migration. */
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
myself->replicaof->replicaof->name, myself->replicaof->name);
clusterSetPrimary(myself->replicaof->replicaof, 1);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

/* If our config epoch collides with the sender's try to fix
* the problem. */
if (sender && clusterNodeIsPrimary(myself) && clusterNodeIsPrimary(sender) &&
senderConfigEpoch == myself->configEpoch) {
if (sender && nodeIsPrimary(myself) && nodeIsPrimary(sender) && senderConfigEpoch == myself->configEpoch) {
clusterHandleConfigEpochCollision(sender);
}

Expand Down
6 changes: 3 additions & 3 deletions tests/unit/cluster/hostnames.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,18 @@ test "Verify the nodes configured with prefer hostname only show hostname for ne
# to accept our isolated nodes connections. At this point they will
# start showing up in cluster slots.
wait_for_condition 50 100 {
[llength [R 6 CLUSTER SLOTS]] eq 3
[llength [R 6 CLUSTER SLOTS]] eq 2
} else {
fail "Node did not learn about the 2 shards it can talk to"
}
wait_for_condition 50 100 {
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-1.com"
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 0 2 3] 1] eq "shard-1.com"
} else {
fail "hostname for shard-1 didn't reach node 6"
}

wait_for_condition 50 100 {
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 2 2 3] 1] eq "shard-2.com"
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-2.com"
} else {
fail "hostname for shard-2 didn't reach node 6"
}
Expand Down

0 comments on commit 5223ecb

Please sign in to comment.