From 6e35e296fbdef047cfdb043d54f80375331b47b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Sat, 1 May 2021 17:59:29 +0200 Subject: [PATCH] fix row_group_slots_threading (#2736) --- Project.toml | 2 +- src/groupeddataframe/groupeddataframe.jl | 5 +- src/groupeddataframe/utils.jl | 135 +++++++++++++++-------- src/other/precompile.jl | 2 +- src/other/utils.jl | 12 +- test/grouping.jl | 51 ++++++++- test/utils.jl | 30 ++++- 7 files changed, 185 insertions(+), 52 deletions(-) diff --git a/Project.toml b/Project.toml index a89d128dce..8ade6d8eea 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "DataFrames" uuid = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" -version = "1.0.1" +version = "1.0.2" [deps] Compat = "34da2185-b29b-5c13-b0c7-acf172513d20" diff --git a/src/groupeddataframe/groupeddataframe.jl b/src/groupeddataframe/groupeddataframe.jl index 590c05f92a..df8502209a 100644 --- a/src/groupeddataframe/groupeddataframe.jl +++ b/src/groupeddataframe/groupeddataframe.jl @@ -51,7 +51,8 @@ into row groups. `DataAPI.refpool` in which case the order of groups follows the order of values returned by `DataAPI.refpool`. As a particular application of this rule if all `cols` are `CategoricalVector`s then groups are always sorted - irrespective of the value of `sort`. + Integer columns with a narrow range also use this this optimization, so + to the order of groups when grouping on integer columns is undefined. - `skipmissing` : whether to skip groups with `missing` values in one of the grouping columns `cols` @@ -65,7 +66,7 @@ In particular if it is an empty vector then a single-group `GroupedDataFrame` is created. A `GroupedDataFrame` also supports -indexing by groups, `map` (which applies a function to each group) +indexing by groups, `select`, `transform`, and `combine` (which applies a function to each group and combines the result into a data frame). diff --git a/src/groupeddataframe/utils.jl b/src/groupeddataframe/utils.jl index 5074d0c77b..52d1486bba 100644 --- a/src/groupeddataframe/utils.jl +++ b/src/groupeddataframe/utils.jl @@ -5,11 +5,13 @@ function hashrows_col!(h::Vector{UInt}, v::AbstractVector{T}, rp::Nothing, firstcol::Bool) where T - @inbounds @spawn_for_chunks 1_000_000 for i in eachindex(h) - el = v[i] - h[i] = hash(el, h[i]) - if length(n) > 0 - n[i] |= ismissing(el) + @spawn_for_chunks 1_000_000 for i in eachindex(h) + @inbounds begin + el = v[i] + h[i] = hash(el, h[i]) + if length(n) > 0 + n[i] |= ismissing(el) + end end end h @@ -31,19 +33,19 @@ function hashrows_col!(h::Vector{UInt}, fira = firstindex(ra) hashes = Vector{UInt}(undef, length(rp)) - @inbounds @spawn_for_chunks 1_000_000 for i in eachindex(hashes) - hashes[i] = hash(rp[i+firp-1]) + @spawn_for_chunks 1_000_000 for i in eachindex(hashes) + @inbounds hashes[i] = hash(rp[i+firp-1]) end # here we rely on the fact that `DataAPI.refpool` has a continuous # block of indices - @inbounds @spawn_for_chunks 1_000_000 for i in eachindex(h) - ref = ra[i+fira-1] - h[i] = hashes[ref+1-firp] + @spawn_for_chunks 1_000_000 for i in eachindex(h) + @inbounds ref = ra[i+fira-1] + @inbounds h[i] = hashes[ref+1-firp] end else - @inbounds @spawn_for_chunks 1_000_000 for i in eachindex(h, v) - h[i] = hash(v[i], h[i]) + @spawn_for_chunks 1_000_000 for i in eachindex(h, v) + @inbounds h[i] = hash(v[i], h[i]) end end # Doing this step separately is faster, as it would disable SIMD above @@ -286,7 +288,6 @@ function row_group_slots(cols::NTuple{N, AbstractVector}, newcols, refpools, refarrays, hash, groups, skipmissing, sort) end - seen = fill(false, ngroups) strides = (cumprod(collect(reverse(ngroupstup)))[end-1:-1:1]..., 1)::NTuple{N, Int} firstinds = map(firstindex, refpools) if sort @@ -303,6 +304,21 @@ function row_group_slots(cols::NTuple{N, AbstractVector}, else sorted = false end + + lg = length(groups) + nt = Threads.nthreads() + # disable threading if we are processing a small data frame or number of groups is large + if lg < 1_000_000 || ngroups > lg * (0.5 - 1 / (2 * nt)) / (2 * nt) + nt = 1 + end + seen = fill(false, ngroups) + seen_vec = Vector{Vector{Bool}}(undef, nt) + seen_vec[1] = seen + for i in 2:nt + seen_vec[i] = fill(false, ngroups) + end + range_chunks = split_to_chunks(lg, nt) + if sort && !sorted # Compute vector mapping missing to -1 if skipmissing=true refmaps = map(cols, refpools, missinginds, nminds) do col, refpool, missingind, nmind @@ -323,45 +339,78 @@ function row_group_slots(cols::NTuple{N, AbstractVector}, end refmap end - @inbounds @spawn_for_chunks 1_000_000 for i in eachindex(groups) - local refs_i - let i=i # Workaround for julia#15276 - refs_i = map(c -> c[i], refarrays) - end - vals = map((m, r, s, fi) -> m[r-fi+1] * s, refmaps, refs_i, strides, firstinds) - j = sum(vals) + 1 - # x < 0 happens with -1 in refmap, which corresponds to missing - if skipmissing && any(x -> x < 0, vals) - j = 0 - else - seen[j] = true + @sync for (seeni, range_chunk) in zip(seen_vec, range_chunks) + @spawn for i in range_chunk + @inbounds begin + local refs_i + let i=i # Workaround for julia#15276 + refs_i = map(refarrays) do c + return @inbounds c[i] + end + end + vals = map(refmaps, refs_i, strides, firstinds) do m, r, s, fi + return @inbounds m[r-fi+1] * s + end + j = sum(vals) + 1 + # x < 0 happens with -1 in refmap, which corresponds to missing + if skipmissing && any(x -> x < 0, vals) + j = 0 + else + seeni[j] = true + end + groups[i] = j + end end - groups[i] = j end else - @inbounds @spawn_for_chunks 1_000_000 for i in eachindex(groups) - local refs_i - let i=i # Workaround for julia#15276 - refs_i = map(refarrays, missinginds) do ref, missingind - r = Int(ref[i]) - if skipmissing - return r == missingind ? -1 : (r > missingind ? r-1 : r) + @sync for (seeni, range_chunk) in zip(seen_vec, range_chunks) + @spawn for i in range_chunk + @inbounds begin + local refs_i + let i=i # Workaround for julia#15276 + refs_i = map(refarrays, missinginds) do ref, missingind + r = @inbounds Int(ref[i]) + if skipmissing + return r == missingind ? -1 : (r > missingind ? r-1 : r) + else + return r + end + end + end + vals = map((r, s, fi) -> (r-fi) * s, refs_i, strides, firstinds) + j = sum(vals) + 1 + # x < 0 happens with -1, which corresponds to missing + if skipmissing && any(x -> x < 0, vals) + j = 0 else - return r + seeni[j] = true end + groups[i] = j end end - vals = map((r, s, fi) -> (r-fi) * s, refs_i, strides, firstinds) - j = sum(vals) + 1 - # x < 0 happens with -1, which corresponds to missing - if skipmissing && any(x -> x < 0, vals) - j = 0 - else - seen[j] = true - end - groups[i] = j end end + + function reduce_or!(x::AbstractVector{Vector{Bool}}) + len = length(x) + if len < 2 + return + elseif len == 2 + x[1] .|= x[2] + else + xl = view(x, 1:len ÷ 2) + xr = view(x, len ÷ 2 + 1:len) + t1 = @spawn reduce_or!(xl) + t2 = @spawn reduce_or!(xr) + fetch(t1) + fetch(t2) + xl[1] .|= xr[1] + end + return + end + + reduce_or!(seen_vec) + # If some groups are unused, compress group indices to drop them # sum(seen) is faster than all(seen) when not short-circuiting, # and short-circuit would only happen in the slower case anyway diff --git a/src/other/precompile.jl b/src/other/precompile.jl index 10d60892db..fae084bca0 100644 --- a/src/other/precompile.jl +++ b/src/other/precompile.jl @@ -1586,7 +1586,7 @@ function precompile(all=false) Base.precompile(Tuple{Reduce{typeof(max), Nothing, Nothing},Vector{Int},GroupedDataFrame{DataFrame}}) Base.precompile(Tuple{Type{OnCol},Vector{String},Vararg{AbstractVector{T} where T, N} where N}) - for v in ([1, 2], [2, 1], [2, 2, 1]), + for v in ([1, 2], [2, 1], [2, 2, 1], Int32[1, 2], Int32[2, 1], Int32[2, 2, 1]), op in (identity, x -> string.(x), x -> PooledArrays.PooledArray(string.(x))), on in (:v1, [:v1, :v2]) df = DataFrame(v1=op(v), v2=v) diff --git a/src/other/utils.jl b/src/other/utils.jl index 9f631bf5ee..6f7bea3883 100644 --- a/src/other/utils.jl +++ b/src/other/utils.jl @@ -88,7 +88,17 @@ funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner)) # This method ensures balanced sizes by avoiding a small last chunk function split_indices(len::Integer, basesize::Integer) len′ = Int64(len) # Avoid overflow on 32-bit machines - np = max(1, div(len, basesize)) + @assert len′ > 0 + @assert basesize > 0 + np = Int64(max(1, len ÷ basesize)) + return split_to_chunks(len′, np) +end + +function split_to_chunks(len::Integer, np::Integer) + len′ = Int64(len) # Avoid overflow on 32-bit machines + np′ = Int64(np) + @assert len′ > 0 + @assert 0 < np′ <= len′ return (Int(1 + ((i - 1) * len′) ÷ np):Int((i * len′) ÷ np) for i in 1:np) end diff --git a/test/grouping.jl b/test/grouping.jl index 6a4f0afaee..f58cf61709 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -3888,9 +3888,58 @@ end @testset "extra tests of wrapper corner cases" begin df = DataFrame(a=1:2) - gdf = groupby(df, :a) + gdf = groupby_checked(df, :a) @test_throws ArgumentError combine(gdf, x -> x.a[1] == 1 ? 1 : x[1, :]) @test_throws ArgumentError combine(gdf, x -> x.a[1] == 1 ? (a=1, b=2) : Ref(1)) end +@testset "grouping correctness with threading" begin + function cmp_gdf(gdf1::GroupedDataFrame, gdf2::GroupedDataFrame) + @test gdf1.ngroups == gdf2.ngroups + @test gdf1.groups == gdf2.groups + @test gdf1.starts == gdf2.starts + @test gdf1.ends == gdf2.ends + @test gdf1.idx == gdf2.idx + end + + Random.seed!(1234) + for levs in (100, 99_000), sz in (100_000, 1_100_000) + df = DataFrame(x_int=rand(1:levs, sz)) + df.x_str = string.(df.x_int, pad=5) + df.x_pool = PooledArray(df.x_str) + g_str = groupby_checked(df, :x_str) + g_pool = groupby_checked(df, :x_pool) + cmp_gdf(g_str, g_pool) + g_int = groupby_checked(df, :x_int, sort=true) + g_str = groupby_checked(df, :x_str, sort=true) + g_pool = groupby_checked(df, :x_pool, sort=true) + cmp_gdf(g_int, g_pool) + cmp_gdf(g_str, g_pool) + + df = df[reverse(1:nrow(df)), :] + g_str = groupby_checked(df, :x_str, sort=true) + g_pool = groupby_checked(df, :x_pool, sort=true) + cmp_gdf(g_str, g_pool) + + df = DataFrame(x_int=[1:levs; rand(1:levs, sz)]) + df.x_str = string.(df.x_int, pad=5) + df.x_pool = PooledArray(df.x_str) + allowmissing!(df) + df[rand(levs+1:sz, 10_000), :] .= missing + g_str = groupby_checked(df, :x_str) + g_pool = groupby_checked(df, :x_pool) + cmp_gdf(g_str, g_pool) + for sm in (false, true) + g_str = groupby_checked(df, :x_str, skipmissing=sm) + g_pool = groupby_checked(df, :x_pool, skipmissing=sm) + cmp_gdf(g_str, g_pool) + g_int = groupby_checked(df, :x_int, sort=true, skipmissing=sm) + g_str = groupby_checked(df, :x_str, sort=true, skipmissing=sm) + g_pool = groupby_checked(df, :x_pool, sort=true, skipmissing=sm) + cmp_gdf(g_int, g_pool) + cmp_gdf(g_str, g_pool) + end + end +end + end # module diff --git a/test/utils.jl b/test/utils.jl index f5efeaf6d9..209f12eaab 100644 --- a/test/utils.jl +++ b/test/utils.jl @@ -102,17 +102,19 @@ end end @testset "split_indices" begin - for len in 0:12 - basesize = 10 + for len in 1:100, basesize in 1:10 x = DataFrames.split_indices(len, basesize) @test length(x) == max(1, div(len, basesize)) - @test reduce(vcat, x) === 1:len + @test reduce(vcat, x) == 1:len vmin, vmax = extrema(length(v) for v in x) @test vmin + 1 == vmax || vmin == vmax @test len < basesize || vmin >= basesize end + @test_throws AssertionError DataFrames.split_indices(0, 10) + @test_throws AssertionError DataFrames.split_indices(10, 0) + # Check overflow on 32-bit len = typemax(Int32) basesize = 100_000_000 @@ -125,4 +127,26 @@ end @test len < basesize || vmin >= basesize end +@testset "split_to_chunks" begin + for lg in 1:100, nt in 1:11 + if lg < nt + @test_throws AssertionError DataFrames.split_to_chunks(lg, nt) + continue + end + x = collect(DataFrames.split_to_chunks(lg, nt)) + @test reduce(vcat, x) == 1:lg + @test sum(length, x) == lg + @test first(x[1]) == 1 + @test last(x[end]) == lg + @test length(x) == nt + for i in 1:nt-1 + @test first(x[i+1])-last(x[i]) == 1 + end + end + + @test_throws AssertionError DataFrames.split_to_chunks(0, 10) + @test_throws AssertionError DataFrames.split_to_chunks(10, 0) + @test_throws AssertionError DataFrames.split_to_chunks(10, 11) +end + end # module