Skip to content

Commit

Permalink
improve GroupBy documentation and remove some more old julia version …
Browse files Browse the repository at this point in the history
…support (#20)

* improve GroupBy documentation and remove some more old julia version support

* fix tests referring to PartitionableArray

* fix doctest

* another doc fix

* yet another doc fix
  • Loading branch information
ExpandingMan authored Jul 20, 2023
1 parent b0a0f1d commit 372c888
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 119 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "Transducers"
uuid = "28d57a85-8fef-5791-bfe6-a80928e7c999"
authors = ["Takafumi Arakaki <aka.tkf@gmail.com>"]
version = "0.4.78"
version = "0.4.79"

[deps]
Adapt = "79e6a3ab-5dfb-504d-930d-738a2a938a0e"
Expand Down
2 changes: 1 addition & 1 deletion src/Transducers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ import Setfield
import Tables
using ArgCheck
using BangBang.Experimental: modify!!, mergewith!!
using BangBang.NoBang: SingletonVector
using BangBang.NoBang: SingletonVector, SingletonDict
using BangBang:
@!, BangBang, Empty, append!!, collector, empty!!, finish!, push!!, setindex!!, union!!
using Baselet
Expand Down
25 changes: 3 additions & 22 deletions src/basics.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,10 @@ julia> _unzip(((1, 2, 3), (4, 5, 6)))
"""
_unzip(xs::Tuple{Vararg{NTuple{N,Any}}}) where {N} = ntuple(i -> map(x -> x[i], xs), N)

if isdefined(Iterators, :Zip1) # VERSION < v"1.1-"
arguments(xs::Iterators.Zip1) = (xs.a,)
arguments(xs::Iterators.Zip2) = (xs.a, xs.b)
arguments(xs::Iterators.Zip) = (xs.a, arguments(xs.z)...)
const _Zip = Iterators.AbstractZipIterator
else
arguments(xs::Iterators.Zip) = xs.is
const _Zip = Iterators.Zip
end
arguments(xs::Iterators.Zip) = xs.is
const _Zip = Iterators.Zip

if VERSION < v"1.3"
_Channel(f, ::Type{T}, size; kwargs...) where {T} =
Channel(f; ctype = T, csize = size, kwargs...)
else
_Channel(f, ::Type{T}, size; kwargs...) where {T} = Channel{T}(f, size; kwargs...)
end
_Channel(f, ::Type{T}, size; kwargs...) where {T} = Channel{T}(f, size; kwargs...)

_typeof(::Type{T}) where {T} = Type{T}
_typeof(::T) where {T} = T
Expand Down Expand Up @@ -56,13 +44,6 @@ prefixed_type_name(@nospecialize x) =
const DenseSubVector{T} =
SubArray{T, 1, Vector{T}, Tuple{UnitRange{Int}}, true}

# https://github.com/JuliaLang/julia/pull/33533
if VERSION < v"1.4"
const PartitionableArray = Vector
else
const PartitionableArray = AbstractArray
end


const _non_executable_transducer_msg = """
Output type of the transducer is inferred to be a `Union{}`. This
Expand Down
14 changes: 2 additions & 12 deletions src/core.jl
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,7 @@ AbstractFilter

abstract type AbstractReduction{innertype} <: Function end

if VERSION >= v"1.3" # post https://github.com/JuliaLang/julia/pull/31916
@inline (rf::AbstractReduction)(state, input) = next(rf, state, input)
end
@inline (rf::AbstractReduction)(state, input) = next(rf, state, input)

InnerType(::Type{<:AbstractReduction{T}}) where T = T

Expand Down Expand Up @@ -321,10 +319,6 @@ end

Base.:(==)(r1::Reduction, r2::Reduction) = (r1.xform == r2.xform) && (r1.inner == r2.inner)

if VERSION < v"1.3" # pre https://github.com/JuliaLang/julia/pull/31916
@inline (rf::Reduction)(state, input) = next(rf, state, input)
end

prependxf(rf::AbstractReduction, xf) = Reduction(xf, rf)
setinner(rf::Reduction, inner) = Reduction(xform(rf), inner)
setxform(rf::Reduction, xform) = Reduction(xform, inner(rf))
Expand Down Expand Up @@ -393,11 +387,7 @@ Composition of transducers.
@inline Base.:(f::IdentityTransducer, ::IdentityTransducer) = f
@inline Base.:(::IdentityTransducer, f::Composition) = f # disambiguation

if VERSION >= v"1.3"
(xf::Transducer)(itr) = eduction(xf, itr)
else
Base.:|>(itr, xf::Transducer) = eduction(xf, itr)
end
(xf::Transducer)(itr) = eduction(xf, itr)

"""
ReducingFunctionTransform(xf)
Expand Down
67 changes: 38 additions & 29 deletions src/groupby.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,26 @@
GroupBy(key, rf, [init])
Group the input stream by a function `key` and then fan-out each group
of key-value pairs to the reducing function `rf`.
of key-value pairs to the eduction `xf'(step)`. This is similar to the
`groupby` relational database operation.
For example, if `GroupBy` is used as in:
For example
[1,2,1,2,3] |> GroupBy(string, Map(last)'(+)) |> foldxl(right)
returns a result equivalent to `Dict("1"=>2, "2"=>4, "3"=>3)` while
[1,2,1,2,3] |> GroupBy(string, Map(last) ⨟ Map(Transducers.SingletonVector), append!!) |> foldxl(right)
returns a result equivalent to `Dict("1"=>[1,1], "2"=>[2,2], "3"=>[3,3])`.
Alternatively, one can provide a reducing function directly, though this is disfavored since it prevents
results from being combined with [`Transducers.combine`](@ref) and therefore cannot be used
with [`foldxt`](@ref) or [`foldxd`](@ref). For example, if `GroupBy` is used as in:
xs |> Map(upstream) |> GroupBy(key, rf, init) |> Map(downstream)
then the "function signatures" would be:
then the function signatures would be:
upstream(_) :: V
key(::V) :: K
Expand All @@ -31,36 +44,32 @@ That is to say,
the current and all preceding results is then fed into the
`downstream`.
The signature `GroupBy(key, ::Transducer, [step, [init]])` is preferred since
results can only be combined with [`Transducers.combine`](@ref) (e.g. for use in `foldxt`)
when the inner-most reducing function is accessible. For example, instead of
```
GroupBy(key, (y, (k, v)) -> y + v.a, init)
```
one should write
```
GroupBy(key, Map(kv -> kv[2].a)'(+), init)
```
The difference is that in the latter case Transducers can figure out how to sum separately from
accessing the value.
See also `groupreduce` in
[SplitApplyCombine.jl](https://github.com/JuliaData/SplitApplyCombine.jl).
!!! compat "Transducers.jl 0.3"
# Examples
```jldoctest; setup = :(using Transducers)
julia> [1,2,3,4] |> GroupBy(iseven, Map(last)'(+)) |> foldxl(right)
Transducers.GroupByViewDict{Bool,Int64,…}(...):
0 => 4
1 => 6
```
New in version 0.3.
```jldoctest; setup = :(using Transducers)
julia> using Transducers: SingletonDict;
# Examples
```jldoctest
julia> using Transducers
using BangBang # for `push!!`
julia> foldl(right, GroupBy(string, Map(last), push!!), [1, 2, 1, 2, 3])
Transducers.GroupByViewDict{String,Vector{Int64},…}(...):
"1" => [1, 1]
"2" => [2, 2]
"3" => [3]
julia> using BangBang; # for merge!!
julia> x = [(a="A", b=1, c=1), (a="B", b=2, c=2), (a="A", b=3, c=3)];
julia> inner = Map(last) ⨟ Map() do ξ
SingletonDict(ξ.b => ξ.c)
end;
julia> x |> GroupBy(ξ -> ξ.a, inner, merge!!) |> foldxl(right)
Transducers.GroupByViewDict{String,Dict{Int64, Int64},…}(...):
"B" => Dict(2=>2)
"A" => Dict(3=>3, 1=>1)
```
Note that the reduction stops if one of the group returns a
Expand All @@ -72,7 +81,7 @@ it is find:
julia> result = transduce(
GroupBy(
string,
opcompose(Map(last), Scan(+), ReduceIf(x -> x > 3)),
Map(last)Scan(+)ReduceIf(x -> x > 3),
),
right,
nothing,
Expand Down
6 changes: 0 additions & 6 deletions src/library.jl
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,6 @@ OfType(T::Type) = OfType{T}()
@inline _next_oftype(T, inner, result, input) =
input isa T ? next(inner, result, input) : result

# Workaround StackOverflowError in Julia 1.0
# https://travis-ci.com/JuliaFolds/Transducers.jl/jobs/171732596
if VERSION >= v"1.1-"

@inline _next_oftype(T, inner, result, input::Tuple) =
_next_oftype_t(T, inner, result, (), input...)

Expand Down Expand Up @@ -395,8 +391,6 @@ end
# Not using `Base.tail(input)` for a Tuple-of-(possibly)-Union seems
# to be a nice strategy for achieving type-stability.

end # if

# https://clojure.github.io/clojure/clojure.core-api.html#clojure.core/take
# https://clojuredocs.org/clojure.core/take
"""
Expand Down
3 changes: 0 additions & 3 deletions src/partitionby.jl
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,3 @@ end

(f::PartitionBy)(xs::AbstractArray) = array_partitionby(f.f, xs)

if VERSION < v"1.3"
Base.:|>(xs::AbstractArray, f::PartitionBy) = array_partitionby(f.f, xs)
end
27 changes: 3 additions & 24 deletions src/progress.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,7 @@ julia> xf = opcompose(
end,
);
julia> if VERSION >= v"1.3-alpha"
# Calling `sleep` in thread is safe in Julia 1.3:
foldxt(+, xf, withprogress(1:10; interval=1e-3); basesize=1)
else
foldl(+, xf, withprogress(1:10; interval=1e-3))
end
julia> foldxt(+, xf, withprogress(1:10; interval=1e-3); basesize=1)
220
```
"""
Expand Down Expand Up @@ -183,24 +178,8 @@ _reduce(ctx, rf, init, coll::SizedReducible{<:ProgressLoggingFoldable}) =
_reduce(ctx, rf, init, coll)
end

if VERSION >= v"1.2"
_reduce_threads_for(rf, init, coll::SizedReducible{<:ProgressLoggingFoldable}) =
_reduce_progress(_reduce_threads_for, rf, init, coll)
else
# In earlier versions, Channel was not thread-safe (?)
# https://github.com/JuliaLang/julia/pull/30186
_reduce_threads_for(rf, init, coll::SizedReducible{<:ProgressLoggingFoldable}) =
_reduce_threads_for(
rf,
init,
(@set coll.reducible = coll.reducible.foldable),
)
end

if VERSION < v"1.3-alpha"
maybe_collect(coll::ProgressLoggingFoldable) =
@set coll.foldable = maybe_collect(coll.foldable)
end
_reduce_threads_for(rf, init, coll::SizedReducible{<:ProgressLoggingFoldable}) =
_reduce_progress(_reduce_threads_for, rf, init, coll)

struct RemoteReduceWithLogging{C} <: Function
chan::C
Expand Down
13 changes: 3 additions & 10 deletions src/reduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,14 @@ function transduce_assoc(
return result
end

if VERSION >= v"1.3-alpha"
maybe_collect(coll) = coll
else
maybe_collect(coll::AbstractArray) = coll
maybe_collect(coll) = collect(coll)
end

function _transduce_assoc_nocomplete(
rf::F,
init,
coll,
basesize,
ctx::DACContext = NoopDACContext(),
) where {F}
reducible = SizedReducible(maybe_collect(coll), basesize)
reducible = SizedReducible(coll, basesize)
return _reduce(ctx, rf, init, reducible)
end

Expand Down Expand Up @@ -224,7 +217,7 @@ _might_return_reduced(rf, init, coll) =
) !== Union{}

_reduce_dummy(rf, init, coll) =
__reduce_dummy(rf, init, SizedReducible(maybe_collect(coll), 1))
__reduce_dummy(rf, init, SizedReducible(coll, 1))

function __reduce_dummy(rf, init, reducible)
if issmall(reducible)
Expand Down Expand Up @@ -378,7 +371,7 @@ tcopy(xf, T::Type{<:AbstractSet}, reducible; kwargs...) =
function tcopy(
::typeof(Map(identity)),
T::Type{<:AbstractSet},
array::PartitionableArray;
array::AbstractArray;
basesize::Integer = max(1, length(array) ÷ Threads.nthreads()),
kwargs...,
)
Expand Down
8 changes: 1 addition & 7 deletions src/unordered.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ function _unsafe_sync_end(tasks)
end
end

if VERSION < v"1.5-"
const sync_end = Base.sync_end
else
const sync_end = _unsafe_sync_end
end

function transduce_commutative!(
xform::Transducer,
step,
Expand Down Expand Up @@ -85,7 +79,7 @@ function transduce_commutative!(
rethrow()
end
end
sync_end(tasks)
_unsafe_sync_end(tasks)
return foldl(combine_step(rf), Map(fetch), tasks)
end

Expand Down
8 changes: 4 additions & 4 deletions test/threads/test_parallel_reduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ end

@testset "tcopy(Set, ...)" begin
@testset for xs in [[1], [1, 1], [1, 1, 1], [1, 1, 1, 1], [1, 1, 1, 1, 1]]
@test tcopy(Set, xs::Transducers.PartitionableArray) == Set([1])
@test tcopy(Set, xs::AbstractArray) == Set([1])
@testset for basesize in 1:3
@test tcopy(Set, xs::Transducers.PartitionableArray, basesize = basesize) ==
@test tcopy(Set, xs::AbstractArray, basesize = basesize) ==
Set([1])
end
end
@testset "empty" begin
@test tcopy(Set, Int[]::Transducers.PartitionableArray) === Empty(Set)
@test tcopy(Set, Int[]) === Empty(Set)
@testset for basesize in 1:3
@test tcopy(Set, Int[]::Transducers.PartitionableArray, basesize = basesize) ==
@test tcopy(Set, Int[], basesize = basesize) ==
Empty(Set)
end
end
Expand Down

0 comments on commit 372c888

Please sign in to comment.