Skip to content

Commit

Permalink
Merge branch 'valkey-io:unstable' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
PingXie authored Jun 30, 2024
2 parents 4a96e5c + e4c1f6d commit 1ad0993
Show file tree
Hide file tree
Showing 40 changed files with 1,248 additions and 563 deletions.
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
10 changes: 5 additions & 5 deletions src/acl.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,11 +506,11 @@ void ACLFreeUserAndKillClients(user *u) {
* more defensive to set the default user and put
* it in non authenticated mode. */
c->user = DefaultUser;
c->flags &= ~CLIENT_AUTHENTICATED;
c->flag.authenticated = 0;
/* We will write replies to this client later, so we can't
* close it directly even if async. */
if (c == server.current_client) {
c->flags |= CLIENT_CLOSE_AFTER_COMMAND;
c->flag.close_after_command = 1;
} else {
freeClientAsync(c);
}
Expand Down Expand Up @@ -1494,13 +1494,13 @@ void addAuthErrReply(client *c, robj *err) {
* The return value is AUTH_OK on success (valid username / password pair) & AUTH_ERR otherwise. */
int checkPasswordBasedAuth(client *c, robj *username, robj *password) {
if (ACLCheckUserCredentials(username, password) == C_OK) {
c->flags |= CLIENT_AUTHENTICATED;
c->flag.authenticated = 1;
c->user = ACLGetUserByName(username->ptr, sdslen(username->ptr));
moduleNotifyUserChanged(c);
return AUTH_OK;
} else {
addACLLogEntry(c, ACL_DENIED_AUTH, (c->flags & CLIENT_MULTI) ? ACL_LOG_CTX_MULTI : ACL_LOG_CTX_TOPLEVEL, 0,
username->ptr, NULL);
addACLLogEntry(c, ACL_DENIED_AUTH, (c->flag.multi) ? ACL_LOG_CTX_MULTI : ACL_LOG_CTX_TOPLEVEL, 0, username->ptr,
NULL);
return AUTH_ERR;
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,8 @@ struct client *createAOFClient(void) {
* background processing there is a chance that the
* command execution order will be violated.
*/
c->flags = CLIENT_DENY_BLOCKING;
c->raw_flag = 0;
c->flag.deny_blocking = 1;

/* We set the fake client as a replica waiting for the synchronization
* so that the server will not try to send replies to this client. */
Expand Down Expand Up @@ -1536,7 +1537,7 @@ int loadSingleAppendOnlyFile(char *filename) {

/* Run the command in the context of a fake client */
fakeClient->cmd = fakeClient->lastcmd = cmd;
if (fakeClient->flags & CLIENT_MULTI && fakeClient->cmd->proc != execCommand) {
if (fakeClient->flag.multi && fakeClient->cmd->proc != execCommand) {
/* Note: we don't have to attempt calling evalGetCommandFlags,
* since this is AOF, the checks in processCommand are not made
* anyway.*/
Expand All @@ -1549,7 +1550,7 @@ int loadSingleAppendOnlyFile(char *filename) {
serverAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);

/* The fake client should never get blocked */
serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0);
serverAssert(fakeClient->flag.blocked == 0);

/* Clean up. Command code may have changed argv/argc so we use the
* argv/argc of the client instead of the local variables. */
Expand All @@ -1562,7 +1563,7 @@ int loadSingleAppendOnlyFile(char *filename) {
* If the client is in the middle of a MULTI/EXEC, handle it as it was
* a short read, even if technically the protocol is correct: we want
* to remove the unprocessed tail and continue. */
if (fakeClient->flags & CLIENT_MULTI) {
if (fakeClient->flag.multi) {
serverLog(LL_WARNING, "Revert incomplete MULTI/EXEC transaction in AOF file %s", filename);
valid_up_to = valid_before_multi;
goto uxeof;
Expand Down
48 changes: 24 additions & 24 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ void initClientBlockingState(client *c) {
* and will be processed when the client is unblocked. */
void blockClient(client *c, int btype) {
/* Primary client should never be blocked unless pause or module */
serverAssert(!(c->flags & CLIENT_PRIMARY && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));
serverAssert(!(c->flag.primary && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));

c->flags |= CLIENT_BLOCKED;
c->flag.blocked = 1;
c->bstate.btype = btype;
if (!(c->flags & CLIENT_MODULE))
if (!c->flag.module)
server.blocked_clients++; /* We count blocked client stats on regular clients and not on module clients */
server.blocked_clients_by_type[btype]++;
addClientToTimeoutTable(c);
Expand Down Expand Up @@ -130,10 +130,10 @@ void processUnblockedClients(void) {
serverAssert(ln != NULL);
c = ln->value;
listDelNode(server.unblocked_clients, ln);
c->flags &= ~CLIENT_UNBLOCKED;
c->flag.unblocked = 0;

if (c->flags & CLIENT_MODULE) {
if (!(c->flags & CLIENT_BLOCKED)) {
if (c->flag.module) {
if (!c->flag.blocked) {
moduleCallCommandUnblockedHandler(c);
}
continue;
Expand All @@ -143,7 +143,7 @@ void processUnblockedClients(void) {
* is blocked again. Actually processInputBuffer() checks that the
* client is not blocked before to proceed, but things may change and
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
if (!c->flag.blocked) {
/* If we have a queued command, execute it now. */
if (processPendingCommandAndInputBuffer(c) == C_ERR) {
c = NULL;
Expand Down Expand Up @@ -172,8 +172,8 @@ void processUnblockedClients(void) {
void queueClientForReprocessing(client *c) {
/* The client may already be into the unblocked list because of a previous
* blocking operation, don't add back it into the list multiple times. */
if (!(c->flags & CLIENT_UNBLOCKED)) {
c->flags |= CLIENT_UNBLOCKED;
if (!c->flag.unblocked) {
c->flag.unblocked = 1;
listAddNodeTail(server.unblocked_clients, c);
}
}
Expand All @@ -199,7 +199,7 @@ void unblockClient(client *c, int queue_for_reprocessing) {

/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) {
if (!c->flag.pending_command && c->bstate.btype != BLOCKED_SHUTDOWN) {
freeClientOriginalArgv(c);
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
Expand All @@ -210,11 +210,11 @@ void unblockClient(client *c, int queue_for_reprocessing) {
}

/* We count blocked client stats on regular clients and not on module clients */
if (!(c->flags & CLIENT_MODULE)) server.blocked_clients--;
if (!c->flag.module) server.blocked_clients--;
server.blocked_clients_by_type[c->bstate.btype]--;
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
c->flags &= ~CLIENT_BLOCKED;
c->flag.blocked = 0;
c->bstate.btype = BLOCKED_NONE;
c->bstate.unblock_on_nokey = 0;
removeClientFromTimeoutTable(c);
Expand Down Expand Up @@ -256,7 +256,7 @@ void replyToClientsBlockedOnShutdown(void) {
listRewind(server.clients, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_BLOCKED && c->bstate.btype == BLOCKED_SHUTDOWN) {
if (c->flag.blocked && c->bstate.btype == BLOCKED_SHUTDOWN) {
addReplyError(c, "Errors trying to SHUTDOWN. Check logs.");
unblockClient(c, 1);
}
Expand All @@ -278,7 +278,7 @@ void disconnectAllBlockedClients(void) {
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);

if (c->flags & CLIENT_BLOCKED) {
if (c->flag.blocked) {
/* POSTPONEd clients are an exception, when they'll be unblocked, the
* command processing will start from scratch, and the command will
* be either executed or rejected. (unlike LIST blocked clients for
Expand All @@ -287,7 +287,7 @@ void disconnectAllBlockedClients(void) {

unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
c->flag.close_after_reply = 1;
}
}
}
Expand Down Expand Up @@ -368,7 +368,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
list *l;
int j;

if (!(c->flags & CLIENT_REPROCESSING_COMMAND)) {
if (!c->flag.reprocessing_command) {
/* If the client is re-processing the command, we do not set the timeout
* because we need to retain the client's original timeout. */
c->bstate.timeout = timeout;
Expand Down Expand Up @@ -411,7 +411,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
/* Currently we assume key blocking will require reprocessing the command.
* However in case of modules, they have a different way to handle the reprocessing
* which does not require setting the pending command flag */
if (btype != BLOCKED_MODULE) c->flags |= CLIENT_PENDING_COMMAND;
if (btype != BLOCKED_MODULE) c->flag.pending_command = 1;
blockClient(c, btype);
}

Expand Down Expand Up @@ -605,7 +605,7 @@ void blockPostponeClient(client *c) {
listAddNodeTail(server.postponed_clients, c);
c->postponed_list_node = listLast(server.postponed_clients);
/* Mark this client to execute its command */
c->flags |= CLIENT_PENDING_COMMAND;
c->flag.pending_command = 1;
}

/* Block client due to shutdown command */
Expand Down Expand Up @@ -633,8 +633,8 @@ static void unblockClientOnKey(client *c, robj *key) {
unblockClient(c, 0);
/* In case this client was blocked on keys during command
* we need to re process the command again */
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (c->flag.pending_command) {
c->flag.pending_command = 0;
/* We want the command processing and the unblock handler (see RM_Call 'K' option)
* to run atomically, this is why we must enter the execution unit here before
* running the command, and exit the execution unit after calling the unblock handler (if exists).
Expand All @@ -644,8 +644,8 @@ static void unblockClientOnKey(client *c, robj *key) {
server.current_client = c;
enterExecutionUnit(1, 0);
processCommandAndResetClient(c);
if (!(c->flags & CLIENT_BLOCKED)) {
if (c->flags & CLIENT_MODULE) {
if (!c->flag.blocked) {
if (c->flag.module) {
moduleCallCommandUnblockedHandler(c);
} else {
queueClientForReprocessing(c);
Expand Down Expand Up @@ -690,7 +690,7 @@ void unblockClientOnTimeout(client *c) {
if (c->bstate.btype == BLOCKED_MODULE && isModuleClientUnblocked(c)) return;

replyToBlockedClientTimedOut(c);
if (c->flags & CLIENT_PENDING_COMMAND) c->flags &= ~CLIENT_PENDING_COMMAND;
if (c->flag.pending_command) c->flag.pending_command = 0;
unblockClient(c, 1);
}

Expand All @@ -699,7 +699,7 @@ void unblockClientOnTimeout(client *c) {
void unblockClientOnError(client *c, const char *err_str) {
if (err_str) addReplyError(c, err_str);
updateStatsOnUnblock(c, 0, 0, 1);
if (c->flags & CLIENT_PENDING_COMMAND) c->flags &= ~CLIENT_PENDING_COMMAND;
if (c->flag.pending_command) c->flag.pending_command = 0;
unblockClient(c, 1);
}

Expand Down
Loading

0 comments on commit 1ad0993

Please sign in to comment.