Skip to content

Commit

Permalink
optimize iouring
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFreeman committed Sep 30, 2024
1 parent 6b78d88 commit 7ed8deb
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 76 deletions.
19 changes: 3 additions & 16 deletions include/swoole_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <queue>

#ifdef SW_USE_IOURING
#include "linux/version.h"
#include <liburing.h>
#endif

Expand Down Expand Up @@ -127,7 +126,7 @@ class AsyncIouring {
uint64_t task_num = 0;
uint64_t entries = 8192;
struct io_uring ring;
std::queue<AsyncEvent *> waitEvents;
std::queue<AsyncEvent *> waiting_tasks;
network::Socket *iou_socket = nullptr;
Reactor *reactor = nullptr;

Expand All @@ -144,18 +143,6 @@ class AsyncIouring {
io_uring_sqe_set_data(sqe, data);
}

inline void *get_iouring_cqe_data(struct io_uring_cqe *cqe) {
return io_uring_cqe_get_data(cqe);
}

inline int get_iouring_cqes(struct io_uring_cqe **cqe_ptr, unsigned count) {
return io_uring_peek_batch_cqe(&ring, cqe_ptr, count);
}

inline void finish_iouring_cqes(unsigned count) {
io_uring_cq_advance(&ring, count);
}

inline bool submit_iouring_sqe() {
return io_uring_submit(&ring);
}
Expand Down Expand Up @@ -193,8 +180,8 @@ class AsyncIouring {
bool unlink(AsyncEvent *event);
bool rename(AsyncEvent *event);
bool fsync(AsyncEvent *event);
inline bool is_empty_wait_events() {
return waitEvents.size() == 0;
inline bool is_empty_waiting_tasks() {
return waiting_tasks.size() == 0;
}

inline uint64_t get_task_num() {
Expand Down
116 changes: 56 additions & 60 deletions src/os/iouring.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,7 @@ AsyncIouring::AsyncIouring(Reactor *reactor_) {
entries = 1 << i;
}

#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 19, 0)
int ret = io_uring_queue_init(entries, &ring, IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SUBMIT_ALL);
#elif LINUX_VERSION_CODE >= KERNEL_VERSION(6, 0, 0)
int ret = io_uring_queue_init(
entries, &ring, IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SUBMIT_ALL | IORING_SETUP_SINGLE_ISSUER);
#else
int ret = io_uring_queue_init(entries, &ring, 0);
#endif

if (ret < 0) {
swoole_warning("create io_uring failed");
throw swoole::Exception(SW_ERROR_WRONG_OPERATION);
Expand All @@ -67,7 +59,7 @@ AsyncIouring::AsyncIouring(Reactor *reactor_) {

reactor->set_exit_condition(Reactor::EXIT_CONDITION_IOURING, [](Reactor *reactor, size_t &event_num) -> bool {
if (SwooleTG.async_iouring && SwooleTG.async_iouring->get_task_num() == 0 &&
SwooleTG.async_iouring->is_empty_wait_events()) {
SwooleTG.async_iouring->is_empty_waiting_tasks()) {
event_num--;
}
return true;
Expand Down Expand Up @@ -104,74 +96,78 @@ void AsyncIouring::delete_event() {
}

bool AsyncIouring::wakeup() {
unsigned num = 8192;
struct io_uring_cqe *cqes[num];
size_t cqes_size = num * sizeof(struct io_uring_cqe *);
unsigned count = 0;

unsigned i = 0;
unsigned num = 8192;
void *data = nullptr;
AsyncEvent *task = nullptr;
AsyncEvent *waiting_task = nullptr;
struct io_uring_cqe *cqe = nullptr;
AsyncEvent *waitEvent = nullptr;
struct io_uring_cqe *cqes[num];

while (true) {
memset(cqes, 0, cqes_size);
count = get_iouring_cqes(cqes, num);
count = io_uring_peek_batch_cqe(&ring, cqes, num);
if (count == 0) {
return true;
}

for (i = 0; i < count; i++) {
for (unsigned i = 0; i < count; i++) {
cqe = cqes[i];
data = get_iouring_cqe_data(cqe);
data = io_uring_cqe_get_data(cqe);
task = reinterpret_cast<AsyncEvent *>(data);
task->retval = (cqe->res >= 0 ? cqe->res : -1);
task_num--;
if (cqe->res < 0) {
errno = abs(cqe->res);
errno = -(cqe->res);
/**
* If the error code is EAGAIN, it indicates that the resource is temporarily unavailable,
* but it can be retried. However, for the fairness of the tasks, this task should be placed
* at the end of the queue.
*/
if (cqe->res == -EAGAIN) {
io_uring_cq_advance(&ring, 1);
waiting_tasks.push(task);
continue;
}
}

task_num--;

if (is_empty_wait_events()) {
task->callback(task);
continue;
}
task->retval = (cqe->res >= 0 ? cqe->res : -1);
io_uring_cq_advance(&ring, 1);
task->callback(task);

waitEvent = waitEvents.front();
waitEvents.pop();
if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_OPENAT) {
open(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_CLOSE) {
close(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_FSTAT ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_LSTAT) {
statx(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_READ ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_WRITE) {
wr(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_RENAMEAT) {
rename(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_UNLINK_FILE ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_UNLINK_DIR) {
unlink(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_MKDIRAT) {
mkdir(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_FSYNC ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_FDATASYNC) {
fsync(waitEvent);
if (!is_empty_waiting_tasks()) {
waiting_task = waiting_tasks.front();
waiting_tasks.pop();
if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_OPENAT) {
open(waiting_task);
} else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_CLOSE) {
close(waiting_task);
} else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_FSTAT ||
waiting_task->opcode == AsyncIouring::SW_IORING_OP_LSTAT) {
statx(waiting_task);
} else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_READ ||
waiting_task->opcode == AsyncIouring::SW_IORING_OP_WRITE) {
wr(waiting_task);
} else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_RENAMEAT) {
rename(waiting_task);
} else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_UNLINK_FILE ||
waiting_task->opcode == AsyncIouring::SW_IORING_OP_UNLINK_DIR) {
unlink(waiting_task);
} else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_MKDIRAT) {
mkdir(waiting_task);
} else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_FSYNC ||
waiting_task->opcode == AsyncIouring::SW_IORING_OP_FDATASYNC) {
fsync(waiting_task);
}
}

task->callback(task);
}
finish_iouring_cqes(count);
}

return true;
}

bool AsyncIouring::open(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand All @@ -195,7 +191,7 @@ bool AsyncIouring::open(AsyncEvent *event) {
bool AsyncIouring::close(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand All @@ -216,7 +212,7 @@ bool AsyncIouring::close(AsyncEvent *event) {
bool AsyncIouring::wr(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand All @@ -240,7 +236,7 @@ bool AsyncIouring::wr(AsyncEvent *event) {
bool AsyncIouring::statx(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand Down Expand Up @@ -271,7 +267,7 @@ bool AsyncIouring::statx(AsyncEvent *event) {
bool AsyncIouring::mkdir(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand All @@ -293,7 +289,7 @@ bool AsyncIouring::mkdir(AsyncEvent *event) {
bool AsyncIouring::unlink(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand All @@ -318,7 +314,7 @@ bool AsyncIouring::unlink(AsyncEvent *event) {
bool AsyncIouring::rename(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand All @@ -342,7 +338,7 @@ bool AsyncIouring::rename(AsyncEvent *event) {
bool AsyncIouring::fsync(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand Down

0 comments on commit 7ed8deb

Please sign in to comment.