Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slot migration improvement #245

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
c11b65d
Slot migration improvement
PingXie Apr 6, 2024
03196a4
Add missing changes
PingXie Apr 12, 2024
9b95b1e
Improve test reliability
PingXie Apr 22, 2024
f4e84d0
Update src/valkey-cli.c
PingXie Apr 22, 2024
fac9a06
Update src/valkey-cli.c
PingXie Apr 22, 2024
2d004b3
Consolidate cluster topology update to clusterUpdateSlotsConfigWith
PingXie Apr 28, 2024
4cef2b4
Fix a typo and move gossip packet validation logic into a separate fu…
PingXie Apr 28, 2024
10944e6
Fix build builds and flaky tests
PingXie Apr 28, 2024
907de8c
Reintroduce server-initiated wait and remove the `REPLICAONLY` flag for
PingXie Apr 29, 2024
58108ea
Merge branch 'valkey-io:unstable' into slot_migration
PingXie Apr 30, 2024
844c3a9
Add a timeout argument to `CLUSTER SETSLOT`
PingXie May 1, 2024
1aa0585
Fix a memory corruption bug
PingXie May 1, 2024
f910041
Restore CMD_MAY_REPLICATE
PingXie May 1, 2024
dc97cc3
Reduce memory allocations
PingXie May 1, 2024
ecfd3b0
Incorporate review feedback
PingXie May 2, 2024
aa54049
Update src/commands/cluster-setslot.json
PingXie May 2, 2024
abcf439
Initial PR outlining the governance for the project (#345)
madolson Apr 30, 2024
f7e7339
Modify mem_freed variable in evict.c and Update debug.c (#376)
Virusuki Apr 30, 2024
b6799bd
Delete unused declaration (#400)
lipzhu May 1, 2024
76f5eff
Don't include config.h from serverassert.h (#404)
zuiderkwast May 1, 2024
2446994
CRC64 perf improvements from Redis patches (#350)
josiahcarlson May 1, 2024
5bd8e05
Rename redis in aof logs and proc title redis-aof-rewrite to valkey-a…
Shivshankar-Reddy May 1, 2024
758ecd8
Changed links and naming to valkey instead of redis (#389)
simkusr May 2, 2024
a51b146
Document the commands JSON files (#403)
zuiderkwast May 2, 2024
f36330a
Fix typo in comment in quicklist.h (#416)
eltociear May 2, 2024
4f55dc2
update commands.def
PingXie May 2, 2024
4ddf884
Merge branch 'valkey-io:unstable' into slot_migration
PingXie May 2, 2024
e42cb94
Improve code readability
PingXie May 3, 2024
ad5c7f8
Incorporate review feedback
PingXie May 3, 2024
11fc143
Reintroduce a proper timeout to help stablize the cluster
PingXie May 6, 2024
a9fe543
update code comment
PingXie May 6, 2024
8afc34c
Bump the config epoch when taking over "orphaned" slots
PingXie May 6, 2024
6f459da
Merge branch 'valkey-io:unstable' into slot_migration
PingXie May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ Makefile.dep
compile_commands.json
redis.code-workspace
.cache
.cscope.*
PingXie marked this conversation as resolved.
Show resolved Hide resolved
35 changes: 23 additions & 12 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) {
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF ||
c->bstate.btype == BLOCKED_WAIT_PREREPL) {
unblockClientWaitingReplicas(c);
} else if (c->bstate.btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
Expand All @@ -203,7 +204,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {

/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) {
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN &&
c->bstate.btype != BLOCKED_WAIT_PREREPL) {
freeClientOriginalArgv(c);
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
Expand Down Expand Up @@ -241,6 +243,8 @@ void replyToBlockedClientTimedOut(client *c) {
addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c, 0);
} else if (c->bstate.btype == BLOCKED_WAIT_PREREPL) {
addReplyErrorObject(c, shared.noreplicaserr);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
Expand Down Expand Up @@ -598,23 +602,30 @@ static void handleClientsBlockedOnKey(readyList *rl) {
}
}

/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
/* block a client for replica acknowledgement */
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int btype, int numlocal) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
listAddNodeHead(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAIT);
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks, c);
blockClient(c, btype);
}

/* block a client due to pre-replication */
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT_PREREPL, 0);
c->flags |= CLIENT_PENDING_COMMAND;
}

/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need two separate blocked states, can we instead just use the CLIENT_PENDING_COMMAND flags?

Copy link
Member Author

@PingXie PingXie May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CLIENT_PENDING_COMMAND is not a blocked state. that said, I do feel that we have a proliferation of blocked/wait states, like wait, waitaof, and now wait_prerepl. let me see if there is a way to improve this in the next PR. let's leave this comment open.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I'm saying we already have the notion that a client is blocked with a pending command, that can get re-executed once the blocking is done. (That is how the normal command blocking works). The blocking mechanic is functionally the same as the other wait (wait for a replack from k replicas then unblock).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. there is definitely room for improvement. filed #427 to track this improvement.

}

/* block a client due to waitaof command */
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAITAOF);
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAITAOF, numlocal);
}

/* Postpone client from executing a command. For example the server might be busy
Expand Down
1 change: 1 addition & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ char *clusterNodeHostname(clusterNode *node);
const char *clusterNodePreferredEndpoint(clusterNode *n);
long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length);
void clusterReplicateOpenSlots(void);

/* functions with shared implementations */
clusterNode *getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
Expand Down
949 changes: 672 additions & 277 deletions src/cluster_legacy.c

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef struct clusterLink {
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED 1024 /* This node supports extensions. */
#define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"

#define nodeIsMaster(n) ((n)->flags & CLUSTER_NODE_MASTER)
#define nodeIsSlave(n) ((n)->flags & CLUSTER_NODE_SLAVE)
#define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE)
#define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR))
Expand Down
7 changes: 5 additions & 2 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,9 @@ struct COMMAND_ARG CLUSTER_SET_CONFIG_EPOCH_Args[] = {

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLUSTER SETSLOT history */
#define CLUSTER_SETSLOT_History NULL
commandHistory CLUSTER_SETSLOT_History[] = {
{"8.0.0","Added the `TIMEOUT` option."},
};
#endif

#ifndef SKIP_CMD_TIPS_TABLE
Expand All @@ -876,6 +878,7 @@ struct COMMAND_ARG CLUSTER_SETSLOT_subcommand_Subargs[] = {
struct COMMAND_ARG CLUSTER_SETSLOT_Args[] = {
{MAKE_ARG("slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("subcommand",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,4,NULL),.subargs=CLUSTER_SETSLOT_subcommand_Subargs},
{MAKE_ARG("timeout",ARG_TYPE_INTEGER,-1,"TIMEOUT",NULL,"8.0.0",CMD_ARG_OPTIONAL,0,NULL),.display_text="timeout"},
};

/********** CLUSTER SHARDS ********************/
Expand Down Expand Up @@ -969,7 +972,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("reset","Resets a node.","O(N) where N is the number of known nodes. The command may execute a FLUSHALL as a side effect.","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_RESET_History,0,CLUSTER_RESET_Tips,0,clusterCommand,-2,CMD_ADMIN|CMD_STALE|CMD_NOSCRIPT,0,CLUSTER_RESET_Keyspecs,0,NULL,1),.args=CLUSTER_RESET_Args},
{MAKE_CMD("saveconfig","Forces a node to save the cluster configuration to disk.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SAVECONFIG_History,0,CLUSTER_SAVECONFIG_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SAVECONFIG_Keyspecs,0,NULL,0)},
{MAKE_CMD("set-config-epoch","Sets the configuration epoch for a new node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SET_CONFIG_EPOCH_History,0,CLUSTER_SET_CONFIG_EPOCH_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SET_CONFIG_EPOCH_Keyspecs,0,NULL,1),.args=CLUSTER_SET_CONFIG_EPOCH_Args},
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,0,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,1,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE|CMD_MAY_REPLICATE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,3),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)},
{MAKE_CMD("slaves","Lists the replica nodes of a master node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args},
{MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER SHARDS`","7.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)},
Expand Down
19 changes: 17 additions & 2 deletions src/commands/cluster-setslot.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@
"arity": -4,
"container": "CLUSTER",
"function": "clusterCommand",
"command_flags": [
"history": [
[
"8.0.0",
"Added the `TIMEOUT` option."
]
],
"command_flags": [
"NO_ASYNC_LOADING",
"ADMIN",
"STALE"
"STALE",
"MAY_REPLICATE"
],
"arguments": [
{
PingXie marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -45,6 +52,14 @@
"token": "STABLE"
}
]
},
{
"name": "timeout",
"display": "timeout",
"type": "integer",
"token": "TIMEOUT",
"optional": true,
"since": "8.0.0"
}
],
"reply_schema": {
Expand Down
3 changes: 1 addition & 2 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -873,8 +873,7 @@ NULL
server.aof_flush_sleep = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc >= 3) {
replicationFeedSlaves(server.slaves, -1,
c->argv + 2, c->argc - 2);
replicationFeedSlaves(-1, c->argv + 2, c->argc - 2);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"error") && c->argc == 3) {
sds errstr = sdsnewlen("-",1);
Expand Down
2 changes: 1 addition & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2086,7 +2086,7 @@ void resetClient(client *c) {
c->multibulklen = 0;
c->bulklen = -1;
c->slot = -1;
c->flags &= ~CLIENT_EXECUTING_COMMAND;
c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_PREREPL_DONE);

/* Make sure the duration has been recorded to some command. */
serverAssert(c->duration == 0);
Expand Down
2 changes: 1 addition & 1 deletion src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3310,7 +3310,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
robj *argv[2];
argv[0] = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
argv[1] = &keyobj;
replicationFeedSlaves(server.slaves,dbid,argv,2);
replicationFeedSlaves(dbid,argv,2);
}
sdsfree(key);
decrRefCount(val);
Expand Down
19 changes: 12 additions & 7 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ void feedReplicationBuffer(char *s, size_t len) {
* received by our clients in order to create the replication stream.
* Instead if the instance is a replica and has sub-replicas attached, we use
* replicationFeedStreamFromMasterStream() */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
void replicationFeedSlaves(int dictid, robj **argv, int argc) {
int j, len;
char llstr[LONG_STR_SIZE];

Expand All @@ -451,7 +451,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {

/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) {
if (server.repl_backlog == NULL && listLength(server.slaves) == 0) {
/* We increment the repl_offset anyway, since we use that for tracking AOF fsyncs
* even when there's no replication active. This code will not be reached if AOF
* is also disabled. */
Expand All @@ -460,7 +460,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
}

/* We can't have slaves attached and no backlog. */
serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
serverAssert(!(listLength(server.slaves) != 0 && server.repl_backlog == NULL));

/* Must install write handler for all replicas first before feeding
* replication stream. */
Expand Down Expand Up @@ -1313,6 +1313,9 @@ int replicaPutOnline(client *slave) {
NULL);
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
replicationGetSlaveName(slave));

/* Replicate slot being migrated/imported to the new replica */
clusterReplicateOpenSlots();
return 1;
}

Expand Down Expand Up @@ -3619,8 +3622,8 @@ void unblockClientWaitingReplicas(client *c) {
updateStatsOnUnblock(c, 0, 0, 0);
}

/* Check if there are clients blocked in WAIT or WAITAOF that can be unblocked
* since we received enough ACKs from slaves. */
/* Check if there are clients blocked in WAIT, WAITAOF, or WAIT_PREREPL
* that can be unblocked since we received enough ACKs from replicas. */
void processClientsWaitingReplicas(void) {
long long last_offset = 0;
long long last_aof_offset = 0;
Expand All @@ -3637,6 +3640,7 @@ void processClientsWaitingReplicas(void) {

client *c = ln->value;
int is_wait_aof = c->bstate.btype == BLOCKED_WAITAOF;
int is_wait_prerepl = c->bstate.btype == BLOCKED_WAIT_PREREPL;
PingXie marked this conversation as resolved.
Show resolved Hide resolved

if (is_wait_aof && c->bstate.numlocal && !server.aof_enabled) {
addReplyError(c, "WAITAOF cannot be used when numlocal is set but appendonly is disabled.");
Expand Down Expand Up @@ -3686,6 +3690,8 @@ void processClientsWaitingReplicas(void) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, numlocal);
addReplyLongLong(c, numreplicas);
} else if (is_wait_prerepl) {
c->flags |= CLIENT_PREREPL_DONE;
} else {
addReplyLongLong(c, numreplicas);
}
Expand Down Expand Up @@ -3788,8 +3794,7 @@ void replicationCron(void) {

if (!manual_failover_in_progress) {
ping_argv[0] = shared.ping;
replicationFeedSlaves(server.slaves, -1,
ping_argv, 1);
replicationFeedSlaves(-1, ping_argv, 1);
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1612,7 +1612,7 @@ static void sendGetackToReplicas(void) {
argv[0] = shared.replconf;
argv[1] = shared.getack;
argv[2] = shared.special_asterick; /* Not used argument. */
replicationFeedSlaves(server.slaves, -1, argv, 3);
replicationFeedSlaves(-1, argv, 3);
}

extern int ProcessingEventsWhileBlocked;
Expand Down Expand Up @@ -1999,6 +1999,10 @@ void createSharedObjects(void) {
shared.special_asterick = createStringObject("*",1);
shared.special_equals = createStringObject("=",1);
shared.redacted = makeObjectShared(createStringObject("(redacted)",10));
shared.cluster = createStringObject("CLUSTER", 7);
shared.setslot = createStringObject("SETSLOT", 7);
shared.importing = createStringObject("IMPORTING", 9);
shared.migrating = createStringObject("MIGRATING", 9);

for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] =
Expand Down Expand Up @@ -3314,7 +3318,7 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) {
if (server.aof_state != AOF_OFF && target & PROPAGATE_AOF)
feedAppendOnlyFile(dbid,argv,argc);
if (target & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
replicationFeedSlaves(dbid,argv,argc);
}

/* Used inside commands to schedule the propagation of additional commands
Expand Down
8 changes: 6 additions & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */
#define CLIENT_PREREPL_DONE (1ULL<<51) /* Indicate that pre-replication has been done on the client */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
Expand All @@ -415,6 +416,7 @@ typedef enum blocking_type {
BLOCKED_ZSET, /* BZPOP et al. */
BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */
BLOCKED_SHUTDOWN, /* SHUTDOWN. */
BLOCKED_WAIT_PREREPL, /* WAIT for pre-replication and then run the command. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the words pre-replication is shown in several places, but I do not understand what it is (sorry missed your previous pr), Can you add a more detail explanations here so everyone can know its meaning after some time later, Thanks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will address in the follow up PR

BLOCKED_NUM, /* Number of blocked states. */
BLOCKED_END /* End of enumeration */
} blocking_type;
Expand Down Expand Up @@ -1334,7 +1336,7 @@ struct sharedObjectsStruct {
*time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread,
*lastid, *ping, *setid, *keepttl, *load, *createconsumer,
*getack, *special_asterick, *special_equals, *default_username, *redacted,
*ssubscribebulk,*sunsubscribebulk, *smessagebulk,
*ssubscribebulk,*sunsubscribebulk, *smessagebulk, *cluster, *setslot, *importing, *migrating,
*select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
Expand Down Expand Up @@ -2820,7 +2822,7 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout);
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);

/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedSlaves(int dictid, robj **argv, int argc);
void replicationFeedStreamFromMasterStream(char *buf, size_t buflen);
void resetReplicationBuffer(void);
void feedReplicationBuffer(char *buf, size_t len);
Expand Down Expand Up @@ -3433,7 +3435,9 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
void blockClientShutdown(client *c);
void blockPostponeClient(client *c);
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas);
void replicationRequestAckFromSlaves(void);
void signalDeletedKeyAsReady(serverDb *db, robj *key, int type);
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);
void scanDatabaseForDeletedKeys(serverDb *emptied, serverDb *replaced_with);
Expand Down
5 changes: 0 additions & 5 deletions tests/cluster/tests/20-half-migrated-slot.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
# 4. migration is half finished on "migrating" node
# 5. migration is half finished on "importing" node

# TODO: Test is currently disabled until it is stabilized (fixing the test
# itself or real issues in the server).

if {false} {
source "../tests/includes/init-tests.tcl"
source "../tests/includes/utils.tcl"

Expand Down Expand Up @@ -95,4 +91,3 @@ test "Half-finish importing" {
}

config_set_all_nodes cluster-allow-replica-migration yes
}
6 changes: 0 additions & 6 deletions tests/cluster/tests/21-many-slot-migration.tcl
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
# Tests for many simultaneous migrations.

# TODO: Test is currently disabled until it is stabilized (fixing the test
# itself or real issues in the server).

if {false} {
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved

source "../tests/includes/init-tests.tcl"
source "../tests/includes/utils.tcl"

Expand Down Expand Up @@ -61,4 +56,3 @@ test "Keys are accessible" {
}

config_set_all_nodes cluster-allow-replica-migration yes
}
11 changes: 4 additions & 7 deletions tests/unit/cluster/cli.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,10 @@ test {Migrate the last slot away from a node using valkey-cli} {
catch { $newnode_r get foo } e
assert_equal "MOVED $slot $owner_host:$owner_port" $e

# Check that the empty node has turned itself into a replica of the new
# owner and that the new owner knows that.
wait_for_condition 1000 50 {
[string match "*slave*" [$owner_r CLUSTER REPLICAS $owner_id]]
} else {
fail "Empty node didn't turn itself into a replica."
}
# Check that the now empty primary node doesn't turn itself into
# a replica of any other nodes
wait_for_cluster_propagation
assert_match *master* [$owner_r role]
}
}

Expand Down
Loading
Loading