Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: "for-loop" compliant @parallel for.... take 2 #20259

Closed
wants to merge 1 commit into from

Conversation

amitmurthy
Copy link
Contributor

@amitmurthy amitmurthy commented Jan 26, 2017

Rework of #20094

This addresses some of the concerns with @parallel as discussed at #19578 in a different manner.

@parallel for differs from a regular for-loop by

  • implementing a reducer functionality
  • treating the last line in the body as the value to be reduced
  • returning before the loop completes execution (non-reducer case)
  • folks are used to looping over local arrays and expect them to be updated. This only works with shared arrays.

This PR:

  • deprecates the reducer mode of @parallel for
  • provides a way for the user to explicitly specify accumulators and use them in the main body
  • can wait on the accumulator(s)

The usage is a bit more verbose, but there is much lesser scope for confusion or misplaced expectations.

@parallel reducer for x in unit_range
  body
end

will now be written as

acc = ParallelAccumulator(initial_value)
@parallel for x in unit_range
  body
  acc[] = reducer_f(acc[], iteration_value)
end
result = reduce(reducer_f, acc)

Multiple accumulators can be referenced

a = ParallelAccumulator(0.0)    # Specify an initial value. 
b = ParallelAccumulator(1.0)     
@parallel for i in 1:N
    a[] += foo(i)               # 0-arg indexation syntax for set/get. Similar to Ref.
                                
    b[] = min(b[], bar(i))   # In the loop any value can be assigned.
                             # The last value is sent to the master node
end
reduced_a = reduce(+, a)        # explicit reduce call for final reduction                  
reduced_b = reduce(min, b) 

Updating shared arrays work as before, there is no need for ParallelAccumulators. However, ParallelAccumulators can be used in multi-node scenarios which shared memory cannot address.

As before the input range is partitioned across workers, local reductions performed with a final reduction on the caller.

I feel this syntax and loop behavior is more in-line with a regular for-loop. Updating arrays from the body is still not allowed (except if they are shared of course). ParallelAccumulators does cover that need.

ParallelAccumulator can also be used outside of a @parallel as shown below:

acc = ParallelAccumulator{Int}()
@sync for p in workers()
    @spawnat p begin
        for i in 1:10
            acc[] += i    # local accumulation
        end
        push!(acc)           # explicit push back to caller. This is implicitly done in case of `@parallel`
    end
end
result = reduce(+, acc)

API:

  • New export : ParallelAccumulator
  • acc[] - set/get current accumulated value on the workers
  • reduce(op, acc) - final reduction on the caller
  • push!(accumulator) - Used on workers when used in non-@parallel mode to explicitly push local reductions to caller.

Further changes:

  • Bikeshed ParallelAccumulator name.
  • any other API/usage suggestions.

Todo:

  • - NEWS
  • - manual update
  • - docstrings update
  • - deprecate.jl entry

base/exports.jl Outdated
@@ -80,6 +80,7 @@ export
ObjectIdDict,
OrdinalRange,
Pair,
ParallelAccumulator,
Copy link
Contributor

@tkelman tkelman Jan 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should go in stdlib doc index if exported

(oh, already in the todo)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't need to be listed twice

@tkelman tkelman added deprecation This change introduces or involves a deprecation needs news A NEWS entry is required for this change parallelism Parallel or distributed computation labels Jan 26, 2017
@amitmurthy amitmurthy changed the title RFC/WIP: "for-loop" compliant @parallel for.... take 2 RFC: "for-loop" compliant @parallel for.... take 2 Jan 27, 2017
@amitmurthy
Copy link
Contributor Author

Ready for review. Will be good if a couple of folks in addition to @tkelman have a look.

@clarkfitzg
Copy link
Contributor

New Julia user here, coming from Python, R, C. Pardon if my questions are naive, just looking to understand.

@aviks mentioned pmap in #19578. Looks like @parallel for and ParallelAccumulator essentially provide an alternate syntax for operations that can be done with the familiar map and reduce. Although there would need to be a parallel mapreduce() to directly reduce without an intermediate result.

The docs mention:

Julia’s pmap() is designed for the case where each function call does a large amount of work. In contrast, @parallel for can handle situations where each iteration is tiny, perhaps merely summing two numbers.

Curious, why is this the case? If they basically do the same thing then is it possible to share implementation?

base/multi.jl Outdated
t = Task(()->remotecall_fetch(f, pid, reducer, R, first(chunks[idx]), last(chunks[idx])))
schedule(t)
for (pid, r) in splitrange(length(R), workers())
t = @schedule remotecall_fetch(f, pid, reducer, R, first(r), last(r))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this do dynamic load balancing across workers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The range is statically partitioned once.

@JeffBezanson
Copy link
Member

How is the performance of this?

@amitmurthy
Copy link
Contributor Author

How is the performance of this?

Terrible. I am ashamed to post numbers here. Will try to come up with an alternative in quick-time.

@amitmurthy
Copy link
Contributor Author

@clarkfitzg , a couple of reasons

  • folks do find the for-loop syntax more natural and convenient
  • deprecating the reducer but retaining the distributed loop helps in working towards a single model combining all types of loops parallelism - multiple nodes, @threads for, @simd for and GPUs too.

@amitmurthy amitmurthy force-pushed the amitm/parfor2 branch 2 times, most recently from d0ac82d to 912e102 Compare January 31, 2017 10:27
@amitmurthy
Copy link
Contributor Author

Have pushed an update. Numbers looking reasonable now.

With an empty body on 0.5, 4 workers:

julia> function foo(n)
           @parallel (+) for i in 1:n
               1
           end
       end;
julia> @time foo(10^9)
  0.000914 seconds (579 allocations: 46.250 KB)
1000000000

julia> @time foo(10^9)
  0.000934 seconds (562 allocations: 45.047 KB)
1000000000

This PR:

julia> function foo(n)
           a1 = ParallelAccumulator(+, 0)
           @parallel for i in 1:10^n
               push!(a1, 1)
           end
           take!(a1)
       end
julia> @time foo(9)
  0.002706 seconds (1.74 k allocations: 76.625 KiB)
1000000000

julia> @time foo(9)
  0.002627 seconds (1.75 k allocations: 77.266 KiB)
1000000000

With a minimal compute in the loop body

On 0.5

julia> function foo(n)
           @parallel (+) for i in 1:n
               rand()
           end
       end
julia> @time foo(10^9)
  0.665094 seconds (557 allocations: 44.531 KB)
5.0000125154458153e8

julia> @time foo(10^9)
  0.684806 seconds (569 allocations: 45.031 KB)
4.9999967851712906e8

On this PR:

julia> function foo(n)
           a1 = ParallelAccumulator(+, 0.0)
           @parallel for i in 1:n
               push!(a1, rand())
           end
           take!(a1)
       end
julia> @time foo(10^9)
  1.011015 seconds (1.80 k allocations: 78.375 KiB)
5.000151295992906e8

julia> @time foo(10^9)
  1.001658 seconds (1.85 k allocations: 72.500 KiB)
4.999923846122731e8

Given that the numbers are for 10^9 iterations, and that real world code will have more meaty compute, the numbers look very reasonable and can probably be improved a bit further too.

@amitmurthy
Copy link
Contributor Author

FWIW, the ParallelAccumulator values can also be directly accessed via []. With

function foo(n)
    a1 = ParallelAccumulator(+, 0.0)
    @parallel for i in 1:n
        a1[] = a1[] + rand()
    end
    take!(a1)
end
foo(10);
@time foo(10^9)

the time is further improved to 0.78 seconds.

base/multi.jl Outdated
end
global pacc_registry
for pacc in get(pacc_registry, rrid, [])
push!(pacc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this accomplishing?

Copy link
Contributor

@tkelman tkelman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(only partway through this version, posting my comments so far)

base/multi.jl Outdated
with a final reduction on the calling process.
The loop is executed in parallel across all workers, with each worker executing a subset
of the range. The call waits for completion of all iterations on all workers before returning.
Any updates to variables outside the loop body is not reflected on the calling node.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Any updates ... are not"

base/multi.jl Outdated
Note that without a reducer function, `@parallel` executes asynchronously, i.e. it spawns
independent tasks on all available workers and returns immediately without waiting for
completion. To wait for completion, prefix the call with [`@sync`](@ref), like :
Example with shared arrays:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see elsewhere for how examples are usually formatted - something like # SharedArray Example would be more consistent here

base/multi.jl Outdated
julia> c = 10;

julia> @parallel for i=1:4
a[i] = i + c;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should make indent consistent with other doctests, and semicolon not needed here (after the end maybe, or show the return value of the @parallel)

base/multi.jl Outdated
loop = args[1]
elseif na==2
elseif na == 2
depwarn("@parallel with a reducer is deprecated. Use ParallelAccumulators for reduction.", Symbol("@parallel"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should leave a TODO note in deprecated.jl

@tkelman
Copy link
Contributor

tkelman commented Feb 2, 2017

I'll continue reviewing this, but given it's a bit of a slowdown relative to master 0.5 I think we should hold off and not make this change for 0.6.

@StefanKarpinski StefanKarpinski removed this from the 0.6.0 milestone Feb 2, 2017
@amitmurthy
Copy link
Contributor Author

but given it's a bit of a slowdown relative to 0.5 I think we should hold off and not make this change for 0.6.

I am fine with not doing this in 0.6, however my reason would be in order to get the interface right. The performance numbers do not concern me that much right now.

  1. This change is more flexible relative to what we have now. We can have any number of parallel accumulators, and they can be used outside of @parallel for-loops too.
  2. The major slowdown is seen only on a billion iterations of an empty for-body. This is not real world usage, with even a small compute the overhead of the new machinery will be rendered inconsequential.
  3. We are in a feature freeze, get the API correct right now. There will be a couple of weeks to further improve performance.
  4. With inputs from @shashi , I have further simplified the interface on my local branch. Unless there is a demand to get this in now, it can wait to be merged early on in the next release cycle.

@shashi
Copy link
Contributor

shashi commented Feb 3, 2017

I agree we should get this in with the right API for 0.6, performance improvements can come later.

@amitmurthy want to push the DRef thing here? or create a new PR?

@tkelman
Copy link
Contributor

tkelman commented Feb 3, 2017

There are a lot of changes here and 0.6 feature freeze is already a month overdue. The discussion on the triage call is that there are more important things to focus on for getting 0.6 feature freeze and this can wait.

@StefanKarpinski
Copy link
Member

There's also the issue that people are actually using our parallel for loops. If we tank their performance, that's not really ok.

@amitmurthy
Copy link
Contributor Author

There's also the issue that people are actually using our parallel for loops. If we tank their performance, that's not really ok.

Of course it is not OK. However it is very important to get some perspective here. For an empty billion iterations (adding 1 in every iteration actually) distributed over 4 workers, slowdown is from 0.0009 to 0.002 seconds - this will be fixed distribution overhead.

For a minimal compute (a rand() in every iteration) - slowdown in the PR is from 0.67 (Julia 0.5) to 0.78 seconds - which I hope to totally remove once we freeze the API.

"tanking their performance" is a bit of a stretch.

@amitmurthy
Copy link
Contributor Author

The slowdown seen in the case of summing floats is essentially #20452

@amitmurthy
Copy link
Contributor Author

  • The "it's embarrassing" comment also mentioned a "fix" and the PR was updated accordingly and numbers posted. Two numbers are being discussed here:

  • One, an "empty" do-nothing case seeing a radical 2.5x slowdown. However, it is important to view that in the right perspective:

    • Net, the 2.5x slowdown translated to an additional 1 millisecond in distributed mode. The additional 1 millisecond does not bother me because a) the replacement is more flexible, b) can support multiple reducers in a single loop c) is general enough to be used independent of @parallel for and finally d) the 1 millisecond extra is mostly network/serialization stuff on single node which will be subsumed by actual computation in real-world scenarios. In multi-node, I would expect even this 2.5x / 1 millisecond difference to disappear.
  • The other number reported a 15-30% slowdown using a rand() call to simulate "small loop computation". That was unexpected and has since been tracked down to a compiler optimization issue. It is being tracked in Performance difference between local Ref (allocated once) and a local float #20452.

@StefanKarpinski
Copy link
Member

So there is no practical slowdown here?

@amitmurthy
Copy link
Contributor Author

Practically, in real-world usage, IMO, there will not be.

Also, @shashi and myself have discussed a slightly improved version of the API. I'll update this PR with that today.



"""
push!(pacc::ParallelAccumulator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1-arg push! doesn't make sense from an API standpoint. This has more to do with the communication than a collection operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, trying to reuse an existing exported verb. send(pacc::ParallelAccumulator)?

@amitmurthy
Copy link
Contributor Author

Updated.

Usage is now:

a = ParallelAccumulator(0.0)    # Specify an initial value. 
b = ParallelAccumulator(1.0)     
@parallel for i in 1:N
    a[] += foo(i)               # 0-arg indexation syntax for set/get. Similar to Ref.
                                
    b[] = min(b[], bar(i))   # In the loop any value can be assigned.
                             # The last value is sent to the master node
end
reduced_a = reduce(+, a)        # explicit reduce call for final reduction                  
reduced_b = reduce(min, b)           

Timings(seconds) - each for 1 billion @parallel iterations with 8 local workers

Sum of master this PR
1.0 0.14 0.13
rand() 0.50 0.60
sqrt(i) 1.21 1.20

@shashi
Copy link
Contributor

shashi commented Feb 8, 2017

makes sense to rename ParallelAccumulator to DRef (for distributed version of Ref)?

@StefanKarpinski
Copy link
Member

Since we spell out RemoteRef and such, wouldn't DistributedRef be better, despite being a bit of a mouthful?

@Sacha0
Copy link
Member

Sacha0 commented Feb 8, 2017

IIUC, this loop construct / the accumulator does not necessarily involve distribution or even parallelism, but rather merely concurrency? If so, perhaps referencing concurrency as opposed to distribution or parallelism would yield a more appropriate name? Ref. #20486 (comment). Best! Corrected misunderstanding: This accumulator is specifically for distributed computing. DistributedRef seems like a great name.

@amitmurthy
Copy link
Contributor Author

IIUC, this loop construct / the accumulator does not necessarily involve distribution or even parallelism, but rather merely concurrency?

No, it is distributed and parallel rather than just concurrent going by the following definition.

"Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once." - Rob Pike (https://blog.golang.org/concurrency-is-not-parallelism)

DistributedRef sounds great.

@StefanKarpinski
Copy link
Member

I've always found that quote bit unclear. I think of it this way:

  • concurrency: expressing that things could happen at the same time;
  • parallelism: things actually happening at the same time.

@tkelman
Copy link
Contributor

tkelman commented Feb 9, 2017

Couldn't you use this API from async tasks without anything being distributed? AccumulatorRef maybe, if that's what it's for?

@StefanKarpinski
Copy link
Member

Or maybe SyncRef since the key behavior seems to be synchronization?

@amitmurthy
Copy link
Contributor Author

The accumulator type is designed to work with @parallel which distributes across processes. When used with @parallel local accumulations on the workers are pushed automatically to the calling node. While it can be used with local task-only parallelism, there are no benefits to doing so - any regular object or a regular Ref will do.

ParallelAccumulator best states its functionality - each worker is accumulating in parallel. DistributedRef captures its nature, a Ref which is distributed across workers - you do have to explicitly execute a final reduce on the caller.

@amitmurthy
Copy link
Contributor Author

amitmurthy commented Feb 9, 2017

SyncRef - no, there is no barrier like functionality exposed here - calling reduce waits to collect accumulated values from all workers, however, synchronization is not an intent.

@Sacha0
Copy link
Member

Sacha0 commented Feb 11, 2017

With new perspective (#20486 (comment)), cheers for DistributedRef or DistributedAccumulator. Best!

Pushes the locally accumulated value to the calling node. Must be called once on each worker
when a DistributedRef is used independent of a [`@parallel`](@ref) construct.
"""
push!(dref::DistributedRef) = put_nowait!(dref.chnl, (myid(), dref.value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

push doesn't make sense, it's not a growing collection - send would be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

Int(rand(Bool))
acc = DistributedRef(0)
@parallel for i=1:200000000
acc[] += Int(rand(Bool))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 space indent in code examples

@amitmurthy
Copy link
Contributor Author

Have renamed to DistributedRef.

If we are having this in 0.6, would like @JeffBezanson to review once before merging.

There is a partial overlap in the functionality (not the implementation which is quite different) between a DistributedRef and a DArray{T,1,T} https://github.com/JuliaParallel/DistributedArrays.jl#working-with-distributed-non-array-data.

DistributedRef is specifically optimized for use with an @parallel for loop - . With a DArray{T,1,T}, creating the distributed vector, local accumulations and a final reductions are distinct operations w.r.t. the messages being transported.

With a DistributedRef, the remote refs are created as part of the @parallel serialization, and local accumulations are sent back when the local loops terminate.

Mentioning it here because I believe at some point in the future, module Distributed, DistributedArrays and SharedArrays will be part of a single DistributedComputing package. At which time
DistributedRef may be replaced with a version of DArray{T,1,T} that works efficiently with @parallel.

However, for now I would like to see this PR merged in 0.6. It addresses two issues, 1) deprecation of the reducer mode in @parallel and 2) its replacement with a different style of reduction, namely explicitly dealing with a distributed ref.

@musm
Copy link
Contributor

musm commented Feb 16, 2017

+1 for having this in 0.6 . Out of consistency should DArray be renamed to DistributedArray to parallel DistributedRef or DArray and DRef? Perhaps there is some other precedent for the DArray name im not aware of.

@shashi
Copy link
Contributor

shashi commented Feb 17, 2017

julia> x = DistributedRef(0)
DistributedRef{Int64}(0, 0, Set{Int64}(), RemoteChannel{Channel{Tuple}}(1, 1, 5), 0)

julia> @parallel for i=1:10
           x[] += i
       end

julia> reduce(+, x)
55

julia> reduce(+, x)
55

julia> @parallel for i=1:10
           x[] += i
       end

julia> reduce(+, x)
55

Would it be better for the second @parallel for to use the old accumulated values of x on the workers? i.e. the answer should be 110 here.

This change would make this abstraction much more powerful than it currently is. It could be used outside @parallel, for example with explicit remotecalls. A caveat is that we will need a function to free the values in the ref if a user wishes to do so. But I figure for most use cases, e.g. for just adding up numbers, you don't really need to care about it. You need such a thing now anyway if you are accumulating big objects (although you can do empty!(x.value) after the reduce, that's not really API)... This should also only result in deletion of some logic in setup and teardown of @parallel

@StefanKarpinski
Copy link
Member

We should keep in mind that we'll probably have some kind of SyncedRef type that can be safely updated by multiple threads (Cilk has this kind of thing), so we should make the naming scheme coherent across parallel and distributed models.

@ViralBShah
Copy link
Member

Perhaps too old to reuse. @shashi any help here?

@vtjnash
Copy link
Member

vtjnash commented Feb 10, 2024

Moved to JuliaLang/Distributed.jl#39

@vtjnash vtjnash closed this Feb 10, 2024
@vtjnash vtjnash deleted the amitm/parfor2 branch February 10, 2024 22:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
deprecation This change introduces or involves a deprecation needs news A NEWS entry is required for this change parallelism Parallel or distributed computation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants