Skip to content

Commit

Permalink
Updated uring implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Jun 12, 2021
1 parent 314d994 commit 8483fd2
Showing 1 changed file with 60 additions and 41 deletions.
101 changes: 60 additions & 41 deletions ext/event/backend/uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include <sys/types.h>
#include <errno.h>

#include "pidfd.c"

static VALUE Event_Backend_URing = Qnil;
static ID id_fileno;

Expand Down Expand Up @@ -114,6 +116,61 @@ VALUE Event_Backend_URing_close(VALUE self) {
return Qnil;
}

struct io_uring_sqe * io_get_sqe(struct Event_Backend_URing *data) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&data->ring);

while (sqe == NULL) {
sqe = io_uring_get_sqe(&data->ring);
}

return sqe;
}

struct process_wait_arguments {
struct Event_Backend_URing *data;
pid_t pid;
int flags;
int descriptor;
};

static
VALUE process_wait_transfer(VALUE _arguments) {
struct process_wait_arguments *arguments = (struct process_wait_arguments *)_arguments;

Event_Backend_transfer(arguments->data->loop);

return Event_Backend_process_status_wait(arguments->pid);
}

static
VALUE process_wait_ensure(VALUE _arguments) {
struct process_wait_arguments *arguments = (struct process_wait_arguments *)_arguments;

close(arguments->descriptor);
}

VALUE Event_Backend_URing_process_wait(VALUE self, VALUE fiber, VALUE pid, VALUE flags) {
struct Event_Backend_URing *data = NULL;
TypedData_Get_Struct(self, struct Event_Backend_URing, &Event_Backend_URing_Type, data);

struct process_wait_arguments process_wait_arguments = {
.data = data,
.pid = NUM2PIDT(pid),
.flags = NUM2INT(flags),
};

process_wait_arguments.descriptor = pidfd_open(process_wait_arguments.pid, 0);
rb_update_max_fd(process_wait_arguments.descriptor);

struct io_uring_sqe *sqe = io_get_sqe(data);
assert(sqe);

io_uring_prep_poll_add(sqe, descriptor, POLLIN|POLLHUP|POLLERR);
io_uring_sqe_set_data(sqe, (void*)fiber);

return rb_ensure(process_wait_transfer, (VALUE)&process_wait_arguments, process_wait_ensure, (VALUE)&process_wait_arguments)
}

static inline
short poll_flags_from_events(int events) {
short flags = 0;
Expand Down Expand Up @@ -145,22 +202,13 @@ struct io_wait_arguments {
short flags;
};

struct io_uring_sqe * io_get_sqe(struct Event_Backend_URing *data) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&data->ring);

while (sqe == NULL) {
sqe = io_uring_get_sqe(&data->ring);
}

return sqe;
}

static
VALUE io_wait_rescue(VALUE _arguments, VALUE exception) {
struct io_wait_arguments *arguments = (struct io_wait_arguments *)_arguments;
struct Event_Backend_URing *data = arguments->data;

struct io_uring_sqe *sqe = io_get_sqe(data);
assert(sqe);

// fprintf(stderr, "poll_remove(%p, %p)\n", sqe, (void*)arguments->fiber);

Expand Down Expand Up @@ -190,8 +238,7 @@ VALUE Event_Backend_URing_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE event

int descriptor = NUM2INT(rb_funcall(io, id_fileno, 0));
struct io_uring_sqe *sqe = io_get_sqe(data);

if (!sqe) return INT2NUM(0);
assert(sqe);

short flags = poll_flags_from_events(NUM2INT(events));

Expand Down Expand Up @@ -242,6 +289,7 @@ VALUE Event_Backend_URing_io_read(VALUE self, VALUE fiber, VALUE io, VALUE buffe

int descriptor = NUM2INT(rb_funcall(io, id_fileno, 0));
struct io_uring_sqe *sqe = io_get_sqe(data);
assert(sqe);

struct iovec iovecs[1];
iovecs[0].iov_base = RSTRING_PTR(buffer) + NUM2SIZET(offset);
Expand Down Expand Up @@ -420,35 +468,6 @@ VALUE Event_Backend_URing_select(VALUE self, VALUE duration) {
return INT2NUM(result);
}

VALUE Event_Backend_URing_process_wait(VALUE self, VALUE fiber, VALUE pid, VALUE flags) {
pid_t pidv = NUM2PIDT(pid);
int options = NUM2INT(flags);
int state = 0;
int err = 0;

if ((flags & WNOHANG) > 0) {
// WNOHANG is nonblock by default.
pid_t ret = PIDT2NUM(waitpid(pidv, &state, options));
if (ret == -1) err = errno;
return Event_Backend_process_status(pidv, state, err);
}

struct Event_Backend_URing *data = NULL;
TypedData_Get_Struct(self, struct Event_Backend_URing, &Event_Backend_URing_Type, data);
struct io_uring_sqe *sqe = io_uring_get_sqe(&data->ring);

int descriptor = pidfd_open(pidv, 0);
short poll_flags = POLLIN | POLLRDNORM;
io_uring_prep_poll_add(sqe, descriptor, poll_flags);
io_uring_sqe_set_data(sqe, (void*)fiber);
io_uring_submit(&data->ring);

Event_Backend_transfer(data->loop);
pid_t ret = PIDT2NUM(waitpid(pidv, &state, options));
if (ret == -1) err = errno;
return Event_Backend_process_status(pidv, state, err);
}

void Init_Event_Backend_URing(VALUE Event_Backend) {
id_fileno = rb_intern("fileno");

Expand Down

0 comments on commit 8483fd2

Please sign in to comment.