Skip to content

Commit

Permalink
Reintroduce server-initiated wait and remove the REPLICAONLY flag for
Browse files Browse the repository at this point in the history
`CLUSTER SETSLOT NODE`

Signed-off-by: Ping Xie <pingxie@google.com>
  • Loading branch information
PingXie committed Apr 29, 2024
1 parent 10944e6 commit 907de8c
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 205 deletions.
35 changes: 23 additions & 12 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) {
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF ||
c->bstate.btype == BLOCKED_WAIT_PREREPL) {
unblockClientWaitingReplicas(c);
} else if (c->bstate.btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
Expand All @@ -202,7 +203,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {

/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) {
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN &&
c->bstate.btype != BLOCKED_WAIT_PREREPL) {
freeClientOriginalArgv(c);
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
Expand Down Expand Up @@ -240,6 +242,8 @@ void replyToBlockedClientTimedOut(client *c) {
addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c, 0);
} else if (c->bstate.btype == BLOCKED_WAIT_PREREPL) {
addReplyErrorObject(c, shared.noreplicaserr);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
Expand Down Expand Up @@ -597,23 +601,30 @@ static void handleClientsBlockedOnKey(readyList *rl) {
}
}

/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
/* block a client for replica acknowledgement */
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int blockType, int numlocal) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
listAddNodeHead(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAIT);
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks, c);
blockClient(c, blockType);
}

/* block a client due to pre-replication */
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT_PREREPL, 0);
c->flags |= CLIENT_PENDING_COMMAND;
}

/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT, 0);
}

/* block a client due to waitaof command */
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAITAOF);
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAITAOF, numlocal);
}

/* Postpone client from executing a command. For example the server might be busy
Expand Down
252 changes: 120 additions & 132 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -5229,6 +5229,7 @@ int clusterAddSlot(clusterNode *n, int slot) {
if (server.cluster->slots[slot]) return C_ERR;
clusterNodeSetSlotBit(n,slot);
server.cluster->slots[slot] = n;
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
return C_OK;
}

Expand Down Expand Up @@ -6333,8 +6334,10 @@ int clusterCommandSpecial(client *c) {
/* SETSLOT 10 NODE <node ID> */
int slot;
clusterNode *n;
int replSetSlot = nodeIsMaster(myself);
int reply = 1;

/* Allow primaries to replicate "CLUSTER SETSLOT" */
/* Allow primaries to replicate "CLUSTER SETSLOT" */
if (!(c->flags & CLIENT_MASTER) && nodeIsSlave(myself)) {
addReplyError(c,"Please use SETSLOT only with masters.");
return 1;
Expand All @@ -6358,6 +6361,11 @@ int clusterCommandSpecial(client *c) {
addReplyError(c,"Target node is not a master");
return 1;
}
serverLog(LL_NOTICE,
"Migrating slot %d to node %.40s (%.s)",
slot,
n->name,
n->human_nodename);
server.cluster->migrating_slots_to[slot] = n;
} else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
if (server.cluster->slots[slot] == myself) {
Expand All @@ -6375,9 +6383,15 @@ int clusterCommandSpecial(client *c) {
addReplyError(c,"Target node is not a master");
return 1;
}
serverLog(LL_NOTICE,
"Importing slot %d from node %.40s (%s)",
slot,
n->name,
n->human_nodename);
server.cluster->importing_slots_from[slot] = n;
} else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
/* CLUSTER SETSLOT <SLOT> STABLE */
serverLog(LL_NOTICE, "Marking slot %d stable", slot);
server.cluster->importing_slots_from[slot] = NULL;
server.cluster->migrating_slots_to[slot] = NULL;
} else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
Expand All @@ -6403,154 +6417,127 @@ int clusterCommandSpecial(client *c) {
}
}

serverLog(LL_NOTICE, "Assigning slot %d to node %.40s (%s) in shard %.40s",
slot,
n->name,
n->human_nodename,
n->shard_id);

/* If this slot is in migrating status but we have no keys
* for it assigning the slot to another node will clear
* the migrating status. */
if (countKeysInSlot(slot) == 0 &&
server.cluster->migrating_slots_to[slot])
server.cluster->migrating_slots_to[slot] = NULL;

int slot_was_mine = server.cluster->slots[slot] == myself;
clusterDelSlot(slot);
clusterAddSlot(n,slot);

/* If we are a master left without slots, we should turn into a
* replica of the new master. */
if (slot_was_mine &&
n != myself &&
myself->numslots == 0 &&
server.cluster_allow_replica_migration) {
serverLog(LL_NOTICE,
"Lost my last slot during slot migration. Reconfiguring myself "
"as a replica of %.40s (%s) in shard %.40s",
n->name,
n->human_nodename,
n->shard_id);
clusterSetMaster(n, 1);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
CLUSTER_TODO_UPDATE_STATE |
CLUSTER_TODO_FSYNC_CONFIG);
}

/* If this node or this node's primary was importing this slot,
* assigning the slot to itself also clears the importing status. */
if ((n == myself || n == myself->slaveof) &&
server.cluster->importing_slots_from[slot])
{
server.cluster->importing_slots_from[slot] = NULL;

/* Only primary broadcasts the updates */
if (n == myself) {
/* This slot was manually migrated, set this node configEpoch
* to a new epoch so that the new version can be propagated
* by the cluster.
*
* Note that if this ever results in a collision with another
* node getting the same configEpoch, for example because a
* failover happens at the same time we close the slot, the
* configEpoch collision resolution will fix it assigning
* a different epoch to each node. */
if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
serverLog(LL_NOTICE,
"ConfigEpoch updated after importing slot %d",
slot);
}
/* After importing this slot, let the other nodes know as
* soon as possible. */
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
}
}
} else if (!strcasecmp(c->argv[3]->ptr,"node") &&
c->argc == 6 &&
!strcasecmp(c->argv[5]->ptr,"replicaonly"))
{
/* CLUSTER SETSLOT <SLOT> NODE <NODE ID> REPLICAONLY */

/* When finalizing the slot, there is a possibility that the
* target node B sends a cluster PONG to the source node A
* destination node B sends cluster PONG to the source node A
* before SETSLOT has been replicated to B'. If B crashes here,
* B' will be in an importing state and the slot will have no
* owner. To help mitigate this issue, we added a new SETSLOT
* command variant that takes a special marker token called
* "REPLICAONLY". This command is a no-op on the primary. It
* simply replicates asynchronously the command without the
* "REPLICAONLY" marker to the replicas, if there exist any.
* The caller is expected to wait for this asynchronous
* replication to complete using the "WAIT" command.
*
* With the help of this command, we finalize the slots
* on the replicas before the primary in the following
* sequence, where A is the source primary and B is the target
* primary:
* B' will be in importing state and the slot will have no owner.
* To help mitigate this issue, we enforce the following order
* for slot migration finalization such that the replicas will
* finalize the slot ownership before this primary:
*
* 1. Client C issues SETSLOT n NODE B REPLICAONLY against
* node B
* 2. Node B replicates SETSLOT n NODE B to all of its replicas,
* such as B', B'', etc
* 3. Client C then issues WAIT <num_replicas> <timeout> for
* a number of B's replicas of C's choice to complete the
* finalization
* 4. On successful WAIT completion, Client C executes SETSLOT
* n NODE B against node B but without the "REPLICAONLY"
* marker this time, which completes the slot finalization
* on node B
*
* The following steps can happen in parallel:
* 1. Client C issues SETSLOT n NODE B against node B
* 2. Node B replicates SETSLOT n NODE B to all of its
* replicas, such as B', B'', etc
* 3. On replication completion, node B executes SETSLOT
* n NODE B and returns control back to client C
* 4. The following steps can happen in parallel
* a. Client C issues SETSLOT n NODE B against node A
* b. Node B gossips its new slot ownership to the cluster,
* including A, A', etc */
* b. node B gossips its new slot ownership to the cluster
* including A, A', etc
*
* Where A is the source primary and B is the destination primary. */
int replDone = (c->flags & CLIENT_INTERNAL_PREREPL_DONE) != 0;
int syncReplRequired = (server.cluster->importing_slots_from[slot] != NULL) &&
(nodeIsMaster(myself) != 0) &&
(myself->numslaves != 0);


/* Slot states must be updated on replicas first before being
* applied to the primary. This is controlled via the retry
* flag */
if (replDone || !syncReplRequired) {
serverLog(LL_NOTICE, "Assigning slot %d to node %.40s (%s) in shard %.40s",
slot,
n->name,
n->human_nodename,
n->shard_id);

n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
/* If this slot is in migrating status but we have no keys
* for it assigning the slot to another node will clear
* the migrating status. */
if (countKeysInSlot(slot) == 0 &&
server.cluster->migrating_slots_to[slot])
server.cluster->migrating_slots_to[slot] = NULL;

int slot_was_mine = server.cluster->slots[slot] == myself;
clusterDelSlot(slot);
clusterAddSlot(n, slot);

/* If we are a master left without slots, we should turn into a
* replica of the new master. */
if (slot_was_mine &&
n != myself &&
myself->numslots == 0 &&
server.cluster_allow_replica_migration) {
serverLog(LL_NOTICE,
"Lost my last slot during slot migration. Reconfiguring myself "
"as a replica of %.40s (%s) in shard %.40s",
n->name,
n->human_nodename,
n->shard_id);
clusterSetMaster(n, 1);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
CLUSTER_TODO_UPDATE_STATE |
CLUSTER_TODO_FSYNC_CONFIG);
}

if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[4]->ptr);
return 1;
}
if (nodeIsSlave(n)) {
addReplyError(c,"Target node is not a master");
return 1;
}
/* If this hash slot was served by 'myself' before to switch
* make sure there are no longer local keys for this hash slot. */
if (server.cluster->slots[slot] == myself && n != myself) {
if (countKeysInSlot(slot) != 0) {
addReplyErrorFormat(c,
"Can't assign hashslot %d to a different node "
"while I still hold keys for this hash slot.", slot);
return 1;
/* If this node or this node's primary was importing this slot,
* assigning the slot to itself also clears the importing status. */
if ((n == myself || n == myself->slaveof) &&
server.cluster->importing_slots_from[slot])
{
server.cluster->importing_slots_from[slot] = NULL;

/* Only primary broadcasts the updates */
if (n == myself) {
/* This slot was manually migrated, set this node configEpoch
* to a new epoch so that the new version can be propagated
* by the cluster.
*
* Note that if this ever results in a collision with another
* node getting the same configEpoch, for example because a
* failover happens at the same time we close the slot, the
* configEpoch collision resolution will fix it assigning
* a different epoch to each node. */
if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
serverLog(LL_NOTICE,
"ConfigEpoch updated after importing slot %d",
slot);
}
/* After importing this slot, let the other nodes know as
* soon as possible. */
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
}
}
}
if (server.cluster->importing_slots_from[slot] == NULL) {
addReplyError(c,"Slot is not open for importing");
return 1;
}
if (myself->numslaves == 0) {
addReplyError(c,"Target node has no replicas");
return 1;
}

/* Remove the last "REPLICAONLY" token so the command
* can be applied as the real "SETSLOT" command on the
* replicas. */
serverAssert(c->argc == 6);
rewriteClientCommandVector(c, 5, c->argv[0], c->argv[1], c->argv[2], c->argv[3], c->argv[4]);
/* Don't replicate again if setslot has been replicated */
replSetSlot &= !replDone;
} else {
/* We are a primary and this is the first time we see this "setslot"
* command. Force-replicate the setslot command to all of our replicas
* first and only on success will we handle the command.
* Note that
* 1. All replicas are expected to ack the replication within 1000ms
* 2. The repl offset target is set to the master's current repl offset + 1.
* There is no concern of partial replication because replicas always
* ack the repl offset at the command boundary. */
blockForPreReplication(c, mstime()+1000, server.master_repl_offset+1, myself->numslaves);
replicationRequestAckFromSlaves();

/* Don't reply to the client yet */
reply = 0;
}
} else {
addReplyError(c,
"Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
return 1;
}

/* Force-replicate "CLUSTER SETSLOT" */
if (nodeIsMaster(myself)) forceCommandPropagation(c, PROPAGATE_REPL);
if (replSetSlot) forceCommandPropagation(c, PROPAGATE_REPL);

clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
addReply(c,shared.ok);
if (reply) addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
/* CLUSTER BUMPEPOCH */
int retval = clusterBumpConfigEpochWithoutConsensus();
Expand Down Expand Up @@ -6623,6 +6610,7 @@ int clusterCommandSpecial(client *c) {

/* Set the master. */
clusterSetMaster(n, 1);
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
Expand Down
2 changes: 1 addition & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2082,7 +2082,7 @@ void resetClient(client *c) {
c->multibulklen = 0;
c->bulklen = -1;
c->slot = -1;
c->flags &= ~CLIENT_EXECUTING_COMMAND;
c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_INTERNAL_PREREPL_DONE);

/* Make sure the duration has been recorded to some command. */
serverAssert(c->duration == 0);
Expand Down
Loading

0 comments on commit 907de8c

Please sign in to comment.