diff --git a/base/exports.jl b/base/exports.jl index 8799710e5ea2a..81815305b651e 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -23,6 +23,7 @@ export Markdown, # Types + AbstractChannel, AbstractMatrix, AbstractSparseArray, AbstractSparseMatrix, diff --git a/base/multi.jl b/base/multi.jl index 7ae1b50d705d1..0d243117ac4c4 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -451,7 +451,7 @@ function deregister_worker(pg, pid) # throw exception to tasks waiting for this pid for (id,rv) in tonotify - notify_error(rv.full, ProcessExitedException()) + notify_error(rv.c, ProcessExitedException()) delete!(pg.refs, id) end end @@ -476,20 +476,26 @@ type RemoteRef finalizer(r, send_del_client) r end +end - REQ_ID::Int = 0 - function RemoteRef(pid::Integer) - rr = RemoteRef(pid, myid(), REQ_ID) - REQ_ID += 1 - rr - end +let REF_ID::Int = 1 + global next_ref_id + next_ref_id() = (id = REF_ID; REF_ID += 1; id) + + global next_rrid_tuple + next_rrid_tuple() = (myid(),next_ref_id()) +end - RemoteRef(w::LocalProcess) = RemoteRef(w.id) - RemoteRef(w::Worker) = RemoteRef(w.id) - RemoteRef() = RemoteRef(myid()) +RemoteRef(w::LocalProcess) = RemoteRef(w.id) +RemoteRef(w::Worker) = RemoteRef(w.id) +RemoteRef(pid::Integer=myid()) = RemoteRef(pid, myid(), next_ref_id()) - global next_id - next_id() = (id=(myid(),REQ_ID); REQ_ID+=1; id) +function RemoteRef(f::Function, pid::Integer=myid()) + remotecall_fetch(pid, f-> begin + rr = RemoteRef() + lookup_ref(rr2id(rr), f) + rr + end, f) end hash(r::RemoteRef, h::UInt) = hash(r.whence, hash(r.id, h)) @@ -497,24 +503,24 @@ hash(r::RemoteRef, h::UInt) = hash(r.whence, hash(r.id, h)) rr2id(r::RemoteRef) = (r.whence, r.id) -lookup_ref(id) = lookup_ref(PGRP, id) -function lookup_ref(pg, id) +lookup_ref(id, f=def_rv_channel) = lookup_ref(PGRP, id, f) +function lookup_ref(pg, id, f) rv = get(pg.refs, id, false) if rv === false # first we've heard of this ref - rv = RemoteValue() + rv = RemoteValue(f) pg.refs[id] = rv push!(rv.clientset, id[1]) end rv end -function isready(rr::RemoteRef) +function isready(rr::RemoteRef, args...) rid = rr2id(rr) if rr.where == myid() - lookup_ref(rid).done + isready(lookup_ref(rid).c, args...) else - remotecall_fetch(rr.where, id->lookup_ref(id).done, rid) + remotecall_fetch(rr.where, id->isready(lookup_ref(rid).c, args...), rid) end end @@ -607,38 +613,16 @@ function deserialize(s::SerializationState, t::Type{RemoteRef}) end # data stored by the owner of a RemoteRef +def_rv_channel() = Channel(1) type RemoteValue - done::Bool - result - full::Condition # waiting for a value - empty::Condition # waiting for value to be removed + c::AbstractChannel clientset::IntSet waitingfor::Int # processor we need to hear from to fill this, or 0 - RemoteValue() = new(false, nothing, Condition(), Condition(), IntSet(), 0) + RemoteValue(f::Function) = new(f(), IntSet(), 0) end -function work_result(rv::RemoteValue) - v = rv.result - if isa(v,WeakRef) - v = v.value - end - v -end - -function wait_full(rv::RemoteValue) - while !rv.done - wait(rv.full) - end - return work_result(rv) -end - -function wait_empty(rv::RemoteValue) - while rv.done - wait(rv.empty) - end - return nothing -end +wait(rv::RemoteValue) = wait(rv.c) ## core messages: do, call, fetch, wait, ref, put! ## type RemoteException <: Exception @@ -670,7 +654,7 @@ function run_work_thunk(rv::RemoteValue, thunk) end function schedule_call(rid, thunk) - rv = RemoteValue() + rv = RemoteValue(def_rv_channel) (PGRP::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid[1]) schedule(@task(run_work_thunk(rv,thunk))) @@ -736,11 +720,11 @@ end function remotecall_fetch(w::Worker, f, args...) # can be weak, because the program will have no way to refer to the Ref # itself, it only gets the result. - oid = next_id() + oid = next_rrid_tuple() rv = lookup_ref(oid) rv.waitingfor = w.id send_msg(w, CallMsg{:call_fetch}(f, args, oid)) - v = wait_full(rv) + v = take!(rv) delete!(PGRP.refs, oid) isa(v, RemoteException) ? throw(v) : v end @@ -752,12 +736,12 @@ remotecall_fetch(id::Integer, f, args...) = remotecall_wait(w::LocalProcess, f, args...) = wait(remotecall(w,f,args...)) function remotecall_wait(w::Worker, f, args...) - prid = next_id() + prid = next_rrid_tuple() rv = lookup_ref(prid) rv.waitingfor = w.id rr = RemoteRef(w) send_msg(w, CallWaitMsg(f, args, rr2id(rr), prid)) - wait_full(rv) + wait(rv) delete!(PGRP.refs, prid) rr end @@ -791,36 +775,25 @@ function call_on_owner(f, rr::RemoteRef, args...) end end -wait_ref(rid) = (wait_full(lookup_ref(rid)); nothing) -wait(r::RemoteRef) = (call_on_owner(wait_ref, r); r) +wait_ref(rid, args...) = (wait(lookup_ref(rid).c, args...); nothing) +wait(r::RemoteRef, args...) = (call_on_owner(wait_ref, r, args...); r) -fetch_ref(rid) = wait_full(lookup_ref(rid)) -fetch(r::RemoteRef) = call_on_owner(fetch_ref, r) +fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...) +fetch(r::RemoteRef, args...) = call_on_owner(fetch_ref, r, args...) fetch(x::ANY) = x # storing a value to a RemoteRef -function put!(rv::RemoteValue, val::ANY) - wait_empty(rv) - rv.result = val - rv.done = true - notify_full(rv) - rv -end +put!(rv::RemoteValue, args...) = put!(rv.c, args...) +put_ref(rid, args...) = put!(lookup_ref(rid), args...) +put!(rr::RemoteRef, args...) = (call_on_owner(put_ref, rr, args...); rr) -put_ref(rid, v) = put!(lookup_ref(rid), v) -put!(rr::RemoteRef, val::ANY) = (call_on_owner(put_ref, rr, val); rr) +take!(rv::RemoteValue, args...) = take!(rv.c, args...) +take_ref(rid, args...) = take!(lookup_ref(rid), args...) +take!(rr::RemoteRef, args...) = call_on_owner(take_ref, rr, args...) -function take!(rv::RemoteValue) - wait_full(rv) - val = rv.result - rv.done = false - rv.result = nothing - notify_empty(rv) - val -end +close_ref(rid) = (close(lookup_ref(rid).c); nothing) +close(rr::RemoteRef) = call_on_owner(close_ref, rr) -take_ref(rid) = take!(lookup_ref(rid)) -take!(rr::RemoteRef) = call_on_owner(take_ref, rr) function deliver_result(sock::IO, msg, oid, value) #print("$(myid()) sending result $oid\n") @@ -848,10 +821,6 @@ function deliver_result(sock::IO, msg, oid, value) end end -# notify waiters that a certain job has finished or RemoteRef has been emptied -notify_full( rv::RemoteValue) = notify(rv.full, work_result(rv)) -notify_empty(rv::RemoteValue) = notify(rv.empty) - ## message event handlers ## process_messages(r_stream::TCPSocket, w_stream::TCPSocket) = @schedule process_tcp_streams(r_stream, w_stream) @@ -926,7 +895,7 @@ end function handle_msg(msg::CallWaitMsg, r_stream, w_stream) @schedule begin rv = schedule_call(msg.response_oid, ()->msg.f(msg.args...)) - deliver_result(w_stream, :call_wait, msg.notify_oid, wait_full(rv)) + deliver_result(w_stream, :call_wait, msg.notify_oid, wait(rv)) end end @@ -1234,7 +1203,7 @@ function create_worker(manager, wconfig) finalizer(w, (w)->if myid() == 1 manage(w.manager, w.id, w.config, :finalize) end) # set when the new worker has finshed connections with all other workers - ntfy_oid = next_id() + ntfy_oid = next_rrid_tuple() rr_ntfy_join = lookup_ref(ntfy_oid) rr_ntfy_join.waitingfor = myid() @@ -1286,7 +1255,7 @@ function create_worker(manager, wconfig) send_msg_now(w, JoinPGRPMsg(w.id, all_locs, isa(w.manager, LocalManager), ntfy_oid, PGRP.topology)) @schedule manage(w.manager, w.id, w.config, :register) - wait_full(rr_ntfy_join) + wait(rr_ntfy_join) delete!(PGRP.refs, ntfy_oid) w.id diff --git a/base/precompile.jl b/base/precompile.jl index 59e72ffbb68a9..bc86ab48d5a53 100644 --- a/base/precompile.jl +++ b/base/precompile.jl @@ -301,8 +301,6 @@ precompile(Base.normpath, (ASCIIString,)) precompile(Base.normpath, (UTF8String, UTF8String)) precompile(Base.normpath, (UTF8String,)) precompile(Base.notify, (Condition, Any)) -precompile(Base.notify_empty, (Base.RemoteValue,)) -precompile(Base.notify_full, (Base.RemoteValue,)) precompile(Base.open, (ASCIIString, ASCIIString)) precompile(Base.parse_input_line, (ASCIIString,)) precompile(Base.parse, (Type{Int}, ASCIIString, Int)) @@ -426,9 +424,6 @@ precompile(Base.uvfinalize, (Base.TTY,)) precompile(Base.vcat, (Base.LineEdit.Prompt,)) precompile(Base.wait, ()) precompile(Base.wait, (RemoteRef,)) -precompile(Base.wait_empty, (Base.RemoteValue,)) -precompile(Base.wait_full, (Base.RemoteValue,)) -precompile(Base.work_result, (Base.RemoteValue,)) precompile(Base.write, (Base.Terminals.TTYTerminal, ASCIIString)) precompile(Base.write, (Base.Terminals.TerminalBuffer, ASCIIString)) precompile(Base.write, (IOBuffer, Vector{UInt8})) @@ -446,7 +441,6 @@ precompile(Base.Sort.sort!, (Array{Any,1},)) precompile(Base.Sort.sort!, (Array{VersionNumber, 1}, Int, Int, Base.Sort.InsertionSortAlg, Base.Order.ForwardOrdering)) precompile(Base.info, (ASCIIString,)) precompile(Base.isempty, (Array{Void, 1},)) -precompile(Base.setindex!, (Dict{Any, Any}, Base.RemoteValue, (Int, Int))) precompile(Base.setindex!, (Dict{ByteString, VersionNumber}, VersionNumber, ASCIIString)) precompile(Base.spawn, (Cmd, (Base.TTY, Base.TTY, Base.TTY), Bool, Bool)) precompile(Base.spawn, (Cmd,)) @@ -470,7 +464,6 @@ precompile(Base.LineEdit.init_state, (Base.Terminals.TTYTerminal, Base.LineEdit. precompile(Base.setindex!, (Base.Dict{Any, Any}, Base.LineEdit.PrefixSearchState, Base.LineEdit.PrefixHistoryPrompt{Base.REPL.REPLHistoryProvider})) precompile(Base.take_ref, (Tuple{Int64, Int64},)) precompile(Base.get, (Base.Dict{Any, Any}, Tuple{Int64, Int64}, Bool)) -precompile(Base.setindex!, (Base.Dict{Any, Any}, Base.RemoteValue, Tuple{Int64, Int64})) precompile(Base.LineEdit.refresh_multi_line, (Array{Any, 1}, Base.Terminals.TerminalBuffer, Base.Terminals.TTYTerminal, Base.IOBuffer, Base.LineEdit.InputAreaState, Base.LineEdit.PromptState)) precompile(Base.schedule, (Array{Any, 1}, Task, Void)) precompile(Base.LineEdit.match_input, (Function, Base.LineEdit.MIState, Base.Terminals.TTYTerminal, Array{Char, 1}, Base.Dict{Char, Any})) diff --git a/doc/manual/parallel-computing.rst b/doc/manual/parallel-computing.rst index f58438dc2139b..9087eed1f30b8 100644 --- a/doc/manual/parallel-computing.rst +++ b/doc/manual/parallel-computing.rst @@ -463,6 +463,30 @@ variable takes on all values added to the channel. An empty, closed channel causes the ``for`` loop to terminate. +RemoteRefs and AbstractChannels +------------------------------- + +A ``RemoteRef`` is a proxy for an implementation of an ``AbstractChannel`` + +A concrete implementation of an ``AbstractChannel`` (like ``Channel``), is required +to implement ``put!``, ``take!``, ``fetch``, ``isready`` and ``wait``. The remote object +referred to by a ``RemoteRef()`` or ``RemoteRef(pid)`` is stored in a ``Channel{Any}(1)``, +i.e., a channel of size 1 capable of holding objects of ``Any`` type. + +Methods ``put!``, ``take!``, ``fetch``, ``isready`` and ``wait`` on a ``RemoteRef`` are proxied onto +the backing store on the remote process. + +The constructor ``RemoteRef(f::Function, pid)`` allows us to construct references to channels holding +more than one value of a specific type. ``f()`` is a function executed on ``pid`` and it must return +an ``AbstractChannel``. + +For example, ``RemoteRef(()->Channel{Int}(10), pid)``, will return a reference to a channel of type ``Int`` +and size 10. + +``RemoteRef`` can thus be used to refer to user implemented ``AbstractChannel`` objects. A simple +example of this is provided in ``examples/dictchannel.jl`` which uses a dictionary as its remote store. + + Shared Arrays ------------- diff --git a/doc/stdlib/parallel.rst b/doc/stdlib/parallel.rst index 8de090be91295..41576d94f83be 100644 --- a/doc/stdlib/parallel.rst +++ b/doc/stdlib/parallel.rst @@ -513,7 +513,9 @@ General Parallel Computing Support .. function:: put!(RemoteRef, value) - Store a value to a remote reference. Implements "shared queue of length 1" semantics: if a value is already present, blocks until the value is removed with ``take!``. Returns its first argument. + Store a value to a remote reference. Constructors ``RemoteRef()`` and ``RemoteRef(pid)`` refer to a remote + channel of size 1 - if a value is already present, blocks until the value is removed + with ``take!``. Returns its first argument. :: put!(Channel, value) @@ -534,7 +536,7 @@ General Parallel Computing Support .. function:: take!(RemoteRef) - Fetch the value of a remote reference, removing it so that the reference is empty again. + Fetches and removes a value from the remote reference. :: take!(Channel) @@ -586,24 +588,18 @@ General Parallel Computing Support .. function:: RemoteRef() - Make an uninitialized remote reference on the local machine. - - :: - RemoteRef(n) - - Make an uninitialized remote reference on process ``n``. - -.. function:: RemoteRef(n) + Make an uninitialized remote reference on the local machine. Backing store is a ``Channel{Any}(1)``. :: - RemoteRef() + RemoteRef(p) - Make an uninitialized remote reference on the local machine. + Make an uninitialized remote reference on process ``p``. Backing store is a ``Channel{Any}(1)``. :: - RemoteRef(n) + RemoteRef(f::Function , pid) - Make an uninitialized remote reference on process ``n``. + ``f()`` is executed on ``pid`` and must return an object of type ``AbstractChannel`` which will be the + backing store for the reference. .. function:: timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1) diff --git a/examples/dictchannel.jl b/examples/dictchannel.jl new file mode 100644 index 0000000000000..86097be68339f --- /dev/null +++ b/examples/dictchannel.jl @@ -0,0 +1,55 @@ +import Base: put!, wait, isready, take!, fetch + +type DictChannel <: AbstractChannel + d::Dict + cond_take::Condition # waiting for data to become available + DictChannel() = new(Dict(), Condition()) +end + +function put!(D::DictChannel, k, v) + D.d[k] = v + notify(D.cond_take) + D +end + +function take!(D::DictChannel, k) + v=fetch(D,k) + delete!(D.d, k) + v +end + +isready(D::DictChannel) = length(D.d) > 1 +isready(D::DictChannel, k) = haskey(D.d,k) +function fetch(D::DictChannel, k) + wait(D,k) + D.d[k] +end + +function wait(D::DictChannel, k) + while !isready(D, k) + wait(D.cond_take) + end +end + +# Usage: + +# RemoteRef to a DictChannel on worker pid +# dc_ref=RemoteRef(()->DictChannel(), pid) + +# Test if there is any data +# isready(dc_ref) + +# add +# put!(dc_ref, 1, 2) + +# Test if key 1 exists +# isready(dc_ref, 1) + +# fetch key 1 +# fetch(dc_ref, 1) + +# fetch and remove key 1 +# take!(dc_ref, 1) + +# wait for key 3 to be added +# wait(dc_ref, 3) diff --git a/test/examples.jl b/test/examples.jl index 303b667e27a5b..0516e940a9f9f 100644 --- a/test/examples.jl +++ b/test/examples.jl @@ -48,9 +48,32 @@ include(joinpath(dir, "queens.jl")) end end +dc_path = joinpath(dir, "dictchannel.jl") +include(dc_path) + +w_set=filter!(x->x != myid(), workers()) +pid = length(w_set) > 0 ? w_set[1] : myid() + +remotecall_fetch(pid, f->(include(f); nothing), dc_path) +dc=RemoteRef(()->DictChannel(), pid) + +@test isready(dc) == false +put!(dc, 1, 2) +put!(dc, "Hello", "World") +@test isready(dc) == true +@test isready(dc, 1) == true +@test isready(dc, "Hello") == true +@test isready(dc, 2) == false +@test fetch(dc, 1) == 2 +@test fetch(dc, "Hello") == "World" +@test take!(dc, 1) == 2 +@test isready(dc, 1) == false + + # At least make sure code loads include(joinpath(dir, "wordcount.jl")) + # the 0mq clustermanager depends on package ZMQ. Just making sure the # code loads using a stub module definition for ZMQ. zmq_found=true diff --git a/test/parallel.jl b/test/parallel.jl index 541ab76f9bdd0..1344a3e361d9a 100644 --- a/test/parallel.jl +++ b/test/parallel.jl @@ -231,6 +231,7 @@ function test_channel(c) end test_channel(Channel(10)) +test_channel(RemoteRef(()->Channel(10))) c=Channel{Int}(1) @test_throws MethodError put!(c, "Hello")