-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathsocket.zig
91 lines (70 loc) · 2.47 KB
/
socket.zig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
const std = @import("std");
const pike = @import("pike");
const io = @import("io.zig");
const sync = @import("sync.zig");
const net = std.net;
const meta = std.meta;
pub const Side = packed enum(u1) {
client,
server,
};
pub const Options = struct {
max_connections_per_client: usize = 16,
max_connections_per_server: usize = 128,
protocol_type: type = void,
message_type: type = []const u8,
context_type: type = void,
write_queue_size: usize = 128,
read_buffer_size: usize = 4 * 1024 * 1024,
write_buffer_size: usize = 4 * 1024 * 1024,
};
pub fn yield() void {
suspend {
var task = pike.Task.init(@frame());
pike.dispatch(&task, .{ .use_lifo = true });
}
}
pub fn Socket(comptime side: Side, comptime opts: Options) type {
return struct {
const Self = @This();
pub const Reader = io.Reader(pike.Socket, opts.read_buffer_size);
pub const Writer = io.Writer(pike.Socket, opts.write_buffer_size);
const WriteQueue = sync.Queue(opts.message_type, opts.write_queue_size);
const Protocol = opts.protocol_type;
const Context = opts.context_type;
inner: pike.Socket,
address: net.Address,
context: Context = undefined,
write_queue: WriteQueue = .{},
pub fn init(inner: pike.Socket, address: net.Address) Self {
return Self{ .inner = inner, .address = address };
}
pub fn deinit(self: *Self) void {
self.inner.deinit();
}
pub inline fn unwrap(self: *Self) *pike.Socket {
return &self.inner;
}
pub fn write(self: *Self, message: opts.message_type) !void {
try self.write_queue.push(message);
}
pub fn run(self: *Self, protocol: Protocol) !void {
var reader = Reader.init(self.unwrap());
var writer = async self.runWriter(protocol);
defer {
self.write_queue.close();
await writer catch {};
}
yield();
try protocol.read(side, self, &reader);
}
fn runWriter(self: *Self, protocol: Protocol) !void {
var writer = Writer.init(self.unwrap());
var queue: @TypeOf(self.write_queue.items) = undefined;
while (true) {
const num_items = try self.write_queue.pop(queue[0..]);
try protocol.write(side, self, &writer, queue[0..num_items]);
}
}
};
}