diff --git a/src/aof.c b/src/aof.c index a2ed139f8b..d15ff379e5 100644 --- a/src/aof.c +++ b/src/aof.c @@ -946,7 +946,7 @@ void stopAppendOnly(void) { server.aof_last_incr_size = 0; server.aof_last_incr_fsync_offset = 0; server.fsynced_reploff = -1; - atomicSet(server.fsynced_reploff_pending, 0); + atomic_store_explicit(&server.fsynced_reploff_pending, 0, memory_order_relaxed); killAppendOnlyChild(); sdsfree(server.aof_buf); server.aof_buf = sdsempty(); @@ -985,11 +985,11 @@ int startAppendOnly(void) { } server.aof_last_fsync = server.mstime; /* If AOF fsync error in bio job, we just ignore it and log the event. */ - int aof_bio_fsync_status; - atomicGet(server.aof_bio_fsync_status, aof_bio_fsync_status); + int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed); if (aof_bio_fsync_status == C_ERR) { - serverLog(LL_WARNING, "AOF reopen, just ignore the AOF fsync error in bio job"); - atomicSet(server.aof_bio_fsync_status, C_OK); + serverLog(LL_WARNING, + "AOF reopen, just ignore the AOF fsync error in bio job"); + atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed); } /* If AOF was in error state, we just ignore it and log the event. */ @@ -1074,7 +1074,7 @@ void flushAppendOnlyFile(int force) { * (because there's no reason, from the AOF POV, to call fsync) and then WAITAOF may wait on * the higher offset (which contains data that was only propagated to replicas, and not to AOF) */ if (!sync_in_progress && server.aof_fsync != AOF_FSYNC_NO) - atomicSet(server.fsynced_reploff_pending, server.master_repl_offset); + atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed); return; } } @@ -1244,8 +1244,9 @@ void flushAppendOnlyFile(int force) { latencyAddSampleIfNeeded("aof-fsync-always", latency); server.aof_last_incr_fsync_offset = server.aof_last_incr_size; server.aof_last_fsync = server.mstime; - atomicSet(server.fsynced_reploff_pending, server.master_repl_offset); - } else if (server.aof_fsync == AOF_FSYNC_EVERYSEC && server.mstime - server.aof_last_fsync >= 1000) { + atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed); + } else if (server.aof_fsync == AOF_FSYNC_EVERYSEC && + server.mstime - server.aof_last_fsync >= 1000) { if (!sync_in_progress) { aof_background_fsync(server.aof_fd); server.aof_last_incr_fsync_offset = server.aof_last_incr_size; @@ -2409,7 +2410,7 @@ int rewriteAppendOnlyFileBackground(void) { /* Set the initial repl_offset, which will be applied to fsynced_reploff * when AOFRW finishes (after possibly being updated by a bio thread) */ - atomicSet(server.fsynced_reploff_pending, server.master_repl_offset); + atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed); server.fsynced_reploff = 0; } @@ -2647,8 +2648,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { /* Update the fsynced replication offset that just now become valid. * This could either be the one we took in startAppendOnly, or a * newer one set by the bio thread. */ - long long fsynced_reploff_pending; - atomicGet(server.fsynced_reploff_pending, fsynced_reploff_pending); + long long fsynced_reploff_pending = atomic_load_explicit(&server.fsynced_reploff_pending, memory_order_relaxed); server.fsynced_reploff = fsynced_reploff_pending; } diff --git a/src/bio.c b/src/bio.c index 4d1d268e62..31d946e566 100644 --- a/src/bio.c +++ b/src/bio.c @@ -62,6 +62,7 @@ #include "server.h" #include "bio.h" +#include static char *bio_worker_title[] = { "bio_close_file", @@ -256,17 +257,19 @@ void *bioProcessBackgroundJobs(void *arg) { /* The fd may be closed by main thread and reused for another * socket, pipe, or file. We just ignore these errno because * aof fsync did not really fail. */ - if (valkey_fsync(job->fd_args.fd) == -1 && errno != EBADF && errno != EINVAL) { - int last_status; - atomicGet(server.aof_bio_fsync_status, last_status); - atomicSet(server.aof_bio_fsync_status, C_ERR); - atomicSet(server.aof_bio_fsync_errno, errno); + if (valkey_fsync(job->fd_args.fd) == -1 && + errno != EBADF && errno != EINVAL) + { + int last_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed); + + atomic_store_explicit(&server.aof_bio_fsync_errno, errno, memory_order_relaxed); + atomic_store_explicit(&server.aof_bio_fsync_status, C_ERR, memory_order_release); if (last_status == C_OK) { serverLog(LL_WARNING, "Fail to fsync the AOF file: %s", strerror(errno)); } } else { - atomicSet(server.aof_bio_fsync_status, C_OK); - atomicSet(server.fsynced_reploff_pending, job->fd_args.offset); + atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed); + atomic_store_explicit(&server.fsynced_reploff_pending, job->fd_args.offset, memory_order_relaxed); } if (job->fd_args.need_reclaim_cache) { diff --git a/src/db.c b/src/db.c index fa07deeb4b..a78c8bad2b 100644 --- a/src/db.c +++ b/src/db.c @@ -29,7 +29,6 @@ #include "server.h" #include "cluster.h" -#include "atomicvar.h" #include "latency.h" #include "script.h" #include "functions.h" diff --git a/src/evict.c b/src/evict.c index 4a51974ac6..cf209ff065 100644 --- a/src/evict.c +++ b/src/evict.c @@ -32,7 +32,6 @@ #include "server.h" #include "bio.h" -#include "atomicvar.h" #include "script.h" #include diff --git a/src/functions.c b/src/functions.c index 76e40c5231..3076f3b90a 100644 --- a/src/functions.c +++ b/src/functions.c @@ -31,7 +31,6 @@ #include "sds.h" #include "dict.h" #include "adlist.h" -#include "atomicvar.h" #define LOAD_TIMEOUT_MS 500 diff --git a/src/lazyfree.c b/src/lazyfree.c index f9811f0e64..82e468322e 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -1,19 +1,20 @@ #include "server.h" #include "bio.h" -#include "atomicvar.h" #include "functions.h" #include "cluster.h" -static serverAtomic size_t lazyfree_objects = 0; -static serverAtomic size_t lazyfreed_objects = 0; +#include + +static _Atomic size_t lazyfree_objects = 0; +static _Atomic size_t lazyfreed_objects = 0; /* Release objects from the lazyfree thread. It's just decrRefCount() * updating the count of objects to release. */ void lazyfreeFreeObject(void *args[]) { robj *o = (robj *)args[0]; decrRefCount(o); - atomicDecr(lazyfree_objects, 1); - atomicIncr(lazyfreed_objects, 1); + atomic_fetch_sub_explicit(&lazyfree_objects,1,memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects,1,memory_order_relaxed); } /* Release a database from the lazyfree thread. The 'db' pointer is the @@ -26,8 +27,8 @@ void lazyfreeFreeDatabase(void *args[]) { size_t numkeys = kvstoreSize(da1); kvstoreRelease(da1); kvstoreRelease(da2); - atomicDecr(lazyfree_objects, numkeys); - atomicIncr(lazyfreed_objects, numkeys); + atomic_fetch_sub_explicit(&lazyfree_objects,numkeys,memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects,numkeys,memory_order_relaxed); } /* Release the key tracking table. */ @@ -35,8 +36,8 @@ void lazyFreeTrackingTable(void *args[]) { rax *rt = args[0]; size_t len = rt->numele; freeTrackingRadixTree(rt); - atomicDecr(lazyfree_objects, len); - atomicIncr(lazyfreed_objects, len); + atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed); } /* Release the error stats rax tree. */ @@ -44,8 +45,8 @@ void lazyFreeErrors(void *args[]) { rax *errors = args[0]; size_t len = errors->numele; raxFreeWithCallback(errors, zfree); - atomicDecr(lazyfree_objects, len); - atomicIncr(lazyfreed_objects, len); + atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed); } /* Release the lua_scripts dict. */ @@ -55,8 +56,8 @@ void lazyFreeLuaScripts(void *args[]) { lua_State *lua = args[2]; long long len = dictSize(lua_scripts); freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua); - atomicDecr(lazyfree_objects, len); - atomicIncr(lazyfreed_objects, len); + atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed); } /* Release the functions ctx. */ @@ -64,8 +65,8 @@ void lazyFreeFunctionsCtx(void *args[]) { functionsLibCtx *functions_lib_ctx = args[0]; size_t len = functionsLibCtxFunctionsLen(functions_lib_ctx); functionsLibCtxFree(functions_lib_ctx); - atomicDecr(lazyfree_objects, len); - atomicIncr(lazyfreed_objects, len); + atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed); } /* Release replication backlog referencing memory. */ @@ -76,26 +77,24 @@ void lazyFreeReplicationBacklogRefMem(void *args[]) { len += raxSize(index); listRelease(blocks); raxFree(index); - atomicDecr(lazyfree_objects, len); - atomicIncr(lazyfreed_objects, len); + atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed); + atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed); } /* Return the number of currently pending objects to free. */ size_t lazyfreeGetPendingObjectsCount(void) { - size_t aux; - atomicGet(lazyfree_objects, aux); + size_t aux = atomic_load_explicit(&lazyfree_objects,memory_order_relaxed); return aux; } /* Return the number of objects that have been freed. */ size_t lazyfreeGetFreedObjectsCount(void) { - size_t aux; - atomicGet(lazyfreed_objects, aux); + size_t aux = atomic_load_explicit(&lazyfreed_objects,memory_order_relaxed); return aux; } void lazyfreeResetStats(void) { - atomicSet(lazyfreed_objects, 0); + atomic_store_explicit(&lazyfreed_objects,0,memory_order_relaxed); } /* Return the amount of work needed in order to free an object. @@ -175,8 +174,8 @@ void freeObjAsync(robj *key, robj *obj, int dbid) { * of parts of the server core may call incrRefCount() to protect * objects, and then call dbDelete(). */ if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) { - atomicIncr(lazyfree_objects, 1); - bioCreateLazyFreeJob(lazyfreeFreeObject, 1, obj); + atomic_fetch_add_explicit(&lazyfree_objects,1,memory_order_relaxed); + bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj); } else { decrRefCount(obj); } @@ -195,7 +194,7 @@ void emptyDbAsync(serverDb *db) { kvstore *oldkeys = db->keys, *oldexpires = db->expires; db->keys = kvstoreCreate(&dbDictType, slot_count_bits, flags); db->expires = kvstoreCreate(&dbExpiresDictType, slot_count_bits, flags); - atomicIncr(lazyfree_objects, kvstoreSize(oldkeys)); + atomic_fetch_add_explicit(&lazyfree_objects, kvstoreSize(oldkeys), memory_order_relaxed); bioCreateLazyFreeJob(lazyfreeFreeDatabase, 2, oldkeys, oldexpires); } @@ -204,8 +203,8 @@ void emptyDbAsync(serverDb *db) { void freeTrackingRadixTreeAsync(rax *tracking) { /* Because this rax has only keys and no values so we use numnodes. */ if (tracking->numnodes > LAZYFREE_THRESHOLD) { - atomicIncr(lazyfree_objects, tracking->numele); - bioCreateLazyFreeJob(lazyFreeTrackingTable, 1, tracking); + atomic_fetch_add_explicit(&lazyfree_objects,tracking->numele,memory_order_relaxed); + bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking); } else { freeTrackingRadixTree(tracking); } @@ -216,8 +215,8 @@ void freeTrackingRadixTreeAsync(rax *tracking) { void freeErrorsRadixTreeAsync(rax *errors) { /* Because this rax has only keys and no values so we use numnodes. */ if (errors->numnodes > LAZYFREE_THRESHOLD) { - atomicIncr(lazyfree_objects, errors->numele); - bioCreateLazyFreeJob(lazyFreeErrors, 1, errors); + atomic_fetch_add_explicit(&lazyfree_objects,errors->numele,memory_order_relaxed); + bioCreateLazyFreeJob(lazyFreeErrors,1,errors); } else { raxFreeWithCallback(errors, zfree); } @@ -227,8 +226,8 @@ void freeErrorsRadixTreeAsync(rax *errors) { * Close lua interpreter, if there are a lot of lua scripts, close it in async way. */ void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua) { if (dictSize(lua_scripts) > LAZYFREE_THRESHOLD) { - atomicIncr(lazyfree_objects, dictSize(lua_scripts)); - bioCreateLazyFreeJob(lazyFreeLuaScripts, 3, lua_scripts, lua_scripts_lru_list, lua); + atomic_fetch_add_explicit(&lazyfree_objects,dictSize(lua_scripts),memory_order_relaxed); + bioCreateLazyFreeJob(lazyFreeLuaScripts,3,lua_scripts,lua_scripts_lru_list,lua); } else { freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua); } @@ -237,8 +236,8 @@ void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_Stat /* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */ void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) { if (functionsLibCtxFunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) { - atomicIncr(lazyfree_objects, functionsLibCtxFunctionsLen(functions_lib_ctx)); - bioCreateLazyFreeJob(lazyFreeFunctionsCtx, 1, functions_lib_ctx); + atomic_fetch_add_explicit(&lazyfree_objects,functionsLibCtxFunctionsLen(functions_lib_ctx),memory_order_relaxed); + bioCreateLazyFreeJob(lazyFreeFunctionsCtx,1,functions_lib_ctx); } else { functionsLibCtxFree(functions_lib_ctx); } @@ -246,9 +245,11 @@ void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) { /* Free replication backlog referencing buffer blocks and rax index. */ void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) { - if (listLength(blocks) > LAZYFREE_THRESHOLD || raxSize(index) > LAZYFREE_THRESHOLD) { - atomicIncr(lazyfree_objects, listLength(blocks) + raxSize(index)); - bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem, 2, blocks, index); + if (listLength(blocks) > LAZYFREE_THRESHOLD || + raxSize(index) > LAZYFREE_THRESHOLD) + { + atomic_fetch_add_explicit(&lazyfree_objects,listLength(blocks)+raxSize(index),memory_order_relaxed); + bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem,2,blocks,index); } else { listRelease(blocks); raxFree(index); diff --git a/src/module.c b/src/module.c index f37d3a8302..f82f30326d 100644 --- a/src/module.c +++ b/src/module.c @@ -2413,8 +2413,7 @@ void VM_Yield(ValkeyModuleCtx *ctx, int flags, const char *busy_reply) { * after the main thread enters acquiring GIL state in order to protect the event * loop (ae.c) and avoid potential race conditions. */ - int acquiring; - atomicGet(server.module_gil_acquring, acquiring); + int acquiring = atomic_load_explicit(&server.module_gil_acquiring, memory_order_relaxed); if (!acquiring) { /* If the main thread has not yet entered the acquiring GIL state, * we attempt to wake it up and exit without waiting for it to @@ -11823,7 +11822,7 @@ void moduleInitModulesSystem(void) { moduleUnblockedClients = listCreate(); server.loadmodule_queue = listCreate(); server.module_configs_queue = dictCreate(&sdsKeyValueHashDictType); - server.module_gil_acquring = 0; + server.module_gil_acquiring = 0; modules = dictCreate(&modulesDictType); moduleAuthCallbacks = listCreate(); diff --git a/src/networking.c b/src/networking.c index 39eaf8b17e..121931a111 100644 --- a/src/networking.c +++ b/src/networking.c @@ -28,7 +28,6 @@ */ #include "server.h" -#include "atomicvar.h" #include "cluster.h" #include "script.h" #include "fpconv_dtoa.h" @@ -37,6 +36,7 @@ #include #include #include +#include static void setProtocolError(const char *errstr, client *c); static void pauseClientsByClient(mstime_t end, int isPauseClientAll); @@ -128,9 +128,8 @@ client *createClient(connection *conn) { connSetPrivateData(conn, c); } c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size); - selectDb(c, 0); - uint64_t client_id; - atomicGetIncr(server.next_client_id, client_id, 1); + selectDb(c,0); + uint64_t client_id = atomic_fetch_add_explicit(&server.next_client_id,1,memory_order_relaxed); c->id = client_id; #ifdef LOG_REQ_RES reqresReset(c, 0); @@ -1943,7 +1942,7 @@ int _writeToClient(client *c, ssize_t *nwritten) { * thread safe. */ int writeToClient(client *c, int handler_installed) { /* Update total number of writes on server */ - atomicIncr(server.stat_total_writes_processed, 1); + atomic_fetch_add_explicit(&server.stat_total_writes_processed,1, memory_order_relaxed); ssize_t nwritten = 0, totwritten = 0; @@ -1969,9 +1968,9 @@ int writeToClient(client *c, int handler_installed) { } if (getClientType(c) == CLIENT_TYPE_SLAVE) { - atomicIncr(server.stat_net_repl_output_bytes, totwritten); + atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes, totwritten, memory_order_relaxed); } else { - atomicIncr(server.stat_net_output_bytes, totwritten); + atomic_fetch_add_explicit(&server.stat_net_output_bytes, totwritten, memory_order_relaxed); } c->net_output_bytes += totwritten; @@ -2611,7 +2610,7 @@ void readQueryFromClient(connection *conn) { if (postponeClientRead(c)) return; /* Update total number of reads on server */ - atomicIncr(server.stat_total_reads_processed, 1); + atomic_fetch_add_explicit(&server.stat_total_reads_processed,1,memory_order_relaxed); readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply @@ -2677,9 +2676,9 @@ void readQueryFromClient(connection *conn) { c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) { c->read_reploff += nread; - atomicIncr(server.stat_net_repl_input_bytes, nread); + atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes,nread,memory_order_relaxed); } else { - atomicIncr(server.stat_net_input_bytes, nread); + atomic_fetch_add_explicit(&server.stat_net_input_bytes,nread,memory_order_relaxed); } c->net_input_bytes += nread; @@ -2698,7 +2697,7 @@ void readQueryFromClient(connection *conn) { sdsfree(ci); sdsfree(bytes); freeClientAsync(c); - atomicIncr(server.stat_client_qbuf_limit_disconnections, 1); + atomic_fetch_add_explicit(&server.stat_client_qbuf_limit_disconnections,1,memory_order_relaxed); goto done; } @@ -4135,7 +4134,7 @@ void processEventsWhileBlocked(void) { #endif typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) threads_pending { - serverAtomic unsigned long value; + _Atomic unsigned long value; } threads_pending; pthread_t io_threads[IO_THREADS_MAX_NUM]; @@ -4150,13 +4149,12 @@ int io_threads_op; list *io_threads_list[IO_THREADS_MAX_NUM]; static inline unsigned long getIOPendingCount(int i) { - unsigned long count = 0; - atomicGetWithSync(io_threads_pending[i].value, count); + unsigned long count = atomic_load(&io_threads_pending[i].value); return count; } static inline void setIOPendingCount(int i, unsigned long count) { - atomicSetWithSync(io_threads_pending[i].value, count); + atomic_store(&io_threads_pending[i].value, count); } void *IOThreadMain(void *myid) { diff --git a/src/rdb.c b/src/rdb.c index 5e398dee74..abc86566d0 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -39,6 +39,7 @@ #include #include +#include #include #include #include @@ -2888,7 +2889,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { processModuleLoadingProgressEvent(0); } if (server.repl_state == REPL_STATE_TRANSFER && rioCheckType(r) == RIO_TYPE_CONN) { - atomicIncr(server.stat_net_repl_input_bytes, len); + atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes,len,memory_order_relaxed); } } diff --git a/src/replication.c b/src/replication.c index a3268f41db..e2612e75b5 100644 --- a/src/replication.c +++ b/src/replication.c @@ -39,6 +39,7 @@ #include #include #include +#include #include #include @@ -1363,8 +1364,8 @@ void sendBulkToSlave(connection *conn) { freeClient(slave); return; } - atomicIncr(server.stat_net_repl_output_bytes, nwritten); - sdsrange(slave->replpreamble, nwritten, -1); + atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes,nwritten,memory_order_relaxed); + sdsrange(slave->replpreamble,nwritten,-1); if (sdslen(slave->replpreamble) == 0) { sdsfree(slave->replpreamble); slave->replpreamble = NULL; @@ -1391,7 +1392,7 @@ void sendBulkToSlave(connection *conn) { return; } slave->repldboff += nwritten; - atomicIncr(server.stat_net_repl_output_bytes, nwritten); + atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes,nwritten,memory_order_relaxed); if (slave->repldboff == slave->repldbsize) { closeRepldbfd(slave); connSetWriteHandler(slave->conn, NULL); @@ -1433,7 +1434,7 @@ void rdbPipeWriteHandler(struct connection *conn) { return; } else { slave->repldboff += nwritten; - atomicIncr(server.stat_net_repl_output_bytes, nwritten); + atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes,nwritten,memory_order_relaxed); if (slave->repldboff < server.rdb_pipe_bufflen) { slave->repl_last_partial_write = server.unixtime; return; /* more data to write.. */ @@ -1506,7 +1507,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, /* Note: when use diskless replication, 'repldboff' is the offset * of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */ slave->repldboff = nwritten; - atomicIncr(server.stat_net_repl_output_bytes, nwritten); + atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes,nwritten,memory_order_relaxed); } /* If we were unable to write all the data to one of the replicas, * setup write handler (and disable pipe read handler, below) */ @@ -1814,7 +1815,7 @@ void readSyncBulkPayload(connection *conn) { } else { /* nread here is returned by connSyncReadLine(), which calls syncReadLine() and * convert "\r\n" to '\0' so 1 byte is lost. */ - atomicIncr(server.stat_net_repl_input_bytes, nread + 1); + atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes,nread+1,memory_order_relaxed); } if (buf[0] == '-') { @@ -1883,7 +1884,7 @@ void readSyncBulkPayload(connection *conn) { cancelReplicationHandshake(1); return; } - atomicIncr(server.stat_net_repl_input_bytes, nread); + atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes,nread,memory_order_relaxed); /* When a mark is used, we want to detect EOF asap in order to avoid * writing the EOF mark into the file... */ diff --git a/src/server.c b/src/server.c index 3e6dc56d6d..edf215eac2 100644 --- a/src/server.c +++ b/src/server.c @@ -33,7 +33,6 @@ #include "slowlog.h" #include "bio.h" #include "latency.h" -#include "atomicvar.h" #include "mt19937-64.h" #include "functions.h" #include "hdr_histogram.h" @@ -1075,7 +1074,7 @@ static inline void updateCachedTimeWithUs(int update_daylight_info, const long l server.ustime = ustime; server.mstime = server.ustime / 1000; time_t unixtime = server.mstime / 1000; - atomicSet(server.unixtime, unixtime); + atomic_store_explicit(&server.unixtime, unixtime, memory_order_relaxed); /* To get information about daylight saving time, we need to call * localtime_r and cache the result. However calling localtime_r in this @@ -1258,10 +1257,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { run_with_period(100) { long long stat_net_input_bytes, stat_net_output_bytes; long long stat_net_repl_input_bytes, stat_net_repl_output_bytes; - atomicGet(server.stat_net_input_bytes, stat_net_input_bytes); - atomicGet(server.stat_net_output_bytes, stat_net_output_bytes); - atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes); - atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes); + + stat_net_input_bytes = atomic_load_explicit(&server.stat_net_input_bytes,memory_order_relaxed); + stat_net_output_bytes = atomic_load_explicit(&server.stat_net_output_bytes,memory_order_relaxed); + stat_net_repl_input_bytes = atomic_load_explicit(&server.stat_net_repl_input_bytes,memory_order_relaxed); + stat_net_repl_output_bytes = atomic_load_explicit(&server.stat_net_repl_output_bytes,memory_order_relaxed); + monotime current_time = getMonotonicUs(); long long factor = 1000000; // us trackInstantaneousMetric(STATS_METRIC_COMMAND, server.stat_numcommands, current_time, factor); @@ -1667,8 +1668,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * If an initial rewrite is in progress then not all data is guaranteed to have actually been * persisted to disk yet, so we cannot update the field. We will wait for the rewrite to complete. */ if (server.aof_state == AOF_ON && server.fsynced_reploff != -1) { - long long fsynced_reploff_pending; - atomicGet(server.fsynced_reploff_pending, fsynced_reploff_pending); + long long fsynced_reploff_pending = atomic_load_explicit(&server.fsynced_reploff_pending, memory_order_relaxed); server.fsynced_reploff = fsynced_reploff_pending; /* If we have blocked [WAIT]AOF clients, and fsynced_reploff changed, we want to try to @@ -1737,11 +1737,12 @@ void afterSleep(struct aeEventLoop *eventLoop) { if (moduleCount()) { mstime_t latency; latencyStartMonitor(latency); - - atomicSet(server.module_gil_acquring, 1); + atomic_store_explicit(&server.module_gil_acquiring,1,memory_order_relaxed); moduleAcquireGIL(); - atomicSet(server.module_gil_acquring, 0); - moduleFireServerEvent(VALKEYMODULE_EVENT_EVENTLOOP, VALKEYMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP, NULL); + atomic_store_explicit(&server.module_gil_acquiring,0,memory_order_relaxed); + moduleFireServerEvent(VALKEYMODULE_EVENT_EVENTLOOP, + VALKEYMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP, + NULL); latencyEndMonitor(latency); latencyAddSampleIfNeeded("module-acquire-GIL", latency); } @@ -1990,7 +1991,7 @@ void initServerConfig(void) { server.aof_flush_sleep = 0; server.aof_last_fsync = time(NULL) * 1000; server.aof_cur_timestamp = 0; - atomicSet(server.aof_bio_fsync_status, C_OK); + atomic_store_explicit(&server.aof_bio_fsync_status,C_OK,memory_order_relaxed); server.aof_rewrite_time_last = -1; server.aof_rewrite_time_start = -1; server.aof_lastbgrewrite_status = C_OK; @@ -2480,10 +2481,10 @@ void resetServerStats(void) { server.stat_sync_partial_ok = 0; server.stat_sync_partial_err = 0; server.stat_io_reads_processed = 0; - atomicSet(server.stat_total_reads_processed, 0); + atomic_store_explicit(&server.stat_total_reads_processed,0,memory_order_relaxed); server.stat_io_writes_processed = 0; - atomicSet(server.stat_total_writes_processed, 0); - atomicSet(server.stat_client_qbuf_limit_disconnections, 0); + atomic_store_explicit(&server.stat_total_writes_processed,0,memory_order_relaxed); + atomic_store_explicit(&server.stat_client_qbuf_limit_disconnections,0,memory_order_relaxed); server.stat_client_outbuf_limit_disconnections = 0; for (j = 0; j < STATS_METRIC_COUNT; j++) { server.inst_metric[j].idx = 0; @@ -2494,10 +2495,10 @@ void resetServerStats(void) { server.stat_aof_rewrites = 0; server.stat_rdb_saves = 0; server.stat_aofrw_consecutive_failures = 0; - atomicSet(server.stat_net_input_bytes, 0); - atomicSet(server.stat_net_output_bytes, 0); - atomicSet(server.stat_net_repl_input_bytes, 0); - atomicSet(server.stat_net_repl_output_bytes, 0); + atomic_store_explicit(&server.stat_net_input_bytes,0,memory_order_relaxed); + atomic_store_explicit(&server.stat_net_output_bytes,0,memory_order_relaxed); + atomic_store_explicit(&server.stat_net_repl_input_bytes,0,memory_order_relaxed); + atomic_store_explicit(&server.stat_net_repl_output_bytes,0,memory_order_relaxed); server.stat_unexpected_error_replies = 0; server.stat_total_error_replies = 0; server.stat_dump_payload_sanitizations = 0; @@ -4380,10 +4381,9 @@ int writeCommandsDeniedByDiskError(void) { return DISK_ERROR_TYPE_AOF; } /* AOF fsync error. */ - int aof_bio_fsync_status; - atomicGet(server.aof_bio_fsync_status, aof_bio_fsync_status); + int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_acquire); if (aof_bio_fsync_status == C_ERR) { - atomicGet(server.aof_bio_fsync_errno, server.aof_last_write_errno); + server.aof_last_write_errno = atomic_load_explicit(&server.aof_bio_fsync_errno, memory_order_relaxed); return DISK_ERROR_TYPE_AOF; } } @@ -5374,7 +5374,6 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "arch_bits:%i\r\n", server.arch_bits, "monotonic_clock:%s\r\n", monotonicInfoString(), "multiplexing_api:%s\r\n", aeGetApiName(), - "atomicvar_api:%s\r\n", REDIS_ATOMIC_API, "gcc_version:%s\r\n", GNUC_VERSION_STR, "process_id:%I\r\n", (int64_t) getpid(), "process_supervised:%s\r\n", supervised, @@ -5531,8 +5530,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { } else if (server.stat_current_save_keys_total) { fork_perc = ((double)server.stat_current_save_keys_processed / server.stat_current_save_keys_total) * 100; } - int aof_bio_fsync_status; - atomicGet(server.aof_bio_fsync_status, aof_bio_fsync_status); + int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status,memory_order_relaxed); /* clang-format off */ info = sdscatprintf(info, "# Persistence\r\n" FMTARGS( @@ -5631,13 +5629,14 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { long long current_active_defrag_time = server.stat_last_active_defrag_time ? (long long)elapsedUs(server.stat_last_active_defrag_time) : 0; long long stat_client_qbuf_limit_disconnections; - atomicGet(server.stat_total_reads_processed, stat_total_reads_processed); - atomicGet(server.stat_total_writes_processed, stat_total_writes_processed); - atomicGet(server.stat_net_input_bytes, stat_net_input_bytes); - atomicGet(server.stat_net_output_bytes, stat_net_output_bytes); - atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes); - atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes); - atomicGet(server.stat_client_qbuf_limit_disconnections, stat_client_qbuf_limit_disconnections); + + stat_total_reads_processed = atomic_load_explicit(&server.stat_total_reads_processed, memory_order_relaxed); + stat_total_writes_processed = atomic_load_explicit(&server.stat_total_writes_processed, memory_order_relaxed); + stat_net_input_bytes = atomic_load_explicit(&server.stat_net_input_bytes, memory_order_relaxed); + stat_net_output_bytes = atomic_load_explicit(&server.stat_net_output_bytes, memory_order_relaxed); + stat_net_repl_input_bytes = atomic_load_explicit(&server.stat_net_repl_input_bytes, memory_order_relaxed); + stat_net_repl_output_bytes = atomic_load_explicit(&server.stat_net_repl_output_bytes, memory_order_relaxed); + stat_client_qbuf_limit_disconnections = atomic_load_explicit(&server.stat_client_qbuf_limit_disconnections, memory_order_relaxed); if (sections++) info = sdscat(info, "\r\n"); /* clang-format off */ diff --git a/src/server.h b/src/server.h index abf66fbf5a..70beb54f43 100644 --- a/src/server.h +++ b/src/server.h @@ -34,12 +34,12 @@ #include "config.h" #include "solarisfixes.h" #include "rio.h" -#include "atomicvar.h" #include "commands.h" #include #include #include +#include #include #include #include @@ -1631,7 +1631,7 @@ struct valkeyServer { int module_pipe[2]; /* Pipe used to awake the event loop by module threads. */ pid_t child_pid; /* PID of current child */ int child_type; /* Type of current child */ - serverAtomic int module_gil_acquring; /* Indicates whether the GIL is being acquiring by the main thread. */ + _Atomic int module_gil_acquiring; /* Indicates whether the GIL is being acquiring by the main thread. */ /* Networking */ int port; /* TCP listening port */ int tls_port; /* TLS listening port */ @@ -1669,13 +1669,13 @@ struct valkeyServer { uint32_t paused_actions; /* Bitmask of actions that are currently paused */ list *postponed_clients; /* List of postponed clients */ pause_event client_pause_per_purpose[NUM_PAUSE_PURPOSES]; - char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ - dict *migrate_cached_sockets; /* MIGRATE cached sockets */ - serverAtomic uint64_t next_client_id; /* Next client unique ID. Incremental. */ - int protected_mode; /* Don't accept external connections. */ - int io_threads_num; /* Number of IO threads to use. */ - int io_threads_do_reads; /* Read and parse from IO threads? */ - int io_threads_active; /* Is IO threads currently active? */ + char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ + dict *migrate_cached_sockets;/* MIGRATE cached sockets */ + _Atomic uint64_t next_client_id; /* Next client unique ID. Incremental. */ + int protected_mode; /* Don't accept external connections. */ + int io_threads_num; /* Number of IO threads to use. */ + int io_threads_do_reads; /* Read and parse from IO threads? */ + int io_threads_active; /* Is IO threads currently active? */ long long events_processed_while_blocked; /* processEventsWhileBlocked() */ int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */ int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */ @@ -1696,65 +1696,61 @@ struct valkeyServer { long long stat_expiredkeys; /* Number of expired keys */ double stat_expired_stale_perc; /* Percentage of keys probably expired */ long long stat_expired_time_cap_reached_count; /* Early expire cycle stops.*/ - long long stat_expire_cycle_time_used; /* Cumulative microseconds used. */ - long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */ - long long stat_evictedclients; /* Number of evicted clients */ - long long stat_evictedscripts; /* Number of evicted lua scripts. */ - long long stat_total_eviction_exceeded_time; /* Total time over the memory limit, unit us */ - monotime stat_last_eviction_exceeded_time; /* Timestamp of current eviction start, unit us */ - long long stat_keyspace_hits; /* Number of successful lookups of keys */ - long long stat_keyspace_misses; /* Number of failed lookups of keys */ - long long stat_active_defrag_hits; /* number of allocations moved */ - long long stat_active_defrag_misses; /* number of allocations scanned but not moved */ - long long stat_active_defrag_key_hits; /* number of keys with moved allocations */ - long long stat_active_defrag_key_misses; /* number of keys scanned and not moved */ - long long stat_active_defrag_scanned; /* number of dictEntries scanned */ - long long stat_total_active_defrag_time; /* Total time memory fragmentation over the limit, unit us */ - monotime stat_last_active_defrag_time; /* Timestamp of current active defrag start */ - size_t stat_peak_memory; /* Max used memory record */ - long long stat_aof_rewrites; /* number of aof file rewrites performed */ - long long stat_aofrw_consecutive_failures; /* The number of consecutive failures of aofrw */ - long long stat_rdb_saves; /* number of rdb saves performed */ - long long stat_fork_time; /* Time needed to perform latest fork() */ - double stat_fork_rate; /* Fork rate in GB/sec. */ - long long stat_total_forks; /* Total count of fork. */ - long long stat_rejected_conn; /* Clients rejected because of maxclients */ - long long stat_sync_full; /* Number of full resyncs with slaves. */ - long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */ - long long stat_sync_partial_err; /* Number of unaccepted PSYNC requests. */ - list *slowlog; /* SLOWLOG list of commands */ - long long slowlog_entry_id; /* SLOWLOG current entry ID */ - long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ - unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ - struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */ - serverAtomic long long stat_net_input_bytes; /* Bytes read from network. */ - serverAtomic long long stat_net_output_bytes; /* Bytes written to network. */ - serverAtomic long long - stat_net_repl_input_bytes; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */ - serverAtomic long long - stat_net_repl_output_bytes; /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */ - size_t stat_current_cow_peak; /* Peak size of copy on write bytes. */ - size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */ - monotime stat_current_cow_updated; /* Last update time of stat_current_cow_bytes */ - size_t stat_current_save_keys_processed; /* Processed keys while child is active. */ - size_t stat_current_save_keys_total; /* Number of keys when child started. */ - size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ - size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ - size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ - double stat_module_progress; /* Module save progress. */ - size_t stat_clients_type_memory[CLIENT_TYPE_COUNT]; /* Mem usage by type */ - size_t stat_cluster_links_memory; /* Mem usage by cluster links */ - long long - stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */ + long long stat_expire_cycle_time_used; /* Cumulative microseconds used. */ + long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */ + long long stat_evictedclients; /* Number of evicted clients */ + long long stat_evictedscripts; /* Number of evicted lua scripts. */ + long long stat_total_eviction_exceeded_time; /* Total time over the memory limit, unit us */ + monotime stat_last_eviction_exceeded_time; /* Timestamp of current eviction start, unit us */ + long long stat_keyspace_hits; /* Number of successful lookups of keys */ + long long stat_keyspace_misses; /* Number of failed lookups of keys */ + long long stat_active_defrag_hits; /* number of allocations moved */ + long long stat_active_defrag_misses; /* number of allocations scanned but not moved */ + long long stat_active_defrag_key_hits; /* number of keys with moved allocations */ + long long stat_active_defrag_key_misses;/* number of keys scanned and not moved */ + long long stat_active_defrag_scanned; /* number of dictEntries scanned */ + long long stat_total_active_defrag_time; /* Total time memory fragmentation over the limit, unit us */ + monotime stat_last_active_defrag_time; /* Timestamp of current active defrag start */ + size_t stat_peak_memory; /* Max used memory record */ + long long stat_aof_rewrites; /* number of aof file rewrites performed */ + long long stat_aofrw_consecutive_failures; /* The number of consecutive failures of aofrw */ + long long stat_rdb_saves; /* number of rdb saves performed */ + long long stat_fork_time; /* Time needed to perform latest fork() */ + double stat_fork_rate; /* Fork rate in GB/sec. */ + long long stat_total_forks; /* Total count of fork. */ + long long stat_rejected_conn; /* Clients rejected because of maxclients */ + long long stat_sync_full; /* Number of full resyncs with slaves. */ + long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */ + long long stat_sync_partial_err;/* Number of unaccepted PSYNC requests. */ + list *slowlog; /* SLOWLOG list of commands */ + long long slowlog_entry_id; /* SLOWLOG current entry ID */ + long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ + unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ + struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */ + _Atomic long long stat_net_input_bytes; /* Bytes read from network. */ + _Atomic long long stat_net_output_bytes; /* Bytes written to network. */ + _Atomic long long stat_net_repl_input_bytes; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */ + _Atomic long long stat_net_repl_output_bytes; /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */ + size_t stat_current_cow_peak; /* Peak size of copy on write bytes. */ + size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */ + monotime stat_current_cow_updated; /* Last update time of stat_current_cow_bytes */ + size_t stat_current_save_keys_processed; /* Processed keys while child is active. */ + size_t stat_current_save_keys_total; /* Number of keys when child started. */ + size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ + size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ + size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ + double stat_module_progress; /* Module save progress. */ + size_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */ + size_t stat_cluster_links_memory; /* Mem usage by cluster links */ + long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */ long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */ - long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ - long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */ - long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */ - serverAtomic long long stat_total_reads_processed; /* Total number of read events processed */ - serverAtomic long long stat_total_writes_processed; /* Total number of write events processed */ - serverAtomic long long - stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ - long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */ + long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ + long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */ + long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */ + _Atomic long long stat_total_reads_processed; /* Total number of read events processed */ + _Atomic long long stat_total_writes_processed; /* Total number of write events processed */ + _Atomic long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ + long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */ /* The following two are used to track instantaneous metrics, like * number of operations per second, network traffic. */ struct { @@ -1814,43 +1810,43 @@ struct valkeyServer { unsigned int max_new_conns_per_cycle; /* The maximum number of tcp connections that will be accepted during each invocation of the event loop. */ /* AOF persistence */ - int aof_enabled; /* AOF configuration */ - int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ - int aof_fsync; /* Kind of fsync() policy */ - char *aof_filename; /* Basename of the AOF file and manifest file */ - char *aof_dirname; /* Name of the AOF directory */ - int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */ - int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */ - off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ - off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ - off_t aof_current_size; /* AOF current size (Including BASE + INCRs). */ - off_t aof_last_incr_size; /* The size of the latest incr AOF. */ - off_t aof_last_incr_fsync_offset; /* AOF offset which is already requested to be synced to disk. - * Compare with the aof_last_incr_size. */ - int aof_flush_sleep; /* Micros to sleep before flush. (used by tests) */ - int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ - sds aof_buf; /* AOF buffer, written before entering the event loop */ - int aof_fd; /* File descriptor of currently selected AOF file */ - int aof_selected_db; /* Currently selected DB in AOF */ - mstime_t aof_flush_postponed_start; /* mstime of postponed AOF flush */ - mstime_t aof_last_fsync; /* mstime of last fsync() */ - time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */ - time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ - time_t aof_cur_timestamp; /* Current record timestamp in AOF */ - int aof_timestamp_enabled; /* Enable record timestamp in AOF */ - int aof_lastbgrewrite_status; /* C_OK or C_ERR */ - unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */ - int aof_rewrite_incremental_fsync; /* fsync incrementally while aof rewriting? */ - int rdb_save_incremental_fsync; /* fsync incrementally while rdb saving? */ - int aof_last_write_status; /* C_OK or C_ERR */ - int aof_last_write_errno; /* Valid if aof write/fsync status is ERR */ - int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */ - int aof_use_rdb_preamble; /* Specify base AOF to use RDB encoding on AOF rewrites. */ - serverAtomic int aof_bio_fsync_status; /* Status of AOF fsync in bio job. */ - serverAtomic int aof_bio_fsync_errno; /* Errno of AOF fsync in bio job. */ - aofManifest *aof_manifest; /* Used to track AOFs. */ - int aof_disable_auto_gc; /* If disable automatically deleting HISTORY type AOFs? - default no. (for testings). */ + int aof_enabled; /* AOF configuration */ + int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ + int aof_fsync; /* Kind of fsync() policy */ + char *aof_filename; /* Basename of the AOF file and manifest file */ + char *aof_dirname; /* Name of the AOF directory */ + int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */ + int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */ + off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ + off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ + off_t aof_current_size; /* AOF current size (Including BASE + INCRs). */ + off_t aof_last_incr_size; /* The size of the latest incr AOF. */ + off_t aof_last_incr_fsync_offset; /* AOF offset which is already requested to be synced to disk. + * Compare with the aof_last_incr_size. */ + int aof_flush_sleep; /* Micros to sleep before flush. (used by tests) */ + int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ + sds aof_buf; /* AOF buffer, written before entering the event loop */ + int aof_fd; /* File descriptor of currently selected AOF file */ + int aof_selected_db; /* Currently selected DB in AOF */ + mstime_t aof_flush_postponed_start; /* mstime of postponed AOF flush */ + mstime_t aof_last_fsync; /* mstime of last fsync() */ + time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */ + time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ + time_t aof_cur_timestamp; /* Current record timestamp in AOF */ + int aof_timestamp_enabled; /* Enable record timestamp in AOF */ + int aof_lastbgrewrite_status; /* C_OK or C_ERR */ + unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */ + int aof_rewrite_incremental_fsync;/* fsync incrementally while aof rewriting? */ + int rdb_save_incremental_fsync; /* fsync incrementally while rdb saving? */ + int aof_last_write_status; /* C_OK or C_ERR */ + int aof_last_write_errno; /* Valid if aof write/fsync status is ERR */ + int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */ + int aof_use_rdb_preamble; /* Specify base AOF to use RDB encoding on AOF rewrites. */ + _Atomic int aof_bio_fsync_status; /* Status of AOF fsync in bio job. */ + _Atomic int aof_bio_fsync_errno; /* Errno of AOF fsync in bio job. */ + aofManifest *aof_manifest; /* Used to track AOFs. */ + int aof_disable_auto_gc; /* If disable automatically deleting HISTORY type AOFs? + default no. (for testings). */ /* RDB persistence */ long long dirty; /* Changes to DB from the last save */ @@ -1908,35 +1904,35 @@ struct valkeyServer { int shutdown_on_sigterm; /* Shutdown flags configured for SIGTERM. */ /* Replication (master) */ - char replid[CONFIG_RUN_ID_SIZE + 1]; /* My current replication ID. */ - char replid2[CONFIG_RUN_ID_SIZE + 1]; /* replid inherited from master*/ - long long master_repl_offset; /* My current replication offset */ - long long second_replid_offset; /* Accept offsets up to this for replid2. */ - serverAtomic long long fsynced_reploff_pending; /* Largest replication offset to - * potentially have been fsynced, applied to - fsynced_reploff only when AOF state is AOF_ON - (not during the initial rewrite) */ - long long fsynced_reploff; /* Largest replication offset that has been confirmed to be fsynced */ - int slaveseldb; /* Last SELECTed DB in replication output */ - int repl_ping_slave_period; /* Master pings the slave every N seconds */ - replBacklog *repl_backlog; /* Replication backlog for partial syncs */ - long long repl_backlog_size; /* Backlog circular buffer size */ - time_t repl_backlog_time_limit; /* Time without slaves after the backlog - gets released. */ - time_t repl_no_slaves_since; /* We have no slaves since that time. - Only valid if server.slaves len is 0. */ - int repl_min_slaves_to_write; /* Min number of slaves to write. */ - int repl_min_slaves_max_lag; /* Max lag of slaves to write. */ - int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ - int repl_diskless_sync; /* Master send RDB to slaves sockets directly. */ - int repl_diskless_load; /* Slave parse RDB directly from the socket. - * see REPL_DISKLESS_LOAD_* enum */ - int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ - int repl_diskless_sync_max_replicas; /* Max replicas for diskless repl BGSAVE - * delay (start sooner if they all connect). */ - size_t repl_buffer_mem; /* The memory of replication buffer. */ - list *repl_buffer_blocks; /* Replication buffers blocks list - * (serving replica clients and repl backlog) */ + char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */ + char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/ + long long master_repl_offset; /* My current replication offset */ + long long second_replid_offset; /* Accept offsets up to this for replid2. */ + _Atomic long long fsynced_reploff_pending;/* Largest replication offset to + * potentially have been fsynced, applied to + fsynced_reploff only when AOF state is AOF_ON + (not during the initial rewrite) */ + long long fsynced_reploff; /* Largest replication offset that has been confirmed to be fsynced */ + int slaveseldb; /* Last SELECTed DB in replication output */ + int repl_ping_slave_period; /* Master pings the slave every N seconds */ + replBacklog *repl_backlog; /* Replication backlog for partial syncs */ + long long repl_backlog_size; /* Backlog circular buffer size */ + time_t repl_backlog_time_limit; /* Time without slaves after the backlog + gets released. */ + time_t repl_no_slaves_since; /* We have no slaves since that time. + Only valid if server.slaves len is 0. */ + int repl_min_slaves_to_write; /* Min number of slaves to write. */ + int repl_min_slaves_max_lag; /* Max lag of slaves to write. */ + int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ + int repl_diskless_sync; /* Master send RDB to slaves sockets directly. */ + int repl_diskless_load; /* Slave parse RDB directly from the socket. + * see REPL_DISKLESS_LOAD_* enum */ + int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ + int repl_diskless_sync_max_replicas;/* Max replicas for diskless repl BGSAVE + * delay (start sooner if they all connect). */ + size_t repl_buffer_mem; /* The memory of replication buffer. */ + list *repl_buffer_blocks; /* Replication buffers blocks list + * (serving replica clients and repl backlog) */ /* Replication (slave) */ char *masteruser; /* AUTH with this user and masterauth with master */ sds masterauth; /* AUTH with this password with master */ @@ -2020,14 +2016,14 @@ struct valkeyServer { int list_max_listpack_size; int list_compress_depth; /* time cache */ - serverAtomic time_t unixtime; /* Unix time sampled every cron cycle. */ - time_t timezone; /* Cached timezone. As set by tzset(). */ - int daylight_active; /* Currently in daylight saving time. */ - mstime_t mstime; /* 'unixtime' in milliseconds. */ - ustime_t ustime; /* 'unixtime' in microseconds. */ - mstime_t cmd_time_snapshot; /* Time snapshot of the root execution nesting. */ - size_t blocking_op_nesting; /* Nesting level of blocking operation, used to reset blocked_last_cron. */ - long long blocked_last_cron; /* Indicate the mstime of the last time we did cron jobs from a blocking operation */ + _Atomic time_t unixtime; /* Unix time sampled every cron cycle. */ + time_t timezone; /* Cached timezone. As set by tzset(). */ + int daylight_active; /* Currently in daylight saving time. */ + mstime_t mstime; /* 'unixtime' in milliseconds. */ + ustime_t ustime; /* 'unixtime' in microseconds. */ + mstime_t cmd_time_snapshot; /* Time snapshot of the root execution nesting. */ + size_t blocking_op_nesting; /* Nesting level of blocking operation, used to reset blocked_last_cron. */ + long long blocked_last_cron; /* Indicate the mstime of the last time we did cron jobs from a blocking operation */ /* Pubsub */ kvstore *pubsub_channels; /* Map channels to list of subscribed clients */ dict *pubsub_patterns; /* A dict of pubsub_patterns */ diff --git a/src/threads_mngr.c b/src/threads_mngr.c index 4dbf1ea427..e1f1d7e7b4 100644 --- a/src/threads_mngr.c +++ b/src/threads_mngr.c @@ -32,12 +32,12 @@ #define UNUSED(V) ((void)V) #ifdef __linux__ -#include "atomicvar.h" #include "server.h" #include #include #include +#include #define IN_PROGRESS 1 static const clock_t RUN_ON_THREADS_TIMEOUT = 2; @@ -46,10 +46,10 @@ static const clock_t RUN_ON_THREADS_TIMEOUT = 2; static run_on_thread_cb g_callback = NULL; static volatile size_t g_tids_len = 0; -static serverAtomic size_t g_num_threads_done = 0; +static _Atomic size_t g_num_threads_done = 0; /* This flag is set while ThreadsManager_runOnThreads is running */ -static serverAtomic int g_in_progress = 0; +static _Atomic int g_in_progress = 0; /*============================ Internal prototypes ========================== */ @@ -111,9 +111,8 @@ __attribute__((noinline)) int ThreadsManager_runOnThreads(pid_t *tids, size_t ti static int test_and_start(void) { - /* atomicFlagGetSet sets the variable to 1 and returns the previous value */ - int prev_state; - atomicFlagGetSet(g_in_progress, prev_state); + /* atomic_exchange_explicit sets the variable to 1 and returns the previous value */ + int prev_state = atomic_exchange_explicit(&g_in_progress,1,memory_order_relaxed); /* If prev_state is 1, g_in_progress was on. */ return prev_state; @@ -124,7 +123,7 @@ __attribute__((noinline)) static void invoke_callback(int sig) { run_on_thread_cb callback = g_callback; if (callback) { callback(); - atomicIncr(g_num_threads_done, 1); + atomic_fetch_add_explicit(&g_num_threads_done,1,memory_order_relaxed); } else { serverLogFromHandler(LL_WARNING, "tid %ld: ThreadsManager g_callback is NULL", syscall(SYS_gettid)); } @@ -146,7 +145,7 @@ static void wait_threads(void) { /* Sleep a bit to yield to other threads. */ /* usleep isn't listed as signal safe, so we use select instead */ select(0, NULL, NULL, NULL, &tv); - atomicGet(g_num_threads_done, curr_done_count); + curr_done_count = atomic_load_explicit(&g_num_threads_done,memory_order_relaxed); clock_gettime(CLOCK_REALTIME, &curr_time); } while (curr_done_count < g_tids_len && curr_time.tv_sec <= timeout_time.tv_sec); @@ -161,7 +160,7 @@ static void ThreadsManager_cleanups(void) { g_num_threads_done = 0; /* Lastly, turn off g_in_progress */ - atomicSet(g_in_progress, 0); + atomic_store_explicit(&g_in_progress,0,memory_order_relaxed); } #else diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 0232d631b4..2eec75a10a 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -41,6 +41,7 @@ #include #include #include +#include #include /* Use hiredis' sds compat header that maps sds calls to their hi_ variants */ #include /* Use hiredis sds. */ @@ -54,7 +55,6 @@ #include "adlist.h" #include "dict.h" #include "zmalloc.h" -#include "atomicvar.h" #include "crc16_slottable.h" #include "hdr_histogram.h" #include "cli_common.h" @@ -84,11 +84,11 @@ static struct config { int tls; struct cliSSLconfig sslconfig; int numclients; - serverAtomic int liveclients; + _Atomic int liveclients; int requests; - serverAtomic int requests_issued; - serverAtomic int requests_finished; - serverAtomic int previous_requests_finished; + _Atomic int requests_issued; + _Atomic int requests_finished; + _Atomic int previous_requests_finished; int last_printed_bytes; long long previous_tick; int keysize; @@ -115,11 +115,11 @@ static struct config { int cluster_node_count; struct clusterNode **cluster_nodes; struct serverConfig *redis_config; - struct hdr_histogram *latency_histogram; - struct hdr_histogram *current_sec_latency_histogram; - serverAtomic int is_fetching_slots; - serverAtomic int is_updating_slots; - serverAtomic int slots_last_update; + struct hdr_histogram* latency_histogram; + struct hdr_histogram* current_sec_latency_histogram; + _Atomic int is_fetching_slots; + _Atomic int is_updating_slots; + _Atomic int slots_last_update; int enable_tracking; pthread_mutex_t liveclients_mutex; pthread_mutex_t is_updating_slots_mutex; @@ -344,8 +344,7 @@ static void freeClient(client c) { aeDeleteFileEvent(el, c->context->fd, AE_WRITABLE); aeDeleteFileEvent(el, c->context->fd, AE_READABLE); if (c->thread_id >= 0) { - int requests_finished = 0; - atomicGet(config.requests_finished, requests_finished); + int requests_finished = atomic_load_explicit(&config.requests_finished,memory_order_relaxed); if (requests_finished >= config.requests) { aeStop(el); } @@ -403,8 +402,7 @@ static void setClusterKeyHashTag(client c) { assert(c->thread_id >= 0); clusterNode *node = c->cluster_node; assert(node); - int is_updating_slots = 0; - atomicGet(config.is_updating_slots, is_updating_slots); + int is_updating_slots = atomic_load_explicit(&config.is_updating_slots,memory_order_relaxed); /* If updateClusterSlotsConfiguration is updating the slots array, * call updateClusterSlotsConfiguration is order to block the thread * since the mutex is locked. When the slots will be updated by the @@ -425,8 +423,7 @@ static void setClusterKeyHashTag(client c) { } static void clientDone(client c) { - int requests_finished = 0; - atomicGet(config.requests_finished, requests_finished); + int requests_finished = atomic_load_explicit(&config.requests_finished,memory_order_relaxed); if (requests_finished >= config.requests) { freeClient(c); if (!config.num_threads && config.el) aeStop(config.el); @@ -520,28 +517,23 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } continue; } - int requests_finished = 0; - atomicGetIncr(config.requests_finished, requests_finished, 1); - if (requests_finished < config.requests) { - if (config.num_threads == 0) { - hdr_record_value(config.latency_histogram, // Histogram to record to - (long)c->latency <= CONFIG_LATENCY_HISTOGRAM_MAX_VALUE - ? (long)c->latency - : CONFIG_LATENCY_HISTOGRAM_MAX_VALUE); // Value to record - hdr_record_value(config.current_sec_latency_histogram, // Histogram to record to - (long)c->latency <= CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE - ? (long)c->latency - : CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE); // Value to record - } else { - hdr_record_value_atomic(config.latency_histogram, // Histogram to record to - (long)c->latency <= CONFIG_LATENCY_HISTOGRAM_MAX_VALUE - ? (long)c->latency - : CONFIG_LATENCY_HISTOGRAM_MAX_VALUE); // Value to record - hdr_record_value_atomic(config.current_sec_latency_histogram, // Histogram to record to - (long)c->latency <= CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE - ? (long)c->latency - : CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE); // Value to record - } + int requests_finished = atomic_fetch_add_explicit(&config.requests_finished,1,memory_order_relaxed); + if (requests_finished < config.requests){ + if (config.num_threads == 0) { + hdr_record_value( + config.latency_histogram, // Histogram to record to + (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_MAX_VALUE); // Value to record + hdr_record_value( + config.current_sec_latency_histogram, // Histogram to record to + (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE); // Value to record + } else { + hdr_record_value_atomic( + config.latency_histogram, // Histogram to record to + (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_MAX_VALUE); // Value to record + hdr_record_value_atomic( + config.current_sec_latency_histogram, // Histogram to record to + (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE); // Value to record + } } c->pending--; if (c->pending == 0) { @@ -564,8 +556,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { /* Initialize request when nothing was written. */ if (c->written == 0) { /* Enforce upper bound to number of requests. */ - int requests_issued = 0; - atomicGetIncr(config.requests_issued, requests_issued, config.pipeline); + int requests_issued = atomic_fetch_add_explicit(&config.requests_issued,config.pipeline,memory_order_relaxed); if (requests_issued >= config.requests) { return; } @@ -573,7 +564,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { /* Really initialize: randomize keys and set start time. */ if (config.randomkeys) randomizeClientKey(c); if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c); - atomicGet(config.slots_last_update, c->slots_last_update); + c->slots_last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed); c->start = ustime(); c->latency = -1; } @@ -803,9 +794,10 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { /* In idle mode, clients still need to register readHandler for catching errors */ aeCreateFileEvent(el, c->context->fd, AE_READABLE, readHandler, c); - listAddNodeTail(config.clients, c); - atomicIncr(config.liveclients, 1); - atomicGet(config.slots_last_update, c->slots_last_update); + listAddNodeTail(config.clients,c); + atomic_fetch_add_explicit(&config.liveclients, 1, memory_order_relaxed); + + c->slots_last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed); return c; } @@ -1231,16 +1223,19 @@ static int fetchClusterSlotsConfiguration(client c) { UNUSED(c); int success = 1, is_fetching_slots = 0, last_update = 0; size_t i; - atomicGet(config.slots_last_update, last_update); + + last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed); if (c->slots_last_update < last_update) { c->slots_last_update = last_update; return -1; } redisReply *reply = NULL; - atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1); - if (is_fetching_slots) return -1; // TODO: use other codes || errno ? - atomicSet(config.is_fetching_slots, 1); - fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n"); + + is_fetching_slots = atomic_fetch_add_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); + if (is_fetching_slots) return -1; //TODO: use other codes || errno ? + atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); + fprintf(stderr, + "WARNING: Cluster slots configuration changed, fetching new one...\n"); const char *errmsg = "Failed to update cluster slots configuration"; static dictType dtype = { dictSdsHash, /* hash function */ @@ -1310,14 +1305,15 @@ static int fetchClusterSlotsConfiguration(client c) { freeReplyObject(reply); redisFree(ctx); dictRelease(masters); - atomicSet(config.is_fetching_slots, 0); + atomic_store_explicit(&config.is_fetching_slots, 0, memory_order_relaxed); return success; } /* Atomically update the new slots configuration. */ static void updateClusterSlotsConfiguration(void) { pthread_mutex_lock(&config.is_updating_slots_mutex); - atomicSet(config.is_updating_slots, 1); + atomic_store_explicit(&config.is_updating_slots, 1, memory_order_relaxed); + int i; for (i = 0; i < config.cluster_node_count; i++) { clusterNode *node = config.cluster_nodes[i]; @@ -1330,8 +1326,8 @@ static void updateClusterSlotsConfiguration(void) { zfree(oldslots); } } - atomicSet(config.is_updating_slots, 0); - atomicIncr(config.slots_last_update, 1); + atomic_store_explicit(&config.is_updating_slots, 0, memory_order_relaxed); + atomic_fetch_add_explicit(&config.slots_last_update, 1, memory_order_relaxed); pthread_mutex_unlock(&config.is_updating_slots_mutex); } @@ -1615,13 +1611,10 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData UNUSED(eventLoop); UNUSED(id); benchmarkThread *thread = (benchmarkThread *)clientData; - int liveclients = 0; - int requests_finished = 0; - int previous_requests_finished = 0; + int liveclients = atomic_load_explicit(&config.liveclients, memory_order_relaxed); + int requests_finished = atomic_load_explicit(&config.requests_finished, memory_order_relaxed); + int previous_requests_finished = atomic_load_explicit(&config.previous_requests_finished, memory_order_relaxed); long long current_tick = mstime(); - atomicGet(config.liveclients, liveclients); - atomicGet(config.requests_finished, requests_finished); - atomicGet(config.previous_requests_finished, previous_requests_finished); if (liveclients == 0 && requests_finished != config.requests) { fprintf(stderr, "All clients disconnected... aborting.\n"); @@ -1646,7 +1639,7 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData const float instantaneous_dt = (float)(current_tick - config.previous_tick) / 1000.0; const float instantaneous_rps = (float)(requests_finished - previous_requests_finished) / instantaneous_dt; config.previous_tick = current_tick; - atomicSet(config.previous_requests_finished, requests_finished); + atomic_store_explicit(&config.previous_requests_finished, requests_finished, memory_order_relaxed); printf("%*s\r", config.last_printed_bytes, " "); /* ensure there is a clean line */ int printed_bytes = printf("%s: rps=%.1f (overall: %.1f) avg_msec=%.3f (overall: %.3f)\r", config.title, instantaneous_rps, rps, diff --git a/src/zmalloc.c b/src/zmalloc.c index 9819ab23a4..8819f0c518 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -51,7 +51,7 @@ void zlibc_free(void *ptr) { #include #include "zmalloc.h" -#include "atomicvar.h" +#include #define UNUSED(x) ((void)(x)) @@ -87,10 +87,10 @@ void zlibc_free(void *ptr) { #define dallocx(ptr, flags) je_dallocx(ptr, flags) #endif -#define update_zmalloc_stat_alloc(__n) atomicIncr(used_memory, (__n)) -#define update_zmalloc_stat_free(__n) atomicDecr(used_memory, (__n)) +#define update_zmalloc_stat_alloc(__n) atomic_fetch_add_explicit(&used_memory, (__n), memory_order_relaxed) +#define update_zmalloc_stat_free(__n) atomic_fetch_sub_explicit(&used_memory, (__n), memory_order_relaxed) -static serverAtomic size_t used_memory = 0; +static _Atomic size_t used_memory = 0; static void zmalloc_default_oom(size_t size) { fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n", size); @@ -388,8 +388,7 @@ char *zstrdup(const char *s) { } size_t zmalloc_used_memory(void) { - size_t um; - atomicGet(used_memory, um); + size_t um = atomic_load_explicit(&used_memory,memory_order_relaxed); return um; }