-
Notifications
You must be signed in to change notification settings - Fork 12.8k
/
futex.rs
320 lines (277 loc) · 10.6 KB
/
futex.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
use crate::sync::atomic::{
AtomicU32,
Ordering::{Acquire, Relaxed, Release},
};
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};
pub struct RwLock {
// The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
// Bits 0..30:
// 0: Unlocked
// 1..=0x3FFF_FFFE: Locked by N readers
// 0x3FFF_FFFF: Write locked
// Bit 30: Readers are waiting on this futex.
// Bit 31: Writers are waiting on the writer_notify futex.
state: AtomicU32,
// The 'condition variable' to notify writers through.
// Incremented on every signal.
writer_notify: AtomicU32,
}
const READ_LOCKED: u32 = 1;
const MASK: u32 = (1 << 30) - 1;
const WRITE_LOCKED: u32 = MASK;
const MAX_READERS: u32 = MASK - 1;
const READERS_WAITING: u32 = 1 << 30;
const WRITERS_WAITING: u32 = 1 << 31;
#[inline]
fn is_unlocked(state: u32) -> bool {
state & MASK == 0
}
#[inline]
fn is_write_locked(state: u32) -> bool {
state & MASK == WRITE_LOCKED
}
#[inline]
fn has_readers_waiting(state: u32) -> bool {
state & READERS_WAITING != 0
}
#[inline]
fn has_writers_waiting(state: u32) -> bool {
state & WRITERS_WAITING != 0
}
#[inline]
fn is_read_lockable(state: u32) -> bool {
// This also returns false if the counter could overflow if we tried to read lock it.
//
// We don't allow read-locking if there's readers waiting, even if the lock is unlocked
// and there's no writers waiting. The only situation when this happens is after unlocking,
// at which point the unlocking thread might be waking up writers, which have priority over readers.
// The unlocking thread will clear the readers waiting bit and wake up readers, if necessary.
state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
}
#[inline]
fn has_reached_max_readers(state: u32) -> bool {
state & MASK == MAX_READERS
}
impl RwLock {
#[inline]
pub const fn new() -> Self {
Self { state: AtomicU32::new(0), writer_notify: AtomicU32::new(0) }
}
#[inline]
pub fn try_read(&self) -> bool {
self.state
.fetch_update(Acquire, Relaxed, |s| is_read_lockable(s).then(|| s + READ_LOCKED))
.is_ok()
}
#[inline]
pub fn read(&self) {
let state = self.state.load(Relaxed);
if !is_read_lockable(state)
|| self
.state
.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
.is_err()
{
self.read_contended();
}
}
#[inline]
pub unsafe fn read_unlock(&self) {
let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED;
// It's impossible for a reader to be waiting on a read-locked RwLock,
// except if there is also a writer waiting.
debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state));
// Wake up a writer if we were the last reader and there's a writer waiting.
if is_unlocked(state) && has_writers_waiting(state) {
self.wake_writer_or_readers(state);
}
}
#[cold]
fn read_contended(&self) {
let mut state = self.spin_read();
loop {
// If we can lock it, lock it.
if is_read_lockable(state) {
match self.state.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
{
Ok(_) => return, // Locked!
Err(s) => {
state = s;
continue;
}
}
}
// Check for overflow.
if has_reached_max_readers(state) {
panic!("too many active read locks on RwLock");
}
// Make sure the readers waiting bit is set before we go to sleep.
if !has_readers_waiting(state) {
if let Err(s) =
self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
{
state = s;
continue;
}
}
// Wait for the state to change.
futex_wait(&self.state, state | READERS_WAITING, None);
// Spin again after waking up.
state = self.spin_read();
}
}
#[inline]
pub fn try_write(&self) -> bool {
self.state
.fetch_update(Acquire, Relaxed, |s| is_unlocked(s).then(|| s + WRITE_LOCKED))
.is_ok()
}
#[inline]
pub fn write(&self) {
if self.state.compare_exchange_weak(0, WRITE_LOCKED, Acquire, Relaxed).is_err() {
self.write_contended();
}
}
#[inline]
pub unsafe fn write_unlock(&self) {
let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED;
debug_assert!(is_unlocked(state));
if has_writers_waiting(state) || has_readers_waiting(state) {
self.wake_writer_or_readers(state);
}
}
#[cold]
fn write_contended(&self) {
let mut state = self.spin_write();
let mut other_writers_waiting = 0;
loop {
// If it's unlocked, we try to lock it.
if is_unlocked(state) {
match self.state.compare_exchange_weak(
state,
state | WRITE_LOCKED | other_writers_waiting,
Acquire,
Relaxed,
) {
Ok(_) => return, // Locked!
Err(s) => {
state = s;
continue;
}
}
}
// Set the waiting bit indicating that we're waiting on it.
if !has_writers_waiting(state) {
if let Err(s) =
self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
{
state = s;
continue;
}
}
// Other writers might be waiting now too, so we should make sure
// we keep that bit on once we manage lock it.
other_writers_waiting = WRITERS_WAITING;
// Examine the notification counter before we check if `state` has changed,
// to make sure we don't miss any notifications.
let seq = self.writer_notify.load(Acquire);
// Don't go to sleep if the lock has become available,
// or if the writers waiting bit is no longer set.
state = self.state.load(Relaxed);
if is_unlocked(state) || !has_writers_waiting(state) {
continue;
}
// Wait for the state to change.
futex_wait(&self.writer_notify, seq, None);
// Spin again after waking up.
state = self.spin_write();
}
}
/// Wake up waiting threads after unlocking.
///
/// If both are waiting, this will wake up only one writer, but will fall
/// back to waking up readers if there was no writer to wake up.
#[cold]
fn wake_writer_or_readers(&self, mut state: u32) {
assert!(is_unlocked(state));
// The readers waiting bit might be turned on at any point now,
// since readers will block when there's anything waiting.
// Writers will just lock the lock though, regardless of the waiting bits,
// so we don't have to worry about the writer waiting bit.
//
// If the lock gets locked in the meantime, we don't have to do
// anything, because then the thread that locked the lock will take
// care of waking up waiters when it unlocks.
// If only writers are waiting, wake one of them up.
if state == WRITERS_WAITING {
match self.state.compare_exchange(state, 0, Relaxed, Relaxed) {
Ok(_) => {
self.wake_writer();
return;
}
Err(s) => {
// Maybe some readers are now waiting too. So, continue to the next `if`.
state = s;
}
}
}
// If both writers and readers are waiting, leave the readers waiting
// and only wake up one writer.
if state == READERS_WAITING + WRITERS_WAITING {
if self.state.compare_exchange(state, READERS_WAITING, Relaxed, Relaxed).is_err() {
// The lock got locked. Not our problem anymore.
return;
}
if self.wake_writer() {
return;
}
// No writers were actually blocked on futex_wait, so we continue
// to wake up readers instead, since we can't be sure if we notified a writer.
state = READERS_WAITING;
}
// If readers are waiting, wake them all up.
if state == READERS_WAITING {
if self.state.compare_exchange(state, 0, Relaxed, Relaxed).is_ok() {
futex_wake_all(&self.state);
}
}
}
/// This wakes one writer and returns true if we woke up a writer that was
/// blocked on futex_wait.
///
/// If this returns false, it might still be the case that we notified a
/// writer that was about to go to sleep.
fn wake_writer(&self) -> bool {
self.writer_notify.fetch_add(1, Release);
futex_wake(&self.writer_notify)
// Note that FreeBSD and DragonFlyBSD don't tell us whether they woke
// up any threads or not, and always return `false` here. That still
// results in correct behaviour: it just means readers get woken up as
// well in case both readers and writers were waiting.
}
/// Spin for a while, but stop directly at the given condition.
#[inline]
fn spin_until(&self, f: impl Fn(u32) -> bool) -> u32 {
let mut spin = 100; // Chosen by fair dice roll.
loop {
let state = self.state.load(Relaxed);
if f(state) || spin == 0 {
return state;
}
crate::hint::spin_loop();
spin -= 1;
}
}
#[inline]
fn spin_write(&self) -> u32 {
// Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.
self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
}
#[inline]
fn spin_read(&self) -> u32 {
// Stop spinning when it's unlocked or read locked, or when there's waiting threads.
self.spin_until(|state| {
!is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
})
}
}