From 3399c93d39213f084e4012f72b219ea3455c5b07 Mon Sep 17 00:00:00 2001 From: Ping Xie Date: Sun, 26 May 2024 23:07:31 -0700 Subject: [PATCH] Reformat remaining Valkey source code and set up GitHub checker action Signed-off-by: Ping Xie --- .github/workflows/ci.yml | 17 -- .github/workflows/clang-format.yml | 35 ++++ src/aof.c | 9 +- src/bio.c | 4 +- src/lazyfree.c | 63 ++++--- src/networking.c | 14 +- src/rdb.c | 2 +- src/replication.c | 14 +- src/server.c | 39 ++--- src/server.h | 267 +++++++++++++++-------------- src/threads_mngr.c | 8 +- src/valkey-benchmark.c | 57 +++--- src/zmalloc.c | 2 +- 13 files changed, 275 insertions(+), 256 deletions(-) create mode 100644 .github/workflows/clang-format.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6153f41c8f..40376f1628 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,23 +7,6 @@ permissions: jobs: - checkers: - name: Run static checkers - runs-on: ubuntu-latest - strategy: - matrix: - path: - - check: 'src' - exclude: 'src/(lzf.*|crccombine.*|crc16.c|crc16_slottable.h|crc64.c|crcspeed.*|fmtargs.h|mt19937-64.*|pqsort.*|setcpuaffinity.c|setproctitle.c|sha1.*|sha256.*|siphash.c|strl.c|unit/test_files.h)' - steps: - - uses: actions/checkout@44c2b7a8a4ea60a981eaca3cf939b5f4305c123b # v4.1.5 - - name: Run clang-format style check (.c and .h) - uses: jidicula/clang-format-action@f62da5e3d3a2d88ff364771d9d938773a618ab5e # v4.11.0 - with: - clang-format-version: '13' - check-path: ${{ matrix.path['check'] }} - exclude-regex: ${{ matrix.path['exclude'] }} - test-ubuntu-latest: runs-on: ubuntu-latest steps: diff --git a/.github/workflows/clang-format.yml b/.github/workflows/clang-format.yml new file mode 100644 index 0000000000..e5135c3c3e --- /dev/null +++ b/.github/workflows/clang-format.yml @@ -0,0 +1,35 @@ +name: Clang Format Check + +on: + pull_request: + paths: + - 'src/**' + +jobs: + clang-format-check: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 + + - name: Set up Clang + run: | + sudo apt-get update -y + sudo apt-get upgrade -y + sudo apt-get install -y clang-format + - name: Run clang-format + id: clang-format + run: | + # Run clang-format and capture the diff + cd src + clang-format -i **/*.c **/*.h + git diff --exit-code + shell: bash + + - name: Check for formatting changes + if: ${{ failure() }} + run: | + echo "Code is not formatted correctly." + exit 1 + diff --git a/src/aof.c b/src/aof.c index d15ff379e5..f3538f64fa 100644 --- a/src/aof.c +++ b/src/aof.c @@ -987,8 +987,7 @@ int startAppendOnly(void) { /* If AOF fsync error in bio job, we just ignore it and log the event. */ int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed); if (aof_bio_fsync_status == C_ERR) { - serverLog(LL_WARNING, - "AOF reopen, just ignore the AOF fsync error in bio job"); + serverLog(LL_WARNING, "AOF reopen, just ignore the AOF fsync error in bio job"); atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed); } @@ -1245,8 +1244,7 @@ void flushAppendOnlyFile(int force) { server.aof_last_incr_fsync_offset = server.aof_last_incr_size; server.aof_last_fsync = server.mstime; atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed); - } else if (server.aof_fsync == AOF_FSYNC_EVERYSEC && - server.mstime - server.aof_last_fsync >= 1000) { + } else if (server.aof_fsync == AOF_FSYNC_EVERYSEC && server.mstime - server.aof_last_fsync >= 1000) { if (!sync_in_progress) { aof_background_fsync(server.aof_fd); server.aof_last_incr_fsync_offset = server.aof_last_incr_size; @@ -2648,7 +2646,8 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { /* Update the fsynced replication offset that just now become valid. * This could either be the one we took in startAppendOnly, or a * newer one set by the bio thread. */ - long long fsynced_reploff_pending = atomic_load_explicit(&server.fsynced_reploff_pending, memory_order_relaxed); + long long fsynced_reploff_pending = + atomic_load_explicit(&server.fsynced_reploff_pending, memory_order_relaxed); server.fsynced_reploff = fsynced_reploff_pending; } diff --git a/src/bio.c b/src/bio.c index 31d946e566..11692e77ef 100644 --- a/src/bio.c +++ b/src/bio.c @@ -257,9 +257,7 @@ void *bioProcessBackgroundJobs(void *arg) { /* The fd may be closed by main thread and reused for another * socket, pipe, or file. We just ignore these errno because * aof fsync did not really fail. */ - if (valkey_fsync(job->fd_args.fd) == -1 && - errno != EBADF && errno != EINVAL) - { + if (valkey_fsync(job->fd_args.fd) == -1 && errno != EBADF && errno != EINVAL) { int last_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed); atomic_store_explicit(&server.aof_bio_fsync_errno, errno, memory_order_relaxed); diff --git a/src/lazyfree.c b/src/lazyfree.c index 82e468322e..38ccd913bd 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -13,8 +13,8 @@ static _Atomic size_t lazyfreed_objects = 0; void lazyfreeFreeObject(void *args[]) { robj *o = (robj *)args[0]; decrRefCount(o); - atomic_fetch_sub_explicit(&lazyfree_objects,1,memory_order_relaxed); - atomic_fetch_add_explicit(&lazyfreed_objects,1,memory_order_relaxed); + atomic_fetch_sub_explicit(&lazyfree_objects, 1, memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects, 1, memory_order_relaxed); } /* Release a database from the lazyfree thread. The 'db' pointer is the @@ -27,8 +27,8 @@ void lazyfreeFreeDatabase(void *args[]) { size_t numkeys = kvstoreSize(da1); kvstoreRelease(da1); kvstoreRelease(da2); - atomic_fetch_sub_explicit(&lazyfree_objects,numkeys,memory_order_relaxed); - atomic_fetch_add_explicit(&lazyfreed_objects,numkeys,memory_order_relaxed); + atomic_fetch_sub_explicit(&lazyfree_objects, numkeys, memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects, numkeys, memory_order_relaxed); } /* Release the key tracking table. */ @@ -36,8 +36,8 @@ void lazyFreeTrackingTable(void *args[]) { rax *rt = args[0]; size_t len = rt->numele; freeTrackingRadixTree(rt); - atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed); - atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed); + atomic_fetch_sub_explicit(&lazyfree_objects, len, memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects, len, memory_order_relaxed); } /* Release the error stats rax tree. */ @@ -45,8 +45,8 @@ void lazyFreeErrors(void *args[]) { rax *errors = args[0]; size_t len = errors->numele; raxFreeWithCallback(errors, zfree); - atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed); - atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed); + atomic_fetch_sub_explicit(&lazyfree_objects, len, memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects, len, memory_order_relaxed); } /* Release the lua_scripts dict. */ @@ -56,8 +56,8 @@ void lazyFreeLuaScripts(void *args[]) { lua_State *lua = args[2]; long long len = dictSize(lua_scripts); freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua); - atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed); - atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed); + atomic_fetch_sub_explicit(&lazyfree_objects, len, memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects, len, memory_order_relaxed); } /* Release the functions ctx. */ @@ -65,8 +65,8 @@ void lazyFreeFunctionsCtx(void *args[]) { functionsLibCtx *functions_lib_ctx = args[0]; size_t len = functionsLibCtxFunctionsLen(functions_lib_ctx); functionsLibCtxFree(functions_lib_ctx); - atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed); - atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed); + atomic_fetch_sub_explicit(&lazyfree_objects, len, memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects, len, memory_order_relaxed); } /* Release replication backlog referencing memory. */ @@ -77,24 +77,24 @@ void lazyFreeReplicationBacklogRefMem(void *args[]) { len += raxSize(index); listRelease(blocks); raxFree(index); - atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed); - atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed); + atomic_fetch_sub_explicit(&lazyfree_objects, len, memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects, len, memory_order_relaxed); } /* Return the number of currently pending objects to free. */ size_t lazyfreeGetPendingObjectsCount(void) { - size_t aux = atomic_load_explicit(&lazyfree_objects,memory_order_relaxed); + size_t aux = atomic_load_explicit(&lazyfree_objects, memory_order_relaxed); return aux; } /* Return the number of objects that have been freed. */ size_t lazyfreeGetFreedObjectsCount(void) { - size_t aux = atomic_load_explicit(&lazyfreed_objects,memory_order_relaxed); + size_t aux = atomic_load_explicit(&lazyfreed_objects, memory_order_relaxed); return aux; } void lazyfreeResetStats(void) { - atomic_store_explicit(&lazyfreed_objects,0,memory_order_relaxed); + atomic_store_explicit(&lazyfreed_objects, 0, memory_order_relaxed); } /* Return the amount of work needed in order to free an object. @@ -174,8 +174,8 @@ void freeObjAsync(robj *key, robj *obj, int dbid) { * of parts of the server core may call incrRefCount() to protect * objects, and then call dbDelete(). */ if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) { - atomic_fetch_add_explicit(&lazyfree_objects,1,memory_order_relaxed); - bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj); + atomic_fetch_add_explicit(&lazyfree_objects, 1, memory_order_relaxed); + bioCreateLazyFreeJob(lazyfreeFreeObject, 1, obj); } else { decrRefCount(obj); } @@ -203,8 +203,8 @@ void emptyDbAsync(serverDb *db) { void freeTrackingRadixTreeAsync(rax *tracking) { /* Because this rax has only keys and no values so we use numnodes. */ if (tracking->numnodes > LAZYFREE_THRESHOLD) { - atomic_fetch_add_explicit(&lazyfree_objects,tracking->numele,memory_order_relaxed); - bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking); + atomic_fetch_add_explicit(&lazyfree_objects, tracking->numele, memory_order_relaxed); + bioCreateLazyFreeJob(lazyFreeTrackingTable, 1, tracking); } else { freeTrackingRadixTree(tracking); } @@ -215,8 +215,8 @@ void freeTrackingRadixTreeAsync(rax *tracking) { void freeErrorsRadixTreeAsync(rax *errors) { /* Because this rax has only keys and no values so we use numnodes. */ if (errors->numnodes > LAZYFREE_THRESHOLD) { - atomic_fetch_add_explicit(&lazyfree_objects,errors->numele,memory_order_relaxed); - bioCreateLazyFreeJob(lazyFreeErrors,1,errors); + atomic_fetch_add_explicit(&lazyfree_objects, errors->numele, memory_order_relaxed); + bioCreateLazyFreeJob(lazyFreeErrors, 1, errors); } else { raxFreeWithCallback(errors, zfree); } @@ -226,8 +226,8 @@ void freeErrorsRadixTreeAsync(rax *errors) { * Close lua interpreter, if there are a lot of lua scripts, close it in async way. */ void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua) { if (dictSize(lua_scripts) > LAZYFREE_THRESHOLD) { - atomic_fetch_add_explicit(&lazyfree_objects,dictSize(lua_scripts),memory_order_relaxed); - bioCreateLazyFreeJob(lazyFreeLuaScripts,3,lua_scripts,lua_scripts_lru_list,lua); + atomic_fetch_add_explicit(&lazyfree_objects, dictSize(lua_scripts), memory_order_relaxed); + bioCreateLazyFreeJob(lazyFreeLuaScripts, 3, lua_scripts, lua_scripts_lru_list, lua); } else { freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua); } @@ -236,8 +236,9 @@ void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_Stat /* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */ void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) { if (functionsLibCtxFunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) { - atomic_fetch_add_explicit(&lazyfree_objects,functionsLibCtxFunctionsLen(functions_lib_ctx),memory_order_relaxed); - bioCreateLazyFreeJob(lazyFreeFunctionsCtx,1,functions_lib_ctx); + atomic_fetch_add_explicit(&lazyfree_objects, functionsLibCtxFunctionsLen(functions_lib_ctx), + memory_order_relaxed); + bioCreateLazyFreeJob(lazyFreeFunctionsCtx, 1, functions_lib_ctx); } else { functionsLibCtxFree(functions_lib_ctx); } @@ -245,11 +246,9 @@ void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) { /* Free replication backlog referencing buffer blocks and rax index. */ void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) { - if (listLength(blocks) > LAZYFREE_THRESHOLD || - raxSize(index) > LAZYFREE_THRESHOLD) - { - atomic_fetch_add_explicit(&lazyfree_objects,listLength(blocks)+raxSize(index),memory_order_relaxed); - bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem,2,blocks,index); + if (listLength(blocks) > LAZYFREE_THRESHOLD || raxSize(index) > LAZYFREE_THRESHOLD) { + atomic_fetch_add_explicit(&lazyfree_objects, listLength(blocks) + raxSize(index), memory_order_relaxed); + bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem, 2, blocks, index); } else { listRelease(blocks); raxFree(index); diff --git a/src/networking.c b/src/networking.c index e01e76d710..54c90db566 100644 --- a/src/networking.c +++ b/src/networking.c @@ -128,8 +128,8 @@ client *createClient(connection *conn) { connSetPrivateData(conn, c); } c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size); - selectDb(c,0); - uint64_t client_id = atomic_fetch_add_explicit(&server.next_client_id,1,memory_order_relaxed); + selectDb(c, 0); + uint64_t client_id = atomic_fetch_add_explicit(&server.next_client_id, 1, memory_order_relaxed); c->id = client_id; #ifdef LOG_REQ_RES reqresReset(c, 0); @@ -1942,7 +1942,7 @@ int _writeToClient(client *c, ssize_t *nwritten) { * thread safe. */ int writeToClient(client *c, int handler_installed) { /* Update total number of writes on server */ - atomic_fetch_add_explicit(&server.stat_total_writes_processed,1, memory_order_relaxed); + atomic_fetch_add_explicit(&server.stat_total_writes_processed, 1, memory_order_relaxed); ssize_t nwritten = 0, totwritten = 0; @@ -2610,7 +2610,7 @@ void readQueryFromClient(connection *conn) { if (postponeClientRead(c)) return; /* Update total number of reads on server */ - atomic_fetch_add_explicit(&server.stat_total_reads_processed,1,memory_order_relaxed); + atomic_fetch_add_explicit(&server.stat_total_reads_processed, 1, memory_order_relaxed); readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply @@ -2676,9 +2676,9 @@ void readQueryFromClient(connection *conn) { c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) { c->read_reploff += nread; - atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes,nread,memory_order_relaxed); + atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes, nread, memory_order_relaxed); } else { - atomic_fetch_add_explicit(&server.stat_net_input_bytes,nread,memory_order_relaxed); + atomic_fetch_add_explicit(&server.stat_net_input_bytes, nread, memory_order_relaxed); } c->net_input_bytes += nread; @@ -2697,7 +2697,7 @@ void readQueryFromClient(connection *conn) { sdsfree(ci); sdsfree(bytes); freeClientAsync(c); - atomic_fetch_add_explicit(&server.stat_client_qbuf_limit_disconnections,1,memory_order_relaxed); + atomic_fetch_add_explicit(&server.stat_client_qbuf_limit_disconnections, 1, memory_order_relaxed); goto done; } diff --git a/src/rdb.c b/src/rdb.c index abc86566d0..fe297cb7a9 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2889,7 +2889,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { processModuleLoadingProgressEvent(0); } if (server.repl_state == REPL_STATE_TRANSFER && rioCheckType(r) == RIO_TYPE_CONN) { - atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes,len,memory_order_relaxed); + atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes, len, memory_order_relaxed); } } diff --git a/src/replication.c b/src/replication.c index e2612e75b5..32fa5be376 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1364,8 +1364,8 @@ void sendBulkToSlave(connection *conn) { freeClient(slave); return; } - atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes,nwritten,memory_order_relaxed); - sdsrange(slave->replpreamble,nwritten,-1); + atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes, nwritten, memory_order_relaxed); + sdsrange(slave->replpreamble, nwritten, -1); if (sdslen(slave->replpreamble) == 0) { sdsfree(slave->replpreamble); slave->replpreamble = NULL; @@ -1392,7 +1392,7 @@ void sendBulkToSlave(connection *conn) { return; } slave->repldboff += nwritten; - atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes,nwritten,memory_order_relaxed); + atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes, nwritten, memory_order_relaxed); if (slave->repldboff == slave->repldbsize) { closeRepldbfd(slave); connSetWriteHandler(slave->conn, NULL); @@ -1434,7 +1434,7 @@ void rdbPipeWriteHandler(struct connection *conn) { return; } else { slave->repldboff += nwritten; - atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes,nwritten,memory_order_relaxed); + atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes, nwritten, memory_order_relaxed); if (slave->repldboff < server.rdb_pipe_bufflen) { slave->repl_last_partial_write = server.unixtime; return; /* more data to write.. */ @@ -1507,7 +1507,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, /* Note: when use diskless replication, 'repldboff' is the offset * of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */ slave->repldboff = nwritten; - atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes,nwritten,memory_order_relaxed); + atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes, nwritten, memory_order_relaxed); } /* If we were unable to write all the data to one of the replicas, * setup write handler (and disable pipe read handler, below) */ @@ -1815,7 +1815,7 @@ void readSyncBulkPayload(connection *conn) { } else { /* nread here is returned by connSyncReadLine(), which calls syncReadLine() and * convert "\r\n" to '\0' so 1 byte is lost. */ - atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes,nread+1,memory_order_relaxed); + atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes, nread + 1, memory_order_relaxed); } if (buf[0] == '-') { @@ -1884,7 +1884,7 @@ void readSyncBulkPayload(connection *conn) { cancelReplicationHandshake(1); return; } - atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes,nread,memory_order_relaxed); + atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes, nread, memory_order_relaxed); /* When a mark is used, we want to detect EOF asap in order to avoid * writing the EOF mark into the file... */ diff --git a/src/server.c b/src/server.c index edf215eac2..e0590706d3 100644 --- a/src/server.c +++ b/src/server.c @@ -1258,10 +1258,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { long long stat_net_input_bytes, stat_net_output_bytes; long long stat_net_repl_input_bytes, stat_net_repl_output_bytes; - stat_net_input_bytes = atomic_load_explicit(&server.stat_net_input_bytes,memory_order_relaxed); - stat_net_output_bytes = atomic_load_explicit(&server.stat_net_output_bytes,memory_order_relaxed); - stat_net_repl_input_bytes = atomic_load_explicit(&server.stat_net_repl_input_bytes,memory_order_relaxed); - stat_net_repl_output_bytes = atomic_load_explicit(&server.stat_net_repl_output_bytes,memory_order_relaxed); + stat_net_input_bytes = atomic_load_explicit(&server.stat_net_input_bytes, memory_order_relaxed); + stat_net_output_bytes = atomic_load_explicit(&server.stat_net_output_bytes, memory_order_relaxed); + stat_net_repl_input_bytes = atomic_load_explicit(&server.stat_net_repl_input_bytes, memory_order_relaxed); + stat_net_repl_output_bytes = atomic_load_explicit(&server.stat_net_repl_output_bytes, memory_order_relaxed); monotime current_time = getMonotonicUs(); long long factor = 1000000; // us @@ -1737,12 +1737,10 @@ void afterSleep(struct aeEventLoop *eventLoop) { if (moduleCount()) { mstime_t latency; latencyStartMonitor(latency); - atomic_store_explicit(&server.module_gil_acquiring,1,memory_order_relaxed); + atomic_store_explicit(&server.module_gil_acquiring, 1, memory_order_relaxed); moduleAcquireGIL(); - atomic_store_explicit(&server.module_gil_acquiring,0,memory_order_relaxed); - moduleFireServerEvent(VALKEYMODULE_EVENT_EVENTLOOP, - VALKEYMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP, - NULL); + atomic_store_explicit(&server.module_gil_acquiring, 0, memory_order_relaxed); + moduleFireServerEvent(VALKEYMODULE_EVENT_EVENTLOOP, VALKEYMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP, NULL); latencyEndMonitor(latency); latencyAddSampleIfNeeded("module-acquire-GIL", latency); } @@ -1991,7 +1989,7 @@ void initServerConfig(void) { server.aof_flush_sleep = 0; server.aof_last_fsync = time(NULL) * 1000; server.aof_cur_timestamp = 0; - atomic_store_explicit(&server.aof_bio_fsync_status,C_OK,memory_order_relaxed); + atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed); server.aof_rewrite_time_last = -1; server.aof_rewrite_time_start = -1; server.aof_lastbgrewrite_status = C_OK; @@ -2481,10 +2479,10 @@ void resetServerStats(void) { server.stat_sync_partial_ok = 0; server.stat_sync_partial_err = 0; server.stat_io_reads_processed = 0; - atomic_store_explicit(&server.stat_total_reads_processed,0,memory_order_relaxed); + atomic_store_explicit(&server.stat_total_reads_processed, 0, memory_order_relaxed); server.stat_io_writes_processed = 0; - atomic_store_explicit(&server.stat_total_writes_processed,0,memory_order_relaxed); - atomic_store_explicit(&server.stat_client_qbuf_limit_disconnections,0,memory_order_relaxed); + atomic_store_explicit(&server.stat_total_writes_processed, 0, memory_order_relaxed); + atomic_store_explicit(&server.stat_client_qbuf_limit_disconnections, 0, memory_order_relaxed); server.stat_client_outbuf_limit_disconnections = 0; for (j = 0; j < STATS_METRIC_COUNT; j++) { server.inst_metric[j].idx = 0; @@ -2495,10 +2493,10 @@ void resetServerStats(void) { server.stat_aof_rewrites = 0; server.stat_rdb_saves = 0; server.stat_aofrw_consecutive_failures = 0; - atomic_store_explicit(&server.stat_net_input_bytes,0,memory_order_relaxed); - atomic_store_explicit(&server.stat_net_output_bytes,0,memory_order_relaxed); - atomic_store_explicit(&server.stat_net_repl_input_bytes,0,memory_order_relaxed); - atomic_store_explicit(&server.stat_net_repl_output_bytes,0,memory_order_relaxed); + atomic_store_explicit(&server.stat_net_input_bytes, 0, memory_order_relaxed); + atomic_store_explicit(&server.stat_net_output_bytes, 0, memory_order_relaxed); + atomic_store_explicit(&server.stat_net_repl_input_bytes, 0, memory_order_relaxed); + atomic_store_explicit(&server.stat_net_repl_output_bytes, 0, memory_order_relaxed); server.stat_unexpected_error_replies = 0; server.stat_total_error_replies = 0; server.stat_dump_payload_sanitizations = 0; @@ -5530,7 +5528,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { } else if (server.stat_current_save_keys_total) { fork_perc = ((double)server.stat_current_save_keys_processed / server.stat_current_save_keys_total) * 100; } - int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status,memory_order_relaxed); + int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed); /* clang-format off */ info = sdscatprintf(info, "# Persistence\r\n" FMTARGS( @@ -5629,14 +5627,15 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { long long current_active_defrag_time = server.stat_last_active_defrag_time ? (long long)elapsedUs(server.stat_last_active_defrag_time) : 0; long long stat_client_qbuf_limit_disconnections; - + stat_total_reads_processed = atomic_load_explicit(&server.stat_total_reads_processed, memory_order_relaxed); stat_total_writes_processed = atomic_load_explicit(&server.stat_total_writes_processed, memory_order_relaxed); stat_net_input_bytes = atomic_load_explicit(&server.stat_net_input_bytes, memory_order_relaxed); stat_net_output_bytes = atomic_load_explicit(&server.stat_net_output_bytes, memory_order_relaxed); stat_net_repl_input_bytes = atomic_load_explicit(&server.stat_net_repl_input_bytes, memory_order_relaxed); stat_net_repl_output_bytes = atomic_load_explicit(&server.stat_net_repl_output_bytes, memory_order_relaxed); - stat_client_qbuf_limit_disconnections = atomic_load_explicit(&server.stat_client_qbuf_limit_disconnections, memory_order_relaxed); + stat_client_qbuf_limit_disconnections = + atomic_load_explicit(&server.stat_client_qbuf_limit_disconnections, memory_order_relaxed); if (sections++) info = sdscat(info, "\r\n"); /* clang-format off */ diff --git a/src/server.h b/src/server.h index 2c7eebae54..a50ea2eed6 100644 --- a/src/server.h +++ b/src/server.h @@ -1669,13 +1669,13 @@ struct valkeyServer { uint32_t paused_actions; /* Bitmask of actions that are currently paused */ list *postponed_clients; /* List of postponed clients */ pause_event client_pause_per_purpose[NUM_PAUSE_PURPOSES]; - char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ - dict *migrate_cached_sockets;/* MIGRATE cached sockets */ - _Atomic uint64_t next_client_id; /* Next client unique ID. Incremental. */ - int protected_mode; /* Don't accept external connections. */ - int io_threads_num; /* Number of IO threads to use. */ - int io_threads_do_reads; /* Read and parse from IO threads? */ - int io_threads_active; /* Is IO threads currently active? */ + char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ + dict *migrate_cached_sockets; /* MIGRATE cached sockets */ + _Atomic uint64_t next_client_id; /* Next client unique ID. Incremental. */ + int protected_mode; /* Don't accept external connections. */ + int io_threads_num; /* Number of IO threads to use. */ + int io_threads_do_reads; /* Read and parse from IO threads? */ + int io_threads_active; /* Is IO threads currently active? */ long long events_processed_while_blocked; /* processEventsWhileBlocked() */ int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */ int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */ @@ -1696,61 +1696,64 @@ struct valkeyServer { long long stat_expiredkeys; /* Number of expired keys */ double stat_expired_stale_perc; /* Percentage of keys probably expired */ long long stat_expired_time_cap_reached_count; /* Early expire cycle stops.*/ - long long stat_expire_cycle_time_used; /* Cumulative microseconds used. */ - long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */ - long long stat_evictedclients; /* Number of evicted clients */ - long long stat_evictedscripts; /* Number of evicted lua scripts. */ - long long stat_total_eviction_exceeded_time; /* Total time over the memory limit, unit us */ - monotime stat_last_eviction_exceeded_time; /* Timestamp of current eviction start, unit us */ - long long stat_keyspace_hits; /* Number of successful lookups of keys */ - long long stat_keyspace_misses; /* Number of failed lookups of keys */ - long long stat_active_defrag_hits; /* number of allocations moved */ - long long stat_active_defrag_misses; /* number of allocations scanned but not moved */ - long long stat_active_defrag_key_hits; /* number of keys with moved allocations */ - long long stat_active_defrag_key_misses;/* number of keys scanned and not moved */ - long long stat_active_defrag_scanned; /* number of dictEntries scanned */ - long long stat_total_active_defrag_time; /* Total time memory fragmentation over the limit, unit us */ - monotime stat_last_active_defrag_time; /* Timestamp of current active defrag start */ - size_t stat_peak_memory; /* Max used memory record */ - long long stat_aof_rewrites; /* number of aof file rewrites performed */ - long long stat_aofrw_consecutive_failures; /* The number of consecutive failures of aofrw */ - long long stat_rdb_saves; /* number of rdb saves performed */ - long long stat_fork_time; /* Time needed to perform latest fork() */ - double stat_fork_rate; /* Fork rate in GB/sec. */ - long long stat_total_forks; /* Total count of fork. */ - long long stat_rejected_conn; /* Clients rejected because of maxclients */ - long long stat_sync_full; /* Number of full resyncs with slaves. */ - long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */ - long long stat_sync_partial_err;/* Number of unaccepted PSYNC requests. */ - list *slowlog; /* SLOWLOG list of commands */ - long long slowlog_entry_id; /* SLOWLOG current entry ID */ - long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ - unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ - struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */ - _Atomic long long stat_net_input_bytes; /* Bytes read from network. */ - _Atomic long long stat_net_output_bytes; /* Bytes written to network. */ - _Atomic long long stat_net_repl_input_bytes; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */ - _Atomic long long stat_net_repl_output_bytes; /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */ - size_t stat_current_cow_peak; /* Peak size of copy on write bytes. */ - size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */ - monotime stat_current_cow_updated; /* Last update time of stat_current_cow_bytes */ - size_t stat_current_save_keys_processed; /* Processed keys while child is active. */ - size_t stat_current_save_keys_total; /* Number of keys when child started. */ - size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ - size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ - size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ - double stat_module_progress; /* Module save progress. */ - size_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */ - size_t stat_cluster_links_memory; /* Mem usage by cluster links */ - long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */ + long long stat_expire_cycle_time_used; /* Cumulative microseconds used. */ + long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */ + long long stat_evictedclients; /* Number of evicted clients */ + long long stat_evictedscripts; /* Number of evicted lua scripts. */ + long long stat_total_eviction_exceeded_time; /* Total time over the memory limit, unit us */ + monotime stat_last_eviction_exceeded_time; /* Timestamp of current eviction start, unit us */ + long long stat_keyspace_hits; /* Number of successful lookups of keys */ + long long stat_keyspace_misses; /* Number of failed lookups of keys */ + long long stat_active_defrag_hits; /* number of allocations moved */ + long long stat_active_defrag_misses; /* number of allocations scanned but not moved */ + long long stat_active_defrag_key_hits; /* number of keys with moved allocations */ + long long stat_active_defrag_key_misses; /* number of keys scanned and not moved */ + long long stat_active_defrag_scanned; /* number of dictEntries scanned */ + long long stat_total_active_defrag_time; /* Total time memory fragmentation over the limit, unit us */ + monotime stat_last_active_defrag_time; /* Timestamp of current active defrag start */ + size_t stat_peak_memory; /* Max used memory record */ + long long stat_aof_rewrites; /* number of aof file rewrites performed */ + long long stat_aofrw_consecutive_failures; /* The number of consecutive failures of aofrw */ + long long stat_rdb_saves; /* number of rdb saves performed */ + long long stat_fork_time; /* Time needed to perform latest fork() */ + double stat_fork_rate; /* Fork rate in GB/sec. */ + long long stat_total_forks; /* Total count of fork. */ + long long stat_rejected_conn; /* Clients rejected because of maxclients */ + long long stat_sync_full; /* Number of full resyncs with slaves. */ + long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */ + long long stat_sync_partial_err; /* Number of unaccepted PSYNC requests. */ + list *slowlog; /* SLOWLOG list of commands */ + long long slowlog_entry_id; /* SLOWLOG current entry ID */ + long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ + unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ + struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */ + _Atomic long long stat_net_input_bytes; /* Bytes read from network. */ + _Atomic long long stat_net_output_bytes; /* Bytes written to network. */ + _Atomic long long + stat_net_repl_input_bytes; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */ + _Atomic long long + stat_net_repl_output_bytes; /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */ + size_t stat_current_cow_peak; /* Peak size of copy on write bytes. */ + size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */ + monotime stat_current_cow_updated; /* Last update time of stat_current_cow_bytes */ + size_t stat_current_save_keys_processed; /* Processed keys while child is active. */ + size_t stat_current_save_keys_total; /* Number of keys when child started. */ + size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ + size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ + size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ + double stat_module_progress; /* Module save progress. */ + size_t stat_clients_type_memory[CLIENT_TYPE_COUNT]; /* Mem usage by type */ + size_t stat_cluster_links_memory; /* Mem usage by cluster links */ + long long + stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */ long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */ - long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ - long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */ - long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */ - _Atomic long long stat_total_reads_processed; /* Total number of read events processed */ - _Atomic long long stat_total_writes_processed; /* Total number of write events processed */ - _Atomic long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ - long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */ + long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ + long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */ + long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */ + _Atomic long long stat_total_reads_processed; /* Total number of read events processed */ + _Atomic long long stat_total_writes_processed; /* Total number of write events processed */ + _Atomic long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ + long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */ /* The following two are used to track instantaneous metrics, like * number of operations per second, network traffic. */ struct { @@ -1810,43 +1813,43 @@ struct valkeyServer { unsigned int max_new_conns_per_cycle; /* The maximum number of tcp connections that will be accepted during each invocation of the event loop. */ /* AOF persistence */ - int aof_enabled; /* AOF configuration */ - int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ - int aof_fsync; /* Kind of fsync() policy */ - char *aof_filename; /* Basename of the AOF file and manifest file */ - char *aof_dirname; /* Name of the AOF directory */ - int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */ - int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */ - off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ - off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ - off_t aof_current_size; /* AOF current size (Including BASE + INCRs). */ - off_t aof_last_incr_size; /* The size of the latest incr AOF. */ - off_t aof_last_incr_fsync_offset; /* AOF offset which is already requested to be synced to disk. - * Compare with the aof_last_incr_size. */ - int aof_flush_sleep; /* Micros to sleep before flush. (used by tests) */ - int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ - sds aof_buf; /* AOF buffer, written before entering the event loop */ - int aof_fd; /* File descriptor of currently selected AOF file */ - int aof_selected_db; /* Currently selected DB in AOF */ + int aof_enabled; /* AOF configuration */ + int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ + int aof_fsync; /* Kind of fsync() policy */ + char *aof_filename; /* Basename of the AOF file and manifest file */ + char *aof_dirname; /* Name of the AOF directory */ + int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */ + int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */ + off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ + off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ + off_t aof_current_size; /* AOF current size (Including BASE + INCRs). */ + off_t aof_last_incr_size; /* The size of the latest incr AOF. */ + off_t aof_last_incr_fsync_offset; /* AOF offset which is already requested to be synced to disk. + * Compare with the aof_last_incr_size. */ + int aof_flush_sleep; /* Micros to sleep before flush. (used by tests) */ + int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ + sds aof_buf; /* AOF buffer, written before entering the event loop */ + int aof_fd; /* File descriptor of currently selected AOF file */ + int aof_selected_db; /* Currently selected DB in AOF */ mstime_t aof_flush_postponed_start; /* mstime of postponed AOF flush */ mstime_t aof_last_fsync; /* mstime of last fsync() */ - time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */ - time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ - time_t aof_cur_timestamp; /* Current record timestamp in AOF */ - int aof_timestamp_enabled; /* Enable record timestamp in AOF */ - int aof_lastbgrewrite_status; /* C_OK or C_ERR */ - unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */ - int aof_rewrite_incremental_fsync;/* fsync incrementally while aof rewriting? */ - int rdb_save_incremental_fsync; /* fsync incrementally while rdb saving? */ - int aof_last_write_status; /* C_OK or C_ERR */ - int aof_last_write_errno; /* Valid if aof write/fsync status is ERR */ - int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */ - int aof_use_rdb_preamble; /* Specify base AOF to use RDB encoding on AOF rewrites. */ - _Atomic int aof_bio_fsync_status; /* Status of AOF fsync in bio job. */ - _Atomic int aof_bio_fsync_errno; /* Errno of AOF fsync in bio job. */ - aofManifest *aof_manifest; /* Used to track AOFs. */ - int aof_disable_auto_gc; /* If disable automatically deleting HISTORY type AOFs? - default no. (for testings). */ + time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */ + time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ + time_t aof_cur_timestamp; /* Current record timestamp in AOF */ + int aof_timestamp_enabled; /* Enable record timestamp in AOF */ + int aof_lastbgrewrite_status; /* C_OK or C_ERR */ + unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */ + int aof_rewrite_incremental_fsync; /* fsync incrementally while aof rewriting? */ + int rdb_save_incremental_fsync; /* fsync incrementally while rdb saving? */ + int aof_last_write_status; /* C_OK or C_ERR */ + int aof_last_write_errno; /* Valid if aof write/fsync status is ERR */ + int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */ + int aof_use_rdb_preamble; /* Specify base AOF to use RDB encoding on AOF rewrites. */ + _Atomic int aof_bio_fsync_status; /* Status of AOF fsync in bio job. */ + _Atomic int aof_bio_fsync_errno; /* Errno of AOF fsync in bio job. */ + aofManifest *aof_manifest; /* Used to track AOFs. */ + int aof_disable_auto_gc; /* If disable automatically deleting HISTORY type AOFs? + default no. (for testings). */ /* RDB persistence */ long long dirty; /* Changes to DB from the last save */ @@ -1904,35 +1907,35 @@ struct valkeyServer { int shutdown_on_sigterm; /* Shutdown flags configured for SIGTERM. */ /* Replication (master) */ - char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */ - char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/ - long long master_repl_offset; /* My current replication offset */ - long long second_replid_offset; /* Accept offsets up to this for replid2. */ - _Atomic long long fsynced_reploff_pending;/* Largest replication offset to - * potentially have been fsynced, applied to - fsynced_reploff only when AOF state is AOF_ON - (not during the initial rewrite) */ - long long fsynced_reploff; /* Largest replication offset that has been confirmed to be fsynced */ - int slaveseldb; /* Last SELECTed DB in replication output */ - int repl_ping_slave_period; /* Master pings the slave every N seconds */ - replBacklog *repl_backlog; /* Replication backlog for partial syncs */ - long long repl_backlog_size; /* Backlog circular buffer size */ - time_t repl_backlog_time_limit; /* Time without slaves after the backlog - gets released. */ - time_t repl_no_slaves_since; /* We have no slaves since that time. - Only valid if server.slaves len is 0. */ - int repl_min_slaves_to_write; /* Min number of slaves to write. */ - int repl_min_slaves_max_lag; /* Max lag of slaves to write. */ - int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ - int repl_diskless_sync; /* Master send RDB to slaves sockets directly. */ - int repl_diskless_load; /* Slave parse RDB directly from the socket. - * see REPL_DISKLESS_LOAD_* enum */ - int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ - int repl_diskless_sync_max_replicas;/* Max replicas for diskless repl BGSAVE - * delay (start sooner if they all connect). */ - size_t repl_buffer_mem; /* The memory of replication buffer. */ - list *repl_buffer_blocks; /* Replication buffers blocks list - * (serving replica clients and repl backlog) */ + char replid[CONFIG_RUN_ID_SIZE + 1]; /* My current replication ID. */ + char replid2[CONFIG_RUN_ID_SIZE + 1]; /* replid inherited from master*/ + long long master_repl_offset; /* My current replication offset */ + long long second_replid_offset; /* Accept offsets up to this for replid2. */ + _Atomic long long fsynced_reploff_pending; /* Largest replication offset to + * potentially have been fsynced, applied to + fsynced_reploff only when AOF state is AOF_ON + (not during the initial rewrite) */ + long long fsynced_reploff; /* Largest replication offset that has been confirmed to be fsynced */ + int slaveseldb; /* Last SELECTed DB in replication output */ + int repl_ping_slave_period; /* Master pings the slave every N seconds */ + replBacklog *repl_backlog; /* Replication backlog for partial syncs */ + long long repl_backlog_size; /* Backlog circular buffer size */ + time_t repl_backlog_time_limit; /* Time without slaves after the backlog + gets released. */ + time_t repl_no_slaves_since; /* We have no slaves since that time. + Only valid if server.slaves len is 0. */ + int repl_min_slaves_to_write; /* Min number of slaves to write. */ + int repl_min_slaves_max_lag; /* Max lag of slaves to write. */ + int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ + int repl_diskless_sync; /* Master send RDB to slaves sockets directly. */ + int repl_diskless_load; /* Slave parse RDB directly from the socket. + * see REPL_DISKLESS_LOAD_* enum */ + int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ + int repl_diskless_sync_max_replicas; /* Max replicas for diskless repl BGSAVE + * delay (start sooner if they all connect). */ + size_t repl_buffer_mem; /* The memory of replication buffer. */ + list *repl_buffer_blocks; /* Replication buffers blocks list + * (serving replica clients and repl backlog) */ /* Replication (slave) */ char *masteruser; /* AUTH with this user and masterauth with master */ sds masterauth; /* AUTH with this password with master */ @@ -2016,13 +2019,13 @@ struct valkeyServer { int list_max_listpack_size; int list_compress_depth; /* time cache */ - _Atomic time_t unixtime; /* Unix time sampled every cron cycle. */ - time_t timezone; /* Cached timezone. As set by tzset(). */ - int daylight_active; /* Currently in daylight saving time. */ - mstime_t mstime; /* 'unixtime' in milliseconds. */ - ustime_t ustime; /* 'unixtime' in microseconds. */ - mstime_t cmd_time_snapshot; /* Time snapshot of the root execution nesting. */ - size_t blocking_op_nesting; /* Nesting level of blocking operation, used to reset blocked_last_cron. */ + _Atomic time_t unixtime; /* Unix time sampled every cron cycle. */ + time_t timezone; /* Cached timezone. As set by tzset(). */ + int daylight_active; /* Currently in daylight saving time. */ + mstime_t mstime; /* 'unixtime' in milliseconds. */ + ustime_t ustime; /* 'unixtime' in microseconds. */ + mstime_t cmd_time_snapshot; /* Time snapshot of the root execution nesting. */ + size_t blocking_op_nesting; /* Nesting level of blocking operation, used to reset blocked_last_cron. */ long long blocked_last_cron; /* Indicate the mstime of the last time we did cron jobs from a blocking operation */ /* Pubsub */ kvstore *pubsub_channels; /* Map channels to list of subscribed clients */ diff --git a/src/threads_mngr.c b/src/threads_mngr.c index e1f1d7e7b4..f18bc79b92 100644 --- a/src/threads_mngr.c +++ b/src/threads_mngr.c @@ -112,7 +112,7 @@ __attribute__((noinline)) int ThreadsManager_runOnThreads(pid_t *tids, size_t ti static int test_and_start(void) { /* atomic_exchange_explicit sets the variable to 1 and returns the previous value */ - int prev_state = atomic_exchange_explicit(&g_in_progress,1,memory_order_relaxed); + int prev_state = atomic_exchange_explicit(&g_in_progress, 1, memory_order_relaxed); /* If prev_state is 1, g_in_progress was on. */ return prev_state; @@ -123,7 +123,7 @@ __attribute__((noinline)) static void invoke_callback(int sig) { run_on_thread_cb callback = g_callback; if (callback) { callback(); - atomic_fetch_add_explicit(&g_num_threads_done,1,memory_order_relaxed); + atomic_fetch_add_explicit(&g_num_threads_done, 1, memory_order_relaxed); } else { serverLogFromHandler(LL_WARNING, "tid %ld: ThreadsManager g_callback is NULL", syscall(SYS_gettid)); } @@ -145,7 +145,7 @@ static void wait_threads(void) { /* Sleep a bit to yield to other threads. */ /* usleep isn't listed as signal safe, so we use select instead */ select(0, NULL, NULL, NULL, &tv); - curr_done_count = atomic_load_explicit(&g_num_threads_done,memory_order_relaxed); + curr_done_count = atomic_load_explicit(&g_num_threads_done, memory_order_relaxed); clock_gettime(CLOCK_REALTIME, &curr_time); } while (curr_done_count < g_tids_len && curr_time.tv_sec <= timeout_time.tv_sec); @@ -160,7 +160,7 @@ static void ThreadsManager_cleanups(void) { g_num_threads_done = 0; /* Lastly, turn off g_in_progress */ - atomic_store_explicit(&g_in_progress,0,memory_order_relaxed); + atomic_store_explicit(&g_in_progress, 0, memory_order_relaxed); } #else diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 2eec75a10a..cb944fac02 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -115,8 +115,8 @@ static struct config { int cluster_node_count; struct clusterNode **cluster_nodes; struct serverConfig *redis_config; - struct hdr_histogram* latency_histogram; - struct hdr_histogram* current_sec_latency_histogram; + struct hdr_histogram *latency_histogram; + struct hdr_histogram *current_sec_latency_histogram; _Atomic int is_fetching_slots; _Atomic int is_updating_slots; _Atomic int slots_last_update; @@ -344,7 +344,7 @@ static void freeClient(client c) { aeDeleteFileEvent(el, c->context->fd, AE_WRITABLE); aeDeleteFileEvent(el, c->context->fd, AE_READABLE); if (c->thread_id >= 0) { - int requests_finished = atomic_load_explicit(&config.requests_finished,memory_order_relaxed); + int requests_finished = atomic_load_explicit(&config.requests_finished, memory_order_relaxed); if (requests_finished >= config.requests) { aeStop(el); } @@ -402,7 +402,7 @@ static void setClusterKeyHashTag(client c) { assert(c->thread_id >= 0); clusterNode *node = c->cluster_node; assert(node); - int is_updating_slots = atomic_load_explicit(&config.is_updating_slots,memory_order_relaxed); + int is_updating_slots = atomic_load_explicit(&config.is_updating_slots, memory_order_relaxed); /* If updateClusterSlotsConfiguration is updating the slots array, * call updateClusterSlotsConfiguration is order to block the thread * since the mutex is locked. When the slots will be updated by the @@ -423,7 +423,7 @@ static void setClusterKeyHashTag(client c) { } static void clientDone(client c) { - int requests_finished = atomic_load_explicit(&config.requests_finished,memory_order_relaxed); + int requests_finished = atomic_load_explicit(&config.requests_finished, memory_order_relaxed); if (requests_finished >= config.requests) { freeClient(c); if (!config.num_threads && config.el) aeStop(config.el); @@ -517,23 +517,27 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } continue; } - int requests_finished = atomic_fetch_add_explicit(&config.requests_finished,1,memory_order_relaxed); - if (requests_finished < config.requests){ - if (config.num_threads == 0) { - hdr_record_value( - config.latency_histogram, // Histogram to record to - (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_MAX_VALUE); // Value to record - hdr_record_value( - config.current_sec_latency_histogram, // Histogram to record to - (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE); // Value to record - } else { - hdr_record_value_atomic( - config.latency_histogram, // Histogram to record to - (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_MAX_VALUE); // Value to record - hdr_record_value_atomic( - config.current_sec_latency_histogram, // Histogram to record to - (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE); // Value to record - } + int requests_finished = atomic_fetch_add_explicit(&config.requests_finished, 1, memory_order_relaxed); + if (requests_finished < config.requests) { + if (config.num_threads == 0) { + hdr_record_value(config.latency_histogram, // Histogram to record to + (long)c->latency <= CONFIG_LATENCY_HISTOGRAM_MAX_VALUE + ? (long)c->latency + : CONFIG_LATENCY_HISTOGRAM_MAX_VALUE); // Value to record + hdr_record_value(config.current_sec_latency_histogram, // Histogram to record to + (long)c->latency <= CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE + ? (long)c->latency + : CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE); // Value to record + } else { + hdr_record_value_atomic(config.latency_histogram, // Histogram to record to + (long)c->latency <= CONFIG_LATENCY_HISTOGRAM_MAX_VALUE + ? (long)c->latency + : CONFIG_LATENCY_HISTOGRAM_MAX_VALUE); // Value to record + hdr_record_value_atomic(config.current_sec_latency_histogram, // Histogram to record to + (long)c->latency <= CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE + ? (long)c->latency + : CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE); // Value to record + } } c->pending--; if (c->pending == 0) { @@ -556,7 +560,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { /* Initialize request when nothing was written. */ if (c->written == 0) { /* Enforce upper bound to number of requests. */ - int requests_issued = atomic_fetch_add_explicit(&config.requests_issued,config.pipeline,memory_order_relaxed); + int requests_issued = atomic_fetch_add_explicit(&config.requests_issued, config.pipeline, memory_order_relaxed); if (requests_issued >= config.requests) { return; } @@ -794,7 +798,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { /* In idle mode, clients still need to register readHandler for catching errors */ aeCreateFileEvent(el, c->context->fd, AE_READABLE, readHandler, c); - listAddNodeTail(config.clients,c); + listAddNodeTail(config.clients, c); atomic_fetch_add_explicit(&config.liveclients, 1, memory_order_relaxed); c->slots_last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed); @@ -1232,10 +1236,9 @@ static int fetchClusterSlotsConfiguration(client c) { redisReply *reply = NULL; is_fetching_slots = atomic_fetch_add_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); - if (is_fetching_slots) return -1; //TODO: use other codes || errno ? + if (is_fetching_slots) return -1; // TODO: use other codes || errno ? atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); - fprintf(stderr, - "WARNING: Cluster slots configuration changed, fetching new one...\n"); + fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n"); const char *errmsg = "Failed to update cluster slots configuration"; static dictType dtype = { dictSdsHash, /* hash function */ diff --git a/src/zmalloc.c b/src/zmalloc.c index 8819f0c518..0117d8d91a 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -388,7 +388,7 @@ char *zstrdup(const char *s) { } size_t zmalloc_used_memory(void) { - size_t um = atomic_load_explicit(&used_memory,memory_order_relaxed); + size_t um = atomic_load_explicit(&used_memory, memory_order_relaxed); return um; }