Skip to content

Commit

Permalink
Merge remote-tracking branch 'valkey/unstable' into dict-refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
PingXie committed Sep 13, 2024
2 parents 8d1d84e + dcc7678 commit 229a473
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 16 deletions.
18 changes: 14 additions & 4 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6224,6 +6224,9 @@ int clusterParseSetSlotCommand(client *c, int *slot_out, clusterNode **node_out,
return 0;
}

/* If 'myself' is a replica, 'c' must be the primary client. */
serverAssert(!nodeIsReplica(myself) || c == server.primary);

if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return 0;

if (!strcasecmp(c->argv[3]->ptr, "migrating") && c->argc >= 5) {
Expand Down Expand Up @@ -6412,20 +6415,27 @@ void clusterCommandSetSlot(client *c) {
server.cluster->migrating_slots_to[slot] = NULL;
}

int slot_was_mine = server.cluster->slots[slot] == myself;
clusterNode *my_primary = clusterNodeGetPrimary(myself);
int slot_was_mine = server.cluster->slots[slot] == my_primary;
clusterDelSlot(slot);
clusterAddSlot(n, slot);

/* If we are a primary left without slots, we should turn into a
* replica of the new primary. */
if (slot_was_mine && n != myself && myself->numslots == 0 && server.cluster_allow_replica_migration) {
/* If replica migration is allowed, check if the primary of this shard
* loses its last slot and the shard becomes empty. In this case, we
* should turn into a replica of the new primary. */
if (server.cluster_allow_replica_migration && slot_was_mine && my_primary->numslots == 0) {
serverAssert(n != my_primary);
serverLog(LL_NOTICE,
"Lost my last slot during slot migration. Reconfiguring myself "
"as a replica of %.40s (%s) in shard %.40s",
n->name, n->human_nodename, n->shard_id);
/* `c` is the primary client if `myself` is a replica, prevent it
* from being freed by clusterSetPrimary. */
if (nodeIsReplica(myself)) protectClient(c);
/* We are migrating to a different shard that has a completely different
* replication history, so a full sync is required. */
clusterSetPrimary(n, 1, 1);
if (nodeIsReplica(myself)) unprotectClient(c);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

Expand Down
7 changes: 6 additions & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1671,7 +1671,7 @@ void freeClient(client *c) {
* some unexpected state, by checking its flags. */
if (server.primary && c->flag.primary) {
serverLog(LL_NOTICE, "Connection with primary lost.");
if (!(c->flag.protocol_error || c->flag.blocked)) {
if (!c->flag.dont_cache_primary && !(c->flag.protocol_error || c->flag.blocked)) {
c->flag.close_asap = 0;
c->flag.close_after_reply = 0;
replicationCachePrimary(c);
Expand Down Expand Up @@ -2553,6 +2553,7 @@ void freeSharedQueryBuf(void) {
*
* * DEBUG RELOAD and similar.
* * When a Lua script is in -BUSY state.
* * A cluster replica executing CLUSTER SETSLOT during slot migration.
*
* So the function will protect the client by doing two things:
*
Expand Down Expand Up @@ -3485,6 +3486,10 @@ void clientCommand(client *c) {
const char *help[] = {
"CACHING (YES|NO)",
" Enable/disable tracking of the keys for next command in OPTIN/OPTOUT modes.",
"CAPA <option> [options...]",
" The client claims its some capability options. Options are:",
" * REDIRECT",
" The client can handle redirection during primary and replica failover in standalone mode.",
"GETREDIR",
" Return the client ID we are redirecting to when tracking is enabled.",
"GETNAME",
Expand Down
14 changes: 6 additions & 8 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3740,6 +3740,12 @@ void replicationSetPrimary(char *ip, int port, int full_sync_required) {
sdsfree(server.primary_host);
server.primary_host = NULL;
if (server.primary) {
/* When joining 'myself' to a new primary, set the dont_cache_primary flag
* if a full sync is required. This happens when 'myself' was previously
* part of a different shard from the new primary. Since 'myself' does not
* have the replication history of the shard it is joining, clearing the
* cached primary is necessary to ensure proper replication behavior. */
server.primary->flag.dont_cache_primary = full_sync_required;
freeClient(server.primary);
}
disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */
Expand Down Expand Up @@ -3768,14 +3774,6 @@ void replicationSetPrimary(char *ip, int port, int full_sync_required) {
replicationCachePrimaryUsingMyself();
}

/* If full sync is required, drop the cached primary. Doing so increases
* this replica node's election rank (delay) and reduces its chance of
* winning the election. If a replica requiring a full sync wins the
* election, it will flush valid data in the shard, causing data loss. */
if (full_sync_required) {
replicationDiscardCachedPrimary();
}

/* Fire the role change modules event. */
moduleFireServerEvent(VALKEYMODULE_EVENT_REPLICATION_ROLE_CHANGED, VALKEYMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
NULL);
Expand Down
5 changes: 4 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,10 @@ typedef struct ClientFlags {
* By using this flag, we ensure that the RDB client remains intact until the replica
* \ has successfully initiated PSYNC. */
uint64_t repl_rdb_channel : 1; /* Dual channel replication sync: track a connection which is used for rdb snapshot */
uint64_t reserved : 7; /* Reserved for future use */
uint64_t dont_cache_primary : 1; /* In some cases we don't want to cache the primary. For example, the replica
* knows that it does not need the cache and required a full sync. With this
* flag, we won't cache the primary in freeClient. */
uint64_t reserved : 6; /* Reserved for future use */
} ClientFlags;

typedef struct client {
Expand Down
25 changes: 23 additions & 2 deletions tests/unit/cluster/replica-migration.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,17 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout
test_sub_replica "sigstop"
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test "valkey-cli make source node ignores NOREPLICAS error when doing the last CLUSTER SETSLOT" {
proc test_cluster_setslot {type} {
test "valkey-cli make source node ignores NOREPLICAS error when doing the last CLUSTER SETSLOT - $type" {
R 3 config set cluster-allow-replica-migration no
R 7 config set cluster-allow-replica-migration yes

if {$type == "setslot"} {
# Make R 7 drop the PING message so that we have a higher
# chance to trigger the migration from CLUSTER SETSLOT.
R 7 DEBUG DROP-CLUSTER-PACKET-FILTER 1
}

# Move slot 0 from primary 3 to primary 0.
set addr "[srv 0 host]:[srv 0 port]"
set myid [R 3 CLUSTER MYID]
Expand All @@ -366,6 +372,13 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout
fail "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result"
}

# Wait for R 3 to report that it is an empty replica (cluster-allow-replica-migration no)
wait_for_log_messages -3 {"*I am now an empty primary*"} 0 1000 50

if {$type == "setslot"} {
R 7 DEBUG DROP-CLUSTER-PACKET-FILTER -1
}

# Make sure server 3 lost its replica (server 7) and server 7 becomes a replica of primary 0.
wait_for_condition 1000 50 {
[s -3 role] eq {master} &&
Expand All @@ -378,4 +391,12 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout
fail "Server 3 and 7 role response has not changed"
}
}
}

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test_cluster_setslot "gossip"
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test_cluster_setslot "setslot"
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

0 comments on commit 229a473

Please sign in to comment.