Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache CLUSTER SLOTS response for improving throughput and reduced latency. #53

Merged
merged 18 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 72 additions & 34 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1312,24 +1312,6 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
return 0;
}

/* Returns an indication if the replica node is fully available
* and should be listed in CLUSTER SLOTS response.
* Returns 1 for available nodes, 0 for nodes that have
* not finished their initial sync, in failed state, or are
* otherwise considered not available to serve read commands. */
static int isReplicaAvailable(clusterNode *node) {
if (clusterNodeIsFailing(node)) {
return 0;
}
long long repl_offset = clusterNodeReplOffset(node);
if (clusterNodeIsMyself(node)) {
/* Nodes do not update their own information
* in the cluster node list. */
repl_offset = replicationGetSlaveOffset();
}
return (repl_offset != 0);
}

void addNodeToNodeReply(client *c, clusterNode *node) {
char* hostname = clusterNodeHostname(node);
addReplyArrayLen(c, 4);
Expand Down Expand Up @@ -1381,10 +1363,28 @@ void addNodeToNodeReply(client *c, clusterNode *node) {
serverAssert(length == 0);
}

/* Returns an indication if the node is fully available
* and should be listed in CLUSTER SLOTS response.
* Returns 1 for available nodes, 0 for nodes that have
* not finished their initial sync, in failed state, or are
* otherwise considered not available to serve read commands. */
int isNodeAvailable(clusterNode *node) {
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
if (clusterNodeIsFailing(node)) {
return 0;
}
long long repl_offset = clusterNodeReplOffset(node);
if (clusterNodeIsMyself(node)) {
/* Nodes do not update their own information
* in the cluster node list. */
repl_offset = getNodeReplicationOffset(node);
}
return (repl_offset != 0);
}

void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
int i, nested_elements = 3; /* slots (2) + master addr (1) */
for (i = 0; i < clusterNodeNumSlaves(node); i++) {
if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue;
if (!isNodeAvailable(clusterNodeGetSlave(node, i))) continue;
nested_elements++;
}
addReplyArrayLen(c, nested_elements);
Expand All @@ -1396,27 +1396,27 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
for (i = 0; i < clusterNodeNumSlaves(node); i++) {
/* This loop is copy/pasted from clusterGenNodeDescription()
* with modifications for per-slot node aggregation. */
if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue;
if (!isNodeAvailable(clusterNodeGetSlave(node, i))) continue;
addNodeToNodeReply(c, clusterNodeGetSlave(node, i));
nested_elements--;
}
serverAssert(nested_elements == 3); /* Original 3 elements */
}

void clusterCommandSlots(client * c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
* 3) node ID
* ... continued until done
*/
void clearCachedClusterSlotsResponse(void) {
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
if (server.cached_cluster_slot_info[conn_type]) {
sdsfree(server.cached_cluster_slot_info[conn_type]);
server.cached_cluster_slot_info[conn_type] = NULL;
}
}
}

sds generateClusterSlotResponse(void) {
client *recording_client = createCachedResponseClient();
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved
clusterNode *n = NULL;
int num_masters = 0, start = -1;
void *slot_replylen = addReplyDeferredLen(c);
void *slot_replylen = addReplyDeferredLen(recording_client);

for (int i = 0; i <= CLUSTER_SLOTS; i++) {
/* Find start node and slot id. */
Expand All @@ -1430,14 +1430,52 @@ void clusterCommandSlots(client * c) {
/* Add cluster slots info when occur different node with start
* or end of slot. */
if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) {
addNodeReplyForClusterSlot(c, n, start, i-1);
addNodeReplyForClusterSlot(recording_client, n, start, i-1);
num_masters++;
if (i == CLUSTER_SLOTS) break;
n = getNodeBySlot(i);
start = i;
}
}
setDeferredArrayLen(c, slot_replylen, num_masters);
setDeferredArrayLen(recording_client, slot_replylen, num_masters);
sds cluster_slot_response = aggregateClientOutputBuffer(recording_client);
deleteCachedResponseClient(recording_client);
return cluster_slot_response;
}

int verifyCachedClusterSlotsResponse(sds cached_response) {
sds generated_response = generateClusterSlotResponse();
int is_equal = !sdscmp(generated_response, cached_response);
/* Here, we use LL_WARNING so this gets printed when debug assertions are enabled and the system is about to crash. */
if (!is_equal) serverLog(LL_WARNING,"\ngenerated_response:\n%s\n\ncached_response:\n%s", generated_response, cached_response);
sdsfree(generated_response);
return is_equal;
}

void clusterCommandSlots(client *c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
* 3) node ID
* ... continued until done
*/
connTypeForCaching conn_type = connIsTLS(c->conn);

if (detectAndUpdateCachedNodeHealth()) clearCachedClusterSlotsResponse();

sds cached_reply = server.cached_cluster_slot_info[conn_type];
if (!cached_reply) {
cached_reply = generateClusterSlotResponse();
server.cached_cluster_slot_info[conn_type] = cached_reply;
} else {
debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(cached_reply) == 1);
madolson marked this conversation as resolved.
Show resolved Hide resolved
}

addReplyProto(c, cached_reply, sdslen(cached_reply));
}

/* -----------------------------------------------------------------------------
Expand Down
7 changes: 7 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ const char *clusterNodePreferredEndpoint(clusterNode *n);
long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length);
void clusterReplicateOpenSlots(void);
int detectAndUpdateCachedNodeHealth(void);
client *createCachedResponseClient(void);
void deleteCachedResponseClient(client *recording_client);
void clearCachedClusterSlotsResponse(void);

/* functions with shared implementations */
clusterNode *getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
Expand All @@ -116,4 +120,7 @@ int isValidAuxString(char *s, unsigned int length);
void migrateCommand(client *c);
void clusterCommand(client *c);
ConnectionType *connTypeOfCluster(void);
int isNodeAvailable(clusterNode *node);
long long getNodeReplicationOffset(clusterNode *node);
sds aggregateClientOutputBuffer(client *c);
#endif /* __CLUSTER_H */
43 changes: 34 additions & 9 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ void clusterSaveConfigOrDie(int do_fsync) {
serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
exit(1);
}
clearCachedClusterSlotsResponse();
}

/* Lock the cluster config using flock(), and retain the file descriptor used to
Expand Down Expand Up @@ -1039,6 +1040,9 @@ void clusterInit(void) {

server.cluster->mf_end = 0;
server.cluster->mf_slave = NULL;
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
server.cached_cluster_slot_info[conn_type] = NULL;
}
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
resetManualFailover();
clusterUpdateMyselfFlags();
clusterUpdateMyselfIp();
Expand Down Expand Up @@ -1363,6 +1367,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->repl_offset_time = 0;
node->repl_offset = 0;
listSetFreeMethod(node->fail_reports,zfree);
node->is_node_healthy = 0;
return node;
}

Expand Down Expand Up @@ -2362,7 +2367,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
serverLog(LL_NOTICE,"Discarding UPDATE message about myself.");
return;
}

madolson marked this conversation as resolved.
Show resolved Hide resolved
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(slots,j)) {
sender_slots++;
Expand Down Expand Up @@ -5862,6 +5867,14 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
}
}

long long getNodeReplicationOffset(clusterNode *node) {
if (node->flags & CLUSTER_NODE_MYSELF) {
return nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else {
return node->repl_offset;
}
}

/* Add detailed information of a node to the output buffer of the given client. */
void addNodeDetailsToShardReply(client *c, clusterNode *node) {
int reply_count = 0;
Expand Down Expand Up @@ -5896,12 +5909,7 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
reply_count++;
}

long long node_offset;
if (node->flags & CLUSTER_NODE_MYSELF) {
node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else {
node_offset = node->repl_offset;
}
long long node_offset = getNodeReplicationOffset(node);

addReplyBulkCString(c, "role");
addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master");
Expand Down Expand Up @@ -6890,9 +6898,26 @@ void clusterPromoteSelfToMaster(void) {
replicationUnsetMaster();
}

int detectAndUpdateCachedNodeHealth(void) {
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
dictIterator di;
dictInitSafeIterator(&di, server.cluster->nodes);
dictEntry *de;
clusterNode *node;
int overall_health_changed = 0;
while((de = dictNext(&di)) != NULL) {
node = dictGetVal(de);
int present_is_node_healthy = isNodeAvailable(node);
if (present_is_node_healthy != node->is_node_healthy) {
overall_health_changed = 1;
madolson marked this conversation as resolved.
Show resolved Hide resolved
node->is_node_healthy = present_is_node_healthy;
}
}

return overall_health_changed;
}

/* Replicate migrating and importing slot states to all replicas */
void clusterReplicateOpenSlots(void)
{
void clusterReplicateOpenSlots(void) {
if (!server.cluster_enabled) return;

int argc = 5;
Expand Down
2 changes: 2 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ struct _clusterNode {
clusterLink *link; /* TCP/IP link established toward this node */
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
list *fail_reports; /* List of nodes signaling this as failing */
int is_node_healthy; /* Boolean indicating the cached node health.
Update with updateAndCountChangedNodeHealth(). */
};

struct clusterState {
Expand Down
9 changes: 8 additions & 1 deletion src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2586,6 +2586,12 @@ static int updateOOMScoreAdj(const char **err) {
return 1;
}

int invalidateClusterSlotsResp(const char **err) {
UNUSED(err);
clearCachedClusterSlotsResponse();
return 1;
}

int updateRequirePass(const char **err) {
UNUSED(err);
/* The old "requirepass" directive just translates to setting
Expand Down Expand Up @@ -2649,6 +2655,7 @@ int updateClusterFlags(const char **err) {
static int updateClusterAnnouncedPort(const char **err) {
UNUSED(err);
clusterUpdateMyselfAnnouncedPorts();
clearCachedClusterSlotsResponse();
return 1;
}

Expand Down Expand Up @@ -3162,7 +3169,7 @@ standardConfig static_configs[] = {
createEnumConfig("enable-protected-configs", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_protected_configs, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("enable-debug-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_debug_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("enable-module-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_module_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, NULL),
createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, invalidateClusterSlotsResp),
madolson marked this conversation as resolved.
Show resolved Hide resolved
createEnumConfig("propagation-error-behavior", NULL, MODIFIABLE_CONFIG, propagation_error_behavior_enum, server.propagation_error_behavior, PROPAGATION_ERR_BEHAVIOR_IGNORE, NULL, NULL),
createEnumConfig("shutdown-on-sigint", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigint, 0, isValidShutdownOnSigFlags, NULL),
createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL),
Expand Down
6 changes: 6 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ typedef enum {
#define CONN_TYPE_TLS "tls"
#define CONN_TYPE_MAX 8 /* 8 is enough to be extendable */

typedef enum connTypeForCaching {
CACHE_CONN_TCP,
CACHE_CONN_TLS,
CACHE_CONN_TYPE_MAX
} connTypeForCaching;

typedef void (*ConnectionCallbackFunc)(struct connection *conn);

typedef struct ConnectionType {
Expand Down
38 changes: 38 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,44 @@ int prepareClientToWrite(client *c) {
return C_OK;
}

/* Returns everything in the client reply linked list in a SDS format.
* This should only be used only with a caching client. */
madolson marked this conversation as resolved.
Show resolved Hide resolved
sds aggregateClientOutputBuffer(client *c) {
sds cmd_response = sdsempty();
listIter li;
listNode *ln;
clientReplyBlock *val_block;
listRewind(c->reply,&li);

/* Here, c->buf is not used, thus we confirm c->bufpos remains 0. */
serverAssert(c->bufpos == 0);
while ((ln = listNext(&li)) != NULL) {
val_block = (clientReplyBlock *)listNodeValue(ln);
cmd_response = sdscatlen(cmd_response, val_block->buf,val_block->used);
}
return cmd_response;
}

/* This function creates and returns a fake client for recording the command response
* to initiate caching of any command response.
*
* It needs be paired with `deleteCachedResponseClient` function to stop caching. */
client *createCachedResponseClient(void) {
struct client *recording_client = createClient(NULL);
/* Allocating the `conn` allows to prepare the caching client before adding
* data to the clients output buffer by `prepareClientToWrite`. */
recording_client->conn = zcalloc(sizeof(connection));
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved
return recording_client;
}

/* This function is used to stop caching of any command response after `createCachedResponseClient` is called.
* It returns the command response as SDS from the recording_client's reply buffer. */
void deleteCachedResponseClient(client *recording_client) {
zfree(recording_client->conn);
recording_client->conn = NULL;
freeClient(recording_client);
}

/* -----------------------------------------------------------------------------
* Low level functions to add more data to output buffers.
* -------------------------------------------------------------------------- */
Expand Down
3 changes: 3 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,7 @@ struct valkeyServer {
unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */
int cluster_drop_packet_filter; /* Debug config that allows tactically
* dropping packets of a specific type */
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX];
madolson marked this conversation as resolved.
Show resolved Hide resolved
/* Scripting */
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
int pre_command_oom_state; /* OOM before command (script?) was started */
Expand Down Expand Up @@ -2680,6 +2681,8 @@ void initThreadedIO(void);
client *lookupClientByID(uint64_t id);
int authRequired(client *c);
void putClientInPendingWriteQueue(client *c);
client *createCachedResponseClient(void);
void deleteCachedResponseClient(client *recording_client);

/* logreqres.c - logging of requests and responses */
void reqresReset(client *c, int free_buf);
Expand Down
2 changes: 1 addition & 1 deletion tests/cluster/tests/04-resharding.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ test "Cluster consistency during live resharding" {
} else {
fail "Resharding is not terminating after some time."
}

wait_for_cluster_propagation
}

test "Verify $numkeys keys for consistency with logical content" {
Expand Down
Loading
Loading