Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC/WIP: "for-loop" compliant @parallel for [ci skip] #20094

Closed
wants to merge 3 commits into from

Conversation

amitmurthy
Copy link
Contributor

This addresses some of the concerns with @parallel as discussed at #19578 in a different manner.

@parallel for differs from a regular for-loop by

  • implementing a reducer functionality
  • treating the last line in the body as the value to be reduced
  • returning before the loop completes execution (non-reducer case)
  • folks are used to looping over local arrays and expect them to be updated. This only works with shared arrays.

This PR:

  • deprecates the reducer mode of @parallel for
  • provides a way for the user to explicitly specify accumulators and use them in the main body
  • can wait on the accumulator(s)

The syntax is a bit more verbose, but there is much lesser scope for confusion or misplaced expectations.

@parallel reducer for x in unit_range
  body
end

will now be written as

acc = ParallelAccumulator(reducer, length)
@accumulate acc @parallel for x in unit_range
  body
  push!(acc, iteration_value)
end
result = wait(acc)

Multiple accumulators can also be specified as a vector

a1 = ParallelAccumulator{Int}(+, 10)
a2 = ParallelAccumulator{Int}(*, 10)
@accumulate [a1,a2] @parallel for i in 1:10
  push!(a1, i)
  push!(a2, i)
end
results = [wait(a1), wait(a2)]

Updating shared arrays work as before, there is no need for ParallelAccumulators. However, ParallelAccumulators can be used in multi-node scenarios which shared memory cannot address.

As before the input range is partitioned across workers, local reductions performed with a final reduction on the caller.

I feel this syntax and loop behavior is more in-line with a regular for-loop. Updating arrays from the body is still not allowed (except if they are shared of course). ParallelAccumulators does cover that need.

  • New exports : ParallelAccumulator, @accumulate
  • wait(::ParallelAccumulator) - waits for and returns value of distributed computation
  • ParallelAccumulator(reducer, length) - reducer function, count of values to be reduced over
  • push!(accumulator, value) - applies reducer over value, stores the result. Local ParallelAccumulator push results to the caller only once (when local range iteration is complete).

Feedback on the overall API and suggestions towards syntax, bikeshedding names are welcome.

Note that with this final result of both
@accumulate acc for-loop and @accumulate acc @parallel for-loop is the same as long as the loop only returns data via accumulators

@amitmurthy
Copy link
Contributor Author

@ararslan ararslan added the parallelism Parallel or distributed computation label Jan 17, 2017
chunks = splitrange(lenR, workers())
accums = get(task_local_storage(), :JULIA_ACCUMULATOR, ())
if accums !== ()
accums = accums[1]
Copy link
Member

@ararslan ararslan Jan 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be preferable to use a different variable name, since here (and below) accums is changing type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted. However, I'll request that we first discuss the API and implementation model. Detailed code review can follow later.

@tkelman tkelman added needs docs Documentation for this change is required needs tests Unit tests are required for this change labels Jan 17, 2017
# A function which returns a length value when input the destination pid.
# Used to serialize the same object with different length values depending
# on the destination pid.
destf::Nullable{Function}
Copy link
Contributor

@tkelman tkelman Jan 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could use a more descriptive name

and description of what it means when the nullable fields are null

new(f, len, len, initial, initial, destf, chnl)
end

set_destf(pacc::ParallelAccumulator, f::Function) = (pacc.destf = f; pacc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!

return get(pacc.value)
end

function reset(pacc::ParallelAccumulator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another specialization of the existing exported reset. I am OK with a new export reset! too.

throw(ArgumentError(string(
"@accumulate : ",
"First argument must be a variable name pointing to a ParallelAccumulator ",
"or a vector of variable names pointing to ParallelAccumulators. ",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why a vector rather than a tuple?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I think "list of accumulators", a vector comes naturally to mind. Tuple or vector, whatever is more natural (or both) can be made to work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have removed the checks. It can be any collection of ParallelAccumulators.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specific checks were not required.

julia> acc = map(i->ParallelAccumulator{Array}(vcat, 10), 1:10);

julia> @accumulate acc @parallel for i in 1:10
         foreach(x->push!(x, myid()), acc)
       end

julia> [wait(a) for a in acc]
10-element Array{Array{Int64,1},1}:
 [4,4,5,5,2,2,2,3,3,3]
 [4,4,5,5,3,3,3,2,2,2]
 [4,4,3,3,3,5,5,2,2,2]
 [4,4,3,3,3,5,5,2,2,2]
 [4,4,3,3,3,5,5,2,2,2]
 [4,4,3,3,3,5,5,2,2,2]
 [4,4,3,3,3,5,5,2,2,2]
 [4,4,3,3,3,5,5,2,2,2]
 [4,4,3,3,3,5,5,2,2,2]
 [4,4,3,3,3,5,5,2,2,2]

@amitmurthy
Copy link
Contributor Author

@tkelman , thanks for the review.

At a higher level, would folks like @parallel to change in this direction? Any other syntax possibilities to make @parallel for more like a regular for-loop and at the same time easy to use in a distributed fashion?

@amitmurthy amitmurthy added the needs decision A decision on this change is needed label Jan 19, 2017
@amitmurthy amitmurthy added this to the 0.6.0 milestone Jan 19, 2017
@amitmurthy
Copy link
Contributor Author

Will add tests and docs if there is consensus on this design.

@amitmurthy
Copy link
Contributor Author

I am inclining towards taking silence as consent.

Pinging a couple more folks for feedback - @ViralBShah, @alanedelman

@tkelman
Copy link
Contributor

tkelman commented Jan 24, 2017

The reduction forms of @parallel are a bit awkward but I'm not sure it's urgent that we rush in a redesign of it right now. Would possibly be better to iterate on these while early in a release cycle, or start moving all of this code to a package asap so it isn't tied to the language's releases any more, rather than feature freezing on something still new and experimental for the duration of 0.6.

@amitmurthy
Copy link
Contributor Author

No rush, but as long as we are in pre-feature freeze I don't see any reason not to go ahead with this change as long as it has a buy-in from folks - it is not like it is a major code change or revamp.

#19578 has been open for sometime now and some discussion has taken place, most of it agreeing that the reducer aspect of @parallel for must be removed.

@shashi
Copy link
Contributor

shashi commented Jan 24, 2017

Are we going to use @parallel arbitrary_expression to mean @everywhere arbitrary_expression? I remember this was on the parallel roadmap document or somewhere...

@amitmurthy
Copy link
Contributor Author

Are we going to use @parallel arbitrary_expression to mean @everywhere arbitrary_expression?

No. Only the reducer functionality of @parallel for is being removed to make it more like a regular for-loop. In order to make it useful in a distributed scenario where we are not using shared arrays, a nee mechanism via ParallelAccumulators is being provided.

loop = args[1]
elseif na==2
elseif na == 2
depwarn("@parallel with a reducer is deprecated. Use ParallelAccumulators for reduction.", :@parallel)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'll be a conflict magnet so it can wait a bit, but whenever this proceeds, would be good to leave a comment in the appropriate section of deprecated.jl as a reminder to remove this code

@shashi
Copy link
Contributor

shashi commented Jan 24, 2017

Why is @accumulate acc needed?

@amitmurthy
Copy link
Contributor Author

The same ParallelAccumulator object on the caller is serialized a little differently to each worker. Specifically, during serialization, depending on the target pid, the length of the sub-range to be processed on the remote node is sent. This is done via a custom serialize implementation that looks up the list of accumulators specified in @accumulate acc and sets up a callback that returns the correct length when input a pid during serialization. The list of accumulators is passed via task local storage.

The alternative to not requiring @accumulate acc would be to parse the body of @parallel for and extract all variables bound to ParallelAccumulator objects in pfor - after all macros in the body has been suitably expanded I guess. How simple/complicated would that be?

@amitmurthy
Copy link
Contributor Author

FWIW, there used to be code to identify variables of type RemoteRef in a thunk -

julia/base/multi.jl

Lines 1256 to 1265 in 7681878

if isa(env,Tuple)
for v in env
if isa(v,Box)
v = v.contents
end
if isa(v,RemoteRef)
p = v.where; break
end
end
end

If anyone can point me how to do it in the current codebase, I can work with it and we should be able to remove @accumulate acc and auto-detect accumulators used in the @parallel for-loop body.

@shashi
Copy link
Contributor

shashi commented Jan 24, 2017

How about:

  1. When serializing for a @parallel for loop, keep track of workers you are sending the accumulator to
  2. Workers report back updates to the acc when they are done executing their part of the for loop
  3. Once all workers to whom the accumulator was sent report back, reduce the answers at the master and notify the condition that releases wait(acc)...

I can see one problem with this: the reduce on each worker's result is not tree-reduce (does preduce do that?). But that's not impossible to implement this way too.

@amitmurthy
Copy link
Contributor Author

Workers report back updates to the acc when they are done executing their part of the for loop.

This is the issue. The accumulators need to know when the for-loop body is done on the worker. That is the information captured in the custom serialization of ParallelAccumulators.

The PR does 1,2 and 3 exactly the way you mention. For step 2, the count to wait for is sent depending on the remote pid.

@amitmurthy
Copy link
Contributor Author

Have removed @accumulate. Looking much better now.

julia> g1 = ParallelAccumulator{Int}(+, 10);

julia> g2 = ParallelAccumulator{Int}(*, 10);

julia> function foo()
           l1 = ParallelAccumulator{String}(string, 10);
           l2 = ParallelAccumulator{Array}(vcat, 10);
           @parallel for i in 1:10
               push!(g1, i)
               push!(g2, i)
               push!(l1, i)
               push!(l2, (i, myid()))
           end

           results = [wait(x) for x in [g1,g2,l1,l2]]
       end
foo (generic function with 1 method)

julia> foo()
4-element Array{Any,1}:
      55                                                                                
 3628800                                                                                
        "78910456123"                                                                   
        Tuple{Int64,Int64}[(1,2),(2,2),(3,2),(4,3),(5,3),(6,3),(9,5),(10,5),(7,4),(8,4)]

One new export of type ParallelAccumulator only.

@@ -1359,6 +1360,7 @@ export
@threadcall,

# multiprocessing
@accumulate,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required.

A final cleanup, docs and tests are pending.

@StefanKarpinski
Copy link
Member

bump

@amitmurthy
Copy link
Contributor Author

Will take a day or two more. Reimplementing this in a simpler fashion.

@ViralBShah
Copy link
Member

It would be really nice to get this into 0.6.

@amitmurthy
Copy link
Contributor Author

Simpler/cleaner implementation cooking!

@amitmurthy
Copy link
Contributor Author

Superseded by #20259

@amitmurthy amitmurthy closed this Jan 26, 2017
@StefanKarpinski StefanKarpinski deleted the amitm/parfor branch January 26, 2017 18:58
@StefanKarpinski StefanKarpinski restored the amitm/parfor branch January 26, 2017 18:58
@amitmurthy amitmurthy deleted the amitm/parfor branch January 26, 2017 19:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs decision A decision on this change is needed needs docs Documentation for this change is required needs tests Unit tests are required for this change parallelism Parallel or distributed computation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants