Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix replica unable trigger migration when it received CLUSTER SETSLOT in advance #981

Merged
merged 11 commits into from
Sep 13, 2024
17 changes: 13 additions & 4 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6202,6 +6202,8 @@ int clusterParseSetSlotCommand(client *c, int *slot_out, clusterNode **node_out,
return 0;
}

if (nodeIsReplica(myself)) serverAssert(c == server.primary);
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved

if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return 0;

if (!strcasecmp(c->argv[3]->ptr, "migrating") && c->argc >= 5) {
Expand Down Expand Up @@ -6390,20 +6392,27 @@ void clusterCommandSetSlot(client *c) {
server.cluster->migrating_slots_to[slot] = NULL;
}

int slot_was_mine = server.cluster->slots[slot] == myself;
clusterNode *my_primary = clusterNodeGetPrimary(myself);
int slot_was_mine = server.cluster->slots[slot] == my_primary;
clusterDelSlot(slot);
clusterAddSlot(n, slot);

/* If we are a primary left without slots, we should turn into a
* replica of the new primary. */
if (slot_was_mine && n != myself && myself->numslots == 0 && server.cluster_allow_replica_migration) {
/* If replica migration is allowed, check if the primary of this shard
* loses its last slot and the shard becomes empty. In this case, we
* should turn into a replica of the new primary. */
if (server.cluster_allow_replica_migration && slot_was_mine && my_primary->numslots == 0) {
serverAssert(n != my_primary);
serverLog(LL_NOTICE,
"Lost my last slot during slot migration. Reconfiguring myself "
"as a replica of %.40s (%s) in shard %.40s",
n->name, n->human_nodename, n->shard_id);
/* `c` is the primary client if `myself` is a replica, prevent it
* from being freed by clusterSetPrimary. */
if (nodeIsReplica(myself)) protectClient(c);
/* We are migrating to a different shard that has a completely different
* replication history, so a full sync is required. */
clusterSetPrimary(n, 1, 1);
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
if (nodeIsReplica(myself)) unprotectClient(c);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
3 changes: 2 additions & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1671,7 +1671,7 @@ void freeClient(client *c) {
* some unexpected state, by checking its flags. */
if (server.primary && c->flag.primary) {
serverLog(LL_NOTICE, "Connection with primary lost.");
if (!(c->flag.protocol_error || c->flag.blocked)) {
if (!c->flag.dont_cache_primary && !(c->flag.protocol_error || c->flag.blocked)) {
c->flag.close_asap = 0;
c->flag.close_after_reply = 0;
replicationCachePrimary(c);
Expand Down Expand Up @@ -2560,6 +2560,7 @@ void freeSharedQueryBuf(void) {
*
* * DEBUG RELOAD and similar.
* * When a Lua script is in -BUSY state.
* * A cluster replica doing CLUSTER SETSLOT and doing migration.
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
*
* So the function will protect the client by doing two things:
*
Expand Down
17 changes: 9 additions & 8 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3740,6 +3740,15 @@ void replicationSetPrimary(char *ip, int port, int full_sync_required) {
sdsfree(server.primary_host);
server.primary_host = NULL;
if (server.primary) {
if (full_sync_required) {
/* If full sync is required, we add the dont_cache_primary flag.
* So in freeClient (or in freeClientAsync), we won't cache the
* primary client. Doing so increases this replica node's election
* rank (delay) and reduces its chance of winning the election.
* If a replica requiring a full sync wins the election, it will
* flush valid data in the shard, causing data loss. */
server.primary->flag.dont_cache_primary = 1;
}
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
freeClient(server.primary);
}
disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */
Expand Down Expand Up @@ -3768,14 +3777,6 @@ void replicationSetPrimary(char *ip, int port, int full_sync_required) {
replicationCachePrimaryUsingMyself();
}

/* If full sync is required, drop the cached primary. Doing so increases
* this replica node's election rank (delay) and reduces its chance of
* winning the election. If a replica requiring a full sync wins the
* election, it will flush valid data in the shard, causing data loss. */
if (full_sync_required) {
replicationDiscardCachedPrimary();
}

/* Fire the role change modules event. */
moduleFireServerEvent(VALKEYMODULE_EVENT_REPLICATION_ROLE_CHANGED, VALKEYMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
NULL);
Expand Down
5 changes: 4 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,10 @@ typedef struct ClientFlags {
* By using this flag, we ensure that the RDB client remains intact until the replica
* \ has successfully initiated PSYNC. */
uint64_t repl_rdb_channel : 1; /* Dual channel replication sync: track a connection which is used for rdb snapshot */
uint64_t reserved : 7; /* Reserved for future use */
uint64_t dont_cache_primary : 1; /* In some cases we don't want to cache the primary. For example, the replica
* knows that it does not need the cache and required a full sync. With this
* flag, we won't cache the primary in freeClient. */
uint64_t reserved : 6; /* Reserved for future use */
} ClientFlags;

typedef struct client {
Expand Down
25 changes: 23 additions & 2 deletions tests/unit/cluster/replica-migration.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,17 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout
test_sub_replica "sigstop"
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test "valkey-cli make source node ignores NOREPLICAS error when doing the last CLUSTER SETSLOT" {
proc test_cluster_setslot {type} {
test "valkey-cli make source node ignores NOREPLICAS error when doing the last CLUSTER SETSLOT - $type" {
R 3 config set cluster-allow-replica-migration no
R 7 config set cluster-allow-replica-migration yes

if {$type == "setslot"} {
# Make R 7 drop the PING message so that we have a higher
# chance to trigger the migration from CLUSTER SETSLOT.
R 7 DEBUG DROP-CLUSTER-PACKET-FILTER 1
}

# Move slot 0 from primary 3 to primary 0.
set addr "[srv 0 host]:[srv 0 port]"
set myid [R 3 CLUSTER MYID]
Expand All @@ -349,6 +355,13 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout
fail "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result"
}

# Wait for R 3 to report that it is an empty replica (cluster-allow-replica-migration no)
wait_for_log_messages -3 {"*I am now an empty primary*"} 0 1000 50

if {$type == "setslot"} {
R 7 DEBUG DROP-CLUSTER-PACKET-FILTER -1
}

# Make sure server 3 lost its replica (server 7) and server 7 becomes a replica of primary 0.
wait_for_condition 1000 50 {
[s -3 role] eq {master} &&
Expand All @@ -361,4 +374,12 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout
fail "Server 3 and 7 role response has not changed"
}
}
}

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test_cluster_setslot "gossip"
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test_cluster_setslot "setslot"
} my_slot_allocation cluster_allocate_replicas ;# start_cluster
Loading