Skip to content

Commit

Permalink
Directly enumerate the completion queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed May 13, 2021
1 parent 84476cb commit 249585f
Showing 1 changed file with 42 additions and 39 deletions.
81 changes: 42 additions & 39 deletions ext/event/backend/uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,7 @@ int timeout_nonblocking(struct __kernel_timespec *timespec) {
struct select_arguments {
struct Event_Backend_URing *data;

int count;
struct io_uring_cqe **cqes;
int result;

struct __kernel_timespec storage;
struct __kernel_timespec *timeout;
Expand All @@ -338,17 +337,8 @@ void * select_internal(void *_arguments) {

io_uring_submit(&arguments->data->ring);

int result = io_uring_wait_cqes(&arguments->data->ring, arguments->cqes, 1, arguments->timeout, NULL);

if (result == -ETIME) {
// If waiting resulted in a timeout, there are 0 events.
arguments->count = 0;
} else if (result == 0) {
// Otherwise, there was no error, at least 1 event was reported. So we ask for them all.
arguments->count = io_uring_peek_batch_cqe(&arguments->data->ring, arguments->cqes, URING_MAX_EVENTS);
} else {
arguments->count = result;
}
struct io_uring_cqe *cqe = NULL;
arguments->result = io_uring_wait_cqe_timeout(&arguments->data->ring, &cqe, arguments->timeout);

return NULL;
}
Expand All @@ -357,29 +347,59 @@ static
int select_internal_without_gvl(struct select_arguments *arguments) {
rb_thread_call_without_gvl(select_internal, (void *)arguments, RUBY_UBF_IO, 0);

if (arguments->count < 0) {
rb_syserr_fail(-arguments->count, "select_internal_without_gvl:io_uring_wait_cqes");
if (arguments->result == -ETIME) {
arguments->result = 0;
} else if (arguments->result < 0) {
rb_syserr_fail(-arguments->result, "select_internal_without_gvl:io_uring_wait_cqes");
} else {
// At least 1 event is waiting:
arguments->result = 1;
}

return arguments->result;
}

static inline
unsigned select_process_completions(struct io_uring *ring) {
unsigned completed = 0;
unsigned head;
struct io_uring_cqe *cqe;

io_uring_for_each_cqe(ring, head, cqe) {
++completed;

// If the operation was cancelled, or the operation has no user data (fiber):
if (cqe->res == -ECANCELED || cqe->user_data == 0) {
continue;
}

VALUE fiber = (VALUE)cqe->user_data;
VALUE result = INT2NUM(cqe->res);

// fprintf(stderr, "cqes[i] res=%d user_data=%p\n", cqes[i]->res, (void*)cqes[i]->user_data);

Event_Backend_resume_safe(fiber, result);
}

if (completed) {
io_uring_cq_advance(ring, completed);
}

return arguments->count;
return completed;
}

VALUE Event_Backend_URing_select(VALUE self, VALUE duration) {
struct Event_Backend_URing *data = NULL;
TypedData_Get_Struct(self, struct Event_Backend_URing, &Event_Backend_URing_Type, data);

struct io_uring_cqe *cqes[URING_MAX_EVENTS];

// This is a non-blocking operation:
int result = io_uring_peek_batch_cqe(&data->ring, cqes, URING_MAX_EVENTS);
int result = select_process_completions(&data->ring);

if (result < 0) {
rb_syserr_fail(-result, strerror(-result));
} else if (result == 0) {
// We might need to wait for events:
struct select_arguments arguments = {
.data = data,
.cqes = cqes,
.timeout = NULL,
};

Expand All @@ -389,27 +409,10 @@ VALUE Event_Backend_URing_select(VALUE self, VALUE duration) {
result = select_internal_without_gvl(&arguments);
} else {
io_uring_submit(&data->ring);
result = io_uring_peek_batch_cqe(&data->ring, cqes, URING_MAX_EVENTS);
}
}

// fprintf(stderr, "cqes count=%d\n", result);

for (int i = 0; i < result; i += 1) {
// If the operation was cancelled, or the operation has no user data (fiber):
if (cqes[i]->res == -ECANCELED || cqes[i]->user_data == 0) {
continue;
}

VALUE fiber = (VALUE)io_uring_cqe_get_data(cqes[i]);
VALUE result = INT2NUM(cqes[i]->res);

// fprintf(stderr, "cqes[i] res=%d user_data=%p\n", cqes[i]->res, (void*)cqes[i]->user_data);

io_uring_cqe_seen(&data->ring, cqes[i]);

Event_Backend_resume_safe(fiber, result);
}
result = select_process_completions(&data->ring);

return INT2NUM(result);
}
Expand Down

0 comments on commit 249585f

Please sign in to comment.