Skip to content

Commit

Permalink
implement correct buffer wrapping logic in std.event.Channel
Browse files Browse the repository at this point in the history
  • Loading branch information
qbradley authored and andrewrk committed Nov 28, 2019
1 parent ca61a5f commit a6c9c5f
Showing 1 changed file with 30 additions and 3 deletions.
33 changes: 30 additions & 3 deletions lib/std/event/channel.zig
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ pub fn Channel(comptime T: type) type {
/// For a zero length buffer, use `[0]T{}`.
/// TODO https://github.com/ziglang/zig/issues/2765
pub fn init(self: *SelfChannel, buffer: []T) void {
// The ring buffer implementation only works with power of 2 buffer sizes
// because of relying on subtracting across zero. For example (0 -% 1) % 10 == 5
assert(buffer.len == 0 or @popCount(usize, buffer.len) == 1);

self.* = SelfChannel{
.buffer_len = 0,
.buffer_nodes = buffer,
Expand Down Expand Up @@ -184,11 +188,11 @@ pub fn Channel(comptime T: type) type {
const get_node = &self.getters.get().?.data;
switch (get_node.data) {
GetNode.Data.Normal => |info| {
info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
info.ptr.* = self.buffer_nodes[(self.buffer_index -% self.buffer_len) % self.buffer_nodes.len];
},
GetNode.Data.OrNull => |info| {
_ = self.or_null_queue.remove(info.or_null);
info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
info.ptr.* = self.buffer_nodes[(self.buffer_index -% self.buffer_len) % self.buffer_nodes.len];
},
}
global_event_loop.onNextTick(get_node.tick_node);
Expand Down Expand Up @@ -222,7 +226,7 @@ pub fn Channel(comptime T: type) type {
while (self.buffer_len != self.buffer_nodes.len and put_count != 0) {
const put_node = &self.putters.get().?.data;

self.buffer_nodes[self.buffer_index] = put_node.data;
self.buffer_nodes[self.buffer_index % self.buffer_nodes.len] = put_node.data;
global_event_loop.onNextTick(put_node.tick_node);
self.buffer_index +%= 1;
self.buffer_len += 1;
Expand Down Expand Up @@ -283,6 +287,29 @@ test "std.event.Channel" {
await putter;
}

test "std.event.Channel wraparound" {

// TODO provide a way to run tests in evented I/O mode
if (!std.io.is_async) return error.SkipZigTest;

const channel_size = 2;

var buf : [channel_size]i32 = undefined;
var channel: Channel(i32) = undefined;
channel.init(&buf);
defer channel.deinit();

// add items to channel and pull them out until
// the buffer wraps around, make sure it doesn't crash.
var result : i32 = undefined;
channel.put(5);
testing.expectEqual(@as(i32, 5), channel.get());
channel.put(6);
testing.expectEqual(@as(i32, 6), channel.get());
channel.put(7);
testing.expectEqual(@as(i32, 7), channel.get());
}

async fn testChannelGetter(channel: *Channel(i32)) void {
const value1 = channel.get();
testing.expect(value1 == 1234);
Expand Down

0 comments on commit a6c9c5f

Please sign in to comment.