Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[6.0][WIP]optimize iouring #5499

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 3 additions & 16 deletions include/swoole_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <queue>

#ifdef SW_USE_IOURING
#include "linux/version.h"
#include <liburing.h>
#endif

Expand Down Expand Up @@ -127,7 +126,7 @@ class AsyncIouring {
uint64_t task_num = 0;
uint64_t entries = 8192;
struct io_uring ring;
std::queue<AsyncEvent *> waitEvents;
std::queue<AsyncEvent *> waiting_tasks;
network::Socket *iou_socket = nullptr;
Reactor *reactor = nullptr;

Expand All @@ -144,18 +143,6 @@ class AsyncIouring {
io_uring_sqe_set_data(sqe, data);
}

inline void *get_iouring_cqe_data(struct io_uring_cqe *cqe) {
return io_uring_cqe_get_data(cqe);
}

inline int get_iouring_cqes(struct io_uring_cqe **cqe_ptr, unsigned count) {
return io_uring_peek_batch_cqe(&ring, cqe_ptr, count);
}

inline void finish_iouring_cqes(unsigned count) {
io_uring_cq_advance(&ring, count);
}

inline bool submit_iouring_sqe() {
return io_uring_submit(&ring);
}
Expand Down Expand Up @@ -193,8 +180,8 @@ class AsyncIouring {
bool unlink(AsyncEvent *event);
bool rename(AsyncEvent *event);
bool fsync(AsyncEvent *event);
inline bool is_empty_wait_events() {
return waitEvents.size() == 0;
inline bool is_empty_waiting_tasks() {
return waiting_tasks.size() == 0;
}

inline uint64_t get_task_num() {
Expand Down
116 changes: 56 additions & 60 deletions src/os/iouring.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,7 @@ AsyncIouring::AsyncIouring(Reactor *reactor_) {
entries = 1 << i;
}

#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 19, 0)
int ret = io_uring_queue_init(entries, &ring, IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SUBMIT_ALL);
#elif LINUX_VERSION_CODE >= KERNEL_VERSION(6, 0, 0)
int ret = io_uring_queue_init(
entries, &ring, IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SUBMIT_ALL | IORING_SETUP_SINGLE_ISSUER);
#else
int ret = io_uring_queue_init(entries, &ring, 0);
#endif

if (ret < 0) {
swoole_warning("create io_uring failed");
throw swoole::Exception(SW_ERROR_WRONG_OPERATION);
Expand All @@ -67,7 +59,7 @@ AsyncIouring::AsyncIouring(Reactor *reactor_) {

reactor->set_exit_condition(Reactor::EXIT_CONDITION_IOURING, [](Reactor *reactor, size_t &event_num) -> bool {
if (SwooleTG.async_iouring && SwooleTG.async_iouring->get_task_num() == 0 &&
SwooleTG.async_iouring->is_empty_wait_events()) {
SwooleTG.async_iouring->is_empty_waiting_tasks()) {
event_num--;
}
return true;
Expand Down Expand Up @@ -104,74 +96,78 @@ void AsyncIouring::delete_event() {
}

bool AsyncIouring::wakeup() {
unsigned num = 8192;
struct io_uring_cqe *cqes[num];
size_t cqes_size = num * sizeof(struct io_uring_cqe *);
unsigned count = 0;

unsigned i = 0;
unsigned num = 8192;
void *data = nullptr;
AsyncEvent *task = nullptr;
AsyncEvent *waiting_task = nullptr;
struct io_uring_cqe *cqe = nullptr;
AsyncEvent *waitEvent = nullptr;
struct io_uring_cqe *cqes[num];

while (true) {
memset(cqes, 0, cqes_size);
count = get_iouring_cqes(cqes, num);
count = io_uring_peek_batch_cqe(&ring, cqes, num);
if (count == 0) {
return true;
}

for (i = 0; i < count; i++) {
for (unsigned i = 0; i < count; i++) {
cqe = cqes[i];
data = get_iouring_cqe_data(cqe);
data = io_uring_cqe_get_data(cqe);
task = reinterpret_cast<AsyncEvent *>(data);
task->retval = (cqe->res >= 0 ? cqe->res : -1);
task_num--;
if (cqe->res < 0) {
errno = abs(cqe->res);
errno = -(cqe->res);
/**
* If the error code is EAGAIN, it indicates that the resource is temporarily unavailable,
* but it can be retried. However, for the fairness of the tasks, this task should be placed
* at the end of the queue.
*/
if (cqe->res == -EAGAIN) {
io_uring_cq_advance(&ring, 1);
waiting_tasks.push(task);
continue;
}
}

task_num--;

if (is_empty_wait_events()) {
task->callback(task);
continue;
}
task->retval = (cqe->res >= 0 ? cqe->res : -1);
io_uring_cq_advance(&ring, 1);
task->callback(task);

waitEvent = waitEvents.front();
waitEvents.pop();
if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_OPENAT) {
open(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_CLOSE) {
close(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_FSTAT ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_LSTAT) {
statx(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_READ ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_WRITE) {
wr(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_RENAMEAT) {
rename(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_UNLINK_FILE ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_UNLINK_DIR) {
unlink(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_MKDIRAT) {
mkdir(waitEvent);
} else if (waitEvent->opcode == AsyncIouring::SW_IORING_OP_FSYNC ||
waitEvent->opcode == AsyncIouring::SW_IORING_OP_FDATASYNC) {
fsync(waitEvent);
if (!is_empty_waiting_tasks()) {
waiting_task = waiting_tasks.front();
waiting_tasks.pop();
if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_OPENAT) {
open(waiting_task);
} else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_CLOSE) {
close(waiting_task);
} else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_FSTAT ||
waiting_task->opcode == AsyncIouring::SW_IORING_OP_LSTAT) {
statx(waiting_task);
} else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_READ ||
waiting_task->opcode == AsyncIouring::SW_IORING_OP_WRITE) {
wr(waiting_task);
} else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_RENAMEAT) {
rename(waiting_task);
} else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_UNLINK_FILE ||
waiting_task->opcode == AsyncIouring::SW_IORING_OP_UNLINK_DIR) {
unlink(waiting_task);
} else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_MKDIRAT) {
mkdir(waiting_task);
} else if (waiting_task->opcode == AsyncIouring::SW_IORING_OP_FSYNC ||
waiting_task->opcode == AsyncIouring::SW_IORING_OP_FDATASYNC) {
fsync(waiting_task);
}
}

task->callback(task);
}
finish_iouring_cqes(count);
}

return true;
}

bool AsyncIouring::open(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand All @@ -195,7 +191,7 @@ bool AsyncIouring::open(AsyncEvent *event) {
bool AsyncIouring::close(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand All @@ -216,7 +212,7 @@ bool AsyncIouring::close(AsyncEvent *event) {
bool AsyncIouring::wr(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand All @@ -240,7 +236,7 @@ bool AsyncIouring::wr(AsyncEvent *event) {
bool AsyncIouring::statx(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand Down Expand Up @@ -271,7 +267,7 @@ bool AsyncIouring::statx(AsyncEvent *event) {
bool AsyncIouring::mkdir(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand All @@ -293,7 +289,7 @@ bool AsyncIouring::mkdir(AsyncEvent *event) {
bool AsyncIouring::unlink(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand All @@ -318,7 +314,7 @@ bool AsyncIouring::unlink(AsyncEvent *event) {
bool AsyncIouring::rename(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand All @@ -342,7 +338,7 @@ bool AsyncIouring::rename(AsyncEvent *event) {
bool AsyncIouring::fsync(AsyncEvent *event) {
struct io_uring_sqe *sqe = get_iouring_sqe();
if (!sqe) {
waitEvents.push(event);
waiting_tasks.push(event);
return true;
}

Expand Down
Loading