From b061d4f38a90ab9b01f1c25ad8b0e71913defc24 Mon Sep 17 00:00:00 2001 From: Sam O'Connor Date: Sun, 14 Feb 2016 21:41:16 +1100 Subject: [PATCH] add WorkerPool and remote() Refinements per @amitmurthy comments: https://github.com/JuliaLang/julia/pull/15073 Update _default_worker_pool via JoinPGRPMsg and JoinCompleteMsg per https://github.com/JuliaLang/julia/pull/15073#discussion_r54993714 Use Nullable for _default_worker_pool. Init only when myid() == 1. --- base/exports.jl | 1 + base/multi.jl | 7 +++- base/sysimg.jl | 1 + base/workerpool.jl | 75 +++++++++++++++++++++++++++++++++++++++++ doc/stdlib/parallel.rst | 6 ++++ test/parallel_exec.jl | 61 +++++++++++++++++++++++++++++++++ 6 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 base/workerpool.jl diff --git a/base/exports.jl b/base/exports.jl index 61577108c702a..5b637bf6b71b5 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1223,6 +1223,7 @@ export pmap, procs, put!, + remote, remotecall, remotecall_fetch, remotecall_wait, diff --git a/base/multi.jl b/base/multi.jl index 7b6e9b2560ae7..5526e9f3ffdb3 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -49,6 +49,7 @@ type JoinPGRPMsg <: AbstractMsg self_is_local::Bool notify_oid::Tuple topology::Symbol + worker_pool end type JoinCompleteMsg <: AbstractMsg notify_oid::Tuple @@ -1045,6 +1046,8 @@ function handle_msg(msg::JoinPGRPMsg, r_stream, w_stream) for wt in wait_tasks; wait(wt); end + set_default_worker_pool(msg.worker_pool) + send_msg_now(controller, JoinCompleteMsg(msg.notify_oid, Sys.CPU_CORES, getpid())) end @@ -1070,6 +1073,8 @@ function handle_msg(msg::JoinCompleteMsg, r_stream, w_stream) ntfy_channel = lookup_ref(msg.notify_oid) put!(ntfy_channel, w.id) + + put!(default_worker_pool(), w) end function disable_threaded_libs() @@ -1384,7 +1389,7 @@ function create_worker(manager, wconfig) end all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id, isa(x.manager, LocalManager)) : ((), x.id, true), join_list) - send_msg_now(w, JoinPGRPMsg(w.id, all_locs, isa(w.manager, LocalManager), ntfy_oid, PGRP.topology)) + send_msg_now(w, JoinPGRPMsg(w.id, all_locs, isa(w.manager, LocalManager), ntfy_oid, PGRP.topology, default_worker_pool())) @schedule manage(w.manager, w.id, w.config, :register) wait(rr_ntfy_join) diff --git a/base/sysimg.jl b/base/sysimg.jl index a26eecb7c23fd..fd5b0e9ebd96c 100644 --- a/base/sysimg.jl +++ b/base/sysimg.jl @@ -226,6 +226,7 @@ include("serialize.jl") importall .Serializer include("channels.jl") include("multi.jl") +include("workerpool.jl") include("managers.jl") # code loading diff --git a/base/workerpool.jl b/base/workerpool.jl new file mode 100644 index 0000000000000..5f97c3ec39c26 --- /dev/null +++ b/base/workerpool.jl @@ -0,0 +1,75 @@ +# This file is a part of Julia. License is MIT: http://julialang.org/license + + +type WorkerPool + channel::RemoteChannel{Channel{Int}} +end + + +""" + WorkerPool([workers]) + +Create a WorkerPool from a vector of worker ids. +""" +function WorkerPool(workers=Int[]) + + # Create a shared queue of available workers... + pool = WorkerPool(RemoteChannel(()->Channel{Int}(128))) + + # Add workers to the pool... + for w in workers + put!(pool, w) + end + + return pool +end + + +put!(pool::WorkerPool, w::Int) = put!(pool.channel, w) +put!(pool::WorkerPool, w::Worker) = put!(pool.channel, w.id) + +isready(pool::WorkerPool) = isready(pool.channel) + +take!(pool::WorkerPool) = take!(pool.channel) + + +""" + remotecall_fetch(f, pool::WorkerPool, args...) + +Call `f(args...)` on one of the workers in `pool`. +""" +function remotecall_fetch(f, pool::WorkerPool, args...) + worker = take!(pool) + try + remotecall_fetch(f, worker, args...) + finally + put!(pool, worker) + end +end + + +""" + default_worker_pool + +WorkerPool containing idle `workers()` (used by `remote(f)`). +""" +_default_worker_pool = Nullable{WorkerPool}() +function default_worker_pool() + if isnull(_default_worker_pool) && myid() == 1 + set_default_worker_pool(WorkerPool()) + end + return get(_default_worker_pool) +end + +function set_default_worker_pool(p::WorkerPool) + global _default_worker_pool = Nullable(p) +end + + +""" + remote(f) + +Returns a lambda that executes function `f` on an available worker +using `remotecall_fetch`. +""" +remote(f) = (args...)->remotecall_fetch(f, default_worker_pool(), args...) diff --git a/doc/stdlib/parallel.rst b/doc/stdlib/parallel.rst index 2174c42b99bca..8391c5d9cfa99 100644 --- a/doc/stdlib/parallel.rst +++ b/doc/stdlib/parallel.rst @@ -339,6 +339,12 @@ General Parallel Computing Support Perform ``fetch(remotecall(...))`` in one message. Any remote exceptions are captured in a ``RemoteException`` and thrown. +.. function:: remote(f) + + .. Docstring generated from Julia source + + Returns a lambda that executes function ``f`` on an available worker using ``remotecall_fetch``. + .. function:: put!(RemoteChannel, value) .. Docstring generated from Julia source diff --git a/test/parallel_exec.jl b/test/parallel_exec.jl index eb6d382607094..fa1ec83655d62 100644 --- a/test/parallel_exec.jl +++ b/test/parallel_exec.jl @@ -11,6 +11,67 @@ elseif Base.JLOptions().code_coverage == 2 end addprocs(3; exeflags=`$cov_flag $inline_flag --check-bounds=yes --depwarn=error`) +# Test remote() + +let + pool = Base.default_worker_pool() + + count = 0 + count_condition = Condition() + + function remote_wait(c) + @async begin + count += 1 + remote(take!)(c) + count -= 1 + notify(count_condition) + end + yield() + end + + testchannels = [RemoteChannel() for i in 1:nworkers()] + testcount = 0 + @test isready(pool) == true + for c in testchannels + @test count == testcount + remote_wait(c) + testcount += 1 + end + @test count == testcount + @test isready(pool) == false + + for c in testchannels + @test count == testcount + put!(c, "foo") + testcount -= 1 + wait(count_condition) + @test count == testcount + @test isready(pool) == true + end + + @test count == 0 + + for c in testchannels + @test count == testcount + remote_wait(c) + testcount += 1 + end + @test count == testcount + @test isready(pool) == false + + for c in reverse(testchannels) + @test count == testcount + put!(c, "foo") + testcount -= 1 + wait(count_condition) + @test count == testcount + @test isready(pool) == true + end + + @test count == 0 +end + + id_me = myid() id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]