Skip to content

Commit

Permalink
NAS-130090 / add backpressure mechanism io_uring (#406)
Browse files Browse the repository at this point in the history
In some edge cases where storage backend can't keep up with client
writes, io_uring / samba will allow queueing up writes beyond what
is reasonable leading to OOM errors in some extreme edge cases.

This commit keeps a counter of outstanding write requests per-TCON
and switches to pwrite(2) once the queue depth limit is reached.
This effectively applies backpressure to client and prevents excessive
memory consumption. A total count of synchronous writes performed is
logged when TCON is disconnected if log level for the VFS module is
sufficiently high (3 or higher). To facilate easier inspection of
io_uring behavior, a custom debug class for the module has been added.
  • Loading branch information
anodos325 authored Aug 24, 2024
1 parent aa643d5 commit 8cb5606
Showing 1 changed file with 109 additions and 3 deletions.
112 changes: 109 additions & 3 deletions source3/modules/vfs_io_uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ struct open_how;
#define IO_URING_ASYNC_WRITE 0x02
#define IO_URING_ASYNC_FSYNC 0x04

#define VFS_URING_WRITEQ_DEFAULT 10

static int vfs_io_uring_debug_level = DBGC_VFS;

#undef DBGC_CLASS
#define DBGC_CLASS vfs_io_uring_debug_level

struct vfs_io_uring_request;

struct vfs_io_uring_config {
Expand All @@ -64,6 +71,10 @@ struct vfs_io_uring_config {
int async_ops;
struct vfs_io_uring_request *queue;
struct vfs_io_uring_request *pending;
int uring_write_queue_sz;
uint uring_write_op_cnt;
uint sync_write_cnt;
uint async_write_cnt;
};

struct vfs_io_uring_request {
Expand Down Expand Up @@ -238,6 +249,19 @@ static int vfs_io_uring_connect(vfs_handle_struct *handle, const char *service,
config->async_ops |= IO_URING_ASYNC_WRITE;
}

config->uring_write_queue_sz = lp_parm_int(SNUM(handle->conn),
"io_uring",
"write_queue_sz",
VFS_URING_WRITEQ_DEFAULT);
if (config->uring_write_queue_sz <= 0) {
DBG_ERR("%d: write_queue_sz parameter must be greater than 0. "
"setting to default of %d\n",
config->uring_write_queue_sz,
VFS_URING_WRITEQ_DEFAULT);
config->uring_write_queue_sz = VFS_URING_WRITEQ_DEFAULT;
}


ret = io_uring_queue_init(num_entries, &config->uring, flags);
if (ret < 0) {
SMB_VFS_NEXT_DISCONNECT(handle);
Expand Down Expand Up @@ -611,6 +635,7 @@ struct vfs_io_uring_pwrite_state {
struct iovec iov;
size_t nwritten;
struct vfs_io_uring_request ur;
bool is_sync_write;
};

static void vfs_io_uring_pwrite_submit(struct vfs_io_uring_pwrite_state *state);
Expand Down Expand Up @@ -646,6 +671,32 @@ static struct tevent_req *vfs_io_uring_pwrite_send(struct vfs_handle_struct *han
if (req == NULL) {
return NULL;
}

/*
* Apply backpressure to client by performing synchronous write
*
*/
if (config->uring_write_op_cnt > config->uring_write_queue_sz) {
ok = sys_valid_io_range(offset, n);
if (!ok) {
tevent_req_error(req, EINVAL);
return tevent_req_post(req, ev);
}

config->sync_write_cnt++;
state->is_sync_write = true;
state->nwritten = pwrite(fsp_get_io_fd(fsp), data, n, offset);
if (state->nwritten == -1) {
DBG_ERR("%s: write to file failed with error: %s\n",
fsp_str_dbg(fsp), strerror(errno));
tevent_req_error(req, errno);
return tevent_req_post(req, ev);
}

tevent_req_done(req);
return tevent_req_post(req, ev);
}

if (config->async_ops & IO_URING_ASYNC_WRITE) {
state->ur.sqe_flags |= IOSQE_ASYNC;
}
Expand All @@ -654,6 +705,11 @@ static struct tevent_req *vfs_io_uring_pwrite_send(struct vfs_handle_struct *han
state->ur.completion_fn = vfs_io_uring_pwrite_completion;
state->ur.destructor_fn = vfs_io_uring_pwrite_destructor;

// Increment because at this point we'll hit the receive_fn
// which decrements the op_cnt
config->uring_write_op_cnt++;
config->async_write_cnt++;

SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pwrite, profile_p,
state->ur.profile_bytes, n);
SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->ur.profile_bytes);
Expand Down Expand Up @@ -733,7 +789,6 @@ static void vfs_io_uring_pwrite_completion(struct vfs_io_uring_request *cur,
tevent_req_done(cur->req);
return;
}

/*
* sys_valid_io_range() already checked the boundaries
* now try to write the rest.
Expand All @@ -749,10 +804,27 @@ static ssize_t vfs_io_uring_pwrite_recv(struct tevent_req *req,
req, struct vfs_io_uring_pwrite_state);
ssize_t ret;

if (state->is_sync_write) {
if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
tevent_req_received(req);
return -1;
}

vfs_aio_state->error = 0;
ret = state->nwritten;

tevent_req_received(req);
return ret;
}

SMBPROFILE_BYTES_ASYNC_END(state->ur.profile_bytes);
vfs_aio_state->duration = nsec_time_diff(&state->ur.end_time,
&state->ur.start_time);

SMB_ASSERT(state->ur.config != NULL);
SMB_ASSERT(state->ur.config->uring_write_op_cnt > 0);
state->ur.config->uring_write_op_cnt--;

if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
tevent_req_received(req);
return -1;
Expand Down Expand Up @@ -869,8 +941,25 @@ static int vfs_io_uring_fsync_recv(struct tevent_req *req,
return 0;
}

static void vfs_io_uring_disconnect(vfs_handle_struct *handle)
{
struct vfs_io_uring_config *config = NULL;

SMB_VFS_HANDLE_GET_DATA(handle, config,
struct vfs_io_uring_config,
smb_panic(__location__));

if (config->sync_write_cnt) {
DBG_NOTICE("Performed %u synchronous writes and "
"%u async writes.\n",
config->sync_write_cnt,
config->async_write_cnt);
}
}

static struct vfs_fn_pointers vfs_io_uring_fns = {
.connect_fn = vfs_io_uring_connect,
.disconnect_fn = vfs_io_uring_disconnect,
.pread_send_fn = vfs_io_uring_pread_send,
.pread_recv_fn = vfs_io_uring_pread_recv,
.pwrite_send_fn = vfs_io_uring_pwrite_send,
Expand All @@ -882,6 +971,23 @@ static struct vfs_fn_pointers vfs_io_uring_fns = {
static_decl_vfs;
NTSTATUS vfs_io_uring_init(TALLOC_CTX *ctx)
{
return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
"io_uring", &vfs_io_uring_fns);
NTSTATUS status;

status = smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
"io_uring", &vfs_io_uring_fns);
if (!NT_STATUS_IS_OK(status)) {
return status;
}

vfs_io_uring_debug_level = debug_add_class("io_uring");
if (vfs_io_uring_debug_level == -1) {
vfs_io_uring_debug_level = DBGC_VFS;
DBG_ERR("%s: Couldn't register custom debugging class!\n",
"vfs_io_uring_init");
} else {
DBG_DEBUG("%s: Debug class number of io_uring: %d\n",
"vfs_io_uring_init", vfs_io_uring_debug_level);
}

return status;
}

0 comments on commit 8cb5606

Please sign in to comment.