Skip to content

Commit

Permalink
Fix for #13163: DefaultRwLock accumulates write-waiters, eventually f…
Browse files Browse the repository at this point in the history
…ails to write lock (#13180)

* Fix for: DefaultRwLock accumulates write-waiters, eventually fails to write lock #13163

* Comment out debug.print at the end of the last test.

* Code formatting

* - use equality test after lock/unlock rather than peeking into internals.
  however, this is still implementation specific and only done for
  DefaultRwLock.
- add num_reads maximum to ensure that reader threads stop if writer threads are
  starved
- use relaxed orderings for the read atomic counter
- don't check at the end for non-zero read ops, since the reader threads may
  only run once if they are starved

* More review changes
- Monotonic is sufficient for incrementing the reads counter
  • Loading branch information
jumpnbrownweasel authored Oct 17, 2022
1 parent ce3ffa5 commit 71f8762
Showing 1 changed file with 127 additions and 1 deletion.
128 changes: 127 additions & 1 deletion lib/std/Thread/RwLock.zig
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const RwLock = @This();
const std = @import("../std.zig");
const builtin = @import("builtin");
const assert = std.debug.assert;
const testing = std.testing;

pub const Impl = if (builtin.single_threaded)
SingleThreadedRwLock
Expand Down Expand Up @@ -190,7 +191,7 @@ pub const DefaultRwLock = struct {
_ = @atomicRmw(usize, &rwl.state, .Add, WRITER, .SeqCst);
rwl.mutex.lock();

const state = @atomicRmw(usize, &rwl.state, .Or, IS_WRITING, .SeqCst);
const state = @atomicRmw(usize, &rwl.state, .Add, IS_WRITING -% WRITER, .SeqCst);
if (state & READER_MASK != 0)
rwl.semaphore.wait();
}
Expand Down Expand Up @@ -247,3 +248,128 @@ pub const DefaultRwLock = struct {
rwl.semaphore.post();
}
};

test "DefaultRwLock - internal state" {
var rwl = DefaultRwLock{};

// The following failed prior to the fix for Issue #13163,
// where the WRITER flag was subtracted by the lock method.

rwl.lock();
rwl.unlock();
try testing.expectEqual(rwl, DefaultRwLock{});
}

test "RwLock - smoke test" {
var rwl = RwLock{};

rwl.lock();
try testing.expect(!rwl.tryLock());
try testing.expect(!rwl.tryLockShared());
rwl.unlock();

try testing.expect(rwl.tryLock());
try testing.expect(!rwl.tryLock());
try testing.expect(!rwl.tryLockShared());
rwl.unlock();

rwl.lockShared();
try testing.expect(!rwl.tryLock());
try testing.expect(rwl.tryLockShared());
rwl.unlockShared();
rwl.unlockShared();

try testing.expect(rwl.tryLockShared());
try testing.expect(!rwl.tryLock());
try testing.expect(rwl.tryLockShared());
rwl.unlockShared();
rwl.unlockShared();

rwl.lock();
rwl.unlock();
}

test "RwLock - concurrent access" {
if (builtin.single_threaded)
return;

const num_writers: usize = 2;
const num_readers: usize = 4;
const num_writes: usize = 10000;
const num_reads: usize = num_writes * 2;

const Runner = struct {
const Self = @This();

rwl: RwLock = .{},
writes: usize = 0,
reads: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0),

term1: usize = 0,
term2: usize = 0,
term_sum: usize = 0,

fn reader(self: *Self) !void {
while (true) {
self.rwl.lockShared();
defer self.rwl.unlockShared();

if (self.writes >= num_writes or self.reads.load(.Unordered) >= num_reads)
break;

try self.check();

_ = self.reads.fetchAdd(1, .Monotonic);
}
}

fn writer(self: *Self, thread_idx: usize) !void {
var prng = std.rand.DefaultPrng.init(thread_idx);
var rnd = prng.random();

while (true) {
self.rwl.lock();
defer self.rwl.unlock();

if (self.writes >= num_writes)
break;

try self.check();

const term1 = rnd.int(usize);
self.term1 = term1;
try std.Thread.yield();

const term2 = rnd.int(usize);
self.term2 = term2;
try std.Thread.yield();

self.term_sum = term1 +% term2;
self.writes += 1;
}
}

fn check(self: *const Self) !void {
const term_sum = self.term_sum;
try std.Thread.yield();

const term2 = self.term2;
try std.Thread.yield();

const term1 = self.term1;
try testing.expectEqual(term_sum, term1 +% term2);
}
};

var runner = Runner{};
var threads: [num_writers + num_readers]std.Thread = undefined;

for (threads[0..num_writers]) |*t, i| t.* = try std.Thread.spawn(.{}, Runner.writer, .{ &runner, i });
for (threads[num_writers..]) |*t| t.* = try std.Thread.spawn(.{}, Runner.reader, .{&runner});

for (threads) |t| t.join();

try testing.expectEqual(num_writes, runner.writes);

//std.debug.print("reads={}\n", .{ runner.reads.load(.Unordered)});
}

0 comments on commit 71f8762

Please sign in to comment.