forked from tigerbeetle/tigerbeetle
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfs_io_uring.zig
104 lines (93 loc) · 4.15 KB
/
fs_io_uring.zig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
const std = @import("std");
const builtin = @import("builtin");
const assert = std.debug.assert;
const IO_Uring = std.os.linux.IO_Uring;
const io_uring_cqe = std.os.linux.io_uring_cqe;
/// Using non-blocking io_uring, write a page, fsync the write, then read the page back in.
/// Rinse and repeat to iterate across a large file.
///
/// Note that this is all non-blocking, but with a pure single-threaded event loop, something that
/// is not otherwise possible on Linux for cached I/O.
///
/// There are several io_uring optimizations that we don't take advantage of here:
/// * SQPOLL to eliminate submission syscalls entirely.
/// * Registered file descriptors to eliminate atomic file referencing in the kernel.
/// * Registered buffers to eliminate page mapping in the kernel.
/// * Finegrained submission, at present we simply submit a batch of events then wait for a batch.
/// We could also submit a batch of events, and then submit partial batches as events complete.
pub fn main() !void {
if (builtin.os.tag != .linux) return error.LinuxRequired;
const size: usize = 256 * 1024 * 1024;
const page: usize = 4096;
const runs: usize = 5;
const path = "file_io_uring";
const file = try std.fs.cwd().createFile(path, .{ .read = true, .truncate = true });
defer file.close();
defer std.fs.cwd().deleteFile(path) catch {};
const fd = file.handle;
var buffer_w = [_]u8{1} ** page;
var buffer_r = [_]u8{0} ** page;
const event_w = 1;
const event_f = 2;
const event_r = 3;
var cqes: [512]io_uring_cqe = undefined;
var ring = try IO_Uring.init(cqes.len, 0);
var run: usize = 0;
while (run < runs) : (run += 1) {
var start = std.time.milliTimestamp();
var pages: usize = 0;
var syscalls: usize = 0;
var offset_submitted: usize = 0;
var offset_completed: usize = 0;
event_loop: while (true) {
// Consume groups of completed events:
const count = try ring.copy_cqes(cqes[0..], 0);
var i: usize = 0;
while (i < count) : (i += 1) {
const cqe = cqes[i];
switch (cqe.user_data) {
event_w => assert(cqe.res == page),
event_f => assert(cqe.res == 0),
event_r => {
assert(cqe.res == page);
pages += 1;
offset_completed += page;
if (offset_completed >= size) {
assert(offset_completed == offset_submitted);
break :event_loop;
}
},
else => {
std.debug.print("ERROR {}\n", .{cqe});
std.os.exit(1);
},
}
}
// Enqueue groups of read/fsync/write calls within the event loop:
var events: u32 = 0;
while (offset_submitted < size and events + 3 <= cqes.len) {
var w = try ring.write(event_w, fd, buffer_w[0..], offset_submitted);
w.flags |= std.os.linux.IOSQE_IO_LINK;
var f = try ring.fsync(event_f, fd, 0);
f.flags |= std.os.linux.IOSQE_IO_LINK;
_ = try ring.read(event_r, fd, buffer_r[0..], offset_submitted);
offset_submitted += page;
events += 3;
}
// Up until now, we have only appended to the SQ ring buffer, but without any syscalls.
// Now submit and wait for these groups of events with a single syscall.
// If we used SQPOLL, we wouldn't need this syscall to submit, only to wait, which
// `copy_cqes(N)` also supports. At present, `copy_cqes(0)` above does not wait because
// we do that here using `submit_and_wait(N)`.
_ = try ring.submit_and_wait(events);
syscalls += 1;
}
std.debug.print("fs io_uring: write({})/fsync/read({}) * {} pages = {} syscalls: {}ms\n", .{
page,
page,
pages,
syscalls,
std.time.milliTimestamp() - start,
});
}
}