Skip to content

Commit

Permalink
Merge pull request #15073 from samoconnor/workerpool_branch
Browse files Browse the repository at this point in the history
WorkerPool and remote()
  • Loading branch information
amitmurthy committed Mar 9, 2016
2 parents 039f57b + b061d4f commit ec6f886
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 1 deletion.
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,7 @@ export
pmap,
procs,
put!,
remote,
remotecall,
remotecall_fetch,
remotecall_wait,
Expand Down
7 changes: 6 additions & 1 deletion base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type JoinPGRPMsg <: AbstractMsg
self_is_local::Bool
notify_oid::Tuple
topology::Symbol
worker_pool
end
type JoinCompleteMsg <: AbstractMsg
notify_oid::Tuple
Expand Down Expand Up @@ -1053,6 +1054,8 @@ function handle_msg(msg::JoinPGRPMsg, r_stream, w_stream)

for wt in wait_tasks; wait(wt); end

set_default_worker_pool(msg.worker_pool)

send_msg_now(controller, JoinCompleteMsg(msg.notify_oid, Sys.CPU_CORES, getpid()))
end

Expand All @@ -1078,6 +1081,8 @@ function handle_msg(msg::JoinCompleteMsg, r_stream, w_stream)

ntfy_channel = lookup_ref(msg.notify_oid)
put!(ntfy_channel, w.id)

put!(default_worker_pool(), w)
end

function disable_threaded_libs()
Expand Down Expand Up @@ -1392,7 +1397,7 @@ function create_worker(manager, wconfig)
end

all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id, isa(x.manager, LocalManager)) : ((), x.id, true), join_list)
send_msg_now(w, JoinPGRPMsg(w.id, all_locs, isa(w.manager, LocalManager), ntfy_oid, PGRP.topology))
send_msg_now(w, JoinPGRPMsg(w.id, all_locs, isa(w.manager, LocalManager), ntfy_oid, PGRP.topology, default_worker_pool()))

@schedule manage(w.manager, w.id, w.config, :register)
wait(rr_ntfy_join)
Expand Down
1 change: 1 addition & 0 deletions base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ include("serialize.jl")
importall .Serializer
include("channels.jl")
include("multi.jl")
include("workerpool.jl")
include("managers.jl")
include("mapiterator.jl")

Expand Down
75 changes: 75 additions & 0 deletions base/workerpool.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# This file is a part of Julia. License is MIT: http://julialang.org/license


type WorkerPool
channel::RemoteChannel{Channel{Int}}
end


"""
WorkerPool([workers])
Create a WorkerPool from a vector of worker ids.
"""
function WorkerPool(workers=Int[])

# Create a shared queue of available workers...
pool = WorkerPool(RemoteChannel(()->Channel{Int}(128)))

# Add workers to the pool...
for w in workers
put!(pool, w)
end

return pool
end


put!(pool::WorkerPool, w::Int) = put!(pool.channel, w)
put!(pool::WorkerPool, w::Worker) = put!(pool.channel, w.id)

isready(pool::WorkerPool) = isready(pool.channel)

take!(pool::WorkerPool) = take!(pool.channel)


"""
remotecall_fetch(f, pool::WorkerPool, args...)
Call `f(args...)` on one of the workers in `pool`.
"""
function remotecall_fetch(f, pool::WorkerPool, args...)
worker = take!(pool)
try
remotecall_fetch(f, worker, args...)
finally
put!(pool, worker)
end
end


"""
default_worker_pool
WorkerPool containing idle `workers()` (used by `remote(f)`).
"""
_default_worker_pool = Nullable{WorkerPool}()
function default_worker_pool()
if isnull(_default_worker_pool) && myid() == 1
set_default_worker_pool(WorkerPool())
end
return get(_default_worker_pool)
end

function set_default_worker_pool(p::WorkerPool)
global _default_worker_pool = Nullable(p)
end


"""
remote(f)
Returns a lambda that executes function `f` on an available worker
using `remotecall_fetch`.
"""
remote(f) = (args...)->remotecall_fetch(f, default_worker_pool(), args...)
6 changes: 6 additions & 0 deletions doc/stdlib/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,12 @@ General Parallel Computing Support
Perform ``fetch(remotecall(...))`` in one message. Any remote exceptions are captured in a ``RemoteException`` and thrown.

.. function:: remote(f)

.. Docstring generated from Julia source
Returns a lambda that executes function ``f`` on an available worker using ``remotecall_fetch``.

.. function:: put!(RemoteChannel, value)

.. Docstring generated from Julia source
Expand Down
61 changes: 61 additions & 0 deletions test/parallel_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,67 @@ elseif Base.JLOptions().code_coverage == 2
end
addprocs(3; exeflags=`$cov_flag $inline_flag --check-bounds=yes --depwarn=error`)

# Test remote()

let
pool = Base.default_worker_pool()

count = 0
count_condition = Condition()

function remote_wait(c)
@async begin
count += 1
remote(take!)(c)
count -= 1
notify(count_condition)
end
yield()
end

testchannels = [RemoteChannel() for i in 1:nworkers()]
testcount = 0
@test isready(pool) == true
for c in testchannels
@test count == testcount
remote_wait(c)
testcount += 1
end
@test count == testcount
@test isready(pool) == false

for c in testchannels
@test count == testcount
put!(c, "foo")
testcount -= 1
wait(count_condition)
@test count == testcount
@test isready(pool) == true
end

@test count == 0

for c in testchannels
@test count == testcount
remote_wait(c)
testcount += 1
end
@test count == testcount
@test isready(pool) == false

for c in reverse(testchannels)
@test count == testcount
put!(c, "foo")
testcount -= 1
wait(count_condition)
@test count == testcount
@test isready(pool) == true
end

@test count == 0
end


id_me = myid()
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]

Expand Down

0 comments on commit ec6f886

Please sign in to comment.