Skip to content

Commit

Permalink
Free offload to IO threads
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <uriy@amazon.com>
  • Loading branch information
uriyage committed Jul 9, 2024
1 parent 530293c commit d8468b7
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 13 deletions.
6 changes: 5 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "latency.h"
#include "script.h"
#include "functions.h"
#include "io_threads.h"

#include <signal.h>
#include <ctype.h>
Expand Down Expand Up @@ -297,7 +298,10 @@ static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEn
old = dictGetVal(de);
}
kvstoreDictSetVal(db->keys, slot, de, val);
if (server.lazyfree_lazy_server_del) {
/* For efficiency, let the I/O thread that allocated an object also deallocate it. */
if (tryOffloadFreeObjToIOThreads(old) == C_OK) {
return;
} else if (server.lazyfree_lazy_server_del) {
freeObjAsync(key, old, db->id);
} else {
decrRefCount(old);
Expand Down
102 changes: 102 additions & 0 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ int inMainThread(void) {
return thread_id == 0;
}

int getCurThreadID(void) {
return thread_id;
}

/* Drains the I/O threads queue by waiting for all jobs to be processed.
* This function must be called from the main thread. */
void drainIOThreadsQueue(void) {
Expand Down Expand Up @@ -387,3 +391,101 @@ int trySendWriteToIOThreads(client *c) {
IOJobQueue_push(jq, ioThreadWriteToClient, c);
return C_OK;
}

/* Internal function to free the client's argv in an IO thread. */
void IOThreadFreeArgv(void *data) {
robj **argv = (robj **)data;

for (int i = 0;; i++) {
robj *o = argv[i];
if (o == NULL) {
continue;
}

int allocator_id = o->allocator_id;
decrRefCount(o);

/* The allocator_id is set to 0 to indicate that this is the last argument to free */
if (allocator_id == 0) {
break;
}
}

zfree(argv);
}

/* This function attempts to offload the client's argv to an IO thread.
* Returns C_OK if the client's argv were successfully offloaded to an IO thread,
* C_ERR otherwise. */
int tryOffloadFreeArgvToIOThreads(client *c) {
if (server.active_io_threads_num <= 1 || c->argc == 0) {
return C_ERR;
}

size_t tid = (c->id % (server.active_io_threads_num - 1)) + 1;

IOJobQueue *jq = &io_jobs[tid];
if (IOJobQueue_isFull(jq)) {
return C_ERR;
}

int last_arg_to_free = -1;

/* Prepare the argv */
for (int j = 0; j < c->argc; j++) {
if (c->argv[j]->refcount > 1 || tid != c->argv[j]->allocator_id) {
decrRefCount(c->argv[j]);
/* Set argv[j] to NULL to avoid double free */
c->argv[j] = NULL;
} else {
last_arg_to_free = j;
}
}

/* If no argv to free, free the argv array at the main thread */
if (last_arg_to_free == -1) {
zfree(c->argv);
return C_OK;
}

/* We set the allocator_id of the last arg to free to 0 to indicate that
* this is the last argument to free. With this approach, we don't need to
* send the argc to the IO thread and we can send just the argv ptr. */
c->argv[last_arg_to_free]->allocator_id = 0;

/* Must succeed as we checked the free space before. */
IOJobQueue_push(jq, IOThreadFreeArgv, c->argv);

return C_OK;
}

/* This function attempts to offload the free of an object to an IO thread.
* Returns C_OK if the object was successfully offloaded to an IO thread,
* C_ERR otherwise.*/
int tryOffloadFreeObjToIOThreads(robj *obj) {
if (server.active_io_threads_num <= 1) {
return C_ERR;
}
/* the object was not allocated by the IO threads */
if (obj->allocator_id == 0) return C_ERR;

if (obj->refcount > 1) return C_ERR;

int tid = obj->allocator_id;
/* If the thread that allocated the object is no longer active,
* reassign the object to a different thread. This is necessary
* because we dynamically adjust the number of active threads
* based on I/O load.*/
if (tid >= server.active_io_threads_num) {
tid = (tid % (server.active_io_threads_num - 1)) + 1;
}

IOJobQueue *jq = &io_jobs[tid];
if (IOJobQueue_isFull(jq)) {
return C_ERR;
}

IOJobQueue_push(jq, decrRefCountVoid, obj);
server.stat_io_freed_objects++;
return C_OK;
}
3 changes: 3 additions & 0 deletions src/io_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
void initIOThreads(void);
void killIOThreads(void);
int inMainThread(void);
int getCurThreadID(void);
int trySendReadToIOThreads(client *c);
int trySendWriteToIOThreads(client *c);
int tryOffloadFreeObjToIOThreads(robj *o);
int tryOffloadFreeArgvToIOThreads(client *c);
void adjustIOThreadsByEventLoad(int numevents, int increase_only);
void drainIOThreadsQueue(void);

Expand Down
4 changes: 2 additions & 2 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -10692,8 +10692,8 @@ void moduleCallCommandFilters(client *c) {
c->argv = filter.argv;
c->argv_len = filter.argv_len;
c->argc = filter.argc;
/* With I/O thread command-lookup offload, we set c->cmd to the command corresponding to c->argv[0].
* Since the command filter may change it, we need to reset c->cmd to null. */
/* With I/O thread command-lookup offload, we set c->realcmd to the command corresponding to c->argv[0].
* Since the command filter may change it, we need to reset c->realcmd to null to force a new command lookup. */
c->realcmd = NULL;
}

Expand Down
19 changes: 13 additions & 6 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1424,13 +1424,14 @@ void freeClientOriginalArgv(client *c) {
}

void freeClientArgv(client *c) {
int j;
for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
if (tryOffloadFreeArgvToIOThreads(c) == C_ERR) {
for (int j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
zfree(c->argv);
}
c->argc = 0;
c->cmd = NULL;
c->argv_len_sum = 0;
c->argv_len = 0;
zfree(c->argv);
c->argv = NULL;
}

Expand Down Expand Up @@ -2581,7 +2582,9 @@ void processInlineBuffer(client *c) {

/* Create an Object for all arguments. */
for (c->argc = 0, j = 0; j < argc; j++) {
c->argv[c->argc] = createObject(OBJ_STRING, argv[j]);
robj *obj = createObject(OBJ_STRING, argv[j]);
obj->allocator_id = getCurThreadID();
c->argv[c->argc] = obj;
c->argc++;
c->argv_len_sum += sdslen(argv[j]);
}
Expand Down Expand Up @@ -2761,15 +2764,19 @@ void processMultibulkBuffer(client *c) {
* just use the current sds string. */
if (!is_primary && c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG &&
sdslen(c->querybuf) == (size_t)(c->bulklen + 2)) {
c->argv[c->argc++] = createObject(OBJ_STRING, c->querybuf);
robj *o = createObject(OBJ_STRING, c->querybuf);
o->allocator_id = getCurThreadID();
c->argv[c->argc++] = o;
c->argv_len_sum += c->bulklen;
sdsIncrLen(c->querybuf, -2); /* remove CRLF */
/* Assume that if we saw a fat argument we'll see another one
* likely... */
c->querybuf = sdsnewlen(SDS_NOINIT, c->bulklen + 2);
sdsclear(c->querybuf);
} else {
c->argv[c->argc++] = createStringObject(c->querybuf + c->qb_pos, c->bulklen);
robj *o = createStringObject(c->querybuf + c->qb_pos, c->bulklen);
o->allocator_id = getCurThreadID();
c->argv[c->argc++] = o;
c->argv_len_sum += c->bulklen;
c->qb_pos += c->bulklen + 2;
}
Expand Down
2 changes: 2 additions & 0 deletions src/object.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ robj *createObject(int type, void *ptr) {
o->encoding = OBJ_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;
o->allocator_id = 0;
o->lru = 0;
return o;
}
Expand Down Expand Up @@ -96,6 +97,7 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
o->encoding = OBJ_ENCODING_EMBSTR;
o->ptr = sh + 1;
o->refcount = 1;
o->allocator_id = 0;
o->lru = 0;

sh->len = len;
Expand Down
2 changes: 2 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2492,6 +2492,7 @@ void resetServerStats(void) {
server.stat_io_reads_processed = 0;
server.stat_total_reads_processed = 0;
server.stat_io_writes_processed = 0;
server.stat_io_freed_objects = 0;
server.stat_total_writes_processed = 0;
server.stat_client_qbuf_limit_disconnections = 0;
server.stat_client_outbuf_limit_disconnections = 0;
Expand Down Expand Up @@ -5702,6 +5703,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"total_writes_processed:%lld\r\n", server.stat_total_writes_processed,
"io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed,
"io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed,
"io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects,
"client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections,
"client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections,
"reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks,
Expand Down
12 changes: 8 additions & 4 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -853,16 +853,18 @@ struct ValkeyModuleDigest {
#define LRU_CLOCK_MAX ((1 << LRU_BITS) - 1) /* Max value of obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */

#define OBJ_SHARED_REFCOUNT INT_MAX /* Global object never destroyed. */
#define OBJ_STATIC_REFCOUNT (INT_MAX - 1) /* Object allocated in the stack. */
#define UINT28_MAX 0x0FFFFFFF
#define OBJ_SHARED_REFCOUNT UINT28_MAX /* Global object never destroyed. */
#define OBJ_STATIC_REFCOUNT (UINT28_MAX - 1) /* Object allocated in the stack. */
#define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT
struct serverObject {
unsigned type : 4;
unsigned encoding : 4;
unsigned lru : LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
unsigned refcount : 28;
unsigned allocator_id : 4;
void *ptr;
};

Expand Down Expand Up @@ -1215,7 +1217,8 @@ typedef struct client {
struct serverCommand *cmd, *lastcmd; /* Last command executed. */
struct serverCommand *realcmd; /* The original command that was executed by the client,
Used to update error stats in case the c->cmd was modified
during the command invocation (like on GEOADD for example). */
during the command invocation (like on GEOADD for example).
Also stores the command parsed by the IO threads*/
user *user; /* User associated with this connection. If the
user is set to NULL the connection can do
anything (admin). */
Expand Down Expand Up @@ -1750,6 +1753,7 @@ struct valkeyServer {
long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */
long long stat_io_reads_processed; /* Number of read events processed by IO threads */
long long stat_io_writes_processed; /* Number of write events processed by IO threads */
long long stat_io_freed_objects; /* Number of objects freed by IO threads */
long long stat_total_reads_processed; /* Total number of read events processed */
long long stat_total_writes_processed; /* Total number of write events processed */
long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */
Expand Down

0 comments on commit d8468b7

Please sign in to comment.