Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slot migration improvement #445

Merged
merged 2 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) {
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF ||
c->bstate.btype == BLOCKED_WAIT_PREREPL) {
unblockClientWaitingReplicas(c);
} else if (c->bstate.btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
Expand All @@ -203,7 +204,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {

/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) {
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN &&
c->bstate.btype != BLOCKED_WAIT_PREREPL) {
freeClientOriginalArgv(c);
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
Expand Down Expand Up @@ -241,6 +243,8 @@ void replyToBlockedClientTimedOut(client *c) {
addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c, 0);
} else if (c->bstate.btype == BLOCKED_WAIT_PREREPL) {
addReplyErrorObject(c, shared.noreplicaserr);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
Expand Down Expand Up @@ -598,23 +602,30 @@ static void handleClientsBlockedOnKey(readyList *rl) {
}
}

/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
/* block a client for replica acknowledgement */
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int btype, int numlocal) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
listAddNodeHead(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAIT);
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks, c);
blockClient(c, btype);
}

/* block a client due to pre-replication */
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT_PREREPL, 0);
c->flags |= CLIENT_PENDING_COMMAND;
}

/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT, 0);
}

/* block a client due to waitaof command */
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAITAOF);
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAITAOF, numlocal);
}

/* Postpone client from executing a command. For example the server might be busy
Expand Down
1 change: 1 addition & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ char *clusterNodeHostname(clusterNode *node);
const char *clusterNodePreferredEndpoint(clusterNode *n);
long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length);
void clusterReplicateOpenSlots(void);

/* functions with shared implementations */
clusterNode *getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
Expand Down
949 changes: 672 additions & 277 deletions src/cluster_legacy.c

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef struct clusterLink {
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED 1024 /* This node supports extensions. */
#define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"

#define nodeIsMaster(n) ((n)->flags & CLUSTER_NODE_MASTER)
#define nodeIsSlave(n) ((n)->flags & CLUSTER_NODE_SLAVE)
#define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE)
#define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR))
Expand Down
7 changes: 5 additions & 2 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,9 @@ struct COMMAND_ARG CLUSTER_SET_CONFIG_EPOCH_Args[] = {

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLUSTER SETSLOT history */
#define CLUSTER_SETSLOT_History NULL
commandHistory CLUSTER_SETSLOT_History[] = {
{"8.0.0","Added the `TIMEOUT` option."},
};
#endif

#ifndef SKIP_CMD_TIPS_TABLE
Expand All @@ -876,6 +878,7 @@ struct COMMAND_ARG CLUSTER_SETSLOT_subcommand_Subargs[] = {
struct COMMAND_ARG CLUSTER_SETSLOT_Args[] = {
{MAKE_ARG("slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("subcommand",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,4,NULL),.subargs=CLUSTER_SETSLOT_subcommand_Subargs},
{MAKE_ARG("timeout",ARG_TYPE_INTEGER,-1,"TIMEOUT",NULL,"8.0.0",CMD_ARG_OPTIONAL,0,NULL),.display_text="timeout"},
};

/********** CLUSTER SHARDS ********************/
Expand Down Expand Up @@ -969,7 +972,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("reset","Resets a node.","O(N) where N is the number of known nodes. The command may execute a FLUSHALL as a side effect.","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_RESET_History,0,CLUSTER_RESET_Tips,0,clusterCommand,-2,CMD_ADMIN|CMD_STALE|CMD_NOSCRIPT,0,CLUSTER_RESET_Keyspecs,0,NULL,1),.args=CLUSTER_RESET_Args},
{MAKE_CMD("saveconfig","Forces a node to save the cluster configuration to disk.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SAVECONFIG_History,0,CLUSTER_SAVECONFIG_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SAVECONFIG_Keyspecs,0,NULL,0)},
{MAKE_CMD("set-config-epoch","Sets the configuration epoch for a new node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SET_CONFIG_EPOCH_History,0,CLUSTER_SET_CONFIG_EPOCH_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SET_CONFIG_EPOCH_Keyspecs,0,NULL,1),.args=CLUSTER_SET_CONFIG_EPOCH_Args},
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,0,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,1,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE|CMD_MAY_REPLICATE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,3),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)},
{MAKE_CMD("slaves","Lists the replica nodes of a master node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args},
{MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER SHARDS`","7.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)},
Expand Down
19 changes: 17 additions & 2 deletions src/commands/cluster-setslot.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@
"arity": -4,
"container": "CLUSTER",
"function": "clusterCommand",
"command_flags": [
"history": [
[
"8.0.0",
"Added the `TIMEOUT` option."
]
],
"command_flags": [
"NO_ASYNC_LOADING",
"ADMIN",
"STALE"
"STALE",
"MAY_REPLICATE"
],
"arguments": [
{
Expand Down Expand Up @@ -45,6 +52,14 @@
"token": "STABLE"
}
]
},
{
"name": "timeout",
"display": "timeout",
"type": "integer",
"token": "TIMEOUT",
"optional": true,
"since": "8.0.0"
}
],
"reply_schema": {
Expand Down
3 changes: 1 addition & 2 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -873,8 +873,7 @@ NULL
server.aof_flush_sleep = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc >= 3) {
replicationFeedSlaves(server.slaves, -1,
c->argv + 2, c->argc - 2);
replicationFeedSlaves(-1, c->argv + 2, c->argc - 2);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"error") && c->argc == 3) {
sds errstr = sdsnewlen("-",1);
Expand Down
2 changes: 1 addition & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2086,7 +2086,7 @@ void resetClient(client *c) {
c->multibulklen = 0;
c->bulklen = -1;
c->slot = -1;
c->flags &= ~CLIENT_EXECUTING_COMMAND;
c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_PREREPL_DONE);

/* Make sure the duration has been recorded to some command. */
serverAssert(c->duration == 0);
Expand Down
2 changes: 1 addition & 1 deletion src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3310,7 +3310,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
robj *argv[2];
argv[0] = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
argv[1] = &keyobj;
replicationFeedSlaves(server.slaves,dbid,argv,2);
replicationFeedSlaves(dbid,argv,2);
}
sdsfree(key);
decrRefCount(val);
Expand Down
19 changes: 12 additions & 7 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ void feedReplicationBuffer(char *s, size_t len) {
* received by our clients in order to create the replication stream.
* Instead if the instance is a replica and has sub-replicas attached, we use
* replicationFeedStreamFromMasterStream() */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
void replicationFeedSlaves(int dictid, robj **argv, int argc) {
int j, len;
char llstr[LONG_STR_SIZE];

Expand All @@ -451,7 +451,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {

/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) {
if (server.repl_backlog == NULL && listLength(server.slaves) == 0) {
/* We increment the repl_offset anyway, since we use that for tracking AOF fsyncs
* even when there's no replication active. This code will not be reached if AOF
* is also disabled. */
Expand All @@ -460,7 +460,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
}

/* We can't have slaves attached and no backlog. */
serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
serverAssert(!(listLength(server.slaves) != 0 && server.repl_backlog == NULL));

/* Must install write handler for all replicas first before feeding
* replication stream. */
Expand Down Expand Up @@ -1313,6 +1313,9 @@ int replicaPutOnline(client *slave) {
NULL);
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
replicationGetSlaveName(slave));

/* Replicate slot being migrated/imported to the new replica */
clusterReplicateOpenSlots();
return 1;
}

Expand Down Expand Up @@ -3619,8 +3622,8 @@ void unblockClientWaitingReplicas(client *c) {
updateStatsOnUnblock(c, 0, 0, 0);
}

/* Check if there are clients blocked in WAIT or WAITAOF that can be unblocked
* since we received enough ACKs from slaves. */
/* Check if there are clients blocked in WAIT, WAITAOF, or WAIT_PREREPL
* that can be unblocked since we received enough ACKs from replicas. */
void processClientsWaitingReplicas(void) {
long long last_offset = 0;
long long last_aof_offset = 0;
Expand All @@ -3637,6 +3640,7 @@ void processClientsWaitingReplicas(void) {

client *c = ln->value;
int is_wait_aof = c->bstate.btype == BLOCKED_WAITAOF;
int is_wait_prerepl = c->bstate.btype == BLOCKED_WAIT_PREREPL;

if (is_wait_aof && c->bstate.numlocal && !server.aof_enabled) {
addReplyError(c, "WAITAOF cannot be used when numlocal is set but appendonly is disabled.");
Expand Down Expand Up @@ -3686,6 +3690,8 @@ void processClientsWaitingReplicas(void) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, numlocal);
addReplyLongLong(c, numreplicas);
} else if (is_wait_prerepl) {
c->flags |= CLIENT_PREREPL_DONE;
} else {
addReplyLongLong(c, numreplicas);
}
Expand Down Expand Up @@ -3788,8 +3794,7 @@ void replicationCron(void) {

if (!manual_failover_in_progress) {
ping_argv[0] = shared.ping;
replicationFeedSlaves(server.slaves, -1,
ping_argv, 1);
replicationFeedSlaves(-1, ping_argv, 1);
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1612,7 +1612,7 @@ static void sendGetackToReplicas(void) {
argv[0] = shared.replconf;
argv[1] = shared.getack;
argv[2] = shared.special_asterick; /* Not used argument. */
replicationFeedSlaves(server.slaves, -1, argv, 3);
replicationFeedSlaves(-1, argv, 3);
}

extern int ProcessingEventsWhileBlocked;
Expand Down Expand Up @@ -1999,6 +1999,10 @@ void createSharedObjects(void) {
shared.special_asterick = createStringObject("*",1);
shared.special_equals = createStringObject("=",1);
shared.redacted = makeObjectShared(createStringObject("(redacted)",10));
shared.cluster = createStringObject("CLUSTER", 7);
shared.setslot = createStringObject("SETSLOT", 7);
shared.importing = createStringObject("IMPORTING", 9);
shared.migrating = createStringObject("MIGRATING", 9);

for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] =
Expand Down Expand Up @@ -3314,7 +3318,7 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) {
if (server.aof_state != AOF_OFF && target & PROPAGATE_AOF)
feedAppendOnlyFile(dbid,argv,argc);
if (target & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
replicationFeedSlaves(dbid,argv,argc);
}

/* Used inside commands to schedule the propagation of additional commands
Expand Down
8 changes: 6 additions & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */
#define CLIENT_PREREPL_DONE (1ULL<<51) /* Indicate that pre-replication has been done on the client */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
Expand All @@ -415,6 +416,7 @@ typedef enum blocking_type {
BLOCKED_ZSET, /* BZPOP et al. */
BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */
BLOCKED_SHUTDOWN, /* SHUTDOWN. */
BLOCKED_WAIT_PREREPL, /* WAIT for pre-replication and then run the command. */
BLOCKED_NUM, /* Number of blocked states. */
BLOCKED_END /* End of enumeration */
} blocking_type;
Expand Down Expand Up @@ -1334,7 +1336,7 @@ struct sharedObjectsStruct {
*time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread,
*lastid, *ping, *setid, *keepttl, *load, *createconsumer,
*getack, *special_asterick, *special_equals, *default_username, *redacted,
*ssubscribebulk,*sunsubscribebulk, *smessagebulk,
*ssubscribebulk,*sunsubscribebulk, *smessagebulk, *cluster, *setslot, *importing, *migrating,
*select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
Expand Down Expand Up @@ -2820,7 +2822,7 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout);
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);

/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedSlaves(int dictid, robj **argv, int argc);
void replicationFeedStreamFromMasterStream(char *buf, size_t buflen);
void resetReplicationBuffer(void);
void feedReplicationBuffer(char *buf, size_t len);
Expand Down Expand Up @@ -3433,7 +3435,9 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
void blockClientShutdown(client *c);
void blockPostponeClient(client *c);
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas);
void replicationRequestAckFromSlaves(void);
void signalDeletedKeyAsReady(serverDb *db, robj *key, int type);
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);
void scanDatabaseForDeletedKeys(serverDb *emptied, serverDb *replaced_with);
Expand Down
5 changes: 0 additions & 5 deletions tests/cluster/tests/20-half-migrated-slot.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
# 4. migration is half finished on "migrating" node
# 5. migration is half finished on "importing" node

# TODO: Test is currently disabled until it is stabilized (fixing the test
# itself or real issues in the server).

if {false} {
source "../tests/includes/init-tests.tcl"
source "../tests/includes/utils.tcl"

Expand Down Expand Up @@ -95,4 +91,3 @@ test "Half-finish importing" {
}

config_set_all_nodes cluster-allow-replica-migration yes
}
6 changes: 0 additions & 6 deletions tests/cluster/tests/21-many-slot-migration.tcl
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
# Tests for many simultaneous migrations.

# TODO: Test is currently disabled until it is stabilized (fixing the test
# itself or real issues in the server).

if {false} {

source "../tests/includes/init-tests.tcl"
source "../tests/includes/utils.tcl"

Expand Down Expand Up @@ -61,4 +56,3 @@ test "Keys are accessible" {
}

config_set_all_nodes cluster-allow-replica-migration yes
}
11 changes: 4 additions & 7 deletions tests/unit/cluster/cli.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,10 @@ test {Migrate the last slot away from a node using valkey-cli} {
catch { $newnode_r get foo } e
assert_equal "MOVED $slot $owner_host:$owner_port" $e

# Check that the empty node has turned itself into a replica of the new
# owner and that the new owner knows that.
wait_for_condition 1000 50 {
[string match "*slave*" [$owner_r CLUSTER REPLICAS $owner_id]]
} else {
fail "Empty node didn't turn itself into a replica."
}
# Check that the now empty primary node doesn't turn itself into
# a replica of any other nodes
wait_for_cluster_propagation
assert_match *master* [$owner_r role]
}
}

Expand Down
6 changes: 3 additions & 3 deletions tests/unit/cluster/hostnames.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,18 @@ test "Verify the nodes configured with prefer hostname only show hostname for ne
# to accept our isolated nodes connections. At this point they will
# start showing up in cluster slots.
wait_for_condition 50 100 {
[llength [R 6 CLUSTER SLOTS]] eq 2
[llength [R 6 CLUSTER SLOTS]] eq 3
} else {
fail "Node did not learn about the 2 shards it can talk to"
}
wait_for_condition 50 100 {
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 0 2 3] 1] eq "shard-1.com"
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-1.com"
} else {
fail "hostname for shard-1 didn't reach node 6"
}

wait_for_condition 50 100 {
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-2.com"
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 2 2 3] 1] eq "shard-2.com"
} else {
fail "hostname for shard-2 didn't reach node 6"
}
Expand Down
Loading
Loading