Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache CLUSTER SLOTS response for improving throughput and reduced latency. #53

Merged
merged 18 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 39 additions & 14 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1402,20 +1402,11 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
serverAssert(nested_elements == 3); /* Original 3 elements */
}

void clusterCommandSlots(client * c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
* 3) node ID
* ... continued until done
*/
sds generateClusterSlotResponse(void) {
client *recording_client = createCachedResponseClient();
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved
clusterNode *n = NULL;
int num_masters = 0, start = -1;
void *slot_replylen = addReplyDeferredLen(c);
void *slot_replylen = addReplyDeferredLen(recording_client);

for (int i = 0; i <= CLUSTER_SLOTS; i++) {
/* Find start node and slot id. */
Expand All @@ -1429,14 +1420,48 @@ void clusterCommandSlots(client * c) {
/* Add cluster slots info when occur different node with start
* or end of slot. */
if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) {
addNodeReplyForClusterSlot(c, n, start, i-1);
addNodeReplyForClusterSlot(recording_client, n, start, i-1);
num_masters++;
if (i == CLUSTER_SLOTS) break;
n = getNodeBySlot(i);
start = i;
}
}
setDeferredArrayLen(c, slot_replylen, num_masters);
setDeferredArrayLen(recording_client, slot_replylen, num_masters);
return stopCaching(recording_client);
}

int verifyCachedClusterSlotsResponse(sds cached_response) {
sds generated_response = generateClusterSlotResponse();
int result = !sdscmp(generated_response, cached_response);
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved
if (!result) serverLog(LL_NOTICE,"\ngenerated_response:\n%s\n\ncached_response:\n%s", generated_response, cached_response);
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved
sdsfree(generated_response);
return result;
}

void clusterCommandSlots(client * c) {
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
* 3) node ID
* ... continued until done
*/
enum connTypeForCaching conn_type = connIsTLS(c->conn);

/* Check if we have a response cached for cluster slots for early exit. */
updateNodesHealth();
if (isClusterSlotsResponseCached(conn_type)) {
debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(getClusterSlotReply(conn_type)) == 1);
addReplyProto(c, getClusterSlotReply(conn_type), sdslen(getClusterSlotReply(conn_type)));
return;
}

cacheSlotsResponse(generateClusterSlotResponse(), conn_type);
addReplyProto(c, getClusterSlotReply(conn_type), sdslen(getClusterSlotReply(conn_type)));
}

/* -----------------------------------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,10 @@ int isValidAuxString(char *s, unsigned int length);
void migrateCommand(client *c);
void clusterCommand(client *c);
ConnectionType *connTypeOfCluster(void);

int isClusterSlotsResponseCached(enum connTypeForCaching conn_type);
sds getClusterSlotReply(enum connTypeForCaching conn_type);
void clearCachedClusterSlotsResp(void);
void cacheSlotsResponse(sds response_to_cache, enum connTypeForCaching conn_type);
void updateNodesHealth(void);
#endif /* __CLUSTER_H */
91 changes: 84 additions & 7 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,31 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
void freeClusterLink(clusterLink *link);
int verifyClusterNodeId(const char *name, int length);

int isClusterSlotsResponseCached(enum connTypeForCaching conn_type) {
if (server.cluster->cached_cluster_slot_info[conn_type] &&
sdslen(server.cluster->cached_cluster_slot_info[conn_type])) {
madolson marked this conversation as resolved.
Show resolved Hide resolved
return 1;
}
return 0;
}

sds getClusterSlotReply(enum connTypeForCaching conn_type) {
return server.cluster->cached_cluster_slot_info[conn_type];
}
madolson marked this conversation as resolved.
Show resolved Hide resolved

void clearCachedClusterSlotsResp(void) {
for (enum connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
if (server.cluster->cached_cluster_slot_info[conn_type]) {
sdsfree(server.cluster->cached_cluster_slot_info[conn_type]);
server.cluster->cached_cluster_slot_info[conn_type] = NULL;
}
}
}

void cacheSlotsResponse(sds response_to_cache, enum connTypeForCaching conn_type) {
server.cluster->cached_cluster_slot_info[conn_type] = response_to_cache;
}

int getNodeDefaultClientPort(clusterNode *n) {
return server.tls_cluster ? n->tls_port : n->tcp_port;
}
Expand Down Expand Up @@ -492,6 +517,7 @@ int clusterLoadConfig(char *filename) {
}
*p = '\0';
memcpy(n->ip,aux_argv[0],strlen(aux_argv[0])+1);
clearCachedClusterSlotsResp();
char *port = p+1;
char *busp = strchr(port,'@');
if (busp) {
Expand Down Expand Up @@ -887,6 +913,7 @@ void clusterUpdateMyselfIp(void) {
} else {
myself->ip[0] = '\0'; /* Force autodetection. */
}
clearCachedClusterSlotsResp();
}
}

Expand All @@ -904,6 +931,7 @@ static void updateAnnouncedHostname(clusterNode *node, char *new) {
} else if (sdslen(node->hostname) != 0) {
sdsclear(node->hostname);
}
clearCachedClusterSlotsResp();
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

Expand Down Expand Up @@ -1028,6 +1056,9 @@ void clusterInit(void) {

server.cluster->mf_end = 0;
server.cluster->mf_slave = NULL;
for (enum connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
madolson marked this conversation as resolved.
Show resolved Hide resolved
server.cluster->cached_cluster_slot_info[conn_type] = NULL;
}
resetManualFailover();
clusterUpdateMyselfFlags();
clusterUpdateMyselfIp();
Expand Down Expand Up @@ -1352,6 +1383,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->repl_offset_time = 0;
node->repl_offset = 0;
listSetFreeMethod(node->fail_reports,zfree);
node->node_health = 0;
madolson marked this conversation as resolved.
Show resolved Hide resolved
return node;
}

Expand Down Expand Up @@ -1480,6 +1512,7 @@ int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
master->slaves[master->numslaves] = slave;
master->numslaves++;
master->flags |= CLUSTER_NODE_MIGRATE_TO;
clearCachedClusterSlotsResp();
return C_OK;
}

Expand Down Expand Up @@ -1526,6 +1559,7 @@ void clusterAddNode(clusterNode *node) {
retval = dictAdd(server.cluster->nodes,
sdsnewlen(node->name,CLUSTER_NAMELEN), node);
serverAssert(retval == DICT_OK);
clearCachedClusterSlotsResp();
}

/* Remove a node from the cluster. The function performs the high level
Expand Down Expand Up @@ -1567,6 +1601,7 @@ void clusterDelNode(clusterNode *delnode) {

/* 3) Remove the node from the owning shard */
clusterRemoveNodeFromShard(delnode);
clearCachedClusterSlotsResp();

/* 4) Free the node, unlinking it from the cluster. */
freeClusterNode(delnode);
Expand Down Expand Up @@ -2184,6 +2219,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
node->tls_port = msg_tls_port;
node->cport = ntohs(g->cport);
node->flags &= ~CLUSTER_NODE_NOADDR;
clearCachedClusterSlotsResp();
}
} else if (!node) {
/* If it's not in NOADDR state and we don't have it, we
Expand Down Expand Up @@ -2279,6 +2315,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
serverLog(LL_NOTICE,"Address updated for node %.40s (%s), now %s:%d",
node->name, node->human_nodename, node->ip, getNodeDefaultClientPort(node));

clearCachedClusterSlotsResp();
/* Check if this is our master and we have to change the
* replication target as well. */
if (nodeIsSlave(myself) && myself->slaveof == node)
Expand All @@ -2300,6 +2337,7 @@ void clusterSetNodeAsMaster(clusterNode *n) {
n->flags |= CLUSTER_NODE_MASTER;
n->slaveof = NULL;

clearCachedClusterSlotsResp();
/* Update config and state. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
Expand Down Expand Up @@ -2344,7 +2382,9 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
serverLog(LL_NOTICE,"Discarding UPDATE message about myself.");
return;
}


madolson marked this conversation as resolved.
Show resolved Hide resolved
clearCachedClusterSlotsResp();

for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(slots,j)) {
sender_slots++;
Expand Down Expand Up @@ -2851,6 +2891,7 @@ int clusterProcessPacket(clusterLink *link) {
strcmp(ip,myself->ip))
{
memcpy(myself->ip,ip,NET_IP_STR_LEN);
clearCachedClusterSlotsResp();
serverLog(LL_NOTICE,"IP address for this node updated to %s",
myself->ip);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
Expand Down Expand Up @@ -2933,6 +2974,7 @@ int clusterProcessPacket(clusterLink *link) {
link->node->cport = 0;
freeClusterLink(link);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
clearCachedClusterSlotsResp();
return 0;
}
}
Expand Down Expand Up @@ -4177,6 +4219,7 @@ void clusterFailoverReplaceYourMaster(void) {
/* 3) Update state and save config. */
clusterUpdateState();
clusterSaveConfigOrDie(1);
clearCachedClusterSlotsResp();

/* 4) Pong all the other nodes so that they can update the state
* accordingly and detect that we switched to master role. */
Expand Down Expand Up @@ -4233,6 +4276,7 @@ void clusterHandleSlaveFailover(void) {
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
return;
}
clearCachedClusterSlotsResp();

/* Set data_age to the number of milliseconds we are disconnected from
* the master. */
Expand Down Expand Up @@ -4961,6 +5005,7 @@ int clusterAddSlot(clusterNode *n, int slot) {
if (server.cluster->slots[slot]) return C_ERR;
clusterNodeSetSlotBit(n,slot);
server.cluster->slots[slot] = n;
clearCachedClusterSlotsResp();
return C_OK;
}

Expand All @@ -4979,6 +5024,7 @@ int clusterDelSlot(int slot) {
server.cluster->slots[slot] = NULL;
/* Make owner_not_claiming_slot flag consistent with slot ownership information. */
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
clearCachedClusterSlotsResp();
return C_OK;
}

Expand Down Expand Up @@ -5603,6 +5649,14 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
}
}

long long getNodeOffSet(clusterNode *node) {
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved
if (node->flags & CLUSTER_NODE_MYSELF) {
return nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else {
return node->repl_offset;
}
}

/* Add detailed information of a node to the output buffer of the given client. */
void addNodeDetailsToShardReply(client *c, clusterNode *node) {
int reply_count = 0;
Expand Down Expand Up @@ -5637,12 +5691,7 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
reply_count++;
}

long long node_offset;
if (node->flags & CLUSTER_NODE_MYSELF) {
node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else {
node_offset = node->repl_offset;
}
long long node_offset = getNodeOffSet(node);

addReplyBulkCString(c, "role");
addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master");
Expand Down Expand Up @@ -6517,3 +6566,31 @@ int clusterAllowFailoverCmd(client *c) {
void clusterPromoteSelfToMaster(void) {
replicationUnsetMaster();
}

void updateNodesHealth(void) {
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved
dictIterator *di;
dictEntry *de;
clusterNode *node;
int overall_health_changed = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved
node = dictGetVal(de);
int present_node_health;

long long node_offset = getNodeOffSet(node);

if (nodeFailed(node) || (nodeIsSlave(node) && node_offset == 0)) {
present_node_health = 0;
} else {
present_node_health = 1;
}
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved

if (present_node_health != node->node_health) {
overall_health_changed = 1;
}
node->node_health = present_node_health;
}
dictReleaseIterator(di);

if (overall_health_changed) clearCachedClusterSlotsResp();
}
2 changes: 2 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ struct _clusterNode {
clusterLink *link; /* TCP/IP link established toward this node */
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
list *fail_reports; /* List of nodes signaling this as failing */
int node_health; /* Last updated node health, can be stale. Update by calling updateNodesHealth() */
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved
};

struct clusterState {
Expand Down Expand Up @@ -353,6 +354,7 @@ struct clusterState {
* stops claiming the slot. This prevents spreading incorrect information (that
* source still owns the slot) using UPDATE messages. */
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX];
};


Expand Down
8 changes: 7 additions & 1 deletion src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2581,6 +2581,12 @@ static int updateOOMScoreAdj(const char **err) {
return 1;
}

int invalidateClusterSlotsResp(const char **err) {
UNUSED(err);
clearCachedClusterSlotsResp();
return 1;
}

int updateRequirePass(const char **err) {
UNUSED(err);
/* The old "requirepass" directive just translates to setting
Expand Down Expand Up @@ -3156,7 +3162,7 @@ standardConfig static_configs[] = {
createEnumConfig("enable-protected-configs", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_protected_configs, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("enable-debug-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_debug_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("enable-module-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_module_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, NULL),
createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, invalidateClusterSlotsResp),
madolson marked this conversation as resolved.
Show resolved Hide resolved
createEnumConfig("propagation-error-behavior", NULL, MODIFIABLE_CONFIG, propagation_error_behavior_enum, server.propagation_error_behavior, PROPAGATION_ERR_BEHAVIOR_IGNORE, NULL, NULL),
createEnumConfig("shutdown-on-sigint", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigint, 0, isValidShutdownOnSigFlags, NULL),
createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL),
Expand Down
6 changes: 6 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ typedef enum {
#define CONN_TYPE_TLS "tls"
#define CONN_TYPE_MAX 8 /* 8 is enough to be extendable */

enum connTypeForCaching {
CACHE_CONN_TCP,
CACHE_CONN_TLS,
CACHE_CONN_TYPE_MAX
};
roshkhatri marked this conversation as resolved.
Show resolved Hide resolved

typedef void (*ConnectionCallbackFunc)(struct connection *conn);

typedef struct ConnectionType {
Expand Down
Loading
Loading