Skip to content

Commit

Permalink
fixup: undo previous commit. remove ST versions of Lock, Event, Semap…
Browse files Browse the repository at this point in the history
…hore. introduce new Threads.Condition
  • Loading branch information
vtjnash committed Dec 11, 2018
1 parent 237634a commit 06caab5
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 267 deletions.
59 changes: 8 additions & 51 deletions base/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ called can be woken up. For level-triggered notifications, you must keep extra s
track of whether a notification has happened. The [`Channel`](@ref) and [`Event`](@ref) types do
this, and can be used for level-triggered events.
This object is NOT thread-safe. See [`Threads.ConditionMT`](@ref) for a thread-safe version.
This object is NOT thread-safe. See [`Threads.Condition`](@ref) for a thread-safe version.
"""
struct GenericCondition{L<:AbstractLock}
waitq::Vector{Any}
Expand Down Expand Up @@ -183,52 +183,9 @@ Return `true` if no tasks are waiting on the condition, `false` otherwise.
isempty(c::GenericCondition) = isempty(c.waitq)


"""
Event()
Create a level-triggered event source. Tasks that call [`wait`](@ref) on an
`Event` are suspended and queued until `notify` is called on the `Event`.
After `notify` is called, the `Event` remains in a signaled state and
tasks will no longer block when waiting for it.
!!! compat "Julia 1.1"
This functionality requires at least Julia 1.1.
"""
mutable struct GenericEvent{L<:AbstractLock}
notify::GenericCondition{L}
set::Bool
GenericEvent{L}() where {L<:AbstractLock} = new{L}(GenericCondition{L}(), false)
end

function wait(e::GenericEvent)
e.set && return
lock(e.notify)
try
while !e.set
wait(e.notify)
end
finally
unlock(e.notify)
end
nothing
end

function notify(e::GenericEvent)
lock(e.notify)
try
if !e.set
e.set = true
notify(e.notify)
end
finally
unlock(e.notify)
end
nothing
end


const ConditionST = GenericCondition{AlwaysLockedST}
const EventST = GenericEvent{CooperativeLock}
# default (Julia v1.0) is currently single-threaded
# (although it uses MT-safe versions, when possible)
const Condition = GenericCondition{AlwaysLockedST}


## scheduler and work queue
Expand Down Expand Up @@ -433,11 +390,11 @@ Use [`isopen`](@ref) to check whether it is still active.
"""
mutable struct AsyncCondition
handle::Ptr{Cvoid}
cond::ConditionST
cond::Condition
isopen::Bool

function AsyncCondition()
this = new(Libc.malloc(_sizeof_uv_async), ConditionST(), true)
this = new(Libc.malloc(_sizeof_uv_async), Condition(), true)
associate_julia_struct(this.handle, this)
finalizer(uvfinalize, this)
err = ccall(:uv_async_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
Expand Down Expand Up @@ -491,14 +448,14 @@ to check whether a timer is still active.
"""
mutable struct Timer
handle::Ptr{Cvoid}
cond::ConditionST
cond::Condition
isopen::Bool

function Timer(timeout::Real; interval::Real = 0.0)
timeout 0 || throw(ArgumentError("timer cannot have negative timeout of $timeout seconds"))
interval 0 || throw(ArgumentError("timer cannot have negative repeat interval of $interval seconds"))

this = new(Libc.malloc(_sizeof_uv_timer), ConditionST(), true)
this = new(Libc.malloc(_sizeof_uv_timer), Condition(), true)
err = ccall(:uv_timer_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), this)
if err != 0
#TODO: this codepath is currently not tested
Expand Down
117 changes: 93 additions & 24 deletions base/lock.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,22 @@ Creates a re-entrant lock for synchronizing [`Task`](@ref)s.
The same task can acquire the lock as many times as required.
Each [`lock`](@ref) must be matched with an [`unlock`](@ref).
"""
mutable struct GenericReentrantLock{ThreadLock<:AbstractLock} <: AbstractLock
mutable struct ReentrantLock <: AbstractLock
locked_by::Union{Task, Nothing}
cond_wait::GenericCondition{ThreadLock}
cond_wait::GenericCondition{Threads.SpinLock}
reentrancy_cnt::Int

GenericReentrantLock{ThreadLock}() where {ThreadLock<:AbstractLock} = new(nothing, GenericCondition{ThreadLock}(), 0)
ReentrantLock() = new(nothing, GenericCondition{Threads.SpinLock}(), 0)
end

# A basic single-threaded, Julia-aware lock:
const ReentrantLockST = GenericReentrantLock{CooperativeLock}


"""
islocked(lock) -> Status (Boolean)
Check whether the `lock` is held by any task/thread.
This should not be used for synchronization (see instead [`trylock`](@ref)).
"""
function islocked(rl::GenericReentrantLock)
function islocked(rl::ReentrantLock)
return rl.reentrancy_cnt != 0
end

Expand All @@ -40,7 +37,7 @@ return `false`.
Each successful `trylock` must be matched by an [`unlock`](@ref).
"""
function trylock(rl::GenericReentrantLock)
function trylock(rl::ReentrantLock)
t = current_task()
lock(rl.cond_wait)
try
Expand All @@ -67,7 +64,7 @@ wait for it to become available.
Each `lock` must be matched by an [`unlock`](@ref).
"""
function lock(rl::GenericReentrantLock)
function lock(rl::ReentrantLock)
t = current_task()
lock(rl.cond_wait)
try
Expand Down Expand Up @@ -95,7 +92,7 @@ Releases ownership of the `lock`.
If this is a recursive lock which has been acquired before, decrement an
internal counter and return immediately.
"""
function unlock(rl::GenericReentrantLock)
function unlock(rl::ReentrantLock)
t = current_task()
rl.reentrancy_cnt == 0 && error("unlock count must match lock count")
rl.locked_by == t || error("unlock from wrong thread")
Expand All @@ -112,7 +109,7 @@ function unlock(rl::GenericReentrantLock)
return
end

function unlockall(rl::GenericReentrantLock)
function unlockall(rl::ReentrantLock)
t = current_task()
n = rl.reentrancy_cnt
rl.locked_by == t || error("unlock from wrong thread")
Expand All @@ -128,7 +125,7 @@ function unlockall(rl::GenericReentrantLock)
return n
end

function relockall(rl::GenericReentrantLock, n::Int)
function relockall(rl::ReentrantLock, n::Int)
t = current_task()
lock(rl)
n1 = rl.reentrancy_cnt
Expand Down Expand Up @@ -157,20 +154,41 @@ function trylock(f, l::AbstractLock)
return false
end

@eval Threads begin
"""
Threads.Condition([lock])
A thread-safe version of [`Base.Condition`](@ref).
!!! compat "Julia 1.1"
This functionality requires at least Julia 1.1.
"""
const Condition = Base.GenericCondition{Base.ReentrantLock}

"""
Special note for [`Threads.Condition`](@ref):
The caller must be holding the [`lock`](@ref) that owns `c` before calling this method.
The calling task will be blocked until some other task wakes it,
usually by calling [`notify`](@ref)` on the same Condition object.
The lock will be atomically released when blocking (even if it was locked recursively),
and will be reacquired before returning.
"""
wait(c::Condition)
end

"""
Semaphore(sem_size)
Create a counting semaphore that allows at most `sem_size`
acquires to be in use at any time.
Each acquire must be matched with a release.
This construct is NOT threadsafe.
"""
mutable struct Semaphore
sem_size::Int
curr_cnt::Int
cond_wait::ConditionST
Semaphore(sem_size) = sem_size > 0 ? new(sem_size, 0, ConditionST()) : throw(ArgumentError("Semaphore size must be > 0"))
cond_wait::Threads.Condition
Semaphore(sem_size) = sem_size > 0 ? new(sem_size, 0, Threads.Condition()) : throw(ArgumentError("Semaphore size must be > 0"))
end

"""
Expand All @@ -180,14 +198,16 @@ Wait for one of the `sem_size` permits to be available,
blocking until one can be acquired.
"""
function acquire(s::Semaphore)
while true
if s.curr_cnt < s.sem_size
s.curr_cnt = s.curr_cnt + 1
return
else
lock(s.cond_wait)
try
while s.curr_cnt >= s.sem_size
wait(s.cond_wait)
end
s.curr_cnt = s.curr_cnt + 1
finally
unlock(s.cond_wait)
end
return
end

"""
Expand All @@ -198,8 +218,57 @@ possibly allowing another task to acquire it
and resume execution.
"""
function release(s::Semaphore)
@assert s.curr_cnt > 0 "release count must match acquire count"
s.curr_cnt -= 1
notify(s.cond_wait; all=false)
lock(s.cond_wait)
try
s.curr_cnt > 0 || error("release count must match acquire count")
s.curr_cnt -= 1
notify(s.cond_wait; all=false)
finally
unlock(s.cond_wait)
end
return
end


"""
Event()
Create a level-triggered event source. Tasks that call [`wait`](@ref) on an
`Event` are suspended and queued until `notify` is called on the `Event`.
After `notify` is called, the `Event` remains in a signaled state and
tasks will no longer block when waiting for it.
!!! compat "Julia 1.1"
This functionality requires at least Julia 1.1.
"""
mutable struct Event
notify::Threads.Condition
set::Bool
Event() = new(Threads.Condition(), false)
end

function wait(e::Event)
e.set && return
lock(e.notify)
try
while !e.set
wait(e.notify)
end
finally
unlock(e.notify)
end
nothing
end

function notify(e::Event)
lock(e.notify)
try
if !e.set
e.set = true
notify(e.notify)
end
finally
unlock(e.notify)
end
nothing
end
Loading

0 comments on commit 06caab5

Please sign in to comment.