Skip to content

Commit

Permalink
Merge remote-tracking branch 'valkey/unstable' into 80ga-all
Browse files Browse the repository at this point in the history
  • Loading branch information
PingXie committed Sep 14, 2024
2 parents 85a5847 + 09def3c commit 1755c92
Show file tree
Hide file tree
Showing 34 changed files with 2,376 additions and 1,557 deletions.
14 changes: 14 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# This is a file that can be used by git-blame to ignore some revisions.
# (git 2.23+, released in August 2019)
#
# Can be configured as follow:
#
# $ git config blame.ignoreRevsFile .git-blame-ignore-revs
#
# For more information you can look at git-blame(1) man page.

# Applied clang-format (#323)
c41dd77a3e93e02be3c4bc75d8c76b7b4169a4ce

# Removed terms `master` and `slave` from the source code (#591)
54c97479356ecf41b4b63733494a1be2ab919e17
1 change: 1 addition & 0 deletions runtest-moduleapi
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/moduleauth \
--single unit/moduleapi/rdbloadsave \
--single unit/moduleapi/crash \
--single unit/moduleapi/getchannels \
"${@}"
1 change: 1 addition & 0 deletions src/.clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ SortIncludes: false
AllowAllParametersOfDeclarationOnNextLine: false
BinPackParameters: false
AlignAfterOpenBracket: Align
InsertNewlineAtEOF: true
9 changes: 5 additions & 4 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -423,15 +423,15 @@ void aofManifestFreeAndUpdate(aofManifest *am) {
* appendonly.aof.1.base.aof (server.aof_use_rdb_preamble is no)
* appendonly.aof.1.base.rdb (server.aof_use_rdb_preamble is yes)
*/
sds getNewBaseFileNameAndMarkPreAsHistory(aofManifest *am) {
sds getNewBaseFileNameAndMarkPreAsHistory(aofManifest *am, int aof_use_rdb_preamble) {
serverAssert(am != NULL);
if (am->base_aof_info) {
serverAssert(am->base_aof_info->file_type == AOF_FILE_TYPE_BASE);
am->base_aof_info->file_type = AOF_FILE_TYPE_HIST;
listAddNodeHead(am->history_aof_list, am->base_aof_info);
}

char *format_suffix = server.aof_use_rdb_preamble ? RDB_FORMAT_SUFFIX : AOF_FORMAT_SUFFIX;
char *format_suffix = aof_use_rdb_preamble ? RDB_FORMAT_SUFFIX : AOF_FORMAT_SUFFIX;

aofInfo *ai = aofInfoCreate();
ai->file_name = sdscatprintf(sdsempty(), "%s.%lld%s%s", server.aof_filename, ++am->curr_base_file_seq,
Expand Down Expand Up @@ -712,7 +712,7 @@ void aofOpenIfNeededOnServerStart(void) {
/* If we start with an empty dataset, we will force create a BASE file. */
size_t incr_aof_len = listLength(server.aof_manifest->incr_aof_list);
if (!server.aof_manifest->base_aof_info && !incr_aof_len) {
sds base_name = getNewBaseFileNameAndMarkPreAsHistory(server.aof_manifest);
sds base_name = getNewBaseFileNameAndMarkPreAsHistory(server.aof_manifest, server.aof_use_rdb_preamble);
sds base_filepath = makePath(server.aof_dirname, base_name);
if (rewriteAppendOnlyFile(base_filepath) != C_OK) {
exit(1);
Expand Down Expand Up @@ -2445,6 +2445,7 @@ int rewriteAppendOnlyFileBackground(void) {
serverLog(LL_NOTICE, "Background append only file rewriting started by pid %ld", (long)childpid);
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
server.aof_rewrite_use_rdb_preamble = server.aof_use_rdb_preamble;
return C_OK;
}
return C_OK; /* unreached */
Expand Down Expand Up @@ -2557,7 +2558,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {

/* Get a new BASE file name and mark the previous (if we have)
* as the HISTORY type. */
sds new_base_filename = getNewBaseFileNameAndMarkPreAsHistory(temp_am);
sds new_base_filename = getNewBaseFileNameAndMarkPreAsHistory(temp_am, server.aof_rewrite_use_rdb_preamble);
serverAssert(new_base_filename != NULL);
new_base_filepath = makePath(server.aof_dirname, new_base_filename);

Expand Down
2 changes: 2 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,8 @@ void clusterCommandHelp(client *c) {
"SLOTS",
" Return information about slots range mappings. Each range is made of:",
" start, end, primary and replicas IP addresses, ports and ids",
"SLOT-STATS",
" Return an array of slot usage statistics for slots assigned to the current node.",
"SHARDS",
" Return information about slot range mappings and the nodes associated with them.",
NULL};
Expand Down
2 changes: 1 addition & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ void clusterInitLast(void);
void clusterCron(void);
void clusterBeforeSleep(void);
int verifyClusterConfigWithData(void);
void clusterHandleServerShutdown(void);

int clusterSendModuleMessageToTarget(const char *target,
uint64_t module_id,
Expand Down Expand Up @@ -83,7 +84,6 @@ int getNodeDefaultClientPort(clusterNode *n);
clusterNode *getMyClusterNode(void);
int getClusterSize(void);
int getMyShardSlotCount(void);
int handleDebugClusterCommand(client *c);
int clusterNodePending(clusterNode *node);
int clusterNodeIsPrimary(clusterNode *n);
char **getClusterNodesList(size_t *numnodes);
Expand Down
74 changes: 66 additions & 8 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,28 @@ void clusterInitLast(void) {
}
}

/* Called when a cluster node receives SHUTDOWN. */
void clusterHandleServerShutdown(void) {
/* The error logs have been logged in the save function if the save fails. */
serverLog(LL_NOTICE, "Saving the cluster configuration file before exiting.");
clusterSaveConfig(1);

#if !defined(__sun)
/* Unlock the cluster config file before shutdown, see clusterLockConfig.
*
* This is needed if you shutdown a very large server process, it will take
* a while for the OS to release resources and unlock the cluster configuration
* file. Therefore, if we immediately try to restart the server process, it
* may not be able to acquire the lock on the cluster configuration file and
* fail to start. We explicitly releases the lock on the cluster configuration
* file on shutdown, rather than relying on the OS to release the lock, which
* is a cleaner and safer way to release acquired resources. */
if (server.cluster_config_file_lock_fd != -1) {
flock(server.cluster_config_file_lock_fd, LOCK_UN | LOCK_NB);
}
#endif /* __sun */
}

/* Reset a node performing a soft or hard reset:
*
* 1) All other nodes are forgotten.
Expand Down Expand Up @@ -2275,6 +2297,23 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
node->tls_port = msg_tls_port;
node->cport = ntohs(g->cport);
node->flags &= ~CLUSTER_NODE_NOADDR;

serverLog(LL_NOTICE, "Address updated for node %.40s (%s), now %s:%d", node->name, node->human_nodename,
node->ip, getNodeDefaultClientPort(node));

/* Check if this is our primary and we have to change the
* replication target as well.
*
* This is needed in case the check in nodeUpdateAddressIfNeeded
* failed due to a race condition. For example, if the replica just
* received a packet from another node that contains new address
* about the primary, we will update primary node address in here,
* when the replica receive the packet from the primary, the check
* in nodeUpdateAddressIfNeeded will fail since the address has been
* updated correctly, and we will not have the opportunity to call
* replicationSetPrimary and update the primary host. */
if (nodeIsReplica(myself) && myself->replicaof == node)
replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node), 0);
}
} else if (!node) {
/* If it's not in NOADDR state and we don't have it, we
Expand Down Expand Up @@ -3139,14 +3178,21 @@ int clusterProcessPacket(clusterLink *link) {
/* Add this node if it is new for us and the msg type is MEET.
* In this stage we don't try to add the node with the right
* flags, replicaof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node. */
* resolved when we'll receive PONGs from the node. The exception
* to this is the flag that indicates extensions are supported, as
* we want to send extensions right away in the return PONG in order
* to reduce the amount of time needed to stabilize the shard ID. */
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;

node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE);
serverAssert(nodeIp2String(node->ip, link, hdr->myip) == C_OK);
getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port);
node->cport = ntohs(hdr->cport);
if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
}
setClusterNodeToInboundClusterLink(node, link);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
Expand Down Expand Up @@ -6178,6 +6224,9 @@ int clusterParseSetSlotCommand(client *c, int *slot_out, clusterNode **node_out,
return 0;
}

/* If 'myself' is a replica, 'c' must be the primary client. */
serverAssert(!nodeIsReplica(myself) || c == server.primary);

if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return 0;

if (!strcasecmp(c->argv[3]->ptr, "migrating") && c->argc >= 5) {
Expand Down Expand Up @@ -6366,20 +6415,27 @@ void clusterCommandSetSlot(client *c) {
server.cluster->migrating_slots_to[slot] = NULL;
}

int slot_was_mine = server.cluster->slots[slot] == myself;
clusterNode *my_primary = clusterNodeGetPrimary(myself);
int slot_was_mine = server.cluster->slots[slot] == my_primary;
clusterDelSlot(slot);
clusterAddSlot(n, slot);

/* If we are a primary left without slots, we should turn into a
* replica of the new primary. */
if (slot_was_mine && n != myself && myself->numslots == 0 && server.cluster_allow_replica_migration) {
/* If replica migration is allowed, check if the primary of this shard
* loses its last slot and the shard becomes empty. In this case, we
* should turn into a replica of the new primary. */
if (server.cluster_allow_replica_migration && slot_was_mine && my_primary->numslots == 0) {
serverAssert(n != my_primary);
serverLog(LL_NOTICE,
"Lost my last slot during slot migration. Reconfiguring myself "
"as a replica of %.40s (%s) in shard %.40s",
n->name, n->human_nodename, n->shard_id);
/* `c` is the primary client if `myself` is a replica, prevent it
* from being freed by clusterSetPrimary. */
if (nodeIsReplica(myself)) protectClient(c);
/* We are migrating to a different shard that has a completely different
* replication history, so a full sync is required. */
clusterSetPrimary(n, 1, 1);
if (nodeIsReplica(myself)) unprotectClient(c);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

Expand Down Expand Up @@ -6639,25 +6695,27 @@ int clusterCommandSpecial(client *c) {
}
resetManualFailover();
server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
sds client = catClientInfoString(sdsempty(), c, server.hide_user_data_from_log);

if (takeover) {
/* A takeover does not perform any initial check. It just
* generates a new configuration epoch for this node without
* consensus, claims the primary's slots, and broadcast the new
* configuration. */
serverLog(LL_NOTICE, "Taking over the primary (user request).");
serverLog(LL_NOTICE, "Taking over the primary (user request from '%s').", client);
clusterBumpConfigEpochWithoutConsensus();
clusterFailoverReplaceYourPrimary();
} else if (force) {
/* If this is a forced failover, we don't need to talk with our
* primary to agree about the offset. We just failover taking over
* it without coordination. */
serverLog(LL_NOTICE, "Forced failover user request accepted.");
serverLog(LL_NOTICE, "Forced failover user request accepted (user request from '%s').", client);
server.cluster->mf_can_start = 1;
} else {
serverLog(LL_NOTICE, "Manual failover user request accepted.");
serverLog(LL_NOTICE, "Manual failover user request accepted (user request from '%s').", client);
clusterSendMFStart(myself->replicaof);
}
sdsfree(client);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "set-config-epoch") && c->argc == 3) {
/* CLUSTER SET-CONFIG-EPOCH <epoch>
Expand Down
Loading

0 comments on commit 1755c92

Please sign in to comment.