Skip to content

Commit

Permalink
make WeakKeyDict finalizer usage gc-safe
Browse files Browse the repository at this point in the history
also use this `client_refs.lock` to protect other data-structures from
being interrupted by finalizers, in the multi.jl logic

we may want to start indicating which mutable data-structures are safe
to call from finalizers, since generally that isn't possible

to make a finalizer API gc-safe, that code should observe the standard
thread-safe restrictions (there's no guarantee of which thread it'll run on),

plus, if the data-structures uses locks for synchronization,
use the `islocked` pattern (demonstrated herein) in the `finalizer`
to re-schedule the finalizer when the mutable data-structure is not
available for mutation.
this ensures that the lock cannot be acquired recursively,
and furthermore, this pattern will continue to work if finalizers
get moved to their own separate thread.

close JuliaLang#14445
fix JuliaLang#16550
reverts workaround JuliaLang#14456 (shouldn't break JuliaLang#14295, due to new locks)
should fix JuliaLang#16091 (with JuliaLang#17619)
  • Loading branch information
vtjnash authored and mfasi committed Sep 5, 2016
1 parent c048321 commit e9bb7fe
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 124 deletions.
10 changes: 0 additions & 10 deletions base/dict.jl
Original file line number Diff line number Diff line change
Expand Up @@ -309,16 +309,6 @@ get!(o::ObjectIdDict, key, default) = (o[key] = get(o, key, default))

abstract AbstractSerializer

# Serializer type needed as soon as ObjectIdDict is available
type SerializationState{I<:IO} <: AbstractSerializer
io::I
counter::Int
table::ObjectIdDict
SerializationState(io::I) = new(io, 0, ObjectIdDict())
end

SerializationState(io::IO) = SerializationState{typeof(io)}(io)

# dict

# These can be changed, to trade off better performance for space
Expand Down
20 changes: 20 additions & 0 deletions base/lock.jl
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,26 @@ function unlock(rl::ReentrantLock)
return
end

function lock(f, l)
lock(l)
try
return f()
finally
unlock(l)
end
end

function trylock(f, l)
if trylock(l)
try
return f()
finally
unlock(l)
end
end
return false
end

"""
Semaphore(sem_size)
Expand Down
155 changes: 84 additions & 71 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -637,27 +637,38 @@ function deregister_worker(pg, pid)
# delete this worker from our remote reference client sets
ids = []
tonotify = []
for (id,rv) in pg.refs
if in(pid,rv.clientset)
push!(ids, id)
lock(client_refs) do
for (id,rv) in pg.refs
if in(pid,rv.clientset)
push!(ids, id)
end
if rv.waitingfor == pid
push!(tonotify, (id,rv))
end
end
if rv.waitingfor == pid
push!(tonotify, (id,rv))
for id in ids
del_client(pg, id, pid)
end
end
for id in ids
del_client(pg, id, pid)
end

# throw exception to tasks waiting for this pid
for (id,rv) in tonotify
notify_error(rv.c, ProcessExitedException())
delete!(pg.refs, id)
# throw exception to tasks waiting for this pid
for (id,rv) in tonotify
notify_error(rv.c, ProcessExitedException())
delete!(pg.refs, id)
end
end
end

## remote refs ##
const client_refs = WeakKeyDict()

"""
client_refs
Tracks whether a particular AbstractRemoteRef
(identified by its RRID) exists on this worker.
The client_refs lock is also used to synchronize access to `.refs` and associated clientset state
"""
const client_refs = WeakKeyDict{Any, Void}() # used as a WeakKeySet

abstract AbstractRemoteRef

Expand All @@ -680,34 +691,26 @@ type RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef
end

function test_existing_ref(r::AbstractRemoteRef)
found = getkey(client_refs, r, false)
if found !== false
if client_refs[r] == true
@assert r.where > 0
if isa(r, Future) && isnull(found.v) && !isnull(r.v)
# we have recd the value from another source, probably a deserialized ref, send a del_client message
send_del_client(r)
found.v = r.v
end
return found
else
# just delete the entry.
delete!(client_refs, found)
end
found = getkey(client_refs, r, nothing)
if found !== nothing
@assert r.where > 0
if isa(r, Future) && isnull(found.v) && !isnull(r.v)
# we have recd the value from another source, probably a deserialized ref, send a del_client message
send_del_client(r)
found.v = r.v
end
return found::typeof(r)
end

client_refs[r] = true
client_refs[r] = nothing
finalizer(r, finalize_ref)
return r
end

function finalize_ref(r::AbstractRemoteRef)
if r.where > 0 # Handle the case of the finalizer having being called manually
if haskey(client_refs, r)
# NOTE: Change below line to deleting the entry once issue https://github.com/JuliaLang/julia/issues/14445
# is fixed.
client_refs[r] = false
end
if r.where > 0 # Handle the case of the finalizer having been called manually
islocked(client_refs) && return finalizer(r, finalize_ref) # delay finalizer for later, when it's not already locked
delete!(client_refs, r)
if isa(r, RemoteChannel)
send_del_client(r)
else
Expand All @@ -717,7 +720,7 @@ function finalize_ref(r::AbstractRemoteRef)
end
r.where = 0
end
return r
nothing
end

Future(w::LocalProcess) = Future(w.id)
Expand Down Expand Up @@ -791,23 +794,27 @@ A low-level API which returns the backing `AbstractChannel` for an `id` returned
The call is valid only on the node where the backing channel exists.
"""
function channel_from_id(id)
rv = get(PGRP.refs, id, false)
rv = lock(client_refs) do
return get(PGRP.refs, id, false)
end
if rv === false
throw(ErrorException("Local instance of remote reference not found"))
end
rv.c
return rv.c
end

lookup_ref(rrid::RRID, f=def_rv_channel) = lookup_ref(PGRP, rrid, f)
function lookup_ref(pg, rrid, f)
rv = get(pg.refs, rrid, false)
if rv === false
# first we've heard of this ref
rv = RemoteValue(f())
pg.refs[rrid] = rv
push!(rv.clientset, rrid.whence)
end
rv
return lock(client_refs) do
rv = get(pg.refs, rrid, false)
if rv === false
# first we've heard of this ref
rv = RemoteValue(f())
pg.refs[rrid] = rv
push!(rv.clientset, rrid.whence)
end
return rv
end::RemoteValue
end

"""
Expand All @@ -827,7 +834,7 @@ function isready(rr::Future)
!isnull(rr.v) && return true

rid = remoteref_id(rr)
if rr.where == myid()
return if rr.where == myid()
isready(lookup_ref(rid).c)
else
remotecall_fetch(rid->isready(lookup_ref(rid).c), rr.where, rid)
Expand All @@ -844,7 +851,7 @@ it can be safely used on a `Future` since they are assigned only once.
"""
function isready(rr::RemoteChannel, args...)
rid = remoteref_id(rr)
if rr.where == myid()
return if rr.where == myid()
isready(lookup_ref(rid).c, args...)
else
remotecall_fetch(rid->isready(lookup_ref(rid).c, args...), rr.where, rid)
Expand All @@ -855,11 +862,7 @@ del_client(rr::AbstractRemoteRef) = del_client(remoteref_id(rr), myid())

del_client(id, client) = del_client(PGRP, id, client)
function del_client(pg, id, client)
# As a workaround to issue https://github.com/JuliaLang/julia/issues/14445
# the dict/set updates are executed asynchronously so that they do
# not occur in the midst of a gc. The `@async` prefix must be removed once
# 14445 is fixed.
@async begin
lock(client_refs) do
rv = get(pg.refs, id, false)
if rv !== false
delete!(rv.clientset, client)
Expand Down Expand Up @@ -898,8 +901,10 @@ function send_del_client(rr)
end

function add_client(id, client)
rv = lookup_ref(id)
push!(rv.clientset, client)
lock(client_refs) do
rv = lookup_ref(id)
push!(rv.clientset, client)
end
nothing
end

Expand Down Expand Up @@ -999,19 +1004,21 @@ function run_work_thunk(thunk, print_error)
result = RemoteException(ce)
print_error && showerror(STDERR, ce)
end
result
return result
end
function run_work_thunk(rv::RemoteValue, thunk)
put!(rv, run_work_thunk(thunk, false))
nothing
end

function schedule_call(rid, thunk)
rv = RemoteValue(def_rv_channel())
(PGRP::ProcessGroup).refs[rid] = rv
push!(rv.clientset, rid.whence)
schedule(@task(run_work_thunk(rv,thunk)))
rv
return lock(client_refs) do
rv = RemoteValue(def_rv_channel())
(PGRP::ProcessGroup).refs[rid] = rv
push!(rv.clientset, rid.whence)
@schedule run_work_thunk(rv, thunk)
return rv
end
end

# make a thunk to call f on args in a way that simulates what would happen if
Expand All @@ -1026,13 +1033,13 @@ end
function remotecall(f, w::LocalProcess, args...; kwargs...)
rr = Future(w)
schedule_call(remoteref_id(rr), local_remotecall_thunk(f, args, kwargs))
rr
return rr
end

function remotecall(f, w::Worker, args...; kwargs...)
rr = Future(w)
send_msg(w, MsgHeader(remoteref_id(rr)), CallMsg{:call}(f, args, kwargs))
rr
return rr
end

"""
Expand All @@ -1046,7 +1053,7 @@ remotecall(f, id::Integer, args...; kwargs...) = remotecall(f, worker_from_id(id

function remotecall_fetch(f, w::LocalProcess, args...; kwargs...)
v=run_work_thunk(local_remotecall_thunk(f,args, kwargs), false)
isa(v, RemoteException) ? throw(v) : v
return isa(v, RemoteException) ? throw(v) : v
end

function remotecall_fetch(f, w::Worker, args...; kwargs...)
Expand All @@ -1057,8 +1064,10 @@ function remotecall_fetch(f, w::Worker, args...; kwargs...)
rv.waitingfor = w.id
send_msg(w, MsgHeader(RRID(0,0), oid), CallMsg{:call_fetch}(f, args, kwargs))
v = take!(rv)
delete!(PGRP.refs, oid)
isa(v, RemoteException) ? throw(v) : v
lock(client_refs) do
delete!(PGRP.refs, oid)
end
return isa(v, RemoteException) ? throw(v) : v
end

"""
Expand All @@ -1080,9 +1089,11 @@ function remotecall_wait(f, w::Worker, args...; kwargs...)
rr = Future(w)
send_msg(w, MsgHeader(remoteref_id(rr), prid), CallWaitMsg(f, args, kwargs))
v = fetch(rv.c)
delete!(PGRP.refs, prid)
lock(client_refs) do
delete!(PGRP.refs, prid)
end
isa(v, RemoteException) && throw(v)
rr
return rr
end

"""
Expand Down Expand Up @@ -1834,9 +1845,11 @@ function create_worker(manager, wconfig)

@schedule manage(w.manager, w.id, w.config, :register)
wait(rr_ntfy_join)
delete!(PGRP.refs, ntfy_oid)
lock(client_refs) do
delete!(PGRP.refs, ntfy_oid)
end

w.id
return w.id
end


Expand All @@ -1859,7 +1872,7 @@ function launch_additional(np::Integer, cmd::Cmd)
additional_io_objs[port] = io
end

addresses
return addresses
end

function redirect_output_from_additional_worker(pid, port)
Expand Down
1 change: 0 additions & 1 deletion base/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,6 @@ precompile(Base.get, (Base.Dict{Any, Any}, Tuple{Int64, Int64}, Bool))
precompile(Base.LineEdit.refresh_multi_line, (Array{Any, 1}, Base.Terminals.TerminalBuffer, Base.Terminals.TTYTerminal, Base.IOBuffer, Base.LineEdit.InputAreaState, Base.LineEdit.PromptState))
precompile(Base.schedule, (Array{Any, 1}, Task, Void))
precompile(Base.LineEdit.match_input, (Function, Base.LineEdit.MIState, Base.Terminals.TTYTerminal, Array{Char, 1}, Base.Dict{Char, Any}))
precompile(Base.weak_key_delete!, (Base.Dict{Any, Any}, Base.RemoteChannel))
precompile(==, (Base.RemoteChannel, WeakRef))
precompile(==, (Base.RemoteChannel, Base.RemoteChannel))
precompile(Base.send_del_client, (Base.RemoteChannel,))
Expand Down
11 changes: 10 additions & 1 deletion base/serialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@ import Base: GMP, Bottom, unsafe_convert, uncompressed_ast, datatype_pointerfree
import Core: svec
using Base: ViewIndex, index_lengths

export serialize, deserialize
export serialize, deserialize, SerializationState

type SerializationState{I<:IO} <: AbstractSerializer
io::I
counter::Int
table::ObjectIdDict
SerializationState(io::I) = new(io, 0, ObjectIdDict())
end

SerializationState(io::IO) = SerializationState{typeof(io)}(io)

## serializing values ##

Expand Down
2 changes: 1 addition & 1 deletion base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ include("reshapedarray.jl")
include("bitarray.jl")
include("intset.jl")
include("dict.jl")
include("weakkeydict.jl")
include("set.jl")
include("iterator.jl")

Expand Down Expand Up @@ -173,6 +172,7 @@ include("event.jl")
include("task.jl")
include("lock.jl")
include("threads.jl")
include("weakkeydict.jl")

# I/O
include("stream.jl")
Expand Down
Loading

0 comments on commit e9bb7fe

Please sign in to comment.