diff --git a/src/db.c b/src/db.c index 5a6562a1e2..7dbad28597 100644 --- a/src/db.c +++ b/src/db.c @@ -32,6 +32,7 @@ #include "latency.h" #include "script.h" #include "functions.h" +#include "io_threads.h" #include #include @@ -297,7 +298,10 @@ static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEn old = dictGetVal(de); } kvstoreDictSetVal(db->keys, slot, de, val); - if (server.lazyfree_lazy_server_del) { + /* For efficiency, let the I/O thread that allocated an object also deallocate it. */ + if (tryOffloadFreeObjToIOThreads(old) == C_OK) { + return; + } else if (server.lazyfree_lazy_server_del) { freeObjAsync(key, old, db->id); } else { decrRefCount(old); diff --git a/src/io_threads.c b/src/io_threads.c index 89e2686cd9..ac60ce85b0 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -124,6 +124,10 @@ int inMainThread(void) { return thread_id == 0; } +int getCurThreadID(void) { + return thread_id; +} + /* Drains the I/O threads queue by waiting for all jobs to be processed. * This function must be called from the main thread. */ void drainIOThreadsQueue(void) { @@ -387,3 +391,101 @@ int trySendWriteToIOThreads(client *c) { IOJobQueue_push(jq, ioThreadWriteToClient, c); return C_OK; } + +/* Internal function to free the client's argv in an IO thread. */ +void IOThreadFreeArgv(void *data) { + robj **argv = (robj **)data; + + for (int i = 0;; i++) { + robj *o = argv[i]; + if (o == NULL) { + continue; + } + + int allocator_id = o->allocator_id; + decrRefCount(o); + + /* The allocator_id is set to 0 to indicate that this is the last argument to free */ + if (allocator_id == 0) { + break; + } + } + + zfree(argv); +} + +/* This function attempts to offload the client's argv to an IO thread. + * Returns C_OK if the client's argv were successfully offloaded to an IO thread, + * C_ERR otherwise. */ +int tryOffloadFreeArgvToIOThreads(client *c) { + if (server.active_io_threads_num <= 1 || c->argc == 0) { + return C_ERR; + } + + size_t tid = (c->id % (server.active_io_threads_num - 1)) + 1; + + IOJobQueue *jq = &io_jobs[tid]; + if (IOJobQueue_isFull(jq)) { + return C_ERR; + } + + int last_arg_to_free = -1; + + /* Prepare the argv */ + for (int j = 0; j < c->argc; j++) { + if (c->argv[j]->refcount > 1 || tid != c->argv[j]->allocator_id) { + decrRefCount(c->argv[j]); + /* Set argv[j] to NULL to avoid double free */ + c->argv[j] = NULL; + } else { + last_arg_to_free = j; + } + } + + /* If no argv to free, free the argv array at the main thread */ + if (last_arg_to_free == -1) { + zfree(c->argv); + return C_OK; + } + + /* We set the allocator_id of the last arg to free to 0 to indicate that + * this is the last argument to free. With this approach, we don't need to + * send the argc to the IO thread and we can send just the argv ptr. */ + c->argv[last_arg_to_free]->allocator_id = 0; + + /* Must succeed as we checked the free space before. */ + IOJobQueue_push(jq, IOThreadFreeArgv, c->argv); + + return C_OK; +} + +/* This function attempts to offload the free of an object to an IO thread. + * Returns C_OK if the object was successfully offloaded to an IO thread, + * C_ERR otherwise.*/ +int tryOffloadFreeObjToIOThreads(robj *obj) { + if (server.active_io_threads_num <= 1) { + return C_ERR; + } + /* the object was not allocated by the IO threads */ + if (obj->allocator_id == 0) return C_ERR; + + if (obj->refcount > 1) return C_ERR; + + int tid = obj->allocator_id; + /* If the thread that allocated the object is no longer active, + * reassign the object to a different thread. This is necessary + * because we dynamically adjust the number of active threads + * based on I/O load.*/ + if (tid >= server.active_io_threads_num) { + tid = (tid % (server.active_io_threads_num - 1)) + 1; + } + + IOJobQueue *jq = &io_jobs[tid]; + if (IOJobQueue_isFull(jq)) { + return C_ERR; + } + + IOJobQueue_push(jq, decrRefCountVoid, obj); + server.stat_io_freed_objects++; + return C_OK; +} diff --git a/src/io_threads.h b/src/io_threads.h index 9fb23c190e..3442e423f0 100644 --- a/src/io_threads.h +++ b/src/io_threads.h @@ -6,8 +6,11 @@ void initIOThreads(void); void killIOThreads(void); int inMainThread(void); +int getCurThreadID(void); int trySendReadToIOThreads(client *c); int trySendWriteToIOThreads(client *c); +int tryOffloadFreeObjToIOThreads(robj *o); +int tryOffloadFreeArgvToIOThreads(client *c); void adjustIOThreadsByEventLoad(int numevents, int increase_only); void drainIOThreadsQueue(void); diff --git a/src/module.c b/src/module.c index be5f197921..ed53a80141 100644 --- a/src/module.c +++ b/src/module.c @@ -10692,8 +10692,8 @@ void moduleCallCommandFilters(client *c) { c->argv = filter.argv; c->argv_len = filter.argv_len; c->argc = filter.argc; - /* With I/O thread command-lookup offload, we set c->cmd to the command corresponding to c->argv[0]. - * Since the command filter may change it, we need to reset c->cmd to null. */ + /* With I/O thread command-lookup offload, we set c->realcmd to the command corresponding to c->argv[0]. + * Since the command filter may change it, we need to reset c->realcmd to null to force a new command lookup. */ c->realcmd = NULL; } diff --git a/src/networking.c b/src/networking.c index 35fb8635fb..11df3d7c6c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1424,13 +1424,14 @@ void freeClientOriginalArgv(client *c) { } void freeClientArgv(client *c) { - int j; - for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); + if (tryOffloadFreeArgvToIOThreads(c) == C_ERR) { + for (int j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); + zfree(c->argv); + } c->argc = 0; c->cmd = NULL; c->argv_len_sum = 0; c->argv_len = 0; - zfree(c->argv); c->argv = NULL; } @@ -2581,7 +2582,9 @@ void processInlineBuffer(client *c) { /* Create an Object for all arguments. */ for (c->argc = 0, j = 0; j < argc; j++) { - c->argv[c->argc] = createObject(OBJ_STRING, argv[j]); + robj *obj = createObject(OBJ_STRING, argv[j]); + obj->allocator_id = getCurThreadID(); + c->argv[c->argc] = obj; c->argc++; c->argv_len_sum += sdslen(argv[j]); } @@ -2761,7 +2764,9 @@ void processMultibulkBuffer(client *c) { * just use the current sds string. */ if (!is_primary && c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen + 2)) { - c->argv[c->argc++] = createObject(OBJ_STRING, c->querybuf); + robj *o = createObject(OBJ_STRING, c->querybuf); + o->allocator_id = getCurThreadID(); + c->argv[c->argc++] = o; c->argv_len_sum += c->bulklen; sdsIncrLen(c->querybuf, -2); /* remove CRLF */ /* Assume that if we saw a fat argument we'll see another one @@ -2769,7 +2774,9 @@ void processMultibulkBuffer(client *c) { c->querybuf = sdsnewlen(SDS_NOINIT, c->bulklen + 2); sdsclear(c->querybuf); } else { - c->argv[c->argc++] = createStringObject(c->querybuf + c->qb_pos, c->bulklen); + robj *o = createStringObject(c->querybuf + c->qb_pos, c->bulklen); + o->allocator_id = getCurThreadID(); + c->argv[c->argc++] = o; c->argv_len_sum += c->bulklen; c->qb_pos += c->bulklen + 2; } diff --git a/src/object.c b/src/object.c index 6e5d1f460b..521557fb0c 100644 --- a/src/object.c +++ b/src/object.c @@ -46,6 +46,7 @@ robj *createObject(int type, void *ptr) { o->encoding = OBJ_ENCODING_RAW; o->ptr = ptr; o->refcount = 1; + o->allocator_id = 0; o->lru = 0; return o; } @@ -96,6 +97,7 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { o->encoding = OBJ_ENCODING_EMBSTR; o->ptr = sh + 1; o->refcount = 1; + o->allocator_id = 0; o->lru = 0; sh->len = len; diff --git a/src/server.c b/src/server.c index 427351c7a0..ac327328f2 100644 --- a/src/server.c +++ b/src/server.c @@ -2492,6 +2492,7 @@ void resetServerStats(void) { server.stat_io_reads_processed = 0; server.stat_total_reads_processed = 0; server.stat_io_writes_processed = 0; + server.stat_io_freed_objects = 0; server.stat_total_writes_processed = 0; server.stat_client_qbuf_limit_disconnections = 0; server.stat_client_outbuf_limit_disconnections = 0; @@ -5702,6 +5703,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "total_writes_processed:%lld\r\n", server.stat_total_writes_processed, "io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed, "io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed, + "io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects, "client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections, "client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections, "reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks, diff --git a/src/server.h b/src/server.h index 8069e01224..dcb4a9e63a 100644 --- a/src/server.h +++ b/src/server.h @@ -853,8 +853,9 @@ struct ValkeyModuleDigest { #define LRU_CLOCK_MAX ((1 << LRU_BITS) - 1) /* Max value of obj->lru */ #define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */ -#define OBJ_SHARED_REFCOUNT INT_MAX /* Global object never destroyed. */ -#define OBJ_STATIC_REFCOUNT (INT_MAX - 1) /* Object allocated in the stack. */ +#define UINT28_MAX 0x0FFFFFFF +#define OBJ_SHARED_REFCOUNT UINT28_MAX /* Global object never destroyed. */ +#define OBJ_STATIC_REFCOUNT (UINT28_MAX - 1) /* Object allocated in the stack. */ #define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT struct serverObject { unsigned type : 4; @@ -862,7 +863,8 @@ struct serverObject { unsigned lru : LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ - int refcount; + unsigned refcount : 28; + unsigned allocator_id : 4; void *ptr; }; @@ -1215,7 +1217,8 @@ typedef struct client { struct serverCommand *cmd, *lastcmd; /* Last command executed. */ struct serverCommand *realcmd; /* The original command that was executed by the client, Used to update error stats in case the c->cmd was modified - during the command invocation (like on GEOADD for example). */ + during the command invocation (like on GEOADD for example). + Also stores the command parsed by the IO threads*/ user *user; /* User associated with this connection. If the user is set to NULL the connection can do anything (admin). */ @@ -1750,6 +1753,7 @@ struct valkeyServer { long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ long long stat_io_reads_processed; /* Number of read events processed by IO threads */ long long stat_io_writes_processed; /* Number of write events processed by IO threads */ + long long stat_io_freed_objects; /* Number of objects freed by IO threads */ long long stat_total_reads_processed; /* Total number of read events processed */ long long stat_total_writes_processed; /* Total number of write events processed */ long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */