From a47dbeeab989a4b26766de2644e5c2ffe1017c03 Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Mon, 6 May 2024 07:37:58 +0000 Subject: [PATCH] poll offload to io threads Signed-off-by: Uri Yagelnik --- src/ae.c | 48 ++++++++++++++++++++++------------ src/ae.h | 13 +++++++--- src/config.c | 2 +- src/connection.h | 3 ++- src/io_threads.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++ src/io_threads.h | 1 + src/module.c | 4 +-- src/rdb.c | 4 +-- src/replication.c | 6 ++--- src/sentinel.c | 1 + src/server.c | 61 +++++++++++++++++++++++++++++++++++++------ src/server.h | 11 +++++++- src/socket.c | 14 +++++----- src/tls.c | 18 ++++++------- 14 files changed, 197 insertions(+), 55 deletions(-) diff --git a/src/ae.c b/src/ae.c index b6a1ce0b10..7bc61404e2 100644 --- a/src/ae.c +++ b/src/ae.c @@ -41,7 +41,6 @@ #include #include #include -#include #include #include "zmalloc.h" @@ -81,6 +80,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) { eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; + eventLoop->custompoll = NULL; eventLoop->flags = 0; if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the @@ -345,6 +345,10 @@ static int processTimeEvents(aeEventLoop *eventLoop) { return processed; } +int aePoll(aeEventLoop *eventLoop, struct timeval *tvp) { + return aeApiPoll(eventLoop, tvp); +} + /* Process every pending file event, then every pending time event * (that may be registered by file event callbacks just processed). * Without special flags the function sleeps until some file event @@ -377,25 +381,29 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP)) eventLoop->beforesleep(eventLoop); - /* The eventLoop->flags may be changed inside beforesleep. - * So we should check it after beforesleep be called. At the same time, - * the parameter flags always should have the highest priority. - * That is to say, once the parameter flag is set to AE_DONT_WAIT, - * no matter what value eventLoop->flags is set to, we should ignore it. */ - if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) { - tv.tv_sec = tv.tv_usec = 0; - tvp = &tv; - } else if (flags & AE_TIME_EVENTS) { - usUntilTimer = usUntilEarliestTimer(eventLoop); - if (usUntilTimer >= 0) { - tv.tv_sec = usUntilTimer / 1000000; - tv.tv_usec = usUntilTimer % 1000000; + if (eventLoop->custompoll != NULL) { + numevents = eventLoop->custompoll(eventLoop); + } else { + /* The eventLoop->flags may be changed inside beforesleep. + * So we should check it after beforesleep be called. At the same time, + * the parameter flags always should have the highest priority. + * That is to say, once the parameter flag is set to AE_DONT_WAIT, + * no matter what value eventLoop->flags is set to, we should ignore it. */ + if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) { + tv.tv_sec = tv.tv_usec = 0; tvp = &tv; + } else if (flags & AE_TIME_EVENTS) { + usUntilTimer = usUntilEarliestTimer(eventLoop); + if (usUntilTimer >= 0) { + tv.tv_sec = usUntilTimer / 1000000; + tv.tv_usec = usUntilTimer % 1000000; + tvp = &tv; + } } + /* Call the multiplexing API, will return only on timeout or when + * some event fires. */ + numevents = aeApiPoll(eventLoop, tvp); } - /* Call the multiplexing API, will return only on timeout or when - * some event fires. */ - numevents = aeApiPoll(eventLoop, tvp); /* Don't process file events if not requested. */ if (!(flags & AE_FILE_EVENTS)) { @@ -503,3 +511,9 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep) { eventLoop->aftersleep = aftersleep; } + +/*This function allows setting a custom poll procedure to be used by the event loop + * The custom poll procedure, if set, will be called instead of the default aeApiPoll */ +void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll) { + eventLoop->custompoll = custompoll; +} diff --git a/src/ae.h b/src/ae.h index 3b1c96a01d..d378cc2539 100644 --- a/src/ae.h +++ b/src/ae.h @@ -34,6 +34,7 @@ #define __AE_H__ #include "monotonic.h" +#include #define AE_OK 0 #define AE_ERR -1 @@ -69,6 +70,7 @@ typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *client typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents); +typedef int aeCustomPollProc(struct aeEventLoop *eventLoop); /* File event structure */ typedef struct aeFileEvent { @@ -109,6 +111,7 @@ typedef struct aeEventLoop { void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; aeAfterSleepProc *aftersleep; + aeCustomPollProc *custompoll; int flags; } aeEventLoop; @@ -116,8 +119,6 @@ typedef struct aeEventLoop { aeEventLoop *aeCreateEventLoop(int setsize); void aeDeleteEventLoop(aeEventLoop *eventLoop); void aeStop(aeEventLoop *eventLoop); -int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); -void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); int aeGetFileEvents(aeEventLoop *eventLoop, int fd); void *aeGetFileClientData(aeEventLoop *eventLoop, int fd); long long aeCreateTimeEvent(aeEventLoop *eventLoop, @@ -132,8 +133,14 @@ void aeMain(aeEventLoop *eventLoop); char *aeGetApiName(void); void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep); +void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll); int aeGetSetSize(aeEventLoop *eventLoop); -int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); void aeSetDontWait(aeEventLoop *eventLoop, int noWait); +int aePoll(aeEventLoop *eventLoop, struct timeval *tvp); +#ifndef AE_EXCLUDE_EVENT_MODIFICATION +int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); +void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); +int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); +#endif /* AE_EXCLUDE_EVENT_MODIFICATION */ #endif diff --git a/src/config.c b/src/config.c index 32e6018ff2..bdf4faeff2 100644 --- a/src/config.c +++ b/src/config.c @@ -2527,7 +2527,7 @@ static int updateMaxclients(const char **err) { return 0; } if ((unsigned int)aeGetSetSize(server.el) < server.maxclients + CONFIG_FDSET_INCR) { - if (aeResizeSetSize(server.el, server.maxclients + CONFIG_FDSET_INCR) == AE_ERR) { + if (serverResizeSetSize(server.maxclients + CONFIG_FDSET_INCR) == AE_ERR) { *err = "The event loop API is not able to handle the specified number of clients"; return 0; } diff --git a/src/connection.h b/src/connection.h index c6466c2d4c..d85a51e629 100644 --- a/src/connection.h +++ b/src/connection.h @@ -35,8 +35,9 @@ #include #include #include - +#define AE_EXCLUDE_EVENT_MODIFICATION #include "ae.h" +#undef AE_EXCLUDE_EVENT_MODIFICATION #define CONN_INFO_LEN 32 #define CONN_ADDR_STR_LEN 128 /* Similar to INET6_ADDRSTRLEN, hoping to handle other protocols. */ diff --git a/src/io_threads.c b/src/io_threads.c index ac60ce85b0..afd27c92c5 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -193,6 +193,20 @@ void adjustIOThreadsByEventLoad(int numevents, int increase_only) { } } +/* This function performs polling on the given event loop and updates the server's + * IO fired events count and poll state. */ +void IOThreadPoll(void *data) { + aeEventLoop *el = (aeEventLoop *)data; + struct timeval tvp = {0, 0}; + + pthread_mutex_lock(&server.io_poll_mutex); + int num_events = aePoll(el, &tvp); + pthread_mutex_unlock(&server.io_poll_mutex); + + server.io_ae_fired_events = num_events; + atomic_store_explicit(&server.io_poll_state, AE_IO_STATE_DONE, memory_order_release); +} + static void *IOThreadMain(void *myid) { /* The ID is the thread ID number (from 1 to server.io_threads_num-1). ID 0 is the main thread. */ long id = (long)myid; @@ -284,6 +298,10 @@ void killIOThreads(void) { void initIOThreads(void) { server.active_io_threads_num = 1; /* We start with threads not active. */ + pthread_mutex_init(&server.io_poll_mutex, NULL); + server.io_poll_state = AE_IO_STATE_NONE; + server.io_ae_fired_events = 0; + /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ if (server.io_threads_num == 1) return; @@ -489,3 +507,51 @@ int tryOffloadFreeObjToIOThreads(robj *obj) { server.stat_io_freed_objects++; return C_OK; } + +/* This function retrieves the results of the IO Thread poll. + * returns the number of fired events if the IO thread has finished processing poll events, 0 otherwise. */ +static int getIOThreadPollResults(aeEventLoop *eventLoop) { + int io_state; + io_state = atomic_load_explicit(&server.io_poll_state, memory_order_acquire); + if (io_state == AE_IO_STATE_POLL) { + /* IO thread is still processing poll events. */ + return 0; + } + + /* IO thread is done processing poll events. */ + serverAssert(io_state == AE_IO_STATE_DONE); + server.stat_poll_processed_by_io_threads++; + server.io_poll_state = AE_IO_STATE_NONE; + + /* Remove the custom poll proc. */ + aeSetCustomPollProc(eventLoop, NULL); + + return server.io_ae_fired_events; +} + +void trySendPollJobToIOThreads(void) { + if (server.active_io_threads_num <= 1) { + return; + } + + /* If there are no pending jobs, let the main thread do the poll-wait by itself. */ + if (listLength(server.clients_pending_io_write) + listLength(server.clients_pending_io_read) == 0) { + return; + } + + /* If the IO thread is already processing poll events, don't send another job. */ + if (server.io_poll_state != AE_IO_STATE_NONE) { + return; + } + + /* The poll is sent to the last thread. While a random thread could have been selected, + * the last thread has a slightly better chance of being less loaded compared to other threads, + * As we activate the lowest threads first. */ + int tid = server.active_io_threads_num - 1; + IOJobQueue *jq = &io_jobs[tid]; + if (IOJobQueue_isFull(jq)) return; /* The main thread will handle the poll itself. */ + + server.io_poll_state = AE_IO_STATE_POLL; + aeSetCustomPollProc(server.el, getIOThreadPollResults); + IOJobQueue_push(jq, IOThreadPoll, server.el); +} diff --git a/src/io_threads.h b/src/io_threads.h index 3442e423f0..1db4581ad2 100644 --- a/src/io_threads.h +++ b/src/io_threads.h @@ -13,5 +13,6 @@ int tryOffloadFreeObjToIOThreads(robj *o); int tryOffloadFreeArgvToIOThreads(client *c); void adjustIOThreadsByEventLoad(int numevents, int increase_only); void drainIOThreadsQueue(void); +void trySendPollJobToIOThreads(void); #endif /* IO_THREADS_H */ diff --git a/src/module.c b/src/module.c index ed53a80141..006eb13493 100644 --- a/src/module.c +++ b/src/module.c @@ -9337,7 +9337,7 @@ int VM_EventLoopAdd(int fd, int mask, ValkeyModuleEventLoopFunc func, void *user int aeMask = eventLoopToAeMask(mask); - if (aeCreateFileEvent(server.el, fd, aeMask, aeProc, data) != AE_OK) { + if (serverCreateFileEvent(fd, aeMask, aeProc, data) != AE_OK) { if (aeGetFileEvents(server.el, fd) == AE_NONE) zfree(data); return VALKEYMODULE_ERR; } @@ -9378,7 +9378,7 @@ int VM_EventLoopDel(int fd, int mask) { /* After deleting the event, if fd does not have any registered event * anymore, we can free the EventLoopData object. */ EventLoopData *data = aeGetFileClientData(server.el, fd); - aeDeleteFileEvent(server.el, fd, eventLoopToAeMask(mask)); + serverDeleteFileEvent(fd, eventLoopToAeMask(mask)); if (aeGetFileEvents(server.el, fd) == AE_NONE) zfree(data); errno = 0; diff --git a/src/rdb.c b/src/rdb.c index f9ccd676fd..0c3a7a9b98 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3455,7 +3455,7 @@ static void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { serverLog(LL_WARNING, "Background transfer terminated by signal %d", bysignal); } if (server.rdb_child_exit_pipe != -1) close(server.rdb_child_exit_pipe); - aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE); + serverDeleteFileEvent(server.rdb_pipe_read, AE_READABLE); close(server.rdb_pipe_read); server.rdb_child_exit_pipe = -1; server.rdb_pipe_read = -1; @@ -3609,7 +3609,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { server.rdb_save_time_start = time(NULL); server.rdb_child_type = RDB_CHILD_TYPE_SOCKET; close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */ - if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) { + if (serverCreateFileEvent(server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) { serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); } } diff --git a/src/replication.c b/src/replication.c index 21ccb0e92d..e677f494a0 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1431,7 +1431,7 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) { server.rdb_pipe_numconns_writing--; /* if there are no more writes for now for this conn, or write error: */ if (server.rdb_pipe_numconns_writing == 0) { - if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) { + if (serverCreateFileEvent(server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) { serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); } } @@ -1488,7 +1488,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, if (server.rdb_pipe_bufflen == 0) { /* EOF - write end was closed. */ int stillUp = 0; - aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE); + serverDeleteFileEvent(server.rdb_pipe_read, AE_READABLE); for (i = 0; i < server.rdb_pipe_numconns; i++) { connection *conn = server.rdb_pipe_conns[i]; if (!conn) continue; @@ -1542,7 +1542,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, } /* Remove the pipe read handler if at least one write handler was set. */ if (server.rdb_pipe_numconns_writing || stillAlive == 0) { - aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE); + serverDeleteFileEvent(server.rdb_pipe_read, AE_READABLE); break; } } diff --git a/src/sentinel.c b/src/sentinel.c index a095b8f48a..0d061f17b1 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -28,6 +28,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include "ae.h" #include "server.h" #include "hiredis.h" #if USE_OPENSSL == 1 /* BUILD_YES */ diff --git a/src/server.c b/src/server.c index ac327328f2..d71e668b40 100644 --- a/src/server.c +++ b/src/server.c @@ -98,6 +98,11 @@ int isReadyToShutdown(void); int finishShutdown(void); const char *replstateToString(int replstate); +/*============================ AE event modification functions ============================ */ +int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); +void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); +int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); + /*============================ Utility functions ============================ */ /* This macro tells if we are in the context of loading an AOF. */ @@ -1565,6 +1570,9 @@ extern int ProcessingEventsWhileBlocked; void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); + /* When I/O threads are enabled and there are pending I/O jobs, the poll is offloaded to one of the I/O threads. */ + trySendPollJobToIOThreads(); + size_t zmalloc_used = zmalloc_used_memory(); if (zmalloc_used > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used; @@ -1596,10 +1604,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Handle pending data(typical TLS). (must be done before flushAppendOnlyFile) */ connTypeProcessPendingData(); - /* If any connection type(typical TLS) still has pending unread data or if there are clients - * with pending IO reads/writes, don't sleep at all. */ - int dont_sleep = connTypeHasPendingData() || listLength(server.clients_pending_io_read) > 0 || - listLength(server.clients_pending_io_write) > 0; + /* If any connection type(typical TLS) still has pending unread data don't sleep at all. */ + int dont_sleep = connTypeHasPendingData(); /* Call the Cluster before sleep function. Note that this function * may change the state of Cluster (from ok to fail or vice versa), @@ -2375,7 +2381,7 @@ void closeListener(connListener *sfd) { for (j = 0; j < sfd->count; j++) { if (sfd->fd[j] == -1) continue; - aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE); + serverDeleteFileEvent(sfd->fd[j], AE_READABLE); close(sfd->fd[j]); } @@ -2388,9 +2394,9 @@ int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler) { int j; for (j = 0; j < sfd->count; j++) { - if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler, sfd) == AE_ERR) { + if (serverCreateFileEvent(sfd->fd[j], AE_READABLE, accept_handler, sfd) == AE_ERR) { /* Rollback */ - for (j = j - 1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE); + for (j = j - 1; j >= 0; j--) serverDeleteFileEvent(sfd->fd[j], AE_READABLE); return C_ERR; } } @@ -2493,6 +2499,7 @@ void resetServerStats(void) { server.stat_total_reads_processed = 0; server.stat_io_writes_processed = 0; server.stat_io_freed_objects = 0; + server.stat_poll_processed_by_io_threads = 0; server.stat_total_writes_processed = 0; server.stat_client_qbuf_limit_disconnections = 0; server.stat_client_outbuf_limit_disconnections = 0; @@ -2698,7 +2705,7 @@ void initServer(void) { /* Register a readable event for the pipe used to awake the event loop * from module threads. */ - if (aeCreateFileEvent(server.el, server.module_pipe[0], AE_READABLE, modulePipeReadable, NULL) == AE_ERR) { + if (serverCreateFileEvent(server.module_pipe[0], AE_READABLE, modulePipeReadable, NULL) == AE_ERR) { serverPanic("Error registering the readable event for the module pipe."); } @@ -5704,6 +5711,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed, "io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed, "io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects, + "io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads, "client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections, "client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections, "reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks, @@ -6351,6 +6359,43 @@ void dismissClientMemory(client *c) { } } +/* server eventloop functions */ +int serverResizeSetSize(int setsize) { + if (server.io_poll_state != AE_IO_STATE_POLL) { + return aeResizeSetSize(server.el, setsize); + } + /* Take mutex - we need to be sure that the file events are not + * resized while the IO thread is polling for events. */ + pthread_mutex_lock(&server.io_poll_mutex); + int ret = aeResizeSetSize(server.el, setsize); + pthread_mutex_unlock(&server.io_poll_mutex); + return ret; +} + +int serverCreateFileEvent(int fd, int mask, aeFileProc *proc, void *clientData) { + if (server.io_poll_state != AE_IO_STATE_POLL) { + return aeCreateFileEvent(server.el, fd, mask, proc, clientData); + } + /* Take mutex - we need to be sure that the file event is not + * added while the IO thread is polling for events. */ + pthread_mutex_lock(&server.io_poll_mutex); + int ret = aeCreateFileEvent(server.el, fd, mask, proc, clientData); + pthread_mutex_unlock(&server.io_poll_mutex); + return ret; +} + +void serverDeleteFileEvent(int fd, int mask) { + if (server.io_poll_state != AE_IO_STATE_POLL) { + aeDeleteFileEvent(server.el, fd, mask); + } + /* Take mutex - we need to be sure that the file event is not + * deleted from the event loop while the IO thread is + * polling for events. */ + pthread_mutex_lock(&server.io_poll_mutex); + aeDeleteFileEvent(server.el, fd, mask); + pthread_mutex_unlock(&server.io_poll_mutex); +} + /* In the child process, we don't need some buffers anymore, and these are * likely to change in the parent when there's heavy write traffic. * We dismiss them right away, to avoid CoW. diff --git a/src/server.h b/src/server.h index dcb4a9e63a..66e9d791a0 100644 --- a/src/server.h +++ b/src/server.h @@ -64,7 +64,6 @@ typedef long long mstime_t; /* millisecond time type. */ typedef long long ustime_t; /* microsecond time type. */ -#include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ #include "dict.h" /* Hash tables */ #include "kvstore.h" /* Slot-based hash table */ @@ -639,6 +638,9 @@ typedef enum { #define BUSY_MODULE_YIELD_EVENTS (1 << 0) #define BUSY_MODULE_YIELD_CLIENTS (1 << 1) +/* IO poll */ +typedef enum { AE_IO_STATE_NONE, AE_IO_STATE_POLL, AE_IO_STATE_DONE } AeIoState; + /*----------------------------------------------------------------------------- * Data types *----------------------------------------------------------------------------*/ @@ -1599,6 +1601,9 @@ struct valkeyServer { dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ aeEventLoop *el; + _Atomic AeIoState io_poll_state; /* Indicates the state of the IO polling. */ + pthread_mutex_t io_poll_mutex; /* Mutex to protect the `server.el` polling access. */ + int io_ae_fired_events; /* Number of poll events received by the IO thread. */ rax *errors; /* Errors table */ int errors_enabled; /* If true, errorstats is enabled, and we will add new errors. */ unsigned int lruclock; /* Clock for LRU eviction */ @@ -1754,6 +1759,7 @@ struct valkeyServer { long long stat_io_reads_processed; /* Number of read events processed by IO threads */ long long stat_io_writes_processed; /* Number of write events processed by IO threads */ long long stat_io_freed_objects; /* Number of objects freed by IO threads */ + long long stat_poll_processed_by_io_threads; /* Total number of poll jobs processed by IO */ long long stat_total_reads_processed; /* Total number of read events processed */ long long stat_total_writes_processed; /* Total number of write events processed */ long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ @@ -3204,6 +3210,9 @@ robj *activeDefragStringOb(robj *ob); void dismissSds(sds s); void dismissMemory(void *ptr, size_t size_hint); void dismissMemoryInChild(void); +void serverDeleteFileEvent(int fd, int mask); +int serverCreateFileEvent(int fd, int mask, aeFileProc *proc, void *clientData); +int serverResizeSetSize(int setsize); #define RESTART_SERVER_NONE 0 #define RESTART_SERVER_GRACEFULLY (1 << 0) /* Do proper shutdown. */ diff --git a/src/socket.c b/src/socket.c index b2f8f1aaec..843cf4b738 100644 --- a/src/socket.c +++ b/src/socket.c @@ -117,7 +117,7 @@ static int connSocketConnect(connection *conn, conn->state = CONN_STATE_CONNECTING; conn->conn_handler = connect_handler; - aeCreateFileEvent(server.el, conn->fd, AE_WRITABLE, conn->type->ae_handler, conn); + serverCreateFileEvent(conn->fd, AE_WRITABLE, conn->type->ae_handler, conn); return C_OK; } @@ -137,7 +137,7 @@ static void connSocketShutdown(connection *conn) { /* Close the connection and free resources. */ static void connSocketClose(connection *conn) { if (conn->fd != -1) { - aeDeleteFileEvent(server.el, conn->fd, AE_READABLE | AE_WRITABLE); + serverDeleteFileEvent(conn->fd, AE_READABLE | AE_WRITABLE); close(conn->fd); conn->fd = -1; } @@ -227,8 +227,8 @@ static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc fu else conn->flags &= ~CONN_FLAG_WRITE_BARRIER; if (!conn->write_handler) - aeDeleteFileEvent(server.el, conn->fd, AE_WRITABLE); - else if (aeCreateFileEvent(server.el, conn->fd, AE_WRITABLE, conn->type->ae_handler, conn) == AE_ERR) + serverDeleteFileEvent(conn->fd, AE_WRITABLE); + else if (serverCreateFileEvent(conn->fd, AE_WRITABLE, conn->type->ae_handler, conn) == AE_ERR) return C_ERR; return C_OK; } @@ -241,8 +241,8 @@ static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc fun conn->read_handler = func; if (!conn->read_handler) - aeDeleteFileEvent(server.el, conn->fd, AE_READABLE); - else if (aeCreateFileEvent(server.el, conn->fd, AE_READABLE, conn->type->ae_handler, conn) == AE_ERR) + serverDeleteFileEvent(conn->fd, AE_READABLE); + else if (serverCreateFileEvent(conn->fd, AE_READABLE, conn->type->ae_handler, conn) == AE_ERR) return C_ERR; return C_OK; } @@ -265,7 +265,7 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD conn->state = CONN_STATE_CONNECTED; } - if (!conn->write_handler) aeDeleteFileEvent(server.el, conn->fd, AE_WRITABLE); + if (!conn->write_handler) serverDeleteFileEvent(conn->fd, AE_WRITABLE); if (!callHandler(conn, conn->conn_handler)) return; conn->conn_handler = NULL; diff --git a/src/tls.c b/src/tls.c index 1913d876fa..1aee4224b3 100644 --- a/src/tls.c +++ b/src/tls.c @@ -586,12 +586,12 @@ static void registerSSLEvent(tls_connection *conn, WantIOType want) { switch (want) { case WANT_READ: - if (mask & AE_WRITABLE) aeDeleteFileEvent(server.el, conn->c.fd, AE_WRITABLE); - if (!(mask & AE_READABLE)) aeCreateFileEvent(server.el, conn->c.fd, AE_READABLE, tlsEventHandler, conn); + if (mask & AE_WRITABLE) serverDeleteFileEvent(conn->c.fd, AE_WRITABLE); + if (!(mask & AE_READABLE)) serverCreateFileEvent(conn->c.fd, AE_READABLE, tlsEventHandler, conn); break; case WANT_WRITE: - if (mask & AE_READABLE) aeDeleteFileEvent(server.el, conn->c.fd, AE_READABLE); - if (!(mask & AE_WRITABLE)) aeCreateFileEvent(server.el, conn->c.fd, AE_WRITABLE, tlsEventHandler, conn); + if (mask & AE_READABLE) serverDeleteFileEvent(conn->c.fd, AE_READABLE); + if (!(mask & AE_WRITABLE)) serverCreateFileEvent(conn->c.fd, AE_WRITABLE, tlsEventHandler, conn); break; default: serverAssert(0); break; } @@ -629,13 +629,11 @@ static void updateSSLEvent(tls_connection *conn) { int need_read = conn->c.read_handler || (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ); int need_write = conn->c.write_handler || (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE); - if (need_read && !(mask & AE_READABLE)) - aeCreateFileEvent(server.el, conn->c.fd, AE_READABLE, tlsEventHandler, conn); - if (!need_read && (mask & AE_READABLE)) aeDeleteFileEvent(server.el, conn->c.fd, AE_READABLE); + if (need_read && !(mask & AE_READABLE)) serverCreateFileEvent(conn->c.fd, AE_READABLE, tlsEventHandler, conn); + if (!need_read && (mask & AE_READABLE)) serverDeleteFileEvent(conn->c.fd, AE_READABLE); - if (need_write && !(mask & AE_WRITABLE)) - aeCreateFileEvent(server.el, conn->c.fd, AE_WRITABLE, tlsEventHandler, conn); - if (!need_write && (mask & AE_WRITABLE)) aeDeleteFileEvent(server.el, conn->c.fd, AE_WRITABLE); + if (need_write && !(mask & AE_WRITABLE)) serverCreateFileEvent(conn->c.fd, AE_WRITABLE, tlsEventHandler, conn); + if (!need_write && (mask & AE_WRITABLE)) serverDeleteFileEvent(conn->c.fd, AE_WRITABLE); } static void updateSSLState(connection *conn_) {