-
Notifications
You must be signed in to change notification settings - Fork 1
/
wrflock.nim
513 lines (451 loc) · 17.9 KB
/
wrflock.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
## Copyright (c) 2021 Shayan Habibi
## Copyright (c) 2019 Mariusz Orlikowski - algorithm
##
## Pure nim implementation of a specialised lock which enforces linear operations
## of write, reading and freeing/deallocating while allowing multiple readers,
## and a single writer and deallocator; known as the WRFLock.
##
## Principally, the WRFLock acts as a state machine. Its advantages is the use
## of futexes or schedule yielding and a incredibly small memory footprint (8 bytes).
##
## It's primary implementation purpose is for the Single Producer Multiple Producer
## ring buffer queue proposed by the algorithm author Mariusz Orlikowski. It's use
## is quite flexible however and can be used in a variety of designs.
##
## While the schedule yielding waits should be platform independent, the blocking
## waits are only implemented on operating systems that have been tested for functional
## implementations of futexes. Linux futex, darwin (macosx) ulocks and windows
## WaitOnAddress kernel primitives are used.
import std/times
export times.`<`
import wrflock/futexes
import wrflock/spec
export wWaitBlock, wWaitYield, rWaitBlock, rWaitYield, fWaitBlock, fWaitYield
type
WRFLockObj = object
data: uint
WRFLockObjU = object
data: array[2, uint32]
WRFLock* = ptr WRFLockObj
WRFLockU = ptr WRFLockObjU
WaitType* = enum ## Flags for setting the wait behaviour of WRFLock
WriteBlock = wWaitBlock
WriteYield = wWaitYield
ReadBlock = rWaitBlock
ReadYield = rWaitYield
FreeBlock = fWaitBlock
FreeYield = fWaitYield
WRFLockOp* = enum
Write, Read, Free
# ============================================================================ #
# Define helpers
# ============================================================================ #
template loadState(lock: WRFLock, order = ATOMIC_RELAXED): uint32 =
cast[WRFLockU](lock).data[stateOffset].addr.atomicLoadN(order)
proc `[]`(lock: WRFLock, idx: int): var uint32 {.inline.} =
cast[WRFLockU](lock).data[idx]
# ============================================================================ #
# Define Constructors and Destructors
# ============================================================================ #
proc initWRFLock*(waitType: set[WaitType] = {}; pshared: bool = false): WRFLock =
## Initialise a WRFLock. pShared arg is nonfunctional at the moment.
##
## Default operation for write, read and free waits are blocking. Pass WriteYield,
## ReadYield and/or FreeYield to waitType to change the operations to schedule yielding
## respectively.
##
## Note: Yield flags take precedence over Block flags when there are conflicting
## flags in the waitType set
result = createShared(WRFLockObj)
if pshared:
result.data = privateMask64 or nextStateWriteMask64
else:
result.data = 0u or nextStateWriteMask64
if WriteYield in waitType:
result.data = result.data or wWaitYieldMask64
if ReadYield in waitType:
result.data = result.data or rWaitYieldMask64
if FreeYield in waitType:
result.data = result.data or fWaitYieldMask64
proc freeWRFLock*(lock: WRFLock) =
## Deallocates a WRFLock.
freeShared(lock)
# ============================================================================ #
# Define Acquires
# ============================================================================ #
template wAcquireImpl(lock: WRFLock): bool =
mixin loadState
var res: bool
var newData, data: uint32
data = lock.loadState
while true:
if (data and wrAcquireValueMask32) != 0u:
# Overflow error
break
newData = data or wrAcquireValueMask32
if (newData and frAcquireValueMask32) != 0u:
newData = newData or rdNxtLoopFlagMask32
if (newData and nextStateWriteMask32) != 0u:
newData = newData xor (nextStateWriteMask32 or currStateWriteMask32)
if lock[stateOffset].addr.atomicCompareExchange(data.addr, newdata.addr, true, ATOMIC_RELAXED, ATOMIC_RELAXED):
res = true
break
res
template rAcquireImpl(lock: WRFLock): bool =
mixin loadState
var res: bool
var newData, data: uint32
data = lock.loadState
block acqloop:
while (data and rdNxtLoopFlagMask32) != 0u:
if (data and rWaitYieldMask32) != 0u:
cpuRelax()
else:
wait(lock[stateOffset].addr, data)
data = lock.loadState
while true:
if (data and rdAcquireCounterMask32) == rdAcquireCounterMask32:
# Overflow error
break acqloop
newData = data + (1 shl rdAcquireCounterShift32)
if lock[countersOffset].addr.atomicCompareExchange(data.addr, newdata.addr, true, ATOMIC_RELAXED, ATOMIC_RELAXED):
break
data = lock.loadState
while true:
newData = data or rdAcquireValueMask32
if (newData and nextStateReadFreeMask32) != 0u:
newData = newData xor (nextStateReadFreeMask32 or currStateReadMask32)
if lock[stateOffset].addr.atomicCompareExchange(data.addr, newdata.addr, true, ATOMIC_RELAXED, ATOMIC_RELAXED):
break
res = true
res
template fAcquireImpl(lock: WRFLock): bool =
mixin loadState
var res: bool
var newData, data: uint32
data = lock.loadState
while true:
if (data and frAcquireValueMask32) != 0u:
# Overflow error
break
newData = data or frAcquireValueMask32
if (newData and nextStateReadFreeMask32) != 0u:
newData = newData xor (nextStateReadFreeMask32 or currStateFreeMask32)
if lock[stateOffset].addr.atomicCompareExchange(data.addr, newdata.addr, true, ATOMIC_RELAXED, ATOMIC_RELAXED):
res = true
break
res
proc wAcquire*(lock: WRFLock): bool {.discardable.} =
## Acquires write access to the WRFLock. Will return false if there is already
## a writer holding write access.
##
## This is a non blocking operation and must be coupled with a successful wWait/wTimeWait/wTryWait
## followed by a wRelease.
wAcquireImpl(lock)
proc rAcquire*(lock: WRFLock): bool {.discardable.} =
## Acquires read access to the WRFLock. Will return false if there are too many
## readers already holding access (65535 readers).
##
## This is a non blocking operation and must be coupled with a successful rWait/rTimeWait/rTryWait
## followed by a rRelease.
rAcquireImpl(lock)
proc fAcquire*(lock: WRFLock): bool {.discardable.} =
## Acquires free access to the WRFLock. Will return false if there is already a
## free/deallocater.
##
## This is a non blocking operation and must be coupled with a successful fWait/fTimeWait/fTryWait
## followed by a fRelease.
fAcquireImpl(lock)
proc acquire*(lock: WRFLock, op: static WRFLockOp): bool {.discardable.} =
## Acquires the specified access in `op` from the WRFLock.
##
## `acquire(lock, Read)` is therefore the same as `rAcquire(lock)`
##
## `acquire(lock, Write)` is therefore the same as `wAcquire(lock)`
##
## `acquire(lock, Free)` is therefore the same as `fAcquire(lock)`
when op == Write:
wAcquireImpl(lock)
elif op == Read:
rAcquireImpl(lock)
elif op == Free:
fAcquireImpl(lock)
# ============================================================================ #
# Define releases
# ============================================================================ #
template wReleaseImpl(lock: WRFLock): bool =
mixin loadState
var res: bool
var newData, data: uint32
data = lock.loadState
block impl:
while true:
if (data and wrAcquireValueMask32) == 0u:
# Overflow error
break impl
newData = data and not(wrAcquireValueMask32 or currStateWriteMask32 or rdNxtLoopFlagMask32)
if (newData and rdAcquireValueMask32) != 0u:
newData = newData or currStateReadMask32
elif (newData and frAcquireValueMask32) != 0u:
newData = newData or currStateFreeMask32
else:
newData = newData or nextStateReadFreeMask32
if lock[stateOffset].addr.atomicCompareExchange(data.addr, newdata.addr, true, ATOMIC_RELEASE, ATOMIC_RELAXED):
break
if (
(
((newData and rWaitYieldMask32) == 0u) and
((newData and (currStateReadMask32 or rdNxtLoopFlagMask32)) != 0u)
) or
(
((newData and fWaitYieldMask32) == 0u) and
((newData and currStateFreeMask32) != 0u)
)
):
wakeAll(lock[stateOffset].addr)
res = true
res
template rReleaseImpl(lock: WRFLock): bool =
mixin loadState
var res: bool
var newData, data: uint
data = lock.data.addr.atomicLoadN(ATOMIC_RELAXED)
block impl:
while true:
if (data and rdAcquireCounterMask64) == 0u:
# Overflow error
break impl
newData = data - (1 shl rdAcquireCounterShift64)
if (newData and rdAcquireCounterMask64) == 0u:
newData = newData and not(rdAcquireValueMask64)
if (newData and frAcquireValueMask64) != 0u:
newData = newData xor (currStateReadMask64 or currStateFreeMask64)
else:
newData = newData xor (currStateReadMask64 or nextStateReadFreeMask64)
if lock.data.addr.atomicCompareExchange(data.addr, newdata.addr, true, ATOMIC_RELEASE, ATOMIC_RELAXED):
break
if (
((newData and fWaitYieldMask64) == 0u) and
((newData and currStateFreeMask64) != 0u)
):
wakeAll(lock[stateOffset].addr)
res = true
res
template fReleaseImpl(lock: WRFLock): bool =
mixin loadState
var res: bool
var newData, data: uint32
data = lock.loadState
block impl:
while true:
if (data and frAcquireValueMask32) == 0u:
# Overflow error
break impl
newData = data and not(frAcquireValueMask32 or currStateFreeMask32)
if (newData and wrAcquireValueMask32) != 0u:
newData = newData or currStateWriteMask32
else:
newData = newData or nextStateWriteMask32
if lock[stateOffset].addr.atomicCompareExchange(data.addr, newdata.addr, true, ATOMIC_RELEASE, ATOMIC_RELAXED):
break
if (
((newData and wWaitYieldMask32) == 0u) and
((newData and currStateWriteMask32) != 0u)
):
wakeAll(lock[stateOffset].addr)
res = true
res
proc wRelease*(lock: WRFLock): bool {.discardable.} =
## Releases write access to the WRFLock. Will return false if there isn't a registered
## write access.
##
## This is a non blocking operation and must be coupled with a prior wAcquire.
##
## Success of this operation will allow readers to proceed by returning rWait
## operations successfuly.
wReleaseImpl(lock)
proc rRelease*(lock: WRFLock): bool {.discardable.} =
## Releases read access to the WRFLock. Will return false if there isn't a registered
## read access.
##
## This is a non blocking operation and must be coupled with a prior rAcquire to
## prevent overflow errors.
##
## Success of this operation reduces the reader counter by 1. When all readers
## release their access, the thread with 'free' access will be allowed to continue
## via returning fWait operations successfully.
rReleaseImpl(lock)
proc fRelease*(lock: WRFLock): bool {.discardable.} =
## Releases free access to the WRFLock. Will return false if there isn't a registered
## free/deallocater access.
##
## This is a non blocking operation and must be coupled with a prior fAcquire.
##
## Success of this operation will allow writers to proceed by returning wWait
## operations successfuly.
fReleaseImpl(lock)
proc release*(lock: WRFLock, op: static WRFLockOp): bool {.discardable.} =
## Releases the specified access in `op` from the WRFLock.
##
## `release(lock, Read)` is therefore the same as `rRelease(lock)`
##
## `release(lock, Write)` is therefore the same as `wRelease(lock)`
##
## `release(lock, Free)` is therefore the same as `fRelease(lock)`
when Write == op:
wReleaseImpl(lock)
elif Read == op:
rReleaseImpl(lock)
elif Free == op:
fReleaseImpl(lock)
# ============================================================================ #
# Define waits
# ============================================================================ #
template waitImpl(lock: WRFLock, time: int, op: static WRFLockOp): bool =
mixin loadState
const currStateMask =
case op
of Write: currStateWriteMask32
of Read: currStateReadMask32
of Free: currStateFreeMask32
const yieldMask =
case op
of Write: wWaitYieldMask32
of Read: rWaitYieldMask32
of Free: fWaitYieldMask32
var res: bool
let stime = getTime()
var data: uint32
var dur: Duration
if time > 0:
dur = initDuration(milliseconds = time)
while true:
data = lock.loadState
if (data and currStateMask) != 0u:
atomicThreadFence(ATOMIC_ACQUIRE)
res = true
break
if (data and yieldMask) == 0u:
if not wait(lock[stateOffset].addr, data, time):
# timed out
break
else:
if time > 0 and getTime() > (stime + dur):
# timed out
break
cpuRelax()
res
proc wWait*(lock: WRFLock; time: int = 0): bool =
## Waits for `time` in msecs (0 = infinite) for permission to execute its write
## operations. Returns false if it times out or otherwise errors (depending on OS).
##
## NOTE: At most the true time waited may be up to double the passed time when
## blocking. This is not the same when schedule yielding.
waitImpl(lock, time, WRFLockOp.Write)
proc rWait*(lock: WRFLock; time: int = 0): bool =
## Waits for `time` in msecs (0 = infinite) for permission to execute its read
## operations. Returns false if it times out or otherwise errors (depending on OS).
##
## NOTE: At most the true time waited may be up to double the passed time when
## blocking. This is not the same when schedule yielding.
waitImpl(lock, time, WRFLockOp.Read)
proc fWait*(lock: WRFLock; time: int = 0): bool =
## Waits for `time` in msecs (0 = infinite) for permission to execute its free
## operations. Returns false if it times out or otherwise errors (depending on OS).
##
## NOTE: At most the true time waited may be up to double the passed time when
## blocking. This is not the same when schedule yielding.
waitImpl(lock, time, WRFLockOp.Free)
proc wait*(lock: WRFLock; op: static WRFLockOp; time: int = 0): bool =
waitImpl(lock, time, op)
template tryWaitImpl(lock: WRFLock, op: static WRFLockOp): bool =
mixin loadState
const currStateMask =
case op
of Write: currStateWriteMask32
of Read: currStateReadMask32
of Free: currStateFreeMask32
let data = lock.loadState(ATOMIC_ACQUIRE)
if (data and currStateMask) == 0u:
false
else:
true
proc wTryWait*(lock: WRFLock): bool =
## Non blocking check to see if the thread can perform its write actions.
tryWaitImpl(lock, WRFLockOp.Write)
proc rTryWait*(lock: WRFLock): bool =
## Non blocking check to see if the thread can perform its read actions.
tryWaitImpl(lock, WRFLockOp.Read)
proc fTryWait*(lock: WRFLock): bool =
## Non blocking check to see if the thread can perform its free/cleaning actions.
tryWaitImpl(lock, WRFLockOp.Free)
proc tryWait*(lock: WRFLock, op: static WRFLockOp): bool =
tryWaitImpl(lock, op)
proc setFlags*(lock: WRFLock, flags: set[WaitType]) =
## EXPERIMENTAL - non blocking change of flags on a lock. Any change from
## a blocking wait to a schedule yield will result in all waiters being awoken.
## Operations that are blocking will return to sleep after checking their condition
## while the schedule yield operations will yield after checking their condition.
var newData: uint32
var data = lock.loadState
var mustWake: bool
while true:
mustWake = false
newData = data
if WriteYield in flags and (data and wWaitYieldMask32) == 0u:
mustWake = true
newData = newData or wWaitYieldMask32
elif WriteBlock in flags and (data and wWaitYieldMask32) != 0u:
newData = newData xor wWaitYieldMask32
if ReadYield in flags and (data and rWaitYieldMask32) == 0u:
mustWake = true
newData = newData or rWaitYieldMask32
elif ReadBlock in flags and (data and rWaitYieldMask32) != 0u:
newData = newData xor rWaitYieldMask32
if FreeYield in flags and (data and fWaitYieldMask32) == 0u:
mustWake = true
newData = newData or fWaitYieldMask32
elif FreeBlock in flags and (data and fWaitYieldMask32) != 0u:
newData = newData xor fWaitYieldMask32
if lock[stateOffset].addr.atomicCompareExchange(data.addr, newdata.addr, true, ATOMIC_RELEASE, ATOMIC_RELAXED):
if mustWake:
wakeAll(lock[stateOffset].addr)
break
else:
data = lock.loadState
proc getCurrState*(lock: WRFLock): WRFLockOp =
## For debugging purposes; checks what state the lock is currently in.
##
## raises ValueError if no valid state is found.
let data = lock.loadState
if (data and currStateReadMask32) != 0u:
result = WRFLockOp.Read
elif (data and currStateWriteMask32) != 0u:
result = WRFLockOp.Write
elif (data and currStateFreeMask32) != 0u:
result = WRFLockOp.Free
else:
raise newException(ValueError, "Tried to read the state of a uninitialised WRFLock")
template withLock*(lock: WRFLock; op: static WRFLockOp; body: untyped): untyped =
## Convenience template; raises OverFlow error if there is already too many accesses
## of the given op to the lock.
##
## Blocks until the lock allows the given op to proceed
if not acquire(lock, op):
raise newException(OverflowError, "Failed to acquire " & $op & " status to a WRFLock")
else:
discard wait(lock, op)
body
doAssert release(lock, op), "Releasing " & $op & " status of the WRFLock was unsuccesful"
template whileTryingLock*(lock: WRFLock; op: static WRFLockOp; body: untyped; succ: untyped): untyped =
## Convenience template; raises OverFlow error if there is already too many accesses
## of the given op to the lock.
##
## Acquires access, and then continuously evaluates `body` until the lock
## allows the given op, at which time it performs `succ` before releasing access.
if not acquire(lock, op):
raise newException(OverflowError, "Failed to acquire " & $op & " status to a WRFLock")
while not tryWait(lock, op):
body
succ
doAssert release(lock, op), "Releasing " & $op & " status of the WRFLock was unsuccesful"