Skip to content

Commit

Permalink
RemoteValue can hold any type of AbstractChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Aug 3, 2015
1 parent dce9d18 commit ef917b0
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 100 deletions.
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export
Markdown,

# Types
AbstractChannel,
AbstractMatrix,
AbstractSparseArray,
AbstractSparseMatrix,
Expand Down
127 changes: 48 additions & 79 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ function deregister_worker(pg, pid)

# throw exception to tasks waiting for this pid
for (id,rv) in tonotify
notify_error(rv.full, ProcessExitedException())
notify_error(rv.c, ProcessExitedException())
delete!(pg.refs, id)
end
end
Expand All @@ -476,45 +476,51 @@ type RemoteRef
finalizer(r, send_del_client)
r
end
end

REQ_ID::Int = 0
function RemoteRef(pid::Integer)
rr = RemoteRef(pid, myid(), REQ_ID)
REQ_ID += 1
rr
end
let REF_ID::Int = 1
global next_ref_id
next_ref_id() = (id = REF_ID; REF_ID += 1; id)

global next_rrid_tuple
next_rrid_tuple() = (myid(),next_ref_id())
end

RemoteRef(w::LocalProcess) = RemoteRef(w.id)
RemoteRef(w::Worker) = RemoteRef(w.id)
RemoteRef() = RemoteRef(myid())
RemoteRef(w::LocalProcess) = RemoteRef(w.id)
RemoteRef(w::Worker) = RemoteRef(w.id)
RemoteRef(pid::Integer=myid()) = RemoteRef(pid, myid(), next_ref_id())

global next_id
next_id() = (id=(myid(),REQ_ID); REQ_ID+=1; id)
function RemoteRef(f::Function, pid::Integer=myid())
remotecall_fetch(pid, f-> begin
rr = RemoteRef()
lookup_ref(rr2id(rr), f)
rr
end, f)
end

hash(r::RemoteRef, h::UInt) = hash(r.whence, hash(r.id, h))
==(r::RemoteRef, s::RemoteRef) = (r.whence==s.whence && r.id==s.id)

rr2id(r::RemoteRef) = (r.whence, r.id)

lookup_ref(id) = lookup_ref(PGRP, id)
function lookup_ref(pg, id)
lookup_ref(id, f=def_rv_channel) = lookup_ref(PGRP, id, f)
function lookup_ref(pg, id, f)
rv = get(pg.refs, id, false)
if rv === false
# first we've heard of this ref
rv = RemoteValue()
rv = RemoteValue(f)
pg.refs[id] = rv
push!(rv.clientset, id[1])
end
rv
end

function isready(rr::RemoteRef)
function isready(rr::RemoteRef, args...)
rid = rr2id(rr)
if rr.where == myid()
lookup_ref(rid).done
isready(lookup_ref(rid).c, args...)
else
remotecall_fetch(rr.where, id->lookup_ref(id).done, rid)
remotecall_fetch(rr.where, id->isready(lookup_ref(rid).c, args...), rid)
end
end

Expand Down Expand Up @@ -607,38 +613,16 @@ function deserialize(s::SerializationState, t::Type{RemoteRef})
end

# data stored by the owner of a RemoteRef
def_rv_channel() = Channel(1)
type RemoteValue
done::Bool
result
full::Condition # waiting for a value
empty::Condition # waiting for value to be removed
c::AbstractChannel
clientset::IntSet
waitingfor::Int # processor we need to hear from to fill this, or 0

RemoteValue() = new(false, nothing, Condition(), Condition(), IntSet(), 0)
RemoteValue(f::Function) = new(f(), IntSet(), 0)
end

function work_result(rv::RemoteValue)
v = rv.result
if isa(v,WeakRef)
v = v.value
end
v
end

function wait_full(rv::RemoteValue)
while !rv.done
wait(rv.full)
end
return work_result(rv)
end

function wait_empty(rv::RemoteValue)
while rv.done
wait(rv.empty)
end
return nothing
end
wait(rv::RemoteValue) = wait(rv.c)

## core messages: do, call, fetch, wait, ref, put! ##
type RemoteException <: Exception
Expand Down Expand Up @@ -670,7 +654,7 @@ function run_work_thunk(rv::RemoteValue, thunk)
end

function schedule_call(rid, thunk)
rv = RemoteValue()
rv = RemoteValue(def_rv_channel)
(PGRP::ProcessGroup).refs[rid] = rv
push!(rv.clientset, rid[1])
schedule(@task(run_work_thunk(rv,thunk)))
Expand Down Expand Up @@ -736,11 +720,11 @@ end
function remotecall_fetch(w::Worker, f, args...)
# can be weak, because the program will have no way to refer to the Ref
# itself, it only gets the result.
oid = next_id()
oid = next_rrid_tuple()
rv = lookup_ref(oid)
rv.waitingfor = w.id
send_msg(w, CallMsg{:call_fetch}(f, args, oid))
v = wait_full(rv)
v = take!(rv)
delete!(PGRP.refs, oid)
isa(v, RemoteException) ? throw(v) : v
end
Expand All @@ -752,12 +736,12 @@ remotecall_fetch(id::Integer, f, args...) =
remotecall_wait(w::LocalProcess, f, args...) = wait(remotecall(w,f,args...))

function remotecall_wait(w::Worker, f, args...)
prid = next_id()
prid = next_rrid_tuple()
rv = lookup_ref(prid)
rv.waitingfor = w.id
rr = RemoteRef(w)
send_msg(w, CallWaitMsg(f, args, rr2id(rr), prid))
wait_full(rv)
wait(rv)
delete!(PGRP.refs, prid)
rr
end
Expand Down Expand Up @@ -791,36 +775,25 @@ function call_on_owner(f, rr::RemoteRef, args...)
end
end

wait_ref(rid) = (wait_full(lookup_ref(rid)); nothing)
wait(r::RemoteRef) = (call_on_owner(wait_ref, r); r)
wait_ref(rid, args...) = (wait(lookup_ref(rid).c, args...); nothing)
wait(r::RemoteRef, args...) = (call_on_owner(wait_ref, r, args...); r)

fetch_ref(rid) = wait_full(lookup_ref(rid))
fetch(r::RemoteRef) = call_on_owner(fetch_ref, r)
fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...)
fetch(r::RemoteRef, args...) = call_on_owner(fetch_ref, r, args...)
fetch(x::ANY) = x

# storing a value to a RemoteRef
function put!(rv::RemoteValue, val::ANY)
wait_empty(rv)
rv.result = val
rv.done = true
notify_full(rv)
rv
end
put!(rv::RemoteValue, args...) = put!(rv.c, args...)
put_ref(rid, args...) = put!(lookup_ref(rid), args...)
put!(rr::RemoteRef, args...) = (call_on_owner(put_ref, rr, args...); rr)

put_ref(rid, v) = put!(lookup_ref(rid), v)
put!(rr::RemoteRef, val::ANY) = (call_on_owner(put_ref, rr, val); rr)
take!(rv::RemoteValue, args...) = take!(rv.c, args...)
take_ref(rid, args...) = take!(lookup_ref(rid), args...)
take!(rr::RemoteRef, args...) = call_on_owner(take_ref, rr, args...)

function take!(rv::RemoteValue)
wait_full(rv)
val = rv.result
rv.done = false
rv.result = nothing
notify_empty(rv)
val
end
close_ref(rid) = (close(lookup_ref(rid).c); nothing)
close(rr::RemoteRef) = call_on_owner(close_ref, rr)

take_ref(rid) = take!(lookup_ref(rid))
take!(rr::RemoteRef) = call_on_owner(take_ref, rr)

function deliver_result(sock::IO, msg, oid, value)
#print("$(myid()) sending result $oid\n")
Expand Down Expand Up @@ -848,10 +821,6 @@ function deliver_result(sock::IO, msg, oid, value)
end
end

# notify waiters that a certain job has finished or RemoteRef has been emptied
notify_full( rv::RemoteValue) = notify(rv.full, work_result(rv))
notify_empty(rv::RemoteValue) = notify(rv.empty)

## message event handlers ##
process_messages(r_stream::TCPSocket, w_stream::TCPSocket) = @schedule process_tcp_streams(r_stream, w_stream)

Expand Down Expand Up @@ -926,7 +895,7 @@ end
function handle_msg(msg::CallWaitMsg, r_stream, w_stream)
@schedule begin
rv = schedule_call(msg.response_oid, ()->msg.f(msg.args...))
deliver_result(w_stream, :call_wait, msg.notify_oid, wait_full(rv))
deliver_result(w_stream, :call_wait, msg.notify_oid, wait(rv))
end
end

Expand Down Expand Up @@ -1234,7 +1203,7 @@ function create_worker(manager, wconfig)
finalizer(w, (w)->if myid() == 1 manage(w.manager, w.id, w.config, :finalize) end)

# set when the new worker has finshed connections with all other workers
ntfy_oid = next_id()
ntfy_oid = next_rrid_tuple()
rr_ntfy_join = lookup_ref(ntfy_oid)
rr_ntfy_join.waitingfor = myid()

Expand Down Expand Up @@ -1286,7 +1255,7 @@ function create_worker(manager, wconfig)
send_msg_now(w, JoinPGRPMsg(w.id, all_locs, isa(w.manager, LocalManager), ntfy_oid, PGRP.topology))

@schedule manage(w.manager, w.id, w.config, :register)
wait_full(rr_ntfy_join)
wait(rr_ntfy_join)
delete!(PGRP.refs, ntfy_oid)

w.id
Expand Down
7 changes: 0 additions & 7 deletions base/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,6 @@ precompile(Base.normpath, (ASCIIString,))
precompile(Base.normpath, (UTF8String, UTF8String))
precompile(Base.normpath, (UTF8String,))
precompile(Base.notify, (Condition, Any))
precompile(Base.notify_empty, (Base.RemoteValue,))
precompile(Base.notify_full, (Base.RemoteValue,))
precompile(Base.open, (ASCIIString, ASCIIString))
precompile(Base.parse_input_line, (ASCIIString,))
precompile(Base.parse, (Type{Int}, ASCIIString, Int))
Expand Down Expand Up @@ -426,9 +424,6 @@ precompile(Base.uvfinalize, (Base.TTY,))
precompile(Base.vcat, (Base.LineEdit.Prompt,))
precompile(Base.wait, ())
precompile(Base.wait, (RemoteRef,))
precompile(Base.wait_empty, (Base.RemoteValue,))
precompile(Base.wait_full, (Base.RemoteValue,))
precompile(Base.work_result, (Base.RemoteValue,))
precompile(Base.write, (Base.Terminals.TTYTerminal, ASCIIString))
precompile(Base.write, (Base.Terminals.TerminalBuffer, ASCIIString))
precompile(Base.write, (IOBuffer, Vector{UInt8}))
Expand All @@ -446,7 +441,6 @@ precompile(Base.Sort.sort!, (Array{Any,1},))
precompile(Base.Sort.sort!, (Array{VersionNumber, 1}, Int, Int, Base.Sort.InsertionSortAlg, Base.Order.ForwardOrdering))
precompile(Base.info, (ASCIIString,))
precompile(Base.isempty, (Array{Void, 1},))
precompile(Base.setindex!, (Dict{Any, Any}, Base.RemoteValue, (Int, Int)))
precompile(Base.setindex!, (Dict{ByteString, VersionNumber}, VersionNumber, ASCIIString))
precompile(Base.spawn, (Cmd, (Base.TTY, Base.TTY, Base.TTY), Bool, Bool))
precompile(Base.spawn, (Cmd,))
Expand All @@ -470,7 +464,6 @@ precompile(Base.LineEdit.init_state, (Base.Terminals.TTYTerminal, Base.LineEdit.
precompile(Base.setindex!, (Base.Dict{Any, Any}, Base.LineEdit.PrefixSearchState, Base.LineEdit.PrefixHistoryPrompt{Base.REPL.REPLHistoryProvider}))
precompile(Base.take_ref, (Tuple{Int64, Int64},))
precompile(Base.get, (Base.Dict{Any, Any}, Tuple{Int64, Int64}, Bool))
precompile(Base.setindex!, (Base.Dict{Any, Any}, Base.RemoteValue, Tuple{Int64, Int64}))
precompile(Base.LineEdit.refresh_multi_line, (Array{Any, 1}, Base.Terminals.TerminalBuffer, Base.Terminals.TTYTerminal, Base.IOBuffer, Base.LineEdit.InputAreaState, Base.LineEdit.PromptState))
precompile(Base.schedule, (Array{Any, 1}, Task, Void))
precompile(Base.LineEdit.match_input, (Function, Base.LineEdit.MIState, Base.Terminals.TTYTerminal, Array{Char, 1}, Base.Dict{Char, Any}))
Expand Down
24 changes: 24 additions & 0 deletions doc/manual/parallel-computing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,30 @@ variable takes on all values added to the channel. An empty, closed channel
causes the ``for`` loop to terminate.


RemoteRefs and AbstractChannels
-------------------------------

A ``RemoteRef`` is a proxy for an implementation of an ``AbstractChannel``

A concrete implementation of an ``AbstractChannel`` (like ``Channel``), is required
to implement ``put!``, ``take!``, ``fetch``, ``isready`` and ``wait``. The remote object
referred to by a ``RemoteRef()`` or ``RemoteRef(pid)`` is stored in a ``Channel{Any}(1)``,
i.e., a channel of size 1 capable of holding objects of ``Any`` type.

Methods ``put!``, ``take!``, ``fetch``, ``isready`` and ``wait`` on a ``RemoteRef`` are proxied onto
the backing store on the remote process.

The constructor ``RemoteRef(f::Function, pid)`` allows us to construct references to channels holding
more than one value of a specific type. ``f()`` is a function executed on ``pid`` and it must return
an ``AbstractChannel``.

For example, ``RemoteRef(()->Channel{Int}(10), pid)``, will return a reference to a channel of type ``Int``
and size 10.

``RemoteRef`` can thus be used to refer to user implemented ``AbstractChannel`` objects. A simple
example of this is provided in ``examples/dictchannel.jl`` which uses a dictionary as its remote store.


Shared Arrays
-------------

Expand Down
24 changes: 10 additions & 14 deletions doc/stdlib/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,9 @@ General Parallel Computing Support

.. function:: put!(RemoteRef, value)

Store a value to a remote reference. Implements "shared queue of length 1" semantics: if a value is already present, blocks until the value is removed with ``take!``. Returns its first argument.
Store a value to a remote reference. Constructors ``RemoteRef()`` and ``RemoteRef(pid)`` refer to a remote
channel of size 1 - if a value is already present, blocks until the value is removed
with ``take!``. Returns its first argument.

::
put!(Channel, value)
Expand All @@ -534,7 +536,7 @@ General Parallel Computing Support

.. function:: take!(RemoteRef)

Fetch the value of a remote reference, removing it so that the reference is empty again.
Fetches and removes a value from the remote reference.

::
take!(Channel)
Expand Down Expand Up @@ -586,24 +588,18 @@ General Parallel Computing Support

.. function:: RemoteRef()

Make an uninitialized remote reference on the local machine.

::
RemoteRef(n)

Make an uninitialized remote reference on process ``n``.

.. function:: RemoteRef(n)
Make an uninitialized remote reference on the local machine. Backing store is a ``Channel{Any}(1)``.

::
RemoteRef()
RemoteRef(p)

Make an uninitialized remote reference on the local machine.
Make an uninitialized remote reference on process ``p``. Backing store is a ``Channel{Any}(1)``.

::
RemoteRef(n)
RemoteRef(f::Function , pid)

Make an uninitialized remote reference on process ``n``.
``f()`` is executed on ``pid`` and must return an object of type ``AbstractChannel`` which will be the
backing store for the reference.

.. function:: timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)

Expand Down
Loading

0 comments on commit ef917b0

Please sign in to comment.