Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use pairwise order for mapreduce on arbitrary iterators #52397

Draft
wants to merge 22 commits into
base: master
Choose a base branch
from

Conversation

stevengj
Copy link
Member

@stevengj stevengj commented Dec 5, 2023

Currently, mapreduce uses a pairwise reduction order for arrays, but switches to mapfoldl order for other iterators. This has the unfortunate effect that pairwise summation is only used for arrays, and other iterators get much less accurate floating-point sums (and related reductions). For example, passing an array through a generator or an Iterators.filter would suddenly make sums less accurate.

I had long been under the mistaken impression that pairwise reduction required random access to an iterator, but @mikmoore pointed out in #52365 that this is not the case.

This WIP PR changes mapreduce to use a pairwise order by default for arbitrary iterators. I've only done light testing so far, but it should make summation about equally accurate for arrays and other iterators, and the performance seems about the same as the old mapfoldl fallback.

More testing and benchmarking required, but I wanted to post some code to get the ball rolling. (Should not be a breaking change, in theory, since we explicitly document that the associativity of mapreduce is implementation-dependent.)

Note that if you want to try out this code on an older version of Julia, just define the following foldl-like functions

import Base: _InitialValue, mapreduce_empty_iter, reduce_empty_iter, _xfadjoint, pairwise_blocksize, mapreduce_first, Generator, MappingRF, mapfoldl_impl
mapreduce_empty_iter(f::F, op::OP, itr) where {F,OP} = reduce_empty_iter(_xfadjoint(op, Generator(f, itr))...)
mapfoldp(f, op, itr; init=_InitialValue()) = mapreduce_impl(f, op, init, itr)
foldp(op, itr; init=_InitialValue()) = mapfoldp(identity, op, itr; init=init)

and copy the new mapreduce_impl and _mapreduce_impl methods from the PR (the chunk of the diff starting at macro _repeat).

@stevengj stevengj added iteration Involves iteration or the iteration protocol fold sum, maximum, reduce, foldl, etc. needs tests Unit tests are required for this change needs news A NEWS entry is required for this change needs nanosoldier run This PR should have benchmarks run on it needs pkgeval Tests for all registered packages should be run with this change speculative Whether the change will be implemented is speculative labels Dec 5, 2023
@stevengj
Copy link
Member Author

stevengj commented Dec 5, 2023

Accuracy example:

a = rand(Float32, 10^6); itr = Iterators.take(a, length(a))

rerr(a,b) = abs(a - b) / abs(a) # relative error
exact = sum(Float64, a);

@show rerr(exact, sum(a)); # classic random-access pairwise
@show rerr(exact, sum(itr)); # this PR: in-order pairwise
@show rerr(exact, foldl(+, a)); # old sum(itr) behavior

gives (typical values):

rerr(exact, sum(a)) = 2.080020937942571e-8
rerr(exact, sum(itr)) = 2.080020937942571e-8
rerr(exact, foldl(+, a)) = 1.4169783303123386e-6

Benchmark example:

(Same test data as above.)

@btime sum($a); # classic random-access pairwise
@btime Base.mapreduce_impl(identity, +, $(Base._InitialValue()), $a);  # new in-order pairwise applied to array

@btime foldl(+, $itr); # old behavior of sum(itr)
@btime sum($itr); #   # new in-order pairwise applied to Take iterator

gives

  77.867 μs (0 allocations: 0 bytes)
  98.385 μs (0 allocations: 0 bytes)
  1.003 ms (0 allocations: 0 bytes)
  1.025 ms (0 allocations: 0 bytes)

so it's slightly slower than the old pairwise algorithm on random access arrays (where it is not used by default), but is comparable to the old mapfoldl method on iterators where it is actually used.

@stevengj
Copy link
Member Author

stevengj commented Dec 5, 2023

Note that the old mapreduce implementation called mapfoldl, which called foldl, which was optimized by @tkf in #33526. The pairwise code now takes a different code path and skips the transducer optimizations, so there might be a performance regression unless we make a similar optimization here.

On the other hand, with original benchmark from #33526, the new mapreduce actually seems to be 2x faster:

julia> xs = [abs(x) < 1 ? x : missing for x in randn(1000)];

julia> @btime foldl(+, (x for x in $xs if x !== missing)); # older transducer-based foldl_impl
  1.860 μs (0 allocations: 0 bytes)

julia> @btime reduce(+, (x for x in $xs if x !== missing)); # this PR's pairwise mapreduce_impl
  967.846 ns (0 allocations: 0 bytes)

Am I missing something? Why is foldl now the slower option (but still faster than Julia 1.9 on my machine)? Is the transducer-based foldl no longer a good idea with the latest compiler improvements?

Maybe because of the @simd annotation in the mapreduce_impl, which allows re-association (which is not allowed by mapfoldl)?

@stevengj stevengj mentioned this pull request Dec 5, 2023
@stevengj
Copy link
Member Author

stevengj commented Dec 6, 2023

Here's a plot of the accuracy, showing that it is comparable to the current random-access pairwise algorithm that we use for arrays, and is much better than the O(√N) mean error growth of the foldl algorithm that is currently used for iterators.

image

Code:
mapfoldp(f, op, itr; init=Base._InitialValue()) = Base.mapreduce_impl(f, op, init, itr) # this PR's mapreduce_impl

using Xsum
accs = Float64[]
accp = Float64[]
accl = Float64[]
Nacc = round.(Int, exp10.(range(1,7,length=1000)))
rerr(x, exact) = Float64(abs(x - exact) / abs(exact))
for (i, N) in enumerate(Nacc)
    println("$i/$(length(Nacc)): N = $N")
    flush(stdout)
    a = rand(Float32, N)
    exact = xsum(a)
    push!(accs, rerr(sum(a), exact))
    push!(accp, rerr(mapfoldp(identity, +, a), exact))
    push!(accl, rerr(mapfoldl(identity, +, a), exact))
end

using PyPlot
loglog(Nacc, accs, "k--")
loglog(Nacc, accl, "b.")
loglog(Nacc, sqrt.(Nacc) .* 1e-8, "b-", linewidth=2)
loglog(Nacc, accp, "r.")
legend(["random-access pairwise", "left-associative", L"\sim \sqrt{N}", "streaming pairwise (new)"])
ylabel("relative error")
xlabel(L"number $N$ of summands")
title("summation accuracy for rand(Float32) values")

@stevengj
Copy link
Member Author

stevengj commented Dec 6, 2023

Here's a plot of the performance (larger = faster) on master, comparing it to both the current random-access pairwise algorithm that we use for arrays and the foldl algorithm that is currently used for iterators, all applied to a simple rand(N) array (which should be one of the fastest iterators). Right now it seems to be 2x 25% slower than foldl for small N, and 10x faster(!!) for larger N (probably SIMD, since foldl can't use @simd?).

(This is an Mac mini running a 3GHz Intel Core i5, from about 2018.)

image

Code:
mapfoldp(f, op, itr; init=Base._InitialValue()) = Base.mapreduce_impl(f, op, init, itr) # this PR's mapreduce_impl

using BenchmarkTools
Nbench = 2 .^ (0:16)
ts, tl, tp = (Float64[] for _=1:3)
for N in Nbench
    @show N
    a = rand(Float64, N)
    push!(ts, @belapsed sum($a))
    push!(tl, @belapsed mapfoldl(identity, +, $a))
    push!(tp, @belapsed mapfoldp(identity, +, $a))
end

using PyPlot
loglog(Nbench, 1e-9 .* Nbench ./ ts, "k--")
loglog(Nbench, 1e-9 .* Nbench ./ tl, "b.")
loglog(Nbench, 1e-9 .* Nbench ./ tp, "r.")
legend(["random-access pairwise", "left-associative", "streaming pairwise (new)"])
ylabel("summands / nanosecond")
xlabel(L"number $N$ of summands")
title("summation performance for rand(Float64) values")

For a different iterator Iterators.take(rand(N), N), the large-N performance advantage of the new implementation goes away, but at least it's about the same for large N (and still 25% slower for small N):

image

Code:
mapfoldp(f, op, itr; init=Base._InitialValue()) = Base.mapreduce_impl(f, op, init, itr) # this PR's mapreduce_impl

using BenchmarkTools
Nbench = 2 .^ (0:16)
tl, tp = (Float64[] for _=1:2)
for N in Nbench
    @show N
    a = rand(Float64, N)
    itr = Iterators.take(a, N)
    push!(tl, @belapsed mapfoldl(identity, +, $itr))
    push!(tp, @belapsed mapfoldp(identity, +, $itr))
end

using PyPlot
semilogx(Nbench, 1e-9 .* Nbench ./ tl, "b.")
semilogx(Nbench, 1e-9 .* Nbench ./ tp, "r.")
legend(["left-associative", "streaming pairwise (new)"])
ylabel(L"summands / nanosecond ($= N / t$)")
xlabel(L"number $N$ of summands")
title("summation performance for Iterators.take(rand(Float64, N), N)")
ylim(0,1.5)

It would be good to reduce the penalty for small N. Update: I did a bit of unrolling, improves things somewhat (graphs are updated).

@stevengj stevengj marked this pull request as draft December 6, 2023 22:59
@stevengj
Copy link
Member Author

stevengj commented Dec 7, 2023

I tried and failed to re-implement mapreduce_impl as a reduce_impl in the transducer style, via:

mapreduce_impl(f::F, op::OP, ::_InitialValue, itr) where {F,OP} = reduce_impl(_xfadjoint(op, Generator(f, itr))...)

so that reduce_impl(op, itr) implicitly has f as part of op. However, I ran into trouble because the transducer-style foldl abstraction, at least as currently implemented in Base, seems incompatible with re-associating the reduction.

In particular, suppose you have reduced the first half of itr to v1 and the second half to v2. Now you want to combine them. Unfortunately, op(v1, v2) no longer works, because if we have op::MappingRF this is equivalent to op.rf(v1, op.f(v2)), whereas what we want is op.rf(v1, v2).

@MasonProtter, am I missing something about this abstraction? Do we just need a reducer(op) function that extracts the reducer from a transducer op?

@stevengj
Copy link
Member Author

stevengj commented Dec 7, 2023

@StefanKarpinski, any thoughts? mapreduce is a pretty fundamental operation, so a change here will touch a lot of things.

@MasonProtter
Copy link
Contributor

@stevengj yes, that's right you'd need a way to extract out what Takafumi calls the "bottom" or "inner" reducing function. In Transducers.jl, there's code that looks like this:

abstract type AbstractReduction{innertype} <: Function end
struct BottomRF{F} <: AbstractReduction{F}
    inner::F
end
struct Reduction{X <: Transducer, I} <: AbstractReduction{I}
    xform::X
    inner::I

    Reduction{X, I}(xf, inner) where {X, I} = new{X, I}(xf, inner)

    function Reduction(xf::X, inner::I) where {X <: Transducer, I}
        if I <: AbstractReduction
            new{X, I}(xf, inner)
        else
            rf = ensurerf(inner)
            new{X, typeof(rf)}(xf, rf)
        end
    end
end
ensurerf(rf::AbstractReduction) = rf
ensurerf(f) = BottomRF(f)
inner(r::AbstractReduction) = r.inner

combine(rf::BottomRF, a, b) = combine(inner(rf), a, b)
combine(f, a, b) = f(a, b)
function combine(rf::Reduction, a, b)
    if ownsstate(rf, a)
        error("Stateful transducer ", xform(rf), " does not support `combine`")
    elseif ownsstate(rf, b)
        error("""
        Some thing went wrong in two ways:
        * `combine(rf, a, b)` is called but type of `a` and `b` are different.
        * `xform(rf) = $(xform(rf))` is stateful and does not support `combine`.
        """)
    else
        combine(inner(rf), a, b)
    end
end

This combine function is what gets used to join together different chunks of work when you reassociate.

Does that help?

@stevengj
Copy link
Member Author

stevengj commented Dec 7, 2023

you'd need a way to extract out what Takafumi calls the "bottom" or "inner" reducing function. In Transducers.jl, there's code that looks like this:

But there's currently nothing like that in Base, right? So we'd need to add that. And if we supported stateful transducers, we'd need a way to dispatch on that property in order to switch to a plain foldl algorithm if there is no combiner.

Anyway, writing/porting a lot of additional transducer functionality is something that I'd rather leave out of this PR.

@stevengj
Copy link
Member Author

@vtjnash, any ideas on further speeding up the small-N case?

@@ -613,6 +613,7 @@ test18695(r) = sum( t^2 for t in r )
@test_throws str -> ( occursin("reducing over an empty", str) &&
occursin("consider supplying `init`", str) &&
!occursin("or defining", str)) test18695(Any[])
@test_throws MethodError test18695(Any[])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I blindly went to add some tests here as I started reviewing this, and I wrote the following...

Suggested change
@test_throws MethodError test18695(Any[])
@test_throws MethodError test18695(Any[])
@testset "issue #52457: pairwise reduction of iterators" begin
@test foldl(+, Iterators.repeated(Float16(1.0), 10000)) == maxintfloat(Float16)
@test mapfoldl(x->x^2, +, Iterators.repeated(Float16(1.0), 10000)) == maxintfloat(Float16)
@test foldr(+, Iterators.repeated(Float16(1.0), 10000)) == maxintfloat(Float16)
@test mapfoldr(x->x^2, +, Iterators.repeated(Float16(1.0), 10000)) == maxintfloat(Float16)
@test reduce(+, Iterators.repeated(Float16(1.0), 10000)) 10000
@test mapreduce(x->x^2, +, Iterators.repeated(Float16(1.0), 10000)) 10000
@test_throws ArgumentError reduce((x,y)->x+y, Iterators.repeated(Float16(1.0), 0))
@test_throws ArgumentError mapreduce(identity, (x,y)->x+y, Iterators.repeated(Float16(1.0), 0))
@test reduce(+, Iterators.repeated(Float16(1.0), 0); init=Float16(0)) === Float16(0)
@test mapreduce(identity, +, Iterators.repeated(Float16(1.0), 0); init=Float16(0)) === Float16(0)
@test reduce(+, Iterators.repeated(Float16(1.0), 10000); init=Float16(0)) 10000
@test mapreduce(identity, +, Iterators.repeated(Float16(1.0), 10000); init=Float16(0)) 10000
@test reduce((x,y)->x+y, Iterators.repeated(Float16(1.0), 0); init=Float16(0)) === Float16(0)
@test mapreduce(x->x^2, (x,y)->x+y, Iterators.repeated(Float16(1.0), 0); init=Float16(0)) === Float16(0)
@test reduce((x,y)->x+y, Iterators.repeated(Float16(1.0), 10000); init=Float16(0)) 10000
@test mapreduce(x->x^2, (x,y)->x+y, Iterators.repeated(Float16(1.0), 10000); init=Float16(0)) 10000
end

But that fails, because supplying init pushes you back into foldl land. I see you've considered this... but it still feels pretty unexpected.

Comment on lines +343 to +345
# for an arbitrary initial value, we need to call foldl,
# because op(nt, itr[i]) may have a different type than op(nt, itr[j]))
# … it's not clear how to reliably do this without foldl associativity.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it's not clear to me why this is necessarily a problem. Isn't it also true that op(op(itr[i], itr[j]), itr[k]) might have a different type than op(itr[i], op(itr[j], itr[k])? Why is having a separately-passed initial value different from itr[i]?

Copy link
Contributor

@MasonProtter MasonProtter Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that re-association can introduce the initial value multiple times. e.g. if I do

sum(1:10, init=1)

then the result depends on how many times I split the pairwise reduction. i.e. if that is just

foldl(+, 1:10, init=1)

you get 56. But if you do a pairwise split in the middle you get

(foldl(+, 1:5, init=1) + foldl(+, 5:10, init=1))

then we have a result of 57. This is similar to but distinct from the problems you encounter when op is not associative. In this case, it turns out to be important that init must be an identity element under op, i.e. you must have op(init, x) ≈ x

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if you do a pairwise split in the middle you get

(foldl(+, 1:5, init=1) + foldl(+, 5:10, init=1))

Dumb question: why couldn't we do foldl(+, 1:5, init=1) + foldl(+, 5:10)? Isn't this already doing foldl for the first 14 operations anyway?

...

Oh. Right, I suppose this is where type comes into play. Because it'd be very weird and problematic if reduce(+, Int8.(1:100), init=big(0)) did half of its calculation mod $2^8$ and half in arbitrary precision. Yeah, ok.

Copy link
Member

@mbauman mbauman Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it'd be very weird and problematic if reduce(+, Int8.(1:100), init=big(0)) did half of its calculation mod $2^8$ and half in arbitrary precision.

But on the other hand, isn't that what you sign up for with reduce? Since we very explicitly do not guarantee an evaluation order, I don't think you should be able to rely upon this sort of "cascading-promotion-from-the-init-value".

Is it really meaningfully different than the status quo:

julia> reduce(+, Integer[1; repeat([UInt8(1)], 10000)])
1297

julia> foldl(+, Integer[1; repeat([UInt8(1)], 10000)])
10001

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what I infer from the docstring is that init must be the identity element of the monoid defined by op and eltype(itr)

so I might expect it to "work" when typeof(op(init, x)) <: eltype(itr) for all x in itr

but in the example reduce(+, Int8.(1:100), init=big(0)) since the promoted return type of +(::BigInt, ::Int8) does not subtype Int8 , I do not think that can reasonably be expected to always produce a sane result

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, we even go further than that and say:

It is unspecified whether init is used for non-empty collections.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the docstring seems to give us a lot of leeway to either drop the init or use it each time and give a 'surprising' result if it's not the identity element.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's continue this conversation in #53871, e.g. see my comment at #53871 (comment)

@ViralBShah
Copy link
Member

Bump. Is this close to get done and merged?

@stevengj
Copy link
Member Author

It's waiting on the resolution of #53945, at which point this PR can be updated to use init according to the new specification.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
fold sum, maximum, reduce, foldl, etc. iteration Involves iteration or the iteration protocol needs nanosoldier run This PR should have benchmarks run on it needs news A NEWS entry is required for this change needs pkgeval Tests for all registered packages should be run with this change needs tests Unit tests are required for this change speculative Whether the change will be implemented is speculative
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants