Skip to content

Commit

Permalink
Cached cluster slots output as sds
Browse files Browse the repository at this point in the history
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
  • Loading branch information
roshkhatri committed Mar 27, 2024
1 parent 340ab6d commit 1836d9f
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 23 deletions.
53 changes: 39 additions & 14 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1402,20 +1402,11 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
serverAssert(nested_elements == 3); /* Original 3 elements */
}

void clusterCommandSlots(client * c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
* 3) node ID
* ... continued until done
*/
sds generateClusterSlotResponse(void) {
client *recording_client = createCachedResponseClient();
clusterNode *n = NULL;
int num_masters = 0, start = -1;
void *slot_replylen = addReplyDeferredLen(c);
void *slot_replylen = addReplyDeferredLen(recording_client);

for (int i = 0; i <= CLUSTER_SLOTS; i++) {
/* Find start node and slot id. */
Expand All @@ -1429,14 +1420,48 @@ void clusterCommandSlots(client * c) {
/* Add cluster slots info when occur different node with start
* or end of slot. */
if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) {
addNodeReplyForClusterSlot(c, n, start, i-1);
addNodeReplyForClusterSlot(recording_client, n, start, i-1);
num_masters++;
if (i == CLUSTER_SLOTS) break;
n = getNodeBySlot(i);
start = i;
}
}
setDeferredArrayLen(c, slot_replylen, num_masters);
setDeferredArrayLen(recording_client, slot_replylen, num_masters);
return stopCaching(recording_client);
}

int verifyCachedClusterSlotsResponse(sds cached_response) {
sds generated_response = generateClusterSlotResponse();
int result = !sdscmp(generated_response, cached_response);
if (!result) serverLog(LL_NOTICE,"\ngenerated_response:\n%s\n\ncached_response:\n%s", generated_response, cached_response);
sdsfree(generated_response);
return result;
}

void clusterCommandSlots(client * c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
* 3) node ID
* ... continued until done
*/
enum connTypeForCaching conn_type = connIsTLS(c->conn);

/* Check if we have a response cached for cluster slots for early exit. */
updateNodesHealth();
if (isClusterSlotsResponseCached(conn_type)) {
debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(getClusterSlotReply(conn_type)) == 1);
addReplyProto(c, getClusterSlotReply(conn_type), sdslen(getClusterSlotReply(conn_type)));
return;
}

cacheSlotsResponse(generateClusterSlotResponse(), conn_type);
addReplyProto(c, getClusterSlotReply(conn_type), sdslen(getClusterSlotReply(conn_type)));
}

/* -----------------------------------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,10 @@ int isValidAuxString(char *s, unsigned int length);
void migrateCommand(client *c);
void clusterCommand(client *c);
ConnectionType *connTypeOfCluster(void);

int isClusterSlotsResponseCached(enum connTypeForCaching conn_type);
sds getClusterSlotReply(enum connTypeForCaching conn_type);
void clearCachedClusterSlotsResp(void);
void cacheSlotsResponse(sds response_to_cache, enum connTypeForCaching conn_type);
void updateNodesHealth(void);
#endif /* __CLUSTER_H */
91 changes: 84 additions & 7 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,31 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
void freeClusterLink(clusterLink *link);
int verifyClusterNodeId(const char *name, int length);

int isClusterSlotsResponseCached(enum connTypeForCaching conn_type) {
if (server.cluster->cached_cluster_slot_info[conn_type] &&
sdslen(server.cluster->cached_cluster_slot_info[conn_type])) {
return 1;
}
return 0;
}

sds getClusterSlotReply(enum connTypeForCaching conn_type) {
return server.cluster->cached_cluster_slot_info[conn_type];
}

void clearCachedClusterSlotsResp(void) {
for (enum connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
if (server.cluster->cached_cluster_slot_info[conn_type]) {
sdsfree(server.cluster->cached_cluster_slot_info[conn_type]);
server.cluster->cached_cluster_slot_info[conn_type] = NULL;
}
}
}

void cacheSlotsResponse(sds response_to_cache, enum connTypeForCaching conn_type) {
server.cluster->cached_cluster_slot_info[conn_type] = response_to_cache;
}

int getNodeDefaultClientPort(clusterNode *n) {
return server.tls_cluster ? n->tls_port : n->tcp_port;
}
Expand Down Expand Up @@ -492,6 +517,7 @@ int clusterLoadConfig(char *filename) {
}
*p = '\0';
memcpy(n->ip,aux_argv[0],strlen(aux_argv[0])+1);
clearCachedClusterSlotsResp();
char *port = p+1;
char *busp = strchr(port,'@');
if (busp) {
Expand Down Expand Up @@ -887,6 +913,7 @@ void clusterUpdateMyselfIp(void) {
} else {
myself->ip[0] = '\0'; /* Force autodetection. */
}
clearCachedClusterSlotsResp();
}
}

Expand All @@ -904,6 +931,7 @@ static void updateAnnouncedHostname(clusterNode *node, char *new) {
} else if (sdslen(node->hostname) != 0) {
sdsclear(node->hostname);
}
clearCachedClusterSlotsResp();
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

Expand Down Expand Up @@ -1028,6 +1056,9 @@ void clusterInit(void) {

server.cluster->mf_end = 0;
server.cluster->mf_slave = NULL;
for (enum connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
server.cluster->cached_cluster_slot_info[conn_type] = NULL;
}
resetManualFailover();
clusterUpdateMyselfFlags();
clusterUpdateMyselfIp();
Expand Down Expand Up @@ -1352,6 +1383,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->repl_offset_time = 0;
node->repl_offset = 0;
listSetFreeMethod(node->fail_reports,zfree);
node->node_health = 0;
return node;
}

Expand Down Expand Up @@ -1480,6 +1512,7 @@ int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
master->slaves[master->numslaves] = slave;
master->numslaves++;
master->flags |= CLUSTER_NODE_MIGRATE_TO;
clearCachedClusterSlotsResp();
return C_OK;
}

Expand Down Expand Up @@ -1526,6 +1559,7 @@ void clusterAddNode(clusterNode *node) {
retval = dictAdd(server.cluster->nodes,
sdsnewlen(node->name,CLUSTER_NAMELEN), node);
serverAssert(retval == DICT_OK);
clearCachedClusterSlotsResp();
}

/* Remove a node from the cluster. The function performs the high level
Expand Down Expand Up @@ -1567,6 +1601,7 @@ void clusterDelNode(clusterNode *delnode) {

/* 3) Remove the node from the owning shard */
clusterRemoveNodeFromShard(delnode);
clearCachedClusterSlotsResp();

/* 4) Free the node, unlinking it from the cluster. */
freeClusterNode(delnode);
Expand Down Expand Up @@ -2184,6 +2219,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
node->tls_port = msg_tls_port;
node->cport = ntohs(g->cport);
node->flags &= ~CLUSTER_NODE_NOADDR;
clearCachedClusterSlotsResp();
}
} else if (!node) {
/* If it's not in NOADDR state and we don't have it, we
Expand Down Expand Up @@ -2279,6 +2315,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
serverLog(LL_NOTICE,"Address updated for node %.40s (%s), now %s:%d",
node->name, node->human_nodename, node->ip, getNodeDefaultClientPort(node));

clearCachedClusterSlotsResp();
/* Check if this is our master and we have to change the
* replication target as well. */
if (nodeIsSlave(myself) && myself->slaveof == node)
Expand All @@ -2300,6 +2337,7 @@ void clusterSetNodeAsMaster(clusterNode *n) {
n->flags |= CLUSTER_NODE_MASTER;
n->slaveof = NULL;

clearCachedClusterSlotsResp();
/* Update config and state. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
Expand Down Expand Up @@ -2344,7 +2382,9 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
serverLog(LL_NOTICE,"Discarding UPDATE message about myself.");
return;
}


clearCachedClusterSlotsResp();

for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(slots,j)) {
sender_slots++;
Expand Down Expand Up @@ -2851,6 +2891,7 @@ int clusterProcessPacket(clusterLink *link) {
strcmp(ip,myself->ip))
{
memcpy(myself->ip,ip,NET_IP_STR_LEN);
clearCachedClusterSlotsResp();
serverLog(LL_NOTICE,"IP address for this node updated to %s",
myself->ip);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
Expand Down Expand Up @@ -2933,6 +2974,7 @@ int clusterProcessPacket(clusterLink *link) {
link->node->cport = 0;
freeClusterLink(link);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
clearCachedClusterSlotsResp();
return 0;
}
}
Expand Down Expand Up @@ -4177,6 +4219,7 @@ void clusterFailoverReplaceYourMaster(void) {
/* 3) Update state and save config. */
clusterUpdateState();
clusterSaveConfigOrDie(1);
clearCachedClusterSlotsResp();

/* 4) Pong all the other nodes so that they can update the state
* accordingly and detect that we switched to master role. */
Expand Down Expand Up @@ -4233,6 +4276,7 @@ void clusterHandleSlaveFailover(void) {
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
return;
}
clearCachedClusterSlotsResp();

/* Set data_age to the number of milliseconds we are disconnected from
* the master. */
Expand Down Expand Up @@ -4961,6 +5005,7 @@ int clusterAddSlot(clusterNode *n, int slot) {
if (server.cluster->slots[slot]) return C_ERR;
clusterNodeSetSlotBit(n,slot);
server.cluster->slots[slot] = n;
clearCachedClusterSlotsResp();
return C_OK;
}

Expand All @@ -4979,6 +5024,7 @@ int clusterDelSlot(int slot) {
server.cluster->slots[slot] = NULL;
/* Make owner_not_claiming_slot flag consistent with slot ownership information. */
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
clearCachedClusterSlotsResp();
return C_OK;
}

Expand Down Expand Up @@ -5603,6 +5649,14 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
}
}

long long getNodeOffSet(clusterNode *node) {
if (node->flags & CLUSTER_NODE_MYSELF) {
return nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else {
return node->repl_offset;
}
}

/* Add detailed information of a node to the output buffer of the given client. */
void addNodeDetailsToShardReply(client *c, clusterNode *node) {
int reply_count = 0;
Expand Down Expand Up @@ -5637,12 +5691,7 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
reply_count++;
}

long long node_offset;
if (node->flags & CLUSTER_NODE_MYSELF) {
node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else {
node_offset = node->repl_offset;
}
long long node_offset = getNodeOffSet(node);

addReplyBulkCString(c, "role");
addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master");
Expand Down Expand Up @@ -6517,3 +6566,31 @@ int clusterAllowFailoverCmd(client *c) {
void clusterPromoteSelfToMaster(void) {
replicationUnsetMaster();
}

void updateNodesHealth(void) {
dictIterator *di;
dictEntry *de;
clusterNode *node;
int overall_health_changed = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
node = dictGetVal(de);
int present_node_health;

long long node_offset = getNodeOffSet(node);

if (nodeFailed(node) || (nodeIsSlave(node) && node_offset == 0)) {
present_node_health = 0;
} else {
present_node_health = 1;
}

if (present_node_health != node->node_health) {
overall_health_changed = 1;
}
node->node_health = present_node_health;
}
dictReleaseIterator(di);

if (overall_health_changed) clearCachedClusterSlotsResp();
}
2 changes: 2 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ struct _clusterNode {
clusterLink *link; /* TCP/IP link established toward this node */
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
list *fail_reports; /* List of nodes signaling this as failing */
int node_health; /* Last updated node health, can be stale. Update by calling updateNodesHealth() */
};

struct clusterState {
Expand Down Expand Up @@ -353,6 +354,7 @@ struct clusterState {
* stops claiming the slot. This prevents spreading incorrect information (that
* source still owns the slot) using UPDATE messages. */
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX];
};


Expand Down
8 changes: 7 additions & 1 deletion src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2581,6 +2581,12 @@ static int updateOOMScoreAdj(const char **err) {
return 1;
}

int invalidateClusterSlotsResp(const char **err) {
UNUSED(err);
clearCachedClusterSlotsResp();
return 1;
}

int updateRequirePass(const char **err) {
UNUSED(err);
/* The old "requirepass" directive just translates to setting
Expand Down Expand Up @@ -3156,7 +3162,7 @@ standardConfig static_configs[] = {
createEnumConfig("enable-protected-configs", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_protected_configs, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("enable-debug-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_debug_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("enable-module-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_module_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, NULL),
createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, invalidateClusterSlotsResp),
createEnumConfig("propagation-error-behavior", NULL, MODIFIABLE_CONFIG, propagation_error_behavior_enum, server.propagation_error_behavior, PROPAGATION_ERR_BEHAVIOR_IGNORE, NULL, NULL),
createEnumConfig("shutdown-on-sigint", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigint, 0, isValidShutdownOnSigFlags, NULL),
createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL),
Expand Down
6 changes: 6 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ typedef enum {
#define CONN_TYPE_TLS "tls"
#define CONN_TYPE_MAX 8 /* 8 is enough to be extendable */

enum connTypeForCaching {
CACHE_CONN_TCP,
CACHE_CONN_TLS,
CACHE_CONN_TYPE_MAX
};

typedef void (*ConnectionCallbackFunc)(struct connection *conn);

typedef struct ConnectionType {
Expand Down
Loading

0 comments on commit 1836d9f

Please sign in to comment.