Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache CLUSTER SLOTS response for improving throughput and reduced latency. #53

Merged
merged 18 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 69 additions & 34 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1312,24 +1312,6 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
return 0;
}

/* Returns an indication if the replica node is fully available
* and should be listed in CLUSTER SLOTS response.
* Returns 1 for available nodes, 0 for nodes that have
* not finished their initial sync, in failed state, or are
* otherwise considered not available to serve read commands. */
static int isReplicaAvailable(clusterNode *node) {
if (clusterNodeIsFailing(node)) {
return 0;
}
long long repl_offset = clusterNodeReplOffset(node);
if (clusterNodeIsMyself(node)) {
/* Nodes do not update their own information
* in the cluster node list. */
repl_offset = replicationGetSlaveOffset();
}
return (repl_offset != 0);
}

void addNodeToNodeReply(client *c, clusterNode *node) {
char* hostname = clusterNodeHostname(node);
addReplyArrayLen(c, 4);
Expand Down Expand Up @@ -1381,10 +1363,28 @@ void addNodeToNodeReply(client *c, clusterNode *node) {
serverAssert(length == 0);
}

/* Returns an indication if the node is fully available
* and should be listed in CLUSTER SLOTS response.
* Returns 1 for available nodes, 0 for nodes that have
* not finished their initial sync, in failed state, or are
* otherwise considered not available to serve read commands. */
int isNodeAvailable(clusterNode *node) {
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
if (clusterNodeIsFailing(node)) {
return 0;
}
long long repl_offset = clusterNodeReplOffset(node);
if (clusterNodeIsMyself(node)) {
/* Nodes do not update their own information
* in the cluster node list. */
repl_offset = replicationGetSlaveOffset();
}
return (repl_offset != 0);
}

void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
int i, nested_elements = 3; /* slots (2) + master addr (1) */
for (i = 0; i < clusterNodeNumSlaves(node); i++) {
if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue;
if (!isNodeAvailable(clusterNodeGetSlave(node, i))) continue;
nested_elements++;
}
addReplyArrayLen(c, nested_elements);
Expand All @@ -1396,27 +1396,27 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
for (i = 0; i < clusterNodeNumSlaves(node); i++) {
/* This loop is copy/pasted from clusterGenNodeDescription()
* with modifications for per-slot node aggregation. */
if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue;
if (!isNodeAvailable(clusterNodeGetSlave(node, i))) continue;
addNodeToNodeReply(c, clusterNodeGetSlave(node, i));
nested_elements--;
}
serverAssert(nested_elements == 3); /* Original 3 elements */
}

void clusterCommandSlots(client * c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
* 3) node ID
* ... continued until done
*/
void clearCachedClusterSlotsResponse(void) {
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
if (server.cached_cluster_slot_info[conn_type]) {
sdsfree(server.cached_cluster_slot_info[conn_type]);
server.cached_cluster_slot_info[conn_type] = NULL;
}
}
}

sds generateClusterSlotResponse(void) {
client *recording_client = createCachedResponseClient();
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved
clusterNode *n = NULL;
int num_masters = 0, start = -1;
void *slot_replylen = addReplyDeferredLen(c);
void *slot_replylen = addReplyDeferredLen(recording_client);

for (int i = 0; i <= CLUSTER_SLOTS; i++) {
/* Find start node and slot id. */
Expand All @@ -1430,14 +1430,49 @@ void clusterCommandSlots(client * c) {
/* Add cluster slots info when occur different node with start
* or end of slot. */
if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) {
addNodeReplyForClusterSlot(c, n, start, i-1);
addNodeReplyForClusterSlot(recording_client, n, start, i-1);
num_masters++;
if (i == CLUSTER_SLOTS) break;
n = getNodeBySlot(i);
start = i;
}
}
setDeferredArrayLen(c, slot_replylen, num_masters);
setDeferredArrayLen(recording_client, slot_replylen, num_masters);
return stopCaching(recording_client);
}

int verifyCachedClusterSlotsResponse(sds cached_response) {
sds generated_response = generateClusterSlotResponse();
int result = !sdscmp(generated_response, cached_response);
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved
if (!result) serverLog(LL_WARNING,"\ngenerated_response:\n%s\n\ncached_response:\n%s", generated_response, cached_response);
PingXie marked this conversation as resolved.
Show resolved Hide resolved
sdsfree(generated_response);
return result;
}

void clusterCommandSlots(client *c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
* 3) node ID
* ... continued until done
*/
connTypeForCaching conn_type = connIsTLS(c->conn);

if (detectAndUpdateCachedNodeHealth()) clearCachedClusterSlotsResponse();

sds cached_reply = server.cached_cluster_slot_info[conn_type];
if (!cached_reply) {
cached_reply = generateClusterSlotResponse();
server.cached_cluster_slot_info[conn_type] = cached_reply;
} else {
debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(cached_reply) == 1);
madolson marked this conversation as resolved.
Show resolved Hide resolved
}

addReplyProto(c, cached_reply, sdslen(cached_reply));
}

/* -----------------------------------------------------------------------------
Expand Down
5 changes: 5 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ const char *clusterNodePreferredEndpoint(clusterNode *n);
long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length);
void clusterReplicateOpenSlots(void);
int detectAndUpdateCachedNodeHealth(void);
client *createCachedResponseClient(void);
sds stopCaching(client *recording_client);
void clearCachedClusterSlotsResponse(void);

/* functions with shared implementations */
clusterNode *getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
Expand All @@ -116,4 +120,5 @@ int isValidAuxString(char *s, unsigned int length);
void migrateCommand(client *c);
void clusterCommand(client *c);
ConnectionType *connTypeOfCluster(void);
int isNodeAvailable(clusterNode *node);
#endif /* __CLUSTER_H */
58 changes: 49 additions & 9 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ void clusterUpdateMyselfIp(void) {
} else {
myself->ip[0] = '\0'; /* Force autodetection. */
}
clearCachedClusterSlotsResponse();
}
}

Expand All @@ -907,6 +908,7 @@ static void updateAnnouncedHostname(clusterNode *node, char *new) {
} else if (sdslen(node->hostname) != 0) {
sdsclear(node->hostname);
}
clearCachedClusterSlotsResponse();
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

Expand Down Expand Up @@ -1039,6 +1041,9 @@ void clusterInit(void) {

server.cluster->mf_end = 0;
server.cluster->mf_slave = NULL;
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
server.cached_cluster_slot_info[conn_type] = NULL;
}
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
resetManualFailover();
clusterUpdateMyselfFlags();
clusterUpdateMyselfIp();
Expand Down Expand Up @@ -1363,6 +1368,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->repl_offset_time = 0;
node->repl_offset = 0;
listSetFreeMethod(node->fail_reports,zfree);
node->is_node_healthy = 0;
return node;
}

Expand Down Expand Up @@ -1496,6 +1502,7 @@ int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
master->numslaves++;
qsort(master->slaves, master->numslaves, sizeof(clusterNode *), clusterNodeNameComparator);
master->flags |= CLUSTER_NODE_MIGRATE_TO;
clearCachedClusterSlotsResponse();
return C_OK;
}

Expand Down Expand Up @@ -1542,6 +1549,7 @@ void clusterAddNode(clusterNode *node) {
retval = dictAdd(server.cluster->nodes,
sdsnewlen(node->name,CLUSTER_NAMELEN), node);
serverAssert(retval == DICT_OK);
clearCachedClusterSlotsResponse();
}

/* Remove a node from the cluster. The function performs the high level
Expand Down Expand Up @@ -1583,6 +1591,7 @@ void clusterDelNode(clusterNode *delnode) {

/* 3) Remove the node from the owning shard */
clusterRemoveNodeFromShard(delnode);
clearCachedClusterSlotsResponse();

/* 4) Free the node, unlinking it from the cluster. */
freeClusterNode(delnode);
Expand Down Expand Up @@ -2202,6 +2211,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
node->tls_port = msg_tls_port;
node->cport = ntohs(g->cport);
node->flags &= ~CLUSTER_NODE_NOADDR;
clearCachedClusterSlotsResponse();
}
} else if (!node) {
/* If it's not in NOADDR state and we don't have it, we
Expand Down Expand Up @@ -2297,6 +2307,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
serverLog(LL_NOTICE,"Address updated for node %.40s (%s), now %s:%d",
node->name, node->human_nodename, node->ip, getNodeDefaultClientPort(node));

clearCachedClusterSlotsResponse();
/* Check if this is our master and we have to change the
* replication target as well. */
if (nodeIsSlave(myself) && myself->slaveof == node)
Expand All @@ -2318,6 +2329,7 @@ void clusterSetNodeAsMaster(clusterNode *n) {
n->flags |= CLUSTER_NODE_MASTER;
n->slaveof = NULL;

clearCachedClusterSlotsResponse();
/* Update config and state. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
Expand Down Expand Up @@ -2362,7 +2374,9 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
serverLog(LL_NOTICE,"Discarding UPDATE message about myself.");
return;
}


madolson marked this conversation as resolved.
Show resolved Hide resolved
clearCachedClusterSlotsResponse();

for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(slots,j)) {
sender_slots++;
Expand Down Expand Up @@ -3050,6 +3064,7 @@ int clusterProcessPacket(clusterLink *link) {
strcmp(ip,myself->ip))
{
memcpy(myself->ip,ip,NET_IP_STR_LEN);
clearCachedClusterSlotsResponse();
serverLog(LL_NOTICE,"IP address for this node updated to %s",
myself->ip);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
Expand Down Expand Up @@ -3135,6 +3150,7 @@ int clusterProcessPacket(clusterLink *link) {
link->node->cport = 0;
freeClusterLink(link);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
clearCachedClusterSlotsResponse();
return 0;
}
}
Expand Down Expand Up @@ -4449,6 +4465,7 @@ void clusterFailoverReplaceYourMaster(void) {
/* 3) Update state and save config. */
clusterUpdateState();
clusterSaveConfigOrDie(1);
clearCachedClusterSlotsResponse();

/* 4) Pong all the other nodes so that they can update the state
* accordingly and detect that we switched to master role. */
Expand Down Expand Up @@ -4503,6 +4520,7 @@ void clusterHandleSlaveFailover(void) {
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
return;
}
clearCachedClusterSlotsResponse();

/* Set data_age to the number of milliseconds we are disconnected from
* the master. */
Expand Down Expand Up @@ -5233,6 +5251,7 @@ int clusterAddSlot(clusterNode *n, int slot) {
if (server.cluster->slots[slot]) return C_ERR;
clusterNodeSetSlotBit(n,slot);
server.cluster->slots[slot] = n;
clearCachedClusterSlotsResponse();
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
return C_OK;
}
Expand All @@ -5252,6 +5271,7 @@ int clusterDelSlot(int slot) {
server.cluster->slots[slot] = NULL;
/* Make owner_not_claiming_slot flag consistent with slot ownership information. */
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
clearCachedClusterSlotsResponse();
return C_OK;
}

Expand Down Expand Up @@ -5862,6 +5882,14 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
}
}

long long getNodeOffset(clusterNode *node) {
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved
if (node->flags & CLUSTER_NODE_MYSELF) {
return nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else {
return node->repl_offset;
}
}

/* Add detailed information of a node to the output buffer of the given client. */
void addNodeDetailsToShardReply(client *c, clusterNode *node) {
int reply_count = 0;
Expand Down Expand Up @@ -5896,12 +5924,7 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
reply_count++;
}

long long node_offset;
if (node->flags & CLUSTER_NODE_MYSELF) {
node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else {
node_offset = node->repl_offset;
}
long long node_offset = getNodeOffset(node);

addReplyBulkCString(c, "role");
addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master");
Expand Down Expand Up @@ -6890,9 +6913,26 @@ void clusterPromoteSelfToMaster(void) {
replicationUnsetMaster();
}

int detectAndUpdateCachedNodeHealth(void) {
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
dictIterator di;
dictInitSafeIterator(&di, server.cluster->nodes);
dictEntry *de;
clusterNode *node;
int overall_health_changed = 0;
while((de = dictNext(&di)) != NULL) {
node = dictGetVal(de);
int present_is_node_healthy = isNodeAvailable(node);
if (present_is_node_healthy != node->is_node_healthy) {
overall_health_changed = 1;
madolson marked this conversation as resolved.
Show resolved Hide resolved
node->is_node_healthy = present_is_node_healthy;
}
}

return overall_health_changed;
}

/* Replicate migrating and importing slot states to all replicas */
void clusterReplicateOpenSlots(void)
{
void clusterReplicateOpenSlots(void) {
if (!server.cluster_enabled) return;

int argc = 5;
Expand Down
2 changes: 2 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ struct _clusterNode {
clusterLink *link; /* TCP/IP link established toward this node */
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
list *fail_reports; /* List of nodes signaling this as failing */
int is_node_healthy; /* Boolean last updated node health used for validating
cached response, can be stale. Update by calling detectAndUpdateCachedNodeHealth() */
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved
};

struct clusterState {
Expand Down
8 changes: 7 additions & 1 deletion src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2586,6 +2586,12 @@ static int updateOOMScoreAdj(const char **err) {
return 1;
}

int invalidateClusterSlotsResp(const char **err) {
UNUSED(err);
clearCachedClusterSlotsResponse();
return 1;
}

int updateRequirePass(const char **err) {
UNUSED(err);
/* The old "requirepass" directive just translates to setting
Expand Down Expand Up @@ -3162,7 +3168,7 @@ standardConfig static_configs[] = {
createEnumConfig("enable-protected-configs", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_protected_configs, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("enable-debug-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_debug_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("enable-module-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_module_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, NULL),
createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, invalidateClusterSlotsResp),
madolson marked this conversation as resolved.
Show resolved Hide resolved
createEnumConfig("propagation-error-behavior", NULL, MODIFIABLE_CONFIG, propagation_error_behavior_enum, server.propagation_error_behavior, PROPAGATION_ERR_BEHAVIOR_IGNORE, NULL, NULL),
createEnumConfig("shutdown-on-sigint", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigint, 0, isValidShutdownOnSigFlags, NULL),
createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL),
Expand Down
Loading
Loading