Skip to content

Commit

Permalink
"for-loop" compliant @parallel for
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jan 31, 2017
1 parent 876549f commit d0ac82d
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 67 deletions.
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export
ObjectIdDict,
OrdinalRange,
Pair,
ParallelAccumulator,
PartialQuickSort,
PollingFileWatcher,
QuickSort,
Expand Down
258 changes: 227 additions & 31 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2034,42 +2034,34 @@ end
eval_ew_expr(ex) = (eval(Main, ex); nothing)

# Statically split range [1,N] into equal sized chunks for np processors
function splitrange(N::Int, np::Int)
function splitrange(N::Int, wlist::Array)
np = length(wlist)
each = div(N,np)
extras = rem(N,np)
nchunks = each > 0 ? np : extras
chunks = Array{UnitRange{Int}}(nchunks)
chunks = Dict{Int, UnitRange{Int}}()
lo = 1
for i in 1:nchunks
hi = lo + each - 1
if extras > 0
hi += 1
extras -= 1
end
chunks[i] = lo:hi
chunks[wlist[i]] = lo:hi
lo = hi+1
end
return chunks
end

function preduce(reducer, f, R)
N = length(R)
chunks = splitrange(N, nworkers())
all_w = workers()[1:length(chunks)]

w_exec = Task[]
for (idx,pid) in enumerate(all_w)
t = Task(()->remotecall_fetch(f, pid, reducer, R, first(chunks[idx]), last(chunks[idx])))
schedule(t)
for (pid, r) in splitrange(length(R), workers())
t = @schedule remotecall_fetch(f, pid, reducer, R, first(r), last(r))
push!(w_exec, t)
end
reduce(reducer, [wait(t) for t in w_exec])
end

function pfor(f, R)
[@spawn f(R, first(c), last(c)) for c in splitrange(length(R), nworkers())]
end

function make_preduce_body(var, body)
quote
function (reducer, R, lo::Int, hi::Int)
Expand All @@ -2085,11 +2077,27 @@ function make_preduce_body(var, body)
end
end

function pfor(f, R)
lenR = length(R)
chunks = splitrange(lenR, workers())
rrid = RRID()
task_local_storage(:JULIA_PACC_TRACKER, rrid)
[remotecall(f, p, R, first(c), last(c), rrid) for (p,c) in chunks]
end

function make_pfor_body(var, body)
quote
function (R, lo::Int, hi::Int)
for $(esc(var)) in R[lo:hi]
$(esc(body))
function (R, lo::Int, hi::Int, rrid)
try
for $(esc(var)) in R[lo:hi]
$(esc(body))
end
global pacc_registry
for pacc in get(pacc_registry, rrid, [])
push!(pacc)
end
finally
delete!(pacc_registry, rrid)
end
end
end
Expand All @@ -2100,27 +2108,58 @@ end
A parallel for loop of the form :
@parallel [reducer] for var = range
@parallel for var = range
body
end
The specified range is partitioned and locally executed across all workers. In case an
optional reducer function is specified, `@parallel` performs local reductions on each worker
with a final reduction on the calling process.
The loop is executed in parallel across all workers, with each worker executing a subset
of the range. The call waits for completion of all iterations on all workers before returning.
Any updates to variables outside the loop body is not reflected on the calling node.
However, this is a common requirement and can be achieved in a couple of ways. One, the loop body
can update shared arrays, wherein the updates are visible on all nodes mapping the array. Second,
[`ParallelAccumulator`](@ref) objects can be used to collect computed values efficiently.
The former can be used only on a single node (with multiple workers mapping the same shared segment), while
the latter can be used when a computation is distributed across nodes.
Note that without a reducer function, `@parallel` executes asynchronously, i.e. it spawns
independent tasks on all available workers and returns immediately without waiting for
completion. To wait for completion, prefix the call with [`@sync`](@ref), like :
Example with shared arrays:
```jldoctest
julia> a = SharedArray{Float64}(4);
@sync @parallel for var = range
body
end
julia> c = 10;
julia> @parallel for i=1:4
a[i] = i + c;
end
julia> a
4-element SharedArray{Float64,1}:
11.0
12.0
13.0
14.0
```
Example with a ParallelAccumulator:
```jldoctest
julia> acc = ParallelAccumulator(+, 0);
julia> c = 100;
julia> @parallel for i in 1:10
j = 2i + c
push!(acc, j)
end;
julia> take!(acc)
1110
```
"""
macro parallel(args...)
na = length(args)
if na==1
if na == 1
loop = args[1]
elseif na==2
elseif na == 2
depwarn("@parallel with a reducer is deprecated. Use ParallelAccumulators for reduction.", Symbol("@parallel"))
reducer = args[1]
loop = args[2]
else
Expand All @@ -2132,14 +2171,171 @@ macro parallel(args...)
var = loop.args[1].args[1]
r = loop.args[1].args[2]
body = loop.args[2]
if na==1
thecall = :(pfor($(make_pfor_body(var, body)), $(esc(r))))
if na == 1
thecall = :(foreach(wait, pfor($(make_pfor_body(var, body)), $(esc(r)))))
else
thecall = :(preduce($(esc(reducer)), $(make_preduce_body(var, body)), $(esc(r))))
end
thecall
end

"""
ParallelAccumulator(f, initial)
Constructs a reducing accumulator designed to be used in conjunction with `@parallel`
for-loops. Arguments are a reducer function and an initial value.
The body of the `@parallel` for-loop can refer to multiple `ParallelAccumulator`s.
See [`@parallel`](@ref) for usage details.
ParallelAccumulators can also be used independent of `@parallel` loops.
For example:
```julia
acc = ParallelAccumulator{Int}(+)
for p in workers()
@spawnat p begin
for i in 1:10
push!(acc, i) # Local accumulation on each worker
end
push!(acc) # Explicit push of local accumulation to driver node (typically node 1)
end
end
result = take!(acc)
```
ParallelAccumulators used outside of a `@parallel` construct need to execute an explicit final
`push!(acc)` without a value. This pushes the locally accumulated value to the node driving the computation.
Use of the same accumulator in concurrent [`@parallel`](@ref) calls is not allowed. An accumulator
instance should be reused only after a `take!(acc)` is performed on the accumulator.
Note that the optional `initial` value is used on all workers. Hence while
`ParallelAccumulator(+, 0)` works as expected, `ParallelAccumulator(+, 25)` will
add a total of `25*nworkers()` to the final result.
"""
type ParallelAccumulator{F,T}
f::F
initial::T
value::T
pending::Int # Used on workers to detect end of loop
workers::Set{Int} # Used on caller to detect arrival of all parts
cnt::Int # Number of reductions accumulated at any given time.
chnl::RemoteChannel
hval::Int # A counter incremented each time a take! is performed.
# Required to generate a new hash value so that global
# accumulators are re-sent when re-used.

ParallelAccumulator(f, initial, chnl) = new(f, initial, initial, 0, Set{Int}(), 0, chnl, 0)
end

ParallelAccumulator{F,T}(f::F, initial::T) = ParallelAccumulator{F,T}(f, initial, RemoteChannel(()->Channel{Tuple}(Inf)))

hash(pacc::ParallelAccumulator, h::UInt) = hash(hash(pacc.chnl), hash(pacc.hval, h))

getindex(pacc::ParallelAccumulator) = pacc.value
setindex!(pacc::ParallelAccumulator, v) = (pacc.value = v; v)


function serialize(s::AbstractSerializer, pacc::ParallelAccumulator)
serialize_type(s, typeof(pacc))

serialize(s, pacc.f)
serialize(s, pacc.initial)
serialize(s, pacc.chnl)
rrid = task_local_storage(:JULIA_PACC_TRACKER)
serialize(s, rrid)

destpid = worker_id_from_socket(s.io)
@assert !(destpid in pacc.workers) # catch concurrent use of accumulators
push!(pacc.workers, destpid)
nothing
end

function deserialize(s::AbstractSerializer, t::Type{T}) where T <: ParallelAccumulator
f = deserialize(s)
initial = deserialize(s)
chnl = deserialize(s)
rrid = deserialize(s)

pacc = T(f, initial, chnl)

global pacc_registry
push!(get!(pacc_registry, rrid, []), pacc)
pacc
end

pacc_registry=Dict{RRID, Array}()

"""
push!(pacc::ParallelAccumulator, v)
Called in the body of [`@parallel`](@ref) for-loops to reduce and accumulate values.
"""
function push!(pacc::ParallelAccumulator, v)
pacc.cnt += 1
pacc.value = pacc.f(pacc.value, v)
end

"""
push!(pacc::ParallelAccumulator)
When a [`ParallelAccumulator`](@ref) is used in a distributed fashion independent of [`@parallel`](@ref),
locally accumulated values must be explicitly pushed to the caller once on each worker. When called without a
value to reduce, `push!(acc)` sends locally accumulated values to the node driving the computation.
"""
push!(pacc::ParallelAccumulator) = put!(pacc.chnl, (myid(), pacc.value, pacc.cnt))

"""
wait(pacc::ParallelAccumulator)
Waits to reduce and accumulate values from all participating workers.
"""
function wait(pacc::ParallelAccumulator)
while length(pacc.workers) > 0
(pid, v, cnt) = take!(pacc.chnl)
@assert pid in pacc.workers # catch concurrent use of accumulators
delete!(pacc.workers, pid)
pacc.value = pacc.f(pacc.value, v)
pacc.cnt += cnt
end
return nothing
end


"""
fetch(pacc::ParallelAccumulator)
Waits for and returns the result of a parallel accumulation.
"""
fetch(pacc::ParallelAccumulator) = (wait(pacc); pacc.value)


"""
take!(pacc::ParallelAccumulator)
Waits for and returns the result of a parallel accumulation. Also resets the
accumulator object, thereby allowing it to be reused in an another call.
"""
function take!(pacc::ParallelAccumulator)
v = fetch(pacc)
pacc.pending = 0
empty!(pacc.workers)
pacc.value = pacc.initial
pacc.cnt = 0
pacc.hval += 1
return v
end


"""
count(pacc::ParallelAccumulator)
Returns the number of reduction operations performed on this accumulator. In the context
of [`@parallel`](@ref) this is equal to the range length.
"""
count(pacc::ParallelAccumulator) = pacc.cnt

function check_master_connect()
timeout = worker_timeout()
Expand Down
Loading

0 comments on commit d0ac82d

Please sign in to comment.