Skip to content

Commit

Permalink
fix row_group_slots_threading (#2736)
Browse files Browse the repository at this point in the history
  • Loading branch information
bkamins authored May 1, 2021
1 parent 57f1f95 commit 6e35e29
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 52 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "DataFrames"
uuid = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
version = "1.0.1"
version = "1.0.2"

[deps]
Compat = "34da2185-b29b-5c13-b0c7-acf172513d20"
Expand Down
5 changes: 3 additions & 2 deletions src/groupeddataframe/groupeddataframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ into row groups.
`DataAPI.refpool` in which case the order of groups follows the order of
values returned by `DataAPI.refpool`. As a particular application of this rule
if all `cols` are `CategoricalVector`s then groups are always sorted
irrespective of the value of `sort`.
Integer columns with a narrow range also use this this optimization, so
to the order of groups when grouping on integer columns is undefined.
- `skipmissing` : whether to skip groups with `missing` values in one of the
grouping columns `cols`
Expand All @@ -65,7 +66,7 @@ In particular if it is an empty vector then a single-group `GroupedDataFrame`
is created.
A `GroupedDataFrame` also supports
indexing by groups, `map` (which applies a function to each group)
indexing by groups, `select`, `transform`,
and `combine` (which applies a function to each group
and combines the result into a data frame).
Expand Down
135 changes: 92 additions & 43 deletions src/groupeddataframe/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ function hashrows_col!(h::Vector{UInt},
v::AbstractVector{T},
rp::Nothing,
firstcol::Bool) where T
@inbounds @spawn_for_chunks 1_000_000 for i in eachindex(h)
el = v[i]
h[i] = hash(el, h[i])
if length(n) > 0
n[i] |= ismissing(el)
@spawn_for_chunks 1_000_000 for i in eachindex(h)
@inbounds begin
el = v[i]
h[i] = hash(el, h[i])
if length(n) > 0
n[i] |= ismissing(el)
end
end
end
h
Expand All @@ -31,19 +33,19 @@ function hashrows_col!(h::Vector{UInt},
fira = firstindex(ra)

hashes = Vector{UInt}(undef, length(rp))
@inbounds @spawn_for_chunks 1_000_000 for i in eachindex(hashes)
hashes[i] = hash(rp[i+firp-1])
@spawn_for_chunks 1_000_000 for i in eachindex(hashes)
@inbounds hashes[i] = hash(rp[i+firp-1])
end

# here we rely on the fact that `DataAPI.refpool` has a continuous
# block of indices
@inbounds @spawn_for_chunks 1_000_000 for i in eachindex(h)
ref = ra[i+fira-1]
h[i] = hashes[ref+1-firp]
@spawn_for_chunks 1_000_000 for i in eachindex(h)
@inbounds ref = ra[i+fira-1]
@inbounds h[i] = hashes[ref+1-firp]
end
else
@inbounds @spawn_for_chunks 1_000_000 for i in eachindex(h, v)
h[i] = hash(v[i], h[i])
@spawn_for_chunks 1_000_000 for i in eachindex(h, v)
@inbounds h[i] = hash(v[i], h[i])
end
end
# Doing this step separately is faster, as it would disable SIMD above
Expand Down Expand Up @@ -286,7 +288,6 @@ function row_group_slots(cols::NTuple{N, AbstractVector},
newcols, refpools, refarrays, hash, groups, skipmissing, sort)
end

seen = fill(false, ngroups)
strides = (cumprod(collect(reverse(ngroupstup)))[end-1:-1:1]..., 1)::NTuple{N, Int}
firstinds = map(firstindex, refpools)
if sort
Expand All @@ -303,6 +304,21 @@ function row_group_slots(cols::NTuple{N, AbstractVector},
else
sorted = false
end

lg = length(groups)
nt = Threads.nthreads()
# disable threading if we are processing a small data frame or number of groups is large
if lg < 1_000_000 || ngroups > lg * (0.5 - 1 / (2 * nt)) / (2 * nt)
nt = 1
end
seen = fill(false, ngroups)
seen_vec = Vector{Vector{Bool}}(undef, nt)
seen_vec[1] = seen
for i in 2:nt
seen_vec[i] = fill(false, ngroups)
end
range_chunks = split_to_chunks(lg, nt)

if sort && !sorted
# Compute vector mapping missing to -1 if skipmissing=true
refmaps = map(cols, refpools, missinginds, nminds) do col, refpool, missingind, nmind
Expand All @@ -323,45 +339,78 @@ function row_group_slots(cols::NTuple{N, AbstractVector},
end
refmap
end
@inbounds @spawn_for_chunks 1_000_000 for i in eachindex(groups)
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(c -> c[i], refarrays)
end
vals = map((m, r, s, fi) -> m[r-fi+1] * s, refmaps, refs_i, strides, firstinds)
j = sum(vals) + 1
# x < 0 happens with -1 in refmap, which corresponds to missing
if skipmissing && any(x -> x < 0, vals)
j = 0
else
seen[j] = true
@sync for (seeni, range_chunk) in zip(seen_vec, range_chunks)
@spawn for i in range_chunk
@inbounds begin
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(refarrays) do c
return @inbounds c[i]
end
end
vals = map(refmaps, refs_i, strides, firstinds) do m, r, s, fi
return @inbounds m[r-fi+1] * s
end
j = sum(vals) + 1
# x < 0 happens with -1 in refmap, which corresponds to missing
if skipmissing && any(x -> x < 0, vals)
j = 0
else
seeni[j] = true
end
groups[i] = j
end
end
groups[i] = j
end
else
@inbounds @spawn_for_chunks 1_000_000 for i in eachindex(groups)
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(refarrays, missinginds) do ref, missingind
r = Int(ref[i])
if skipmissing
return r == missingind ? -1 : (r > missingind ? r-1 : r)
@sync for (seeni, range_chunk) in zip(seen_vec, range_chunks)
@spawn for i in range_chunk
@inbounds begin
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(refarrays, missinginds) do ref, missingind
r = @inbounds Int(ref[i])
if skipmissing
return r == missingind ? -1 : (r > missingind ? r-1 : r)
else
return r
end
end
end
vals = map((r, s, fi) -> (r-fi) * s, refs_i, strides, firstinds)
j = sum(vals) + 1
# x < 0 happens with -1, which corresponds to missing
if skipmissing && any(x -> x < 0, vals)
j = 0
else
return r
seeni[j] = true
end
groups[i] = j
end
end
vals = map((r, s, fi) -> (r-fi) * s, refs_i, strides, firstinds)
j = sum(vals) + 1
# x < 0 happens with -1, which corresponds to missing
if skipmissing && any(x -> x < 0, vals)
j = 0
else
seen[j] = true
end
groups[i] = j
end
end

function reduce_or!(x::AbstractVector{Vector{Bool}})
len = length(x)
if len < 2
return
elseif len == 2
x[1] .|= x[2]
else
xl = view(x, 1:len ÷ 2)
xr = view(x, len ÷ 2 + 1:len)
t1 = @spawn reduce_or!(xl)
t2 = @spawn reduce_or!(xr)
fetch(t1)
fetch(t2)
xl[1] .|= xr[1]
end
return
end

reduce_or!(seen_vec)

# If some groups are unused, compress group indices to drop them
# sum(seen) is faster than all(seen) when not short-circuiting,
# and short-circuit would only happen in the slower case anyway
Expand Down
2 changes: 1 addition & 1 deletion src/other/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1586,7 +1586,7 @@ function precompile(all=false)
Base.precompile(Tuple{Reduce{typeof(max), Nothing, Nothing},Vector{Int},GroupedDataFrame{DataFrame}})
Base.precompile(Tuple{Type{OnCol},Vector{String},Vararg{AbstractVector{T} where T, N} where N})

for v in ([1, 2], [2, 1], [2, 2, 1]),
for v in ([1, 2], [2, 1], [2, 2, 1], Int32[1, 2], Int32[2, 1], Int32[2, 2, 1]),
op in (identity, x -> string.(x), x -> PooledArrays.PooledArray(string.(x))),
on in (:v1, [:v1, :v2])
df = DataFrame(v1=op(v), v2=v)
Expand Down
12 changes: 11 additions & 1 deletion src/other/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,17 @@ funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner))
# This method ensures balanced sizes by avoiding a small last chunk
function split_indices(len::Integer, basesize::Integer)
len′ = Int64(len) # Avoid overflow on 32-bit machines
np = max(1, div(len, basesize))
@assert len′ > 0
@assert basesize > 0
np = Int64(max(1, len ÷ basesize))
return split_to_chunks(len′, np)
end

function split_to_chunks(len::Integer, np::Integer)
len′ = Int64(len) # Avoid overflow on 32-bit machines
np′ = Int64(np)
@assert len′ > 0
@assert 0 < np′ <= len′
return (Int(1 + ((i - 1) * len′) ÷ np):Int((i * len′) ÷ np) for i in 1:np)
end

Expand Down
51 changes: 50 additions & 1 deletion test/grouping.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3888,9 +3888,58 @@ end

@testset "extra tests of wrapper corner cases" begin
df = DataFrame(a=1:2)
gdf = groupby(df, :a)
gdf = groupby_checked(df, :a)
@test_throws ArgumentError combine(gdf, x -> x.a[1] == 1 ? 1 : x[1, :])
@test_throws ArgumentError combine(gdf, x -> x.a[1] == 1 ? (a=1, b=2) : Ref(1))
end

@testset "grouping correctness with threading" begin
function cmp_gdf(gdf1::GroupedDataFrame, gdf2::GroupedDataFrame)
@test gdf1.ngroups == gdf2.ngroups
@test gdf1.groups == gdf2.groups
@test gdf1.starts == gdf2.starts
@test gdf1.ends == gdf2.ends
@test gdf1.idx == gdf2.idx
end

Random.seed!(1234)
for levs in (100, 99_000), sz in (100_000, 1_100_000)
df = DataFrame(x_int=rand(1:levs, sz))
df.x_str = string.(df.x_int, pad=5)
df.x_pool = PooledArray(df.x_str)
g_str = groupby_checked(df, :x_str)
g_pool = groupby_checked(df, :x_pool)
cmp_gdf(g_str, g_pool)
g_int = groupby_checked(df, :x_int, sort=true)
g_str = groupby_checked(df, :x_str, sort=true)
g_pool = groupby_checked(df, :x_pool, sort=true)
cmp_gdf(g_int, g_pool)
cmp_gdf(g_str, g_pool)

df = df[reverse(1:nrow(df)), :]
g_str = groupby_checked(df, :x_str, sort=true)
g_pool = groupby_checked(df, :x_pool, sort=true)
cmp_gdf(g_str, g_pool)

df = DataFrame(x_int=[1:levs; rand(1:levs, sz)])
df.x_str = string.(df.x_int, pad=5)
df.x_pool = PooledArray(df.x_str)
allowmissing!(df)
df[rand(levs+1:sz, 10_000), :] .= missing
g_str = groupby_checked(df, :x_str)
g_pool = groupby_checked(df, :x_pool)
cmp_gdf(g_str, g_pool)
for sm in (false, true)
g_str = groupby_checked(df, :x_str, skipmissing=sm)
g_pool = groupby_checked(df, :x_pool, skipmissing=sm)
cmp_gdf(g_str, g_pool)
g_int = groupby_checked(df, :x_int, sort=true, skipmissing=sm)
g_str = groupby_checked(df, :x_str, sort=true, skipmissing=sm)
g_pool = groupby_checked(df, :x_pool, sort=true, skipmissing=sm)
cmp_gdf(g_int, g_pool)
cmp_gdf(g_str, g_pool)
end
end
end

end # module
30 changes: 27 additions & 3 deletions test/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,19 @@ end
end

@testset "split_indices" begin
for len in 0:12
basesize = 10
for len in 1:100, basesize in 1:10
x = DataFrames.split_indices(len, basesize)

@test length(x) == max(1, div(len, basesize))
@test reduce(vcat, x) === 1:len
@test reduce(vcat, x) == 1:len
vmin, vmax = extrema(length(v) for v in x)
@test vmin + 1 == vmax || vmin == vmax
@test len < basesize || vmin >= basesize
end

@test_throws AssertionError DataFrames.split_indices(0, 10)
@test_throws AssertionError DataFrames.split_indices(10, 0)

# Check overflow on 32-bit
len = typemax(Int32)
basesize = 100_000_000
Expand All @@ -125,4 +127,26 @@ end
@test len < basesize || vmin >= basesize
end

@testset "split_to_chunks" begin
for lg in 1:100, nt in 1:11
if lg < nt
@test_throws AssertionError DataFrames.split_to_chunks(lg, nt)
continue
end
x = collect(DataFrames.split_to_chunks(lg, nt))
@test reduce(vcat, x) == 1:lg
@test sum(length, x) == lg
@test first(x[1]) == 1
@test last(x[end]) == lg
@test length(x) == nt
for i in 1:nt-1
@test first(x[i+1])-last(x[i]) == 1
end
end

@test_throws AssertionError DataFrames.split_to_chunks(0, 10)
@test_throws AssertionError DataFrames.split_to_chunks(10, 0)
@test_throws AssertionError DataFrames.split_to_chunks(10, 11)
end

end # module

2 comments on commit 6e35e29

@bkamins
Copy link
Member Author

@bkamins bkamins commented on 6e35e29 May 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/35791

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v1.0.2 -m "<description of version>" 6e35e296fbdef047cfdb043d54f80375331b47b5
git push origin v1.0.2

Please sign in to comment.