Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-32107: [C++] Create Filter Kernels for REE values #35001

Draft
wants to merge 46 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
9b7a93a
Create filter kernel on REE data
felipecrv Jun 16, 2023
6151525
s/RLE/REE
felipecrv Jun 16, 2023
a7e6ebe
s/RunEndsType/RunEndType
felipecrv Feb 27, 2023
4968050
Add vector_run_end_selection.h/cc
felipecrv Mar 21, 2023
c7f7f89
Combine runs when filtering REE values with a REE filter
felipecrv Mar 21, 2023
9e7a4e5
Move all the REE execution to the dedicated file (still broken)
felipecrv Apr 7, 2023
4bd5b7e
Change the REExREE loop: keep an open run instead of closing immediately
felipecrv Apr 7, 2023
bb853b9
Move GetREExREEFilterOutputSizeImpl into template class
felipecrv Apr 7, 2023
207255d
Introduce VisitREExREEFilterCombinedOutputRuns and use it for countin…
felipecrv Apr 7, 2023
5dc9bad
Define the filtering process in terms of VisitREExREEFilterCombinedOu…
felipecrv Apr 7, 2023
1f32e88
Enable instantiation of REExREE filters for all types
felipecrv Apr 7, 2023
3480f80
Inject MemoryPool* and return Status from kernels
felipecrv May 10, 2023
b42fe8a
Turn VisitREExREEFilterOutputFragments into a templated functor
felipecrv May 9, 2023
636b96f
Implement the REExPlain filter kernel
felipecrv Jun 16, 2023
e9b71d5
Remove Result and unique_ptr wrapping from factories
felipecrv May 12, 2023
0f4a8c6
Pass EmitRun as a std::function reference
felipecrv May 12, 2023
8ff7a66
Another std::function, another reduction
felipecrv May 12, 2023
3ded672
Define EmitFragment as an std::function
felipecrv May 12, 2023
b21027f
Pass the VisitFilterOutputFragments as a function pointer
felipecrv May 12, 2023
c0108fc
Define VisitFilterOutputFragments explicitly
felipecrv May 12, 2023
eb73f8d
Implement the PlainxREE filter kernel
felipecrv Jun 16, 2023
f24927b
Prepare test code for expansion in test cases
felipecrv May 15, 2023
dfb30b6
Also assert the output sizes resulting from the filtering
felipecrv May 16, 2023
97f9e76
Assert the output of the REExREE filtering kernel
felipecrv May 16, 2023
ea8bc13
Collapse CalculateOutputSize and Exec tests into one set of asserts
felipecrv May 16, 2023
73acfec
Make the meaning of callback parameters consistent and FIX bugs
felipecrv May 17, 2023
279c450
Extract GenericTestInvocations from REExREEFilterTest
felipecrv May 17, 2023
ab01f8e
Add tests for REExPlain filters
felipecrv May 17, 2023
dd81d34
Add tests for PlainxREE filters
felipecrv May 17, 2023
899f271
ARROW_EXPORT so tests using the DLL build on Windows
felipecrv May 17, 2023
29e5bfc
FIX: Remove UB that didn't affect correctnes, but triggered UBSAN
felipecrv May 18, 2023
bdee147
Undo initial changes I've made to vector_selection_test.cc
felipecrv May 19, 2023
8fa5b86
Complete wiring for all the filter kernels involving REEs
felipecrv Jun 16, 2023
e4d2297
FIX: Add missing HALF_FLOAT to the cases
felipecrv May 20, 2023
13086ab
Add ValidateOutput() checks to the run-end selection tests
felipecrv May 20, 2023
73ce739
s/Json/JSON
felipecrv Jun 14, 2023
ceb60ff
tests: Move all the test code in vector_selection_test.cc
felipecrv Jun 16, 2023
8533115
tests: Fix RunEndTypes template struct
felipecrv Jun 17, 2023
443e288
tests: Remove emit_null_ and drop_ members
felipecrv Jun 16, 2023
74c0097
tests: Simplify by removing one param from many asserts
felipecrv Jun 16, 2023
cd5c37e
tests: Remove REEFilterExec from tests
felipecrv Jun 16, 2023
9a51bf8
tests: Generalize the assert functions to remove half of them
felipecrv Jun 17, 2023
d82732f
tests: Make filter tests faster
felipecrv Jun 17, 2023
2337e8d
Remove unnecessary argument to function call
felipecrv Aug 21, 2023
feb61d6
Remove the separate Plain x REE implementation
felipecrv Aug 21, 2023
8c41d52
Fix test and find a bug (TODO: fix)
felipecrv Aug 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ if(ARROW_COMPUTE)
compute/kernels/vector_rank.cc
compute/kernels/vector_replace.cc
compute/kernels/vector_run_end_encode.cc
compute/kernels/vector_run_end_selection.cc
compute/kernels/vector_select_k.cc
compute/kernels/vector_sort.cc)

Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,9 @@ ARROW_NOINLINE Status RunEndEncodeNullArray(const std::shared_ptr<DataType>& run
RETURN_NOT_OK(ValidateRunEndType(run_end_type, input_array.length));

ARROW_ASSIGN_OR_RAISE(
auto output_array_data,
output->value,
ree_util::MakeNullREEArray(run_end_type, input_length, ctx->memory_pool()));

output->value = std::move(output_array_data);
return Status::OK();
}

Expand Down
973 changes: 973 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_run_end_selection.cc

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_run_end_selection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstdint>

#include "arrow/array/data.h"
#include "arrow/compute/api_vector.h"
#include "arrow/result.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/ree_util.h"

// Filtering from and using run-end encoded filter arrays.
//
// Used by vector_selection.cc to implement the actual selection compute kernels.

namespace arrow::compute::internal {

ARROW_EXPORT Status REExREEFilterExec(KernelContext* ctx, const ExecSpan& span,
ExecResult* result);

ARROW_EXPORT Status REExPlainFilterExec(KernelContext* ctx, const ExecSpan& span,
ExecResult* result);

} // namespace arrow::compute::internal
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/kernels/vector_selection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/bitmap_reader.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/int_util.h"

namespace arrow {
Expand Down
27 changes: 25 additions & 2 deletions cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "arrow/compute/exec.h"
#include "arrow/compute/kernel.h"
#include "arrow/compute/kernels/codegen_internal.h"
#include "arrow/compute/kernels/vector_run_end_selection.h"
#include "arrow/compute/kernels/vector_selection_filter_internal.h"
#include "arrow/compute/kernels/vector_selection_internal.h"
#include "arrow/datum.h"
Expand All @@ -40,6 +41,7 @@
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/ree_util.h"

namespace arrow {

Expand Down Expand Up @@ -1040,9 +1042,12 @@ std::unique_ptr<Function> MakeFilterMetaFunction() {
void PopulateFilterKernels(std::vector<SelectionKernelData>* out) {
auto plain_filter = InputType(Type::BOOL);
auto ree_filter = InputType(match::RunEndEncoded(Type::BOOL));
auto ree_values = [](auto in) {
return InputType(match::RunEndEncoded(std::move(in)));
};

*out = {
// * x Boolean
// Plain(*) x Plain(Boolean)
{InputType(match::Primitive()), plain_filter, PrimitiveFilterExec},
{InputType(match::BinaryLike()), plain_filter, BinaryFilterExec},
{InputType(match::LargeBinaryLike()), plain_filter, BinaryFilterExec},
Expand All @@ -1060,7 +1065,7 @@ void PopulateFilterKernels(std::vector<SelectionKernelData>* out) {
{InputType(Type::STRUCT), plain_filter, StructFilterExec},
{InputType(Type::MAP), plain_filter, MapFilterExec},

// * x REE(Boolean)
// Plain(*) x REE(Boolean)
{InputType(match::Primitive()), ree_filter, PrimitiveFilterExec},
{InputType(match::BinaryLike()), ree_filter, BinaryFilterExec},
{InputType(match::LargeBinaryLike()), ree_filter, BinaryFilterExec},
Expand All @@ -1077,6 +1082,24 @@ void PopulateFilterKernels(std::vector<SelectionKernelData>* out) {
{InputType(Type::SPARSE_UNION), ree_filter, SparseUnionFilterExec},
{InputType(Type::STRUCT), ree_filter, StructFilterExec},
{InputType(Type::MAP), ree_filter, MapFilterExec},

// REE(*) x REE(Boolean)
{ree_values(match::Primitive()), ree_filter, REExREEFilterExec},
{ree_values(match::BinaryLike()), ree_filter, REExREEFilterExec},
{ree_values(match::LargeBinaryLike()), ree_filter, REExREEFilterExec},
{ree_values(Type::FIXED_SIZE_BINARY), ree_filter, REExREEFilterExec},
{ree_values(Type::NA), ree_filter, REExREEFilterExec},
{ree_values(Type::DECIMAL128), ree_filter, REExREEFilterExec},
{ree_values(Type::DECIMAL256), ree_filter, REExREEFilterExec},

// REE(*) x Plain(Boolean)
{ree_values(match::Primitive()), plain_filter, REExPlainFilterExec},
{ree_values(match::BinaryLike()), plain_filter, REExPlainFilterExec},
{ree_values(match::LargeBinaryLike()), plain_filter, REExPlainFilterExec},
{ree_values(Type::FIXED_SIZE_BINARY), plain_filter, REExPlainFilterExec},
{ree_values(Type::NA), plain_filter, REExPlainFilterExec},
{ree_values(Type::DECIMAL128), plain_filter, REExPlainFilterExec},
{ree_values(Type::DECIMAL256), plain_filter, REExPlainFilterExec},
};
}

Expand Down
31 changes: 31 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_selection_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,37 @@ Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int bit
return Status::OK();
}

Status PreallocateDataREE(KernelContext* ctx, int64_t physical_length, int bit_width,
bool allocate_validity, ArrayData* out) {
// Preallocate memory
out->buffers = {NULLPTR};
out->child_data = {NULLPTR, NULLPTR};

auto& ree_type = checked_cast<RunEndEncodedType&>(*out->type);
auto values_array = std::make_shared<ArrayData>(ree_type.value_type(), physical_length);
values_array->buffers = {NULLPTR, NULLPTR};
auto run_ends_array = std::make_shared<ArrayData>(ree_type.run_end_type(),
physical_length, /*null_count=*/0);
run_ends_array->buffers = {NULLPTR, NULLPTR};

if (allocate_validity) {
ARROW_ASSIGN_OR_RAISE(values_array->buffers[0], ctx->AllocateBitmap(physical_length));
}
if (bit_width == 1) {
ARROW_ASSIGN_OR_RAISE(values_array->buffers[1], ctx->AllocateBitmap(physical_length));
} else {
ARROW_ASSIGN_OR_RAISE(values_array->buffers[1],
ctx->Allocate(physical_length * bit_width / 8));
}
ARROW_ASSIGN_OR_RAISE(
run_ends_array->buffers[1],
ctx->Allocate(physical_length * ree_type.run_end_type()->bit_width() / 8));

out->child_data[0] = std::move(run_ends_array);
out->child_data[1] = std::move(values_array);
return Status::OK();
}

namespace {

/// \brief Iterate over a REE filter, emitting ranges of a plain values array that
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_selection_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ void RegisterSelectionFunction(const std::string& name, FunctionDoc doc,
Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int bit_width,
bool allocate_validity, ArrayData* out);

Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int bit_width,
bool allocate_validity, ArrayData* out);

/// \brief Callback type for VisitPlainxREEFilterOutputSegments.
///
/// position is the logical position in the values array relative to its offset.
Expand Down
Loading
Loading