From 726b4e4ab23e49e20bccfcf56087f05a07a22daf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Sat, 13 Feb 2021 12:43:08 +0100 Subject: [PATCH] implement faster innerjoin (#2612) --- NEWS.md | 5 + Project.toml | 2 +- benchmarks/innerjoin_performance.jl | 96 +++++ benchmarks/run.sh | 2 + benchmarks/runtests.jl | 12 + src/abstractdataframe/join.jl | 573 ++++++++++++++++++++++++- src/groupeddataframe/fastaggregates.jl | 10 +- test/join.jl | 204 ++++++++- 8 files changed, 887 insertions(+), 17 deletions(-) create mode 100644 benchmarks/innerjoin_performance.jl create mode 100644 benchmarks/run.sh create mode 100644 benchmarks/runtests.jl diff --git a/NEWS.md b/NEWS.md index ddbf7204cc..acb4f2fb14 100644 --- a/NEWS.md +++ b/NEWS.md @@ -33,6 +33,11 @@ ## Other relevant changes +* `innerjoin` is now much faster and checks if passed data frames are sorted + by the `on` columns and takes into account if shorter data frame that is joined + has unique values in `on` columns. These aspects of input data frames might affect + the order of rows produced in the output + ([#2612](https://github.com/JuliaData/DataFrames.jl/pull/2612)) # DataFrames v0.22 Release Notes diff --git a/Project.toml b/Project.toml index 69e3c599d1..043c22a27b 100644 --- a/Project.toml +++ b/Project.toml @@ -30,7 +30,7 @@ DataAPI = "1.4" InvertedIndices = "1" IteratorInterfaceExtensions = "0.1.1, 1" Missings = "0.4.2" -PooledArrays = "0.5, 1.0" +PooledArrays = "1.1" PrettyTables = "0.11" Reexport = "0.1, 0.2, 1.0" SortingAlgorithms = "0.1, 0.2, 0.3" diff --git a/benchmarks/innerjoin_performance.jl b/benchmarks/innerjoin_performance.jl new file mode 100644 index 0000000000..1c28199c3b --- /dev/null +++ b/benchmarks/innerjoin_performance.jl @@ -0,0 +1,96 @@ +using CategoricalArrays +using DataFrames +using PooledArrays +using Random + +fullgc() = (GC.gc(true); GC.gc(true); GC.gc(true); GC.gc(true)) + +@assert length(ARGS) == 6 +@assert ARGS[3] in ["int", "pool", "cat", "str"] +@assert ARGS[4] in ["uniq", "dup", "manydup"] +@assert ARGS[5] in ["sort", "rand"] +@assert ARGS[6] in ["1", "2"] + +@info ARGS + +llen = parse(Int, ARGS[1]) +rlen = parse(Int, ARGS[2]) +@assert llen > 1000 +@assert rlen > 2000 + +pad = maximum(length.(string.((llen, rlen)))) + +if ARGS[3] == "int" + if ARGS[4] == "uniq" + col1 = [1:llen;] + col2 = [1:rlen;] + elseif ARGS[4] == "dup" + col1 = repeat(1:llen ÷ 2, inner=2) + col2 = repeat(1:rlen ÷ 2, inner=2) + else + @assert ARGS[4] == "manydup" + col1 = repeat(1:llen ÷ 20, inner=20) + col2 = repeat(1:rlen ÷ 20, inner=20) + end +elseif ARGS[3] == "pool" + if ARGS[4] == "dup" + col1 = PooledArray(repeat(string.(1:llen ÷ 2, pad=pad), inner=2)) + col2 = PooledArray(repeat(string.(1:rlen ÷ 2, pad=pad), inner=2)) + else + @assert ARGS[4] == "manydup" + col1 = PooledArray(repeat(string.(1:llen ÷ 20, pad=pad), inner=20)) + col2 = PooledArray(repeat(string.(1:rlen ÷ 20, pad=pad), inner=20)) + end +elseif ARGS[3] == "cat" + if ARGS[4] == "dup" + col1 = categorical(repeat(string.(1:llen ÷ 2, pad=pad), inner=2)) + col2 = categorical(repeat(string.(1:rlen ÷ 2, pad=pad), inner=2)) + else + @assert ARGS[4] == "manydup" + col1 = categorical(repeat(string.(1:llen ÷ 20, pad=pad), inner=20)) + col2 = categorical(repeat(string.(1:rlen ÷ 20, pad=pad), inner=20)) + end +else + @assert ARGS[3] == "str" + if ARGS[4] == "uniq" + col1 = string.(1:llen, pad=pad) + col2 = string.(1:rlen, pad=pad) + elseif ARGS[4] == "dup" + col1 = repeat(string.(1:llen ÷ 2, pad=pad), inner=2) + col2 = repeat(string.(1:rlen ÷ 2, pad=pad), inner=2) + else + @assert ARGS[4] == "manydup" + col1 = repeat(string.(1:llen ÷ 20, pad=pad), inner=20) + col2 = repeat(string.(1:rlen ÷ 20, pad=pad), inner=20) + end +end + +Random.seed!(1234) + +if ARGS[5] == "rand" + shuffle!(col1) + shuffle!(col2) +else + @assert ARGS[5] == "sort" +end + +if ARGS[6] == "1" + df1 = DataFrame(id1 = col1) + df2 = DataFrame(id1 = col2) + innerjoin(df1[1:1000, :], df2[1:2000, :], on=:id1) + innerjoin(df2[1:2000, :], df1[1:1000, :], on=:id1) + fullgc() + @time innerjoin(df1, df2, on=:id1) + fullgc() + @time innerjoin(df2, df1, on=:id1) +else + @assert ARGS[6] == "2" + df1 = DataFrame(id1 = col1, id2 = col1) + df2 = DataFrame(id1 = col1, id2 = col1) + innerjoin(df1[1:1000, :], df2[1:2000, :], on=[:id1, :id2]) + innerjoin(df2[1:2000, :], df1[1:1000, :], on=[:id1, :id2]) + fullgc() + @time innerjoin(df1, df2, on=[:id1, :id2]) + fullgc() + @time innerjoin(df2, df1, on=[:id1, :id2]) +end diff --git a/benchmarks/run.sh b/benchmarks/run.sh new file mode 100644 index 0000000000..f23eef6a3a --- /dev/null +++ b/benchmarks/run.sh @@ -0,0 +1,2 @@ +julia runtests.jl 100000 50000000 +julia runtests.jl 5000000 10000000 diff --git a/benchmarks/runtests.jl b/benchmarks/runtests.jl new file mode 100644 index 0000000000..a17959f576 --- /dev/null +++ b/benchmarks/runtests.jl @@ -0,0 +1,12 @@ +@assert length(ARGS) == 2 +file_loc = joinpath(dirname(@__FILE__), "innerjoin_performance.jl") +llen = ARGS[1] +rlen = ARGS[2] + +for a3 in ["str", "int", "pool", "cat"], + a4 in ["uniq", "dup", "manydup"], + a5 in ["sort", "rand"], + a6 in ["1", "2"] + a4 == "uniq" && a3 in ["pool", "cat"] && continue + run(`julia $file_loc $llen $rlen $a3 $a4 $a5 $a6`) +end diff --git a/src/abstractdataframe/join.jl b/src/abstractdataframe/join.jl index 0e391d4c99..206f8edde7 100644 --- a/src/abstractdataframe/join.jl +++ b/src/abstractdataframe/join.jl @@ -81,9 +81,6 @@ end Base.length(x::RowIndexMap) = length(x.orig) -# composes the joined data table using the maps between the left and right -# table rows and the indices of rows in the result - _rename_cols(old_names::AbstractVector{Symbol}, renamecols::Union{Function, Symbol, AbstractString}, exclude::AbstractVector{Symbol} = Symbol[]) = @@ -91,6 +88,570 @@ _rename_cols(old_names::AbstractVector{Symbol}, (renamecols isa Function ? Symbol(renamecols(string(n))) : Symbol(n, renamecols)) for n in old_names] +struct OnColRow{T} + row::Int + cols::T + h::Vector{UInt} + + OnColRow(row::Union{Signed,Unsigned}, + cols::NTuple{<:Any, AbstractVector}, h::Vector{UInt}) = + new{typeof(cols)}(Int(row), cols, h) +end + +struct OnCol{T,N} <: AbstractVector{OnColRow{T}} + len::Int + cols::T + h::Vector{UInt} + + function OnCol(cs::AbstractVector...) + @assert length(cs) > 1 + len = length(cs[1]) + @assert all(x -> firstindex(x) == 1, cs) + @assert all(x -> lastindex(x) == len, cs) + new{typeof(cs), length(cs)}(len, cs, UInt[]) + end +end + +Base.IndexStyle(::Type{<:OnCol}) = Base.IndexLinear() + +@inline Base.size(oc::OnCol) = (oc.len,) + +@inline function Base.getindex(oc::OnCol, i::Int) + @boundscheck checkbounds(oc, i) + return OnColRow(i, oc.cols, oc.h) +end + +Base.hash(ocr1::OnColRow, h::UInt) = throw(MethodError(hash, (ocr1, h))) +@inline Base.hash(ocr1::OnColRow) = @inbounds ocr1.h[ocr1.row] + +# Hashing one column at a time is faster since it can use SIMD +function _prehash(oc::OnCol) + h = oc.h + resize!(h, oc.len) + fill!(h, Base.tuplehash_seed) + for col in reverse(oc.cols) + h .= hash.(col, h) + end +end + +# TODO: rewrite isequal and isless to use @generated +# or some other approach that would keep them efficient and avoid code duplication + +Base.:(==)(x::OnColRow, y::OnColRow) = throw(MethodError(==, (x, y))) + +@inline function Base.isequal(ocr1::OnColRow{<:NTuple{2, AbstractVector}}, + ocr2::OnColRow{<:NTuple{2, AbstractVector}}) + r1 = ocr1.row + c11, c12 = ocr1.cols + r2 = ocr2.row + c21, c22 = ocr2.cols + + return @inbounds isequal(c11[r1], c21[r2]) && isequal(c12[r1], c22[r2]) +end + +Base.isequal(ocr1::OnColRow{<:NTuple{N,AbstractVector}}, + ocr2::OnColRow{<:NTuple{N,AbstractVector}}) where {N} = + isequal(ntuple(i -> @inbounds(ocr1.cols[i][ocr1.row]), N), + ntuple(i -> @inbounds(ocr2.cols[i][ocr2.row]), N)) + +@inline function Base.isless(ocr1::OnColRow{<:NTuple{2, AbstractVector}}, + ocr2::OnColRow{<:NTuple{2, AbstractVector}}) + r1 = ocr1.row + c11, c12 = ocr1.cols + r2 = ocr2.row + c21, c22 = ocr2.cols + + c11r = @inbounds c11[r1] + c12r = @inbounds c12[r1] + c21r = @inbounds c21[r2] + c22r = @inbounds c22[r2] + + isless(c11r, c21r) || (isequal(c11r, c21r) && isless(c12r, c22r)) +end + +@inline Base.isless(ocr1::OnColRow{<:NTuple{N,AbstractVector}}, + ocr2::OnColRow{<:NTuple{N,AbstractVector}}) where {N} = + isless(ntuple(i -> @inbounds(ocr1.cols[i][ocr1.row]), N), + ntuple(i -> @inbounds(ocr2.cols[i][ocr2.row]), N)) + +prepare_on_col() = throw(ArgumentError("at least one on column required when joining")) +prepare_on_col(c::AbstractVector) = c +prepare_on_col(cs::AbstractVector...) = OnCol(cs...) + +# Return if it is allowed to use refpool instead of the original array for joining. +# There are multiple conditions that must be met to allow for this. +# If it is allowed we are sure that missing can be used as a sentinel +check_mapping_allowed(short::AbstractVector, refarray_long::AbstractVector, + refpool_long, invrefpool_long) = + !isempty(short) && !isnothing(refpool_long) && !isnothing(invrefpool_long) && + eltype(refarray_long) <: Union{Signed, Unsigned} + +@noinline map_refarray(mapping::AbstractVector, refarray::AbstractVector, ::Val{fi}) where {fi} = + [@inbounds mapping[r - fi + 1] for r in refarray] + +function map2refs(x::AbstractVector, invrefpool) + x_refpool = DataAPI.refpool(x) + if x_refpool isa AbstractVector{<:Integer} && 0 <= firstindex(x_refpool) <= 1 + # here we know that x_refpool is AbstractVector that allows integer indexing + # and its firstindex must be an integer + # if firstindex is not 0 or 1 then we fallback to slow path for safety reasons + # all refpool we currently know have firstindex 0 or 1 + # if there is some very strange firstindex we might run into overflow issues + # below use function barrier as mapping is not type stable + mapping = [get(invrefpool, v, missing) for v in x_refpool] + return map_refarray(mapping, DataAPI.refarray(x), Val(Int(firstindex(x_refpool)))) + else + return [get(invrefpool, v, missing) for v in x] + end +end + +function compose_inner_table(joiner::DataFrameJoiner, + makeunique::Bool, + left_rename::Union{Function, AbstractString, Symbol}, + right_rename::Union{Function, AbstractString, Symbol}) + + right_len = length(joiner.dfr_on[!, 1]) + left_len = length(joiner.dfl_on[!, 1]) + right_shorter = right_len <= left_len + + left_cols = collect(eachcol(joiner.dfl_on)) + right_cols = collect(eachcol(joiner.dfr_on)) + + # if column of a longer table supports DataAPI.refpool and DataAPI.invrefpool + # remap matching left and right columns to use refs + if right_shorter + for i in eachindex(left_cols, right_cols) + rc = right_cols[i] + lc = left_cols[i] + + lc_refs = DataAPI.refarray(lc) + lc_refpool = DataAPI.refpool(lc) + lc_invrefpool = DataAPI.invrefpool(lc) + if check_mapping_allowed(rc, lc_refs, lc_refpool, lc_invrefpool) + right_cols[i] = map2refs(rc, lc_invrefpool) + left_cols[i] = lc_refs + end + end + else + for i in eachindex(left_cols, right_cols) + rc = right_cols[i] + lc = left_cols[i] + + rc_refs = DataAPI.refarray(rc) + rc_refpool = DataAPI.refpool(rc) + rc_invrefpool = DataAPI.invrefpool(rc) + if check_mapping_allowed(lc, rc_refs, rc_refpool, rc_invrefpool) + right_cols[i] = rc_refs + left_cols[i] = map2refs(lc, rc_invrefpool) + end + end + end + + # this is a workaround for https://github.com/JuliaData/CategoricalArrays.jl/issues/319 + # this path will be triggered only in rare cases when the refpool code above + # fails to convert CategoricalArray into refpool + disallow_sorted = false + + for (lc, rc) in zip(left_cols, right_cols) + @assert length(lc) == left_len + @assert length(rc) == right_len + lct = typeof(lc) + lcat = nameof(lct) === :CategoricalArray && nameof(parentmodule(lct)) === :CategoricalArrays + rct = typeof(rc) + rcat = nameof(rct) === :CategoricalArray && nameof(parentmodule(rct)) === :CategoricalArrays + disallow_sorted |= rcat ⊻ lcat + end + + # TODO: + # If DataAPI.invrefpool vectors are found in the "on" columns + # then potentially the following optimizations can be done: + # 1. identify rows in shorter table that should be dropped + # 2. develop custom _innerjoin_sorted and _innerjoin_unsorted that + # drop rows from shorter table that do not match rows from longer table based on + # PooledArray refpool check + # This optimization significantly complicates the code (especially sorted path). + # It should be added if in practice we find that the use case is often enough + # and that the benefits are significant. The two cases when the benefits should + # be expected are: + # 1. Shorter table is sorted when we drop rows not matching longer table rows + # 2. Shorter table does not have duplicates when we drop rows not matching longer table rows + + left_col = prepare_on_col(left_cols...) + right_col = prepare_on_col(right_cols...) + + local left_ixs + local right_ixs + + if isempty(left_col) || isempty(right_col) + # we treat this case separately so we know we have at least one element later + left_ixs, right_ixs = Int[], Int[] + else + # if sorting is not disallowed try using a fast algorithm that works + # on sorted columns; if it is not run or errors fall back to the unsorted case + # the try-catch is used to handle the case when columns on which we join + # contain values that are not comparable + already_joined = false + if !disallow_sorted + try + if issorted(left_col) && issorted(right_col) + left_ixs, right_ixs = _innerjoin_sorted(left_col, right_col) + already_joined = true + end + catch + # nothing to do - one of the columns is not sortable + end + end + if !already_joined + if right_shorter + if left_col isa AbstractVector{<:Union{Integer, Missing}} && + right_col isa AbstractVector{<:Union{Integer, Missing}} + left_ixs, right_ixs = _innerjoin_unsorted_int(left_col, right_col) + else + left_ixs, right_ixs = _innerjoin_unsorted(left_col, right_col) + end + else + if left_col isa AbstractVector{<:Union{Integer, Missing}} && + right_col isa AbstractVector{<:Union{Integer, Missing}} + right_ixs, left_ixs = _innerjoin_unsorted_int(right_col, left_col) + else + right_ixs, left_ixs = _innerjoin_unsorted(right_col, left_col) + end + end + end + end + + dfl = joiner.dfl[left_ixs, :] + dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)] + + ncleft = ncol(dfl) + cols = Vector{AbstractVector}(undef, ncleft + ncol(dfr_noon)) + + for (i, col) in enumerate(eachcol(dfl)) + cols[i] = col + end + for (i, col) in enumerate(eachcol(dfr_noon)) + cols[i+ncleft] = col + end + + new_names = vcat(_rename_cols(_names(joiner.dfl), left_rename, joiner.left_on), + _rename_cols(_names(dfr_noon), right_rename)) + res = DataFrame(cols, new_names, makeunique=makeunique, copycols=false) + + return res, nothing, nothing +end + +@inline function find_next_range(x::AbstractArray, start::Int, start_value) + stop_value = start_value + n = length(x) + stop = start + 1 + while stop <= n + @inbounds stop_value = x[stop] + isequal(start_value, stop_value) || break + stop += 1 + end + return stop, stop_value +end + +function _innerjoin_sorted(left::AbstractArray, right::AbstractArray) + left_n = length(left) + right_n = length(right) + + left_ixs = Int[] + right_ixs = Int[] + + (left_n == 0 || right_n == 0) && return left_ixs, right_ixs + + # lower bound assuming we get matches + sizehint!(left_ixs, min(left_n, right_n)) + sizehint!(right_ixs, min(left_n, right_n)) + + left_cur = 1 + left_val = left[left_cur] + left_new, left_tmp = find_next_range(left, left_cur, left_val) + + right_cur = 1 + right_val = right[right_cur] + right_new, right_tmp = find_next_range(right, right_cur, right_val) + + while left_cur <= left_n && right_cur <= right_n + if isequal(left_val, right_val) + if left_new - left_cur == right_new - right_cur == 1 + push!(left_ixs, left_cur) + push!(right_ixs, right_cur) + else + idx = length(left_ixs) + left_range = left_cur:left_new - 1 + right_range = right_cur:right_new - 1 + to_grow = Base.checked_add(idx, Base.checked_mul(length(left_range), length(right_range))) + resize!(left_ixs, to_grow) + resize!(right_ixs, to_grow) + @inbounds for right_i in right_range, left_i in left_range + idx += 1 + left_ixs[idx] = left_i + right_ixs[idx] = right_i + end + end + left_cur, left_val = left_new, left_tmp + left_new, left_tmp = find_next_range(left, left_cur, left_val) + right_cur, right_val = right_new, right_tmp + right_new, right_tmp = find_next_range(right, right_cur, right_val) + elseif isless(left_val, right_val) + left_cur, left_val = left_new, left_tmp + left_new, left_tmp = find_next_range(left, left_cur, left_val) + else + right_cur, right_val = right_new, right_tmp + right_new, right_tmp = find_next_range(right, right_cur, right_val) + end + end + + return left_ixs, right_ixs +end + +# optimistically assume that shorter table does not have duplicates in on column +# if this is not the case we call _innerjoin_dup +# which efficiently uses the work already done and continues with the more +# memory expensive algorithm that allows for duplicates +function _innerjoin_unsorted(left::AbstractArray, right::AbstractArray{T}) where {T} + dict = Dict{T, Int}() + + right_len = length(right) + sizehint!(dict, 2 * min(right_len, typemax(Int) >> 2)) + + right isa OnCol && _prehash(right) + left isa OnCol && _prehash(left) + + for (idx_r, val_r) in enumerate(right) + haskey(dict, val_r) && return _innerjoin_dup(left, right, dict, idx_r) + dict[val_r] = idx_r + end + + left_ixs = Int[] + right_ixs = Int[] + + # lower bound assuming we get matches + sizehint!(left_ixs, right_len) + sizehint!(right_ixs, right_len) + + for (idx_l, val_l) in enumerate(left) + # we know that dict contains only positive values + idx_r = get(dict, val_l, -1) + if idx_r != -1 + push!(left_ixs, idx_l) + push!(right_ixs, idx_r) + end + end + return left_ixs, right_ixs +end + +extrema_missing(x::AbstractVector{Missing}) = (1, 0) + +function extrema_missing(x::AbstractVector{T}) where {T<:Union{Integer, Missing}} + try + return extrema(skipmissing(x)) + catch + S = nonmissingtype(T) + return S(1), S(0) + end +end + +function _innerjoin_unsorted_int(left::AbstractVector{<:Union{Integer, Missing}}, + right::AbstractVector{<:Union{Integer, Missing}}) + minv, maxv = extrema_missing(right) + + val_range = big(maxv) - big(minv) + if val_range > typemax(Int) - 3 || val_range ÷ 2 > max(64, length(right)) || + minv < typemin(Int) + 2 || maxv > typemax(Int) - 3 + return _innerjoin_unsorted(left, right) + end + + offset = 1 - Int(minv) # we are now sure it does not overflow + len = Int(maxv) - Int(minv) + 2 + dict = zeros(Int, len) + + @inbounds for (idx_r, val_r) in enumerate(right) + i = ismissing(val_r) ? length(dict) : Int(val_r) + offset + if dict[i] > 0 + return _innerjoin_dup_int(left, right, dict, idx_r, offset, Int(minv), Int(maxv)) + end + dict[i] = idx_r + end + + left_ixs = Int[] + right_ixs = Int[] + + right_len = length(right) + sizehint!(left_ixs, right_len) + sizehint!(right_ixs, right_len) + + @inbounds for (idx_l, val_l) in enumerate(left) + # we use dict_index to make sure the following two operations are fast: + # - if index is found - get it and process it + # - if index is not found - do nothing + if ismissing(val_l) + idx_r = dict[end] + if idx_r > 0 + push!(left_ixs, idx_l) + push!(right_ixs, idx_r) + end + elseif minv <= val_l <= maxv + idx_r = dict[Int(val_l) + offset] + if idx_r > 0 + push!(left_ixs, idx_l) + push!(right_ixs, idx_r) + end + end + end + return left_ixs, right_ixs +end + +# we fall back to general case if we have duplicates +# normally it should happen fast as we reuse work already done +function _innerjoin_dup(left::AbstractArray, right::AbstractArray{T}, + dict::Dict{T, Int}, idx_r_start::Int) where {T} + ngroups = idx_r_start - 1 + right_len = length(right) + groups = Vector{Int}(undef, right_len) + groups[1:ngroups] = 1:ngroups + + @inbounds for idx_r in idx_r_start:right_len + val_r = right[idx_r] + # we know that group ids are positive + group_id = get(dict, val_r, -1) + if group_id == -1 + ngroups += 1 + groups[idx_r] = ngroups + dict[val_r] = ngroups + else + groups[idx_r] = group_id + end + end + + @assert ngroups > 0 # we should not get here with 0-length right + return _innerjoin_postprocess(left, dict, groups, ngroups, right_len) +end + +function _innerjoin_dup_int(left::AbstractVector{<:Union{Integer, Missing}}, + right::AbstractVector{<:Union{Integer, Missing}}, + dict::Vector{Int}, idx_r_start::Int, offset::Int, + minv::Int, maxv::Int) + ngroups = idx_r_start - 1 + right_len = length(right) + groups = Vector{Int}(undef, right_len) + groups[1:ngroups] = 1:ngroups + + @inbounds for idx_r in idx_r_start:right_len + val_r = right[idx_r] + i = ismissing(val_r) ? length(dict) : Int(val_r) + offset + dict_val = dict[i] + if dict_val > 0 + groups[idx_r] = dict_val + else + ngroups += 1 + groups[idx_r] = ngroups + dict[i] = ngroups + end + end + + @assert ngroups > 0 # we should not get here with 0-length right + return _innerjoin_postprocess_int(left, dict, groups, ngroups, right_len, offset, minv, maxv) +end + +function compute_join_indices!(groups::Vector{Int}, ngroups::Int, + starts::Vector, rperm::Vector) + @inbounds for gix in groups + starts[gix] += 1 + end + + cumsum!(starts, starts) + + @inbounds for (i, gix) in enumerate(groups) + rperm[starts[gix]] = i + starts[gix] -= 1 + end + push!(starts, length(groups)) + return nothing +end + +function _innerjoin_postprocess(left::AbstractArray, dict::Dict{T, Int}, + groups::Vector{Int}, ngroups::Int, right_len::Int) where {T} + starts = zeros(Int, ngroups) + rperm = Vector{Int}(undef, right_len) + + left_ixs = Int[] + right_ixs = Int[] + + # lower bound assuming we get matches + sizehint!(left_ixs, right_len) + sizehint!(right_ixs, right_len) + + compute_join_indices!(groups, ngroups, starts, rperm) + + n = 0 + @inbounds for (idx_l, val_l) in enumerate(left) + group_id = get(dict, val_l, -1) + if group_id != -1 + ref_stop = starts[group_id + 1] + l = ref_stop - starts[group_id] + newn = n + l + resize!(left_ixs, newn) + for i in n+1:n+l + left_ixs[i] = idx_l + end + resize!(right_ixs, newn) + for i in 1:l + right_ixs[n + i] = rperm[ref_stop - i + 1] + end + n = newn + end + end + + return left_ixs, right_ixs +end + +function _innerjoin_postprocess_int(left::AbstractVector{<:Union{Integer, Missing}}, + dict::Vector{Int}, + groups::Vector{Int}, ngroups::Int, right_len::Int, + offset::Int, minv::Int, maxv::Int) + starts = zeros(Int, ngroups) + rperm = Vector{Int}(undef, right_len) + + left_ixs = Int[] + right_ixs = Int[] + + sizehint!(left_ixs, right_len) + sizehint!(right_ixs, right_len) + + compute_join_indices!(groups, ngroups, starts, rperm) + + n = 0 + @inbounds for (idx_l, val_l) in enumerate(left) + if ismissing(val_l) + group_id = dict[end] + elseif minv <= val_l <= maxv + group_id = dict[Int(val_l) + offset] + else + group_id = 0 + end + + if group_id > 0 + ref_stop = starts[group_id + 1] + l = ref_stop - starts[group_id] + newn = n + l + resize!(left_ixs, newn) + for i in n+1:n+l + left_ixs[i] = idx_l + end + resize!(right_ixs, newn) + for i in 1:l + right_ixs[n + i] = rperm[ref_stop - i + 1] + end + n = newn + end + end + + return left_ixs, right_ixs +end + function compose_joined_table(joiner::DataFrameJoiner, kind::Symbol, left_ixs::RowIndexMap, leftonly_ixs::RowIndexMap, right_ixs::RowIndexMap, rightonly_ixs::RowIndexMap, @@ -383,12 +944,8 @@ function _join(df1::AbstractDataFrame, df2::AbstractDataFrame; left_indicator, right_indicator = nothing, nothing if kind == :inner - inner_row_maps = update_row_maps!(joiner.dfl_on, joiner.dfr_on, - group_rows(joiner.dfr_on), - true, false, true, false) joined, left_indicator, right_indicator = - compose_joined_table(joiner, kind, inner_row_maps..., - makeunique, left_rename, right_rename, nothing) + compose_inner_table(joiner, makeunique, left_rename, right_rename) elseif kind == :left left_row_maps = update_row_maps!(joiner.dfl_on, joiner.dfr_on, group_rows(joiner.dfr_on), diff --git a/src/groupeddataframe/fastaggregates.jl b/src/groupeddataframe/fastaggregates.jl index 039b8e9f49..6e432d2ad9 100644 --- a/src/groupeddataframe/fastaggregates.jl +++ b/src/groupeddataframe/fastaggregates.jl @@ -122,9 +122,8 @@ for (op, initf) in ((:max, :typemin), (:min, :typemax)) # !ismissing check is purely an optimization to avoid a copy later outcol = similar(incol, condf === !ismissing ? S : T, length(gd)) # Comparison is possible only between CatValues from the same pool - outcolT = typeof(outcol).name - if outcolT.name === :CategoricalArray && - nameof(outcolT.module) === :CategoricalArrays + resT = typeof(outcol) + if nameof(resT) === :CategoricalArray && nameof(parentmodule(resT)) === :CategoricalArrays # we know that CategoricalArray has `pool` field outcol.pool = incol.pool end @@ -214,9 +213,8 @@ function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Boo end # Reallocate Vector created in groupreduce_init with min or max # for CategoricalVector - resT = typeof(res).name - if resT.name === :CategoricalArray && - nameof(resT.module) === :CategoricalArrays + resT = typeof(res) + if nameof(resT) === :CategoricalArray && nameof(parentmodule(resT)) === :CategoricalArrays @assert op === min || op === max # we know that CategoricalArray has `pool` field @assert res.pool === incol.pool diff --git a/test/join.jl b/test/join.jl index ee7dd57068..ff709e8420 100644 --- a/test/join.jl +++ b/test/join.jl @@ -1,7 +1,7 @@ module TestJoin -using Test, DataFrames, Random, CategoricalArrays -using DataFrames: similar_missing +using Test, DataFrames, Random, CategoricalArrays, PooledArrays +using DataFrames: similar_missing, OnCol const ≅ = isequal name = DataFrame(ID = Union{Int, Missing}[1, 2, 3], @@ -936,4 +936,204 @@ end innerjoin(df1_view2, df2, on=:a) end +@testset "OnCol correctness tests" begin + Random.seed!(1234) + c1 = collect(1:10^2) + c2 = collect(Float64, 1:10^2) + c3 = collect(sort(string.(1:10^2))) + c4 = repeat(1:10, inner=10) + c5 = collect(Float64, repeat(1:50, inner=2)) + c6 = sort(string.(repeat(1:25,inner=4))) + c7 = repeat(20:-1:1, inner=5) + + @test_throws AssertionError OnCol() + @test_throws AssertionError OnCol(c1) + @test_throws AssertionError OnCol(c1, [1]) + @test_throws MethodError OnCol(c1, 1) + + oncols = [OnCol(c1, c2), OnCol(c3, c4), OnCol(c5, c6), OnCol(c1, c2, c3), + OnCol(c2, c3, c4), OnCol(c4, c5, c6), OnCol(c1, c2, c3, c4), + OnCol(c2, c3, c4, c5), OnCol(c3, c4, c5, c6), OnCol(c1, c2, c3, c4, c5), + OnCol(c2, c3, c4, c5, c6), OnCol(c1, c2, c3, c4, c5, c6), + OnCol(c4, c7), OnCol(c4, c5, c7), OnCol(c4, c5, c6, c7)] + tupcols = [tuple.(c1, c2), tuple.(c3, c4), tuple.(c5, c6), tuple.(c1, c2, c3), + tuple.(c2, c3, c4), tuple.(c4, c5, c6), tuple.(c1, c2, c3, c4), + tuple.(c2, c3, c4, c5), tuple.(c3, c4, c5, c6), tuple.(c1, c2, c3, c4, c5), + tuple.(c2, c3, c4, c5, c6), tuple.(c1, c2, c3, c4, c5, c6), + tuple.(c4, c7), tuple.(c4, c5, c7), tuple.(c4, c5, c6, c7)] + + for (oncol, tupcol) in zip(oncols, tupcols) + @test issorted(oncol) == issorted(tupcol) + @test IndexStyle(oncol) === IndexLinear() + @test_throws MethodError oncol[1] == oncol[2] + end + + for i in eachindex(c1), j in eachindex(oncols, tupcols) + @test_throws MethodError hash(oncols[j][1], zero(UInt)) + DataFrames._prehash(oncols[j]) + @test hash(oncols[j][i]) == hash(tupcols[j][i]) + for k in eachindex(c1) + @test isequal(oncols[j][i], oncols[j][k]) == isequal(tupcols[j][i], tupcols[j][k]) + @test isequal(oncols[j][k], oncols[j][i]) == isequal(tupcols[j][k], tupcols[j][i]) + @test isless(oncols[j][i], oncols[j][k]) == isless(tupcols[j][i], tupcols[j][k]) + @test isless(oncols[j][k], oncols[j][i]) == isless(tupcols[j][k], tupcols[j][i]) + end + end + + foreach(shuffle!, [c1, c2, c3, c4, c5, c6]) + + tupcols = [tuple.(c1, c2), tuple.(c3, c4), tuple.(c5, c6), tuple.(c1, c2, c3), + tuple.(c2, c3, c4), tuple.(c4, c5, c6), tuple.(c1, c2, c3, c4), + tuple.(c2, c3, c4, c5), tuple.(c3, c4, c5, c6), tuple.(c1, c2, c3, c4, c5), + tuple.(c2, c3, c4, c5, c6), tuple.(c1, c2, c3, c4, c5, c6), + tuple.(c4, c7), tuple.(c4, c5, c7), tuple.(c4, c5, c6, c7)] + + for i in eachindex(c1), j in eachindex(oncols, tupcols) + DataFrames._prehash(oncols[j]) + @test hash(oncols[j][i]) == hash(tupcols[j][i]) + for k in eachindex(c1) + @test isequal(oncols[j][i], oncols[j][k]) == isequal(tupcols[j][i], tupcols[j][k]) + @test isequal(oncols[j][k], oncols[j][i]) == isequal(tupcols[j][k], tupcols[j][i]) + @test isless(oncols[j][i], oncols[j][k]) == isless(tupcols[j][i], tupcols[j][k]) + @test isless(oncols[j][k], oncols[j][i]) == isless(tupcols[j][k], tupcols[j][i]) + end + end +end + +@testset "innerjoin correctness tests" begin + + @test_throws ArgumentError DataFrames.prepare_on_col() + + function test_innerjoin(df1, df2) + @assert names(df1) == ["id", "x"] + @assert names(df2) == ["id", "y"] + + dfres = DataFrame(id=[], x=[], y=[]) + for i in axes(df1, 1), j in axes(df2, 1) + if isequal(df1.id[i], df2.id[j]) + push!(dfres, (id=df1.id[i], x=df1.x[i], y=df2.y[j])) + end + end + + df1x = copy(df1) + df1x.id2 = copy(df1x.id) + df2x = copy(df2) + df2x.id2 = copy(df2x.id) + + df1x2 = copy(df1x) + df1x2.id3 = copy(df1x2.id) + df2x2 = copy(df2x) + df2x2.id3 = copy(df2x2.id) + + sort!(dfres) + dfres2 = copy(dfres) + insertcols!(dfres2, 3, :id2 => dfres2.id) + dfres3 = copy(dfres2) + insertcols!(dfres3, 4, :id3 => dfres3.id) + + return dfres ≅ sort(innerjoin(df1, df2, on=:id, matchmissing=:equal)) && + dfres2 ≅ sort(innerjoin(df1x, df2x, on=[:id, :id2], matchmissing=:equal)) && + dfres3 ≅ sort(innerjoin(df1x2, df2x2, on=[:id, :id2, :id3], matchmissing=:equal)) + end + + Random.seed!(1234) + for i in 1:5, j in 0:2 + for df1 in [DataFrame(id=rand(1:i+j, i+j), x=1:i+j), DataFrame(id=rand(1:i, i), x=1:i), + DataFrame(id=[rand(1:i+j, i+j); missing], x=1:i+j+1), + DataFrame(id=[rand(1:i, i); missing], x=1:i+1)], + df2 in [DataFrame(id=rand(1:i+j, i+j), y=1:i+j), DataFrame(id=rand(1:i, i), y=1:i), + DataFrame(id=[rand(1:i+j, i+j); missing], y=1:i+j+1), + DataFrame(id=[rand(1:i, i); missing], y=1:i+1)] + for opleft = [identity, sort, x -> unique(x, :id), x -> sort(unique(x, :id))], + opright = [identity, sort, x -> unique(x, :id), x -> sort(unique(x, :id))] + + # integers + @test test_innerjoin(opleft(df1), opright(df2)) + @test test_innerjoin(opleft(df1), opright(rename(df1, :x => :y))) + + # strings + df1s = copy(df1) + df1s[!, 1] = passmissing(string).(df1s[!, 1]) + df2s = copy(df2) + df2s[!, 1] = passmissing(string).(df2s[!, 1]) + @test test_innerjoin(opleft(df1s), opright(df2s)) + @test test_innerjoin(opleft(df1s), opright(rename(df1s, :x => :y))) + + # PooledArrays + df1p = copy(df1) + df1p[!, 1] = PooledArray(df1p[!, 1]) + df2p = copy(df2) + df2p[!, 1] = PooledArray(df2p[!, 1]) + @test test_innerjoin(opleft(df1), opright(df2p)) + @test test_innerjoin(opleft(df1p), opright(df2)) + @test test_innerjoin(opleft(df1p), opright(df2p)) + @test test_innerjoin(opleft(df1p), opright(rename(df1p, :x => :y))) + + # add unused level + df1p[1, 1] = 0 + df2p[1, 1] = 0 + df1p[1, 1] = 1 + df2p[1, 1] = 1 + @test test_innerjoin(opleft(df1), opright(df2p)) + @test test_innerjoin(opleft(df1p), opright(df2)) + @test test_innerjoin(opleft(df1p), opright(df2p)) + @test test_innerjoin(opleft(df1p), opright(rename(df1p, :x => :y))) + + # CategoricalArrays + df1c = copy(df1) + df1c[!, 1] = categorical(df1c[!, 1]) + df2c = copy(df2) + df2c[!, 1] = categorical(df2c[!, 1]) + @test test_innerjoin(opleft(df1), opright(df2c)) + @test test_innerjoin(opleft(df1c), opright(df2c)) + @test test_innerjoin(opleft(df1c), opright(df2)) + @test test_innerjoin(opleft(df1c), opright(rename(df1c, :x => :y))) + @test test_innerjoin(opleft(df1p), opright(df2c)) + @test test_innerjoin(opleft(df1c), opright(df2p)) + + # add unused level + df1c[1, 1] = 0 + df2c[1, 1] = 0 + df1c[1, 1] = 1 + df2c[1, 1] = 1 + @test test_innerjoin(opleft(df1), opright(df2c)) + @test test_innerjoin(opleft(df1c), opright(df2c)) + @test test_innerjoin(opleft(df1c), opright(df2)) + @test test_innerjoin(opleft(df1c), opright(rename(df1c, :x => :y))) + @test test_innerjoin(opleft(df1p), opright(df2c)) + @test test_innerjoin(opleft(df1c), opright(df2p)) + end + end + end + + # some special cases + @test innerjoin(DataFrame(id=[]), DataFrame(id=[]), on=:id) == DataFrame(id=[]) + @test innerjoin(DataFrame(id=[]), DataFrame(id=[1, 2, 3]), on=:id) == DataFrame(id=[]) + @test innerjoin(DataFrame(id=[1, 2, 3]), DataFrame(id=[]), on=:id) == DataFrame(id=[]) + @test innerjoin(DataFrame(id=[4, 5, 6]), DataFrame(id=[1, 2, 3]), on=:id) == DataFrame(id=[]) + @test innerjoin(DataFrame(id=[1, 2, 3]), DataFrame(id=[4, 5, 6]), on=:id) == DataFrame(id=[]) + + @test innerjoin(DataFrame(id=[missing]), DataFrame(id=[1]), on=:id, matchmissing=:equal) == + DataFrame(id=[]) + @test innerjoin(DataFrame(id=Missing[]), DataFrame(id=[1]), on=:id, matchmissing=:equal) == + DataFrame(id=[]) + @test innerjoin(DataFrame(id=Union{Int, Missing}[]), DataFrame(id=[1]), on=:id, matchmissing=:equal) == + DataFrame(id=[]) + @test innerjoin(DataFrame(id=Union{Int, Missing}[]), DataFrame(id=[2, 1, 2]), on=:id, matchmissing=:equal) == + DataFrame(id=[]) + @test innerjoin(DataFrame(id=Union{Int, Missing}[missing]), DataFrame(id=[1]), + on=:id, matchmissing=:equal) == DataFrame(id=[]) + @test innerjoin(DataFrame(id=[missing]), DataFrame(id=[1, missing]), + on=:id, matchmissing=:equal) ≅ DataFrame(id=[missing]) + @test innerjoin(DataFrame(id=Union{Int, Missing}[missing]), DataFrame(id=[1, missing]), + on=:id, matchmissing=:equal) ≅ DataFrame(id=[missing]) + + @test innerjoin(DataFrame(id=[typemin(Int) + 1, typemin(Int)]), DataFrame(id=[typemin(Int)]), on=:id) == + DataFrame(id=[typemin(Int)]) + @test innerjoin(DataFrame(id=[typemax(Int), typemax(Int) - 1]), DataFrame(id=[typemax(Int)]), on=:id) == + DataFrame(id=[typemax(Int)]) + @test innerjoin(DataFrame(id=[2000, 2, 100]), DataFrame(id=[2000, 1, 100]), on=:id) == + DataFrame(id=[2000, 100]) +end + end # module