From d0ac82d4b5fdd30c26ccbb67569f31ac8edee493 Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Tue, 17 Jan 2017 20:39:19 +0530 Subject: [PATCH] "for-loop" compliant @parallel for --- base/exports.jl | 1 + base/multi.jl | 258 +++++++++++++++++++++++---- doc/src/manual/parallel-computing.md | 33 ++-- doc/src/stdlib/parallel.md | 7 + test/parallel_exec.jl | 132 ++++++++++++-- 5 files changed, 364 insertions(+), 67 deletions(-) diff --git a/base/exports.jl b/base/exports.jl index 52e5786a37aefe..b215b966882f0f 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -80,6 +80,7 @@ export ObjectIdDict, OrdinalRange, Pair, + ParallelAccumulator, PartialQuickSort, PollingFileWatcher, QuickSort, diff --git a/base/multi.jl b/base/multi.jl index 6d50948880c576..f4fe1cad8cccbb 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -2034,11 +2034,12 @@ end eval_ew_expr(ex) = (eval(Main, ex); nothing) # Statically split range [1,N] into equal sized chunks for np processors -function splitrange(N::Int, np::Int) +function splitrange(N::Int, wlist::Array) + np = length(wlist) each = div(N,np) extras = rem(N,np) nchunks = each > 0 ? np : extras - chunks = Array{UnitRange{Int}}(nchunks) + chunks = Dict{Int, UnitRange{Int}}() lo = 1 for i in 1:nchunks hi = lo + each - 1 @@ -2046,30 +2047,21 @@ function splitrange(N::Int, np::Int) hi += 1 extras -= 1 end - chunks[i] = lo:hi + chunks[wlist[i]] = lo:hi lo = hi+1 end return chunks end function preduce(reducer, f, R) - N = length(R) - chunks = splitrange(N, nworkers()) - all_w = workers()[1:length(chunks)] - w_exec = Task[] - for (idx,pid) in enumerate(all_w) - t = Task(()->remotecall_fetch(f, pid, reducer, R, first(chunks[idx]), last(chunks[idx]))) - schedule(t) + for (pid, r) in splitrange(length(R), workers()) + t = @schedule remotecall_fetch(f, pid, reducer, R, first(r), last(r)) push!(w_exec, t) end reduce(reducer, [wait(t) for t in w_exec]) end -function pfor(f, R) - [@spawn f(R, first(c), last(c)) for c in splitrange(length(R), nworkers())] -end - function make_preduce_body(var, body) quote function (reducer, R, lo::Int, hi::Int) @@ -2085,11 +2077,27 @@ function make_preduce_body(var, body) end end +function pfor(f, R) + lenR = length(R) + chunks = splitrange(lenR, workers()) + rrid = RRID() + task_local_storage(:JULIA_PACC_TRACKER, rrid) + [remotecall(f, p, R, first(c), last(c), rrid) for (p,c) in chunks] +end + function make_pfor_body(var, body) quote - function (R, lo::Int, hi::Int) - for $(esc(var)) in R[lo:hi] - $(esc(body)) + function (R, lo::Int, hi::Int, rrid) + try + for $(esc(var)) in R[lo:hi] + $(esc(body)) + end + global pacc_registry + for pacc in get(pacc_registry, rrid, []) + push!(pacc) + end + finally + delete!(pacc_registry, rrid) end end end @@ -2100,27 +2108,58 @@ end A parallel for loop of the form : - @parallel [reducer] for var = range + @parallel for var = range body end -The specified range is partitioned and locally executed across all workers. In case an -optional reducer function is specified, `@parallel` performs local reductions on each worker -with a final reduction on the calling process. +The loop is executed in parallel across all workers, with each worker executing a subset +of the range. The call waits for completion of all iterations on all workers before returning. +Any updates to variables outside the loop body is not reflected on the calling node. +However, this is a common requirement and can be achieved in a couple of ways. One, the loop body +can update shared arrays, wherein the updates are visible on all nodes mapping the array. Second, +[`ParallelAccumulator`](@ref) objects can be used to collect computed values efficiently. +The former can be used only on a single node (with multiple workers mapping the same shared segment), while +the latter can be used when a computation is distributed across nodes. -Note that without a reducer function, `@parallel` executes asynchronously, i.e. it spawns -independent tasks on all available workers and returns immediately without waiting for -completion. To wait for completion, prefix the call with [`@sync`](@ref), like : +Example with shared arrays: +```jldoctest +julia> a = SharedArray{Float64}(4); - @sync @parallel for var = range - body - end +julia> c = 10; + +julia> @parallel for i=1:4 + a[i] = i + c; + end + +julia> a +4-element SharedArray{Float64,1}: + 11.0 + 12.0 + 13.0 + 14.0 + ``` + +Example with a ParallelAccumulator: +```jldoctest +julia> acc = ParallelAccumulator(+, 0); + +julia> c = 100; + +julia> @parallel for i in 1:10 + j = 2i + c + push!(acc, j) + end; + +julia> take!(acc) +1110 +``` """ macro parallel(args...) na = length(args) - if na==1 + if na == 1 loop = args[1] - elseif na==2 + elseif na == 2 + depwarn("@parallel with a reducer is deprecated. Use ParallelAccumulators for reduction.", Symbol("@parallel")) reducer = args[1] loop = args[2] else @@ -2132,14 +2171,171 @@ macro parallel(args...) var = loop.args[1].args[1] r = loop.args[1].args[2] body = loop.args[2] - if na==1 - thecall = :(pfor($(make_pfor_body(var, body)), $(esc(r)))) + if na == 1 + thecall = :(foreach(wait, pfor($(make_pfor_body(var, body)), $(esc(r))))) else thecall = :(preduce($(esc(reducer)), $(make_preduce_body(var, body)), $(esc(r)))) end thecall end +""" + ParallelAccumulator(f, initial) + +Constructs a reducing accumulator designed to be used in conjunction with `@parallel` +for-loops. Arguments are a reducer function and an initial value. + +The body of the `@parallel` for-loop can refer to multiple `ParallelAccumulator`s. + +See [`@parallel`](@ref) for usage details. + +ParallelAccumulators can also be used independent of `@parallel` loops. + +For example: +```julia +acc = ParallelAccumulator{Int}(+) +for p in workers() + @spawnat p begin + for i in 1:10 + push!(acc, i) # Local accumulation on each worker + end + push!(acc) # Explicit push of local accumulation to driver node (typically node 1) + end +end +result = take!(acc) +``` + +ParallelAccumulators used outside of a `@parallel` construct need to execute an explicit final +`push!(acc)` without a value. This pushes the locally accumulated value to the node driving the computation. + +Use of the same accumulator in concurrent [`@parallel`](@ref) calls is not allowed. An accumulator +instance should be reused only after a `take!(acc)` is performed on the accumulator. + +Note that the optional `initial` value is used on all workers. Hence while +`ParallelAccumulator(+, 0)` works as expected, `ParallelAccumulator(+, 25)` will +add a total of `25*nworkers()` to the final result. +""" +type ParallelAccumulator{F,T} + f::F + initial::T + value::T + pending::Int # Used on workers to detect end of loop + workers::Set{Int} # Used on caller to detect arrival of all parts + cnt::Int # Number of reductions accumulated at any given time. + chnl::RemoteChannel + hval::Int # A counter incremented each time a take! is performed. + # Required to generate a new hash value so that global + # accumulators are re-sent when re-used. + + ParallelAccumulator(f, initial, chnl) = new(f, initial, initial, 0, Set{Int}(), 0, chnl, 0) +end + +ParallelAccumulator{F,T}(f::F, initial::T) = ParallelAccumulator{F,T}(f, initial, RemoteChannel(()->Channel{Tuple}(Inf))) + +hash(pacc::ParallelAccumulator, h::UInt) = hash(hash(pacc.chnl), hash(pacc.hval, h)) + +getindex(pacc::ParallelAccumulator) = pacc.value +setindex!(pacc::ParallelAccumulator, v) = (pacc.value = v; v) + + +function serialize(s::AbstractSerializer, pacc::ParallelAccumulator) + serialize_type(s, typeof(pacc)) + + serialize(s, pacc.f) + serialize(s, pacc.initial) + serialize(s, pacc.chnl) + rrid = task_local_storage(:JULIA_PACC_TRACKER) + serialize(s, rrid) + + destpid = worker_id_from_socket(s.io) + @assert !(destpid in pacc.workers) # catch concurrent use of accumulators + push!(pacc.workers, destpid) + nothing +end + +function deserialize(s::AbstractSerializer, t::Type{T}) where T <: ParallelAccumulator + f = deserialize(s) + initial = deserialize(s) + chnl = deserialize(s) + rrid = deserialize(s) + + pacc = T(f, initial, chnl) + + global pacc_registry + push!(get!(pacc_registry, rrid, []), pacc) + pacc +end + +pacc_registry=Dict{RRID, Array}() + +""" + push!(pacc::ParallelAccumulator, v) + +Called in the body of [`@parallel`](@ref) for-loops to reduce and accumulate values. +""" +function push!(pacc::ParallelAccumulator, v) + pacc.cnt += 1 + pacc.value = pacc.f(pacc.value, v) +end + +""" + push!(pacc::ParallelAccumulator) + +When a [`ParallelAccumulator`](@ref) is used in a distributed fashion independent of [`@parallel`](@ref), +locally accumulated values must be explicitly pushed to the caller once on each worker. When called without a +value to reduce, `push!(acc)` sends locally accumulated values to the node driving the computation. +""" +push!(pacc::ParallelAccumulator) = put!(pacc.chnl, (myid(), pacc.value, pacc.cnt)) + +""" + wait(pacc::ParallelAccumulator) + +Waits to reduce and accumulate values from all participating workers. +""" +function wait(pacc::ParallelAccumulator) + while length(pacc.workers) > 0 + (pid, v, cnt) = take!(pacc.chnl) + @assert pid in pacc.workers # catch concurrent use of accumulators + delete!(pacc.workers, pid) + pacc.value = pacc.f(pacc.value, v) + pacc.cnt += cnt + end + return nothing +end + + +""" + fetch(pacc::ParallelAccumulator) + +Waits for and returns the result of a parallel accumulation. +""" +fetch(pacc::ParallelAccumulator) = (wait(pacc); pacc.value) + + +""" + take!(pacc::ParallelAccumulator) + +Waits for and returns the result of a parallel accumulation. Also resets the +accumulator object, thereby allowing it to be reused in an another call. +""" +function take!(pacc::ParallelAccumulator) + v = fetch(pacc) + pacc.pending = 0 + empty!(pacc.workers) + pacc.value = pacc.initial + pacc.cnt = 0 + pacc.hval += 1 + return v +end + + +""" + count(pacc::ParallelAccumulator) + +Returns the number of reduction operations performed on this accumulator. In the context +of [`@parallel`](@ref) this is equal to the range length. +""" +count(pacc::ParallelAccumulator) = pacc.cnt function check_master_connect() timeout = worker_timeout() diff --git a/doc/src/manual/parallel-computing.md b/doc/src/manual/parallel-computing.md index c9731bc61e38b2..41b7a81c3b61ab 100644 --- a/doc/src/manual/parallel-computing.md +++ b/doc/src/manual/parallel-computing.md @@ -353,17 +353,18 @@ of processes, we can use a *parallel for loop*, which can be written in Julia us like this: ```julia -nheads = @parallel (+) for i=1:200000000 - Int(rand(Bool)) +acc = ParallelAccumulator{Int}(+) +@parallel for i=1:200000000 + push!(acc, Int(rand(Bool))) end +nheads = take!(acc) ``` This construct implements the pattern of assigning iterations to multiple processes, and combining -them with a specified reduction (in this case `(+)`). The result of each iteration is taken as -the value of the last expression inside the loop. The whole parallel loop expression itself evaluates -to the final answer. +them with a specified reduction (in this case `(+)`). The result of each iteration is accumulated locally +by the [`ParallelAccumulator`](@ref) with final reductions performed on the caller. -Note that although parallel for loops look like serial for loops, their behavior is dramatically +Note that although parallel for loops look like serial for loops, their behavior is quite different. In particular, the iterations do not happen in a specified order, and writes to variables or arrays will not be globally visible since iterations run on different processes. Any variables used inside the parallel loop will be copied and broadcast to each process. @@ -378,8 +379,8 @@ end ``` This code will not initialize all of `a`, since each process will have a separate copy of it. -Parallel for loops like these must be avoided. Fortunately, [Shared Arrays](@ref man-shared-arrays) can be used -to get around this limitation: +Parallel for loops like these must be avoided. Fortunately, [Shared Arrays](@ref man-shared-arrays) and +[`ParallelAccumulator`](@ref)s can be used to get around this limitation: ```julia a = SharedArray{Float64}(10) @@ -392,19 +393,15 @@ Using "outside" variables in parallel loops is perfectly reasonable if the varia ```julia a = randn(1000) -@parallel (+) for i=1:100000 - f(a[rand(1:end)]) +acc = ParallelAccumulator{Int}(+) +@parallel for i=1:200000000 + push!(acc, f(a[rand(1:end)])) end +result = take!(acc) ``` Here each iteration applies `f` to a randomly-chosen sample from a vector `a` shared by all processes. -As you could see, the reduction operator can be omitted if it is not needed. In that case, the -loop executes asynchronously, i.e. it spawns independent tasks on all available workers and returns -an array of [`Future`](@ref) immediately without waiting for completion. The caller can wait for -the [`Future`](@ref) completions at a later point by calling [`fetch()`](@ref) on them, or wait -for completion at the end of the loop by prefixing it with [`@sync`](@ref), like `@sync @parallel for`. - In some cases no reduction operator is needed, and we merely wish to apply a function to all integers in some range (or, more generally, to all elements in some collection). This is another useful operation called *parallel map*, implemented in Julia as the [`pmap()`](@ref) function. For example, @@ -418,8 +415,8 @@ pmap(svd, M) Julia's [`pmap()`](@ref) is designed for the case where each function call does a large amount of work. In contrast, `@parallel for` can handle situations where each iteration is tiny, perhaps merely summing two numbers. Only worker processes are used by both [`pmap()`](@ref) and `@parallel for` -for the parallel computation. In case of `@parallel for`, the final reduction is done on the calling -process. +for the parallel computation. In case of `@parallel for` used in conjunction with [`ParallelAccumulator`](@ref)s, +the final reduction is done on the calling process. ## Synchronization With Remote References diff --git a/doc/src/stdlib/parallel.md b/doc/src/stdlib/parallel.md index 433c5730f9b3d1..2cf7ae7516c5ea 100644 --- a/doc/src/stdlib/parallel.md +++ b/doc/src/stdlib/parallel.md @@ -74,6 +74,13 @@ Base.@fetchfrom Base.@async Base.@sync Base.@parallel +Base.ParallelAccumulator +Base.push!(::ParallelAccumulator, ::Any) +Base.push!(::ParallelAccumulator) +Base.wait(::ParallelAccumulator) +Base.fetch(::ParallelAccumulator) +Base.take!(::ParallelAccumulator) +Base.count(::ParallelAccumulator) Base.@everywhere Base.clear!(::Any, ::Any; ::Any) Base.remoteref_id diff --git a/test/parallel_exec.jl b/test/parallel_exec.jl index 9fe3144a2547ac..e7e01deda65e8b 100644 --- a/test/parallel_exec.jl +++ b/test/parallel_exec.jl @@ -13,9 +13,11 @@ end # Test a few "remote" invocations when no workers are present @test remote(myid)() == 1 @test pmap(identity, 1:100) == [1:100...] -@test 100 == @parallel (+) for i in 1:100 - 1 - end +acc = ParallelAccumulator(+, 0) +@parallel for i in 1:100 + push!(acc, 1) +end +@test take!(acc) == 100 addprocs(4; exeflags=`$cov_flag $inline_flag --check-bounds=yes --startup-file=no --depwarn=error`) @@ -470,8 +472,9 @@ for T in [Void, ShmemFoo] end # Issue #14664 -d = SharedArray{Int}(10) -@sync @parallel for i=1:10 +# Also tests that @parallel blocks by default. +d = SharedArray{Int}(100) +@parallel for i=1:100 d[i] = i end @@ -482,7 +485,7 @@ end # complex sd = SharedArray{Int}(10) se = SharedArray{Int}(10) -@sync @parallel for i=1:10 +@parallel for i=1:10 sd[i] = i se[i] = i end @@ -511,12 +514,16 @@ finalize(d) # Test @parallel load balancing - all processors should get either M or M+1 # iterations out of the loop range for some M. -ids = @parallel((a,b)->[a;b], for i=1:7; myid(); end) +acc = ParallelAccumulator((a,b)->[a;b], []) +@parallel(for i=1:7; push!(acc, myid()); end) +ids = take!(acc) workloads = Int[sum(ids .== i) for i in 2:nprocs()] @test maximum(workloads) - minimum(workloads) <= 1 # @parallel reduction should work even with very short ranges -@test @parallel(+, for i=1:2; i; end) == 3 +acc = ParallelAccumulator(+, 0) +@parallel(for i=1:2; push!(acc, i); end) +@test take!(acc) == 3 @test_throws ArgumentError sleep(-1) @test_throws ArgumentError timedwait(()->false, 0.1, pollint=-0.5) @@ -998,10 +1005,12 @@ end # issue #8207 let A = Any[] - @parallel (+) for i in (push!(A,1); 1:2) - i + acc = ParallelAccumulator(+, 0) + @parallel for i in (push!(A, 1); 1:2) + push!(acc, i) end @test length(A) == 1 + @test take!(acc) == 3 end # issue #13168 @@ -1116,16 +1125,19 @@ let end # issue #16451 -rng=RandomDevice() -retval = @parallel (+) for _ in 1:10 - rand(rng) +rng = RandomDevice() +acc = ParallelAccumulator(+, 0.0) +@parallel for _ in 1:10 + push!(acc, rand(rng)) end +retval = take!(acc) @test retval > 0.0 && retval < 10.0 rand(rng) -retval = @parallel (+) for _ in 1:10 - rand(rng) +@parallel for _ in 1:10 + push!(acc, rand(rng)) end +retval = take!(acc) @test retval > 0.0 && retval < 10.0 # serialization tests @@ -1415,11 +1427,12 @@ s = convert(SharedArray, [1,2,3,4]) #6760 if true a = 2 - x = @parallel (vcat) for k=1:2 - sin(a) + acc = ParallelAccumulator(vcat, []) + @parallel for k=1:2 + push!(acc, sin(a)) end end -@test x == map(_->sin(2), 1:2) +@test take!(acc) == map(_->sin(2), 1:2) # Testing clear! function setup_syms(n, pids) @@ -1463,3 +1476,86 @@ syms = setup_syms(3, workers()) clear!(syms, workers()) test_clear(syms, workers()) +# ParallelAccumulator and @parallel tests. +# Simple cases of ParallelAccumulator and @parallel has been tested previously +# and hence not repeated here. + +# ParallelAccumulator used independent of a @parallel +acc = ParallelAccumulator(+, 0) +for p in workers() + @spawnat p begin + for i in 1:10 + push!(acc, 2) + end + push!(acc) + end +end +wait(acc) +@test count(acc) == nworkers()*10 +@test take!(acc) == 2*nworkers()*10 + +# take! should have reset the accumulator +@test acc.cnt == 0 +@test isempty(acc.workers) + +# reuse the accumulator +@parallel for i in 1:10 + push!(acc, i) +end + +# fetch must not reset the accumulator +@test fetch(acc) == 55 +@test fetch(acc) == 55 +@test take!(acc) == 55 + +# Multiple accumulators in a @parallel call, mix of local and global accumulators. +global gacc1 = ParallelAccumulator(+, 0) +global gacc2 = ParallelAccumulator(*, 1) +global gval = 1 +function foo() + local lval = 2 + local l1 = ParallelAccumulator(string, ""); + local l2 = ParallelAccumulator((a,b)->[a;b], []); # accumulator with an anonymous reducer + @parallel for i in 11:20 + j = i - 10 + push!(gacc1, lval*j) + push!(gacc2, gval*j) + push!(l1, j) + push!(l2, myid()) + end + + @test take!(gacc1) == 110 + @test take!(gacc2) == 3628800 + v = take!(l1) + @test length(v) == 11 + @test all(x -> x>=0 && x <= 9, [parse(Int, string(x)) for x in v]) + wlist = workers() + @test all(x -> x in wlist, take!(l2)) +end + +foo() + +# Parallel accumulator with a regular for loop +acc = ParallelAccumulator(+, 0) +for i in 1:10 + push!(acc, i) +end +@test take!(acc) == 55 + +# With an initial value and without, simulate an empty loop +@test take!(ParallelAccumulator(+, 0)) == 0 +@test take!(ParallelAccumulator(+, 0)) == 0 + +# concurrent use of an accumulator should trigger assertion checks +function pacc_error_test() + acc = ParallelAccumulator(+, 0) + @parallel for i in 1:nworkers() + sleep(1) + push!(acc, i) + end + @parallel for i in 1:nworkers() + sleep(1) + push!(acc, i) + end +end +@test_throws AssertionError pacc_error_test()