Skip to content

Commit

Permalink
Consolidate cluster topology update to clusterUpdateSlotsConfigWith
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Xie <pingxie@google.com>
  • Loading branch information
PingXie committed Apr 28, 2024
1 parent bab803b commit b11936d
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 35 deletions.
46 changes: 12 additions & 34 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ list *clusterGetNodesInMyShard(clusterNode *node);
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
int clusterAddSlot(clusterNode *n, int slot);
int clusterDelSlot(int slot);
int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node);
int clusterDelNodeSlots(clusterNode *node);
int clusterNodeSetSlotBit(clusterNode *n, int slot);
void clusterSetMaster(clusterNode *n, int closeSlots);
Expand Down Expand Up @@ -2985,8 +2984,9 @@ int clusterProcessPacket(clusterLink *link) {
senderConfigEpoch = ntohu64(hdr->configEpoch);
if (senderCurrentEpoch > server.cluster->currentEpoch)
server.cluster->currentEpoch = senderCurrentEpoch;
/* Update the sender configEpoch if it is publishing a newer one. */
if (senderConfigEpoch > sender->configEpoch) {
/* Update the sender configEpoch if it is a primary publishing a newer one. */
if (!memcpy(hdr->slaveof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->slaveof)) &&
senderConfigEpoch > sender->configEpoch) {
sender->configEpoch = senderConfigEpoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_FSYNC_CONFIG);
Expand Down Expand Up @@ -3165,9 +3165,7 @@ int clusterProcessPacket(clusterLink *link) {

/* Check for role switch: slave -> master or master -> slave. */
if (sender) {
if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
sizeof(hdr->slaveof)))
{
if (!memcmp(hdr->slaveof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->slaveof))) {
/* Node is a master. */
clusterSetNodeAsMaster(sender);
} else {
Expand All @@ -3176,7 +3174,7 @@ int clusterProcessPacket(clusterLink *link) {

if (clusterNodeIsMaster(sender)) {
/* Master turned into a slave! Reconfigure the node. */
if (master && !memcmp(master->shard_id, sender->shard_id, CLUSTER_NAMELEN)) {
if (master && areInSameShard(master, sender)) {
/* `sender` was a primary and was in the same shard as `master`, its new primary */
if (sender->configEpoch > senderConfigEpoch) {
serverLog(LL_NOTICE,
Expand All @@ -3188,18 +3186,14 @@ int clusterProcessPacket(clusterLink *link) {
(unsigned long long)senderConfigEpoch,
(unsigned long long)sender->configEpoch);
} else {
/* A failover occurred in the shard where `sender` belongs to and `sender` is no longer
* a primary. Update slot assignment to `master`, which is the new primary in the shard */
int slots = clusterMoveNodeSlots(sender, master);
/* `master` is still a `slave` in this observer node's view; update its role and configEpoch */
clusterSetNodeAsMaster(master);
master->configEpoch = senderConfigEpoch;
serverLog(LL_NOTICE, "A failover occurred in shard %.40s; node %.40s (%s)"
" lost %d slot(s) to node %.40s (%s) with a config epoch of %llu",
" failed over to node %.40s (%s) with a config epoch of %llu",
sender->shard_id,
sender->name,
sender->human_nodename,
slots,
master->name,
master->human_nodename,
(unsigned long long) master->configEpoch);
Expand All @@ -3220,6 +3214,7 @@ int clusterProcessPacket(clusterLink *link) {
master->shard_id);
}
}

sender->flags &= ~(CLUSTER_NODE_MASTER|
CLUSTER_NODE_MIGRATE_TO);
sender->flags |= CLUSTER_NODE_SLAVE;
Expand Down Expand Up @@ -3268,21 +3263,20 @@ int clusterProcessPacket(clusterLink *link) {
if (sender) {
sender_master = clusterNodeIsMaster(sender) ? sender : sender->slaveof;
if (sender_master) {
dirty_slots = memcmp(sender_master->slots,
hdr->myslots,sizeof(hdr->myslots)) != 0;
dirty_slots = memcmp(sender_master->slots, hdr->myslots, sizeof(hdr->myslots)) != 0;

/* Force dirty when sender is primary and owns no slots so that
/* Force dirty when the sending shard owns no slots so that
* we have a chance to examine and repair slot migrating/importing
* states that involve empty shards. */
dirty_slots |= nodeIsMaster(sender) && sender_master->numslots == 0;
dirty_slots |= sender_master->numslots == 0;
}
}

/* 1) If the sender of the message is a master, and we detected that
* the set of slots it claims changed, scan the slots to see if we
* need to update our configuration. */
if (sender && clusterNodeIsMaster(sender) && dirty_slots)
clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
if (sender_master && dirty_slots)
clusterUpdateSlotsConfigWith(sender_master, senderConfigEpoch, hdr->myslots);

/* Explicitly check for a replication loop before attempting the replication
* chain folding logic.
Expand Down Expand Up @@ -5237,22 +5231,6 @@ int clusterDelSlot(int slot) {
return C_OK;
}

/* Transfer slots from `from_node` to `to_node`.
* Iterates over all cluster slots, transferring each slot covered by `from_node` to `to_node`.
* Counts and returns the number of slots transferred. */
int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node) {
int processed = 0;

for (int j = 0; j < CLUSTER_SLOTS; j++) {
if (clusterNodeCoversSlot(from_node, j)) {
clusterDelSlot(j);
clusterAddSlot(to_node, j);
processed++;
}
}
return processed;
}

/* Delete all the slots associated with the specified node.
* The number of deleted slots is returned. */
int clusterDelNodeSlots(clusterNode *node) {
Expand Down
1 change: 0 additions & 1 deletion tests/unit/cluster/slot-migration.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ proc wait_for_role {srv_idx role} {
set node_timeout [lindex [R 0 config get cluster-node-timeout] 1]
# wait for a gossip cycle for states to be propagated throughout the cluster
after $node_timeout
wait_for_cluster_propagation
wait_for_condition 100 100 {
[lindex [split [R $srv_idx role] " "] 0] eq $role
} else {
Expand Down

0 comments on commit b11936d

Please sign in to comment.