From 249585f2fcd8ea5096fe1458619715ccd57aba4e Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 13 May 2021 16:18:38 +1200 Subject: [PATCH] Directly enumerate the completion queue. --- ext/event/backend/uring.c | 81 ++++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 39 deletions(-) diff --git a/ext/event/backend/uring.c b/ext/event/backend/uring.c index 5004e26d..17932b2f 100644 --- a/ext/event/backend/uring.c +++ b/ext/event/backend/uring.c @@ -325,8 +325,7 @@ int timeout_nonblocking(struct __kernel_timespec *timespec) { struct select_arguments { struct Event_Backend_URing *data; - int count; - struct io_uring_cqe **cqes; + int result; struct __kernel_timespec storage; struct __kernel_timespec *timeout; @@ -338,17 +337,8 @@ void * select_internal(void *_arguments) { io_uring_submit(&arguments->data->ring); - int result = io_uring_wait_cqes(&arguments->data->ring, arguments->cqes, 1, arguments->timeout, NULL); - - if (result == -ETIME) { - // If waiting resulted in a timeout, there are 0 events. - arguments->count = 0; - } else if (result == 0) { - // Otherwise, there was no error, at least 1 event was reported. So we ask for them all. - arguments->count = io_uring_peek_batch_cqe(&arguments->data->ring, arguments->cqes, URING_MAX_EVENTS); - } else { - arguments->count = result; - } + struct io_uring_cqe *cqe = NULL; + arguments->result = io_uring_wait_cqe_timeout(&arguments->data->ring, &cqe, arguments->timeout); return NULL; } @@ -357,21 +347,52 @@ static int select_internal_without_gvl(struct select_arguments *arguments) { rb_thread_call_without_gvl(select_internal, (void *)arguments, RUBY_UBF_IO, 0); - if (arguments->count < 0) { - rb_syserr_fail(-arguments->count, "select_internal_without_gvl:io_uring_wait_cqes"); + if (arguments->result == -ETIME) { + arguments->result = 0; + } else if (arguments->result < 0) { + rb_syserr_fail(-arguments->result, "select_internal_without_gvl:io_uring_wait_cqes"); + } else { + // At least 1 event is waiting: + arguments->result = 1; + } + + return arguments->result; +} + +static inline +unsigned select_process_completions(struct io_uring *ring) { + unsigned completed = 0; + unsigned head; + struct io_uring_cqe *cqe; + + io_uring_for_each_cqe(ring, head, cqe) { + ++completed; + + // If the operation was cancelled, or the operation has no user data (fiber): + if (cqe->res == -ECANCELED || cqe->user_data == 0) { + continue; + } + + VALUE fiber = (VALUE)cqe->user_data; + VALUE result = INT2NUM(cqe->res); + + // fprintf(stderr, "cqes[i] res=%d user_data=%p\n", cqes[i]->res, (void*)cqes[i]->user_data); + + Event_Backend_resume_safe(fiber, result); + } + + if (completed) { + io_uring_cq_advance(ring, completed); } - return arguments->count; + return completed; } VALUE Event_Backend_URing_select(VALUE self, VALUE duration) { struct Event_Backend_URing *data = NULL; TypedData_Get_Struct(self, struct Event_Backend_URing, &Event_Backend_URing_Type, data); - struct io_uring_cqe *cqes[URING_MAX_EVENTS]; - - // This is a non-blocking operation: - int result = io_uring_peek_batch_cqe(&data->ring, cqes, URING_MAX_EVENTS); + int result = select_process_completions(&data->ring); if (result < 0) { rb_syserr_fail(-result, strerror(-result)); @@ -379,7 +400,6 @@ VALUE Event_Backend_URing_select(VALUE self, VALUE duration) { // We might need to wait for events: struct select_arguments arguments = { .data = data, - .cqes = cqes, .timeout = NULL, }; @@ -389,27 +409,10 @@ VALUE Event_Backend_URing_select(VALUE self, VALUE duration) { result = select_internal_without_gvl(&arguments); } else { io_uring_submit(&data->ring); - result = io_uring_peek_batch_cqe(&data->ring, cqes, URING_MAX_EVENTS); } } - // fprintf(stderr, "cqes count=%d\n", result); - - for (int i = 0; i < result; i += 1) { - // If the operation was cancelled, or the operation has no user data (fiber): - if (cqes[i]->res == -ECANCELED || cqes[i]->user_data == 0) { - continue; - } - - VALUE fiber = (VALUE)io_uring_cqe_get_data(cqes[i]); - VALUE result = INT2NUM(cqes[i]->res); - - // fprintf(stderr, "cqes[i] res=%d user_data=%p\n", cqes[i]->res, (void*)cqes[i]->user_data); - - io_uring_cqe_seen(&data->ring, cqes[i]); - - Event_Backend_resume_safe(fiber, result); - } + result = select_process_completions(&data->ring); return INT2NUM(result); }