Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async IO threads #758

Merged
merged 1 commit into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/daily.yml
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,10 @@ jobs:
run: sudo apt-get install tcl8.6 tclx
- name: test
if: true && !contains(github.event.inputs.skiptests, 'valkey')
run: ./runtest --config io-threads 4 --config io-threads-do-reads yes --accurate --verbose --tags network --dump-logs ${{github.event.inputs.test_args}}
run: ./runtest --config io-threads 2 --config events-per-io-thread 0 --accurate --verbose --tags network --dump-logs ${{github.event.inputs.test_args}}
- name: cluster tests
if: true && !contains(github.event.inputs.skiptests, 'cluster')
run: ./runtest-cluster --config io-threads 4 --config io-threads-do-reads yes ${{github.event.inputs.cluster_test_args}}
run: ./runtest-cluster --config io-threads 2 --config events-per-io-thread 0 ${{github.event.inputs.cluster_test_args}}

test-ubuntu-reclaim-cache:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
4 changes: 2 additions & 2 deletions src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
}

/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop);
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop, numevents);

for (j = 0; j < numevents; j++) {
int fd = eventLoop->fired[j].fd;
Expand Down Expand Up @@ -489,6 +489,6 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep
eventLoop->beforesleep = beforesleep;
}

void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep) {
eventLoop->aftersleep = aftersleep;
}
5 changes: 3 additions & 2 deletions src/ae.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData,
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents);

/* File event structure */
typedef struct aeFileEvent {
Expand Down Expand Up @@ -107,7 +108,7 @@ typedef struct aeEventLoop {
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
aeAfterSleepProc *aftersleep;
int flags;
} aeEventLoop;

Expand All @@ -130,7 +131,7 @@ int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
Expand Down
2 changes: 1 addition & 1 deletion src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void processUnblockedClients(void) {
if (!c->flag.blocked) {
/* If we have a queued command, execute it now. */
if (processPendingCommandAndInputBuffer(c) == C_ERR) {
c = NULL;
continue;
}
}
beforeNextClient(c);
Expand Down
6 changes: 5 additions & 1 deletion src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,9 @@ void loadServerConfigFromString(char *config) {
if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ;
if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ;

/* To ensure backward compatibility when io_threads_num is according to the previous maximum of 128. */
if (server.io_threads_num > IO_THREADS_MAX_NUM) server.io_threads_num = IO_THREADS_MAX_NUM;

sdsfreesplitres(lines, totlines);
reading_config_file = 0;
return;
Expand Down Expand Up @@ -3023,7 +3026,7 @@ standardConfig static_configs[] = {
/* Bool configs */
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL),
createBoolConfig("daemonize", NULL, IMMUTABLE_CONFIG, server.daemonize, 0, NULL, NULL),
createBoolConfig("io-threads-do-reads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, server.io_threads_do_reads, 0, NULL, NULL), /* Read + parse from threads? */
createBoolConfig("io-threads-do-reads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, server.io_threads_do_reads, 1, NULL, NULL), /* Read + parse from threads */
createBoolConfig("always-show-logo", NULL, IMMUTABLE_CONFIG, server.always_show_logo, 0, NULL, NULL),
createBoolConfig("protected-mode", NULL, MODIFIABLE_CONFIG, server.protected_mode, 1, NULL, NULL),
createBoolConfig("rdbcompression", NULL, MODIFIABLE_CONFIG, server.rdb_compression, 1, NULL, NULL),
Expand Down Expand Up @@ -3124,6 +3127,7 @@ standardConfig static_configs[] = {
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */
createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL),
Expand Down
11 changes: 10 additions & 1 deletion src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,15 @@ void setproctitle(const char *fmt, ...);
#error "Undefined or invalid BYTE_ORDER"
#endif

/* Cache line alignment */
#ifndef CACHE_LINE_SIZE
#if defined(__aarch64__) && defined(__APPLE__)
#define CACHE_LINE_SIZE 128
#else
#define CACHE_LINE_SIZE 64
#endif /* __aarch64__ && __APPLE__ */
#endif /* CACHE_LINE_SIZE */

#if (__i386 || __amd64 || __powerpc__) && __GNUC__
#define GNUC_VERSION (__GNUC__ * 10000 + __GNUC_MINOR__ * 100 + __GNUC_PATCHLEVEL__)
#if defined(__clang__)
Expand Down Expand Up @@ -329,7 +338,7 @@ void setcpuaffinity(const char *cpulist);
#define HAVE_FADVISE
#endif

#define IO_THREADS_MAX_NUM 128
#define IO_THREADS_MAX_NUM 16

#ifndef CACHE_LINE_SIZE
#if defined(__aarch64__) && defined(__APPLE__)
Expand Down
18 changes: 18 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ typedef struct ConnectionType {
int (*has_pending_data)(void);
int (*process_pending_data)(void);

/* Postpone update state - with IO threads & TLS we don't want the IO threads to update the event loop events - let
* the main-thread do it */
void (*postpone_update_state)(struct connection *conn, int);
/* Called by the main-thread */
void (*update_state)(struct connection *conn);

/* TLS specified methods */
sds (*get_peer_cert)(struct connection *conn);
} ConnectionType;
Expand Down Expand Up @@ -456,4 +462,16 @@ static inline int connIsTLS(connection *conn) {
return conn && conn->type == connectionTypeTls();
}

static inline void connUpdateState(connection *conn) {
if (conn->type->update_state) {
conn->type->update_state(conn);
}
}

static inline void connSetPostponeUpdateState(connection *conn, int on) {
if (conn->type->postpone_update_state) {
conn->type->postpone_update_state(conn, on);
}
}

#endif /* __REDIS_CONNECTION_H */
2 changes: 2 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "fpconv_dtoa.h"
#include "cluster.h"
#include "threads_mngr.h"
#include "io_threads.h"

#include <arpa/inet.h>
#include <signal.h>
Expand Down Expand Up @@ -2159,6 +2160,7 @@ void removeSigSegvHandlers(void) {
}

void printCrashReport(void) {
server.crashed = 1;
/* Log INFO and CLIENT LIST */
logServerInfo();

Expand Down
2 changes: 1 addition & 1 deletion src/eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ void ldbEndSession(client *c) {

/* If it's a fork()ed session, we just exit. */
if (ldb.forked) {
writeToClient(c, 0);
writeToClient(c);
serverLog(LL_NOTICE, "Lua debugging session child exiting");
exitFromChild(0);
} else {
Expand Down
Loading
Loading