Skip to content

Commit

Permalink
Implement the PlainxREE filter kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
felipecrv committed May 13, 2023
1 parent 3c6bef5 commit 8a66457
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 21 deletions.
219 changes: 198 additions & 21 deletions cpp/src/arrow/compute/kernels/vector_run_end_selection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
#include <memory>
#include <tuple>

#include "arrow/array/builder_decimal.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/array/builder_time.h"
#include "arrow/array/builder_union.h"
#include "arrow/array/util.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/kernels/codegen_internal.h"
#include "arrow/compute/kernels/ree_util_internal.h"
Expand Down Expand Up @@ -515,6 +520,65 @@ Status VisitREExPlainFilterOutputFragments(
return Status::OK();
}

using EmitRange = std::function<void(int64_t range_start, int64_t range_length, bool)>;

/// \brief Iterate over plain values and REE filter, emitting ranges that
/// pass the filter.
///
/// Differently from REExREE, and REExPlain filtering, PlainxREE filtering
/// does not produce a REE output, but rather a plain output array.
template <typename FilterRunEndType>
void VisitPlainxREEFilterOutputRanges(MemoryPool* pool, const ArraySpan& values,
const ArraySpan& filter,
FilterOptions::NullSelectionBehavior null_selection,
const EmitRange& emit_range) {
using FilterRunEndCType = typename FilterRunEndType::c_type;

DCHECK_EQ(values.length, filter.length);

const ArraySpan& filter_values = arrow::ree_util::ValuesArray(filter);
const int64_t filter_values_offset = filter_values.offset;
const uint8_t* filter_is_valid = filter_values.buffers[0].data;
const uint8_t* filter_selection = filter_values.buffers[1].data;
const bool filter_may_have_nulls =
filter_is_valid != NULLPTR && filter_values.GetNullCount() != 0;

const arrow::ree_util::RunEndEncodedArraySpan<FilterRunEndCType> filter_span(filter);
auto it = filter_span.begin();
if (filter_may_have_nulls) {
if (null_selection == FilterOptions::EMIT_NULL) {
while (!it.is_end(filter_span)) {
const int64_t i = filter_values_offset + it.index_into_array();
const bool emit_null = !bit_util::GetBit(filter_is_valid, i);
const bool emit = emit_null || bit_util::GetBit(filter_selection, i);
if (emit) {
emit_range(values.offset + it.logical_position(), it.run_length(), emit_null);
}
++it;
}
} else { // DROP nulls
while (!it.is_end(filter_span)) {
const int64_t i = filter_values_offset + it.index_into_array();
const bool emit =
bit_util::GetBit(filter_is_valid, i) && bit_util::GetBit(filter_selection, i);
if (emit) {
emit_range(values.offset + it.logical_position(), it.run_length(), false);
}
++it;
}
}
} else {
while (!it.is_end(filter_span)) {
const int64_t i = filter_values_offset + it.index_into_array();
const bool emit = bit_util::GetBit(filter_selection, i);
if (emit) {
emit_range(values.offset + it.logical_position(), it.run_length(), false);
}
++it;
}
}
}

// This is called from templates with many instantiations, so we don't want to inline it.
ARROW_NOINLINE Status MakeNullREEData(int64_t logical_length, MemoryPool* pool,
ArrayData* out) {
Expand Down Expand Up @@ -775,11 +839,81 @@ class REExPlainFilterExecImpl final : public REEFilterExec {
}
};

template <typename ValuesType, typename FilterRunEndType>
class PlainxREEFilterExecImpl final : public REEFilterExec {
private:
MemoryPool* pool_;
const ArraySpan& values_;
const ArraySpan& filter_;
const FilterOptions::NullSelectionBehavior null_selection_;

public:
PlainxREEFilterExecImpl(MemoryPool* pool, const ArraySpan& values,
const ArraySpan& filter,
FilterOptions::NullSelectionBehavior null_selection) noexcept
: pool_(pool), values_(values), filter_(filter), null_selection_(null_selection) {}

~PlainxREEFilterExecImpl() override = default;

private:
void VisitOutputRanges(const EmitRange& emit_range) {
VisitPlainxREEFilterOutputRanges<FilterRunEndType>(pool_, values_, filter_,
null_selection_, emit_range);
}

public:
Result<int64_t> CalculateOutputSize() final {
return CountREEFilterEmits<FilterRunEndType>(filter_, null_selection_);
}

private:
/// \param[out] out the output array data (not pre-allocated as we use a
/// builder for Plain x REE filtering)
Status ExecInternal(int64_t logical_length, ArrayData* out) {
using BuilderType = typename TypeTraits<ValuesType>::BuilderType;
auto builder = std::make_unique<BuilderType>(out->type, pool_);
RETURN_NOT_OK(builder->Reserve(logical_length));

[[maybe_unused]] int64_t written_length = 0;
Status append_status;
VisitOutputRanges([&](int64_t i, int64_t range_length, bool valid) noexcept {
if (ARROW_PREDICT_TRUE(append_status.ok())) {
append_status =
builder->AppendArraySlice(values_, i - values_.offset, range_length);
}
written_length += range_length;
});
RETURN_NOT_OK(append_status);
DCHECK_EQ(written_length, logical_length);
std::shared_ptr<ArrayData> array_data;
RETURN_NOT_OK(builder->FinishInternal(&array_data));
*out = *array_data;
return Status::OK();
}

public:
Status Exec(ArrayData* out) final {
const int64_t logical_length =
CountREEFilterEmits<FilterRunEndType>(filter_, null_selection_);
if constexpr (std::is_same<ValuesType, NullType>::value) {
auto values_type = values_.type->GetSharedPtr();
ARROW_ASSIGN_OR_RAISE(
auto null_array,
arrow::MakeArrayOfNull(std::move(values_type), logical_length, pool_));
*out = *null_array->data();
return Status::OK();
} else {
return ExecInternal(logical_length, out);
}
}
};

template <template <typename ArrowType> class FactoryFunctor>
REEFilterExec* MakeREEFilterExec(
MemoryPool* pool, const ArraySpan& values, const ArraySpan& filter,
MemoryPool* pool, const DataType& value_type, const ArraySpan& values,
const ArraySpan& filter,
FilterOptions::NullSelectionBehavior null_selection) noexcept {
switch (arrow::ree_util::ValuesArray(values).type->id()) {
switch (value_type.id()) {
case Type::NA:
return FactoryFunctor<NullType>{}(pool, values, filter, null_selection);
case Type::BOOL:
Expand Down Expand Up @@ -833,7 +967,7 @@ Status ValidateRunEndType(const ArraySpan& array) {
DCHECK_EQ(array.type->id(), Type::RUN_END_ENCODED);
auto run_end_type = arrow::ree_util::RunEndsArray(array).type->id();
if (ARROW_PREDICT_FALSE(!is_run_end_type(run_end_type))) {
return Status::Invalid("Invalid run end type: ", run_end_type);
return Status::Invalid("Invalid run-end type: ", run_end_type);
}
return Status::OK();
}
Expand Down Expand Up @@ -913,57 +1047,100 @@ struct REExPlainFilterExecFactory {
}
};

/// \tparam ArrowType The DataType of the plain values array
template <typename ArrowType>
struct PlainxREEFilterExecFactory {
REEFilterExec* operator()(MemoryPool* pool, const ArraySpan& values,
const ArraySpan& filter,
FilterOptions::NullSelectionBehavior null_selection) {
using ValuesType = ArrowType;
switch (filter.type->id()) {
case Type::INT16:
return new PlainxREEFilterExecImpl<ValuesType, Int16Type>(pool, values, filter,
null_selection);
case Type::INT32:
return new PlainxREEFilterExecImpl<ValuesType, Int32Type>(pool, values, filter,
null_selection);
default:
return new PlainxREEFilterExecImpl<ValuesType, Int64Type>(pool, values, filter,
null_selection);
}
}
};

Result<std::unique_ptr<REEFilterExec>> UniquePtrFromHeapPtr(const DataType& type,
REEFilterExec* ptr) {
if (!ptr) {
return Status::NotImplemented("MakeREEFilterExec: ArrowType=", type.ToString(), ".");
}
return std::unique_ptr<REEFilterExec>{ptr};
}

} // namespace

Result<std::unique_ptr<REEFilterExec>> MakeREExREEFilterExec(
MemoryPool* pool, const ArraySpan& values, const ArraySpan& filter,
FilterOptions::NullSelectionBehavior null_selection) {
RETURN_NOT_OK(ValidateRunEndType(values));
RETURN_NOT_OK(ValidateRunEndType(filter));
auto* ree_filter_exec =
MakeREEFilterExec<REExREEFilterExecFactory>(pool, values, filter, null_selection);
if (ree_filter_exec) {
return std::unique_ptr<REEFilterExec>{ree_filter_exec};
}
return Status::NotImplemented("MakeREEFilterExec: ArrowType=", values.type->ToString(),
".");
const auto* values_value_type = arrow::ree_util::ValuesArray(values).type;
return UniquePtrFromHeapPtr(
*values.type, MakeREEFilterExec<REExREEFilterExecFactory>(
pool, *values_value_type, values, filter, null_selection));
}

Result<std::unique_ptr<REEFilterExec>> MakeREExPlainFilterExec(
MemoryPool* pool, const ArraySpan& values, const ArraySpan& filter,
FilterOptions::NullSelectionBehavior null_selection) {
RETURN_NOT_OK(ValidateRunEndType(values));
auto* ree_filter_exec =
MakeREEFilterExec<REExPlainFilterExecFactory>(pool, values, filter, null_selection);
if (ree_filter_exec) {
return std::unique_ptr<REEFilterExec>{ree_filter_exec};
}
return Status::NotImplemented("MakeREEFilterExec: ArrowType=", values.type->ToString(),
".");
const auto* values_value_type = arrow::ree_util::ValuesArray(values).type;
return UniquePtrFromHeapPtr(
*values.type, MakeREEFilterExec<REExPlainFilterExecFactory>(
pool, *values_value_type, values, filter, null_selection));
}

Result<std::unique_ptr<REEFilterExec>> MakePlainxREEFilterExec(
MemoryPool* pool, const ArraySpan& values, const ArraySpan& filter,
FilterOptions::NullSelectionBehavior null_selection) {
RETURN_NOT_OK(ValidateRunEndType(filter));
return UniquePtrFromHeapPtr(*values.type,
MakeREEFilterExec<PlainxREEFilterExecFactory>(
pool, *values.type, values, filter, null_selection));
}

using FilterState = OptionsWrapper<FilterOptions>;

Status REExREEFilterExec(KernelContext* ctx, const ExecSpan& span, ExecResult* result) {
using FilterState = OptionsWrapper<FilterOptions>;
const auto null_selection = FilterState::Get(ctx).null_selection_behavior;
const auto& values = span.values[0].array;
const auto& filter = span.values[1].array;
ArrayData* out = result->array_data().get();
DCHECK(out->type->Equals(*values.type));
const auto null_selection = FilterState::Get(ctx).null_selection_behavior;
ARROW_ASSIGN_OR_RAISE(auto exec, MakeREExREEFilterExec(ctx->memory_pool(), values,
filter, null_selection));
return exec->Exec(out);
}

Status REExPlainFilterExec(KernelContext* ctx, const ExecSpan& span, ExecResult* result) {
using FilterState = OptionsWrapper<FilterOptions>;
const auto null_selection = FilterState::Get(ctx).null_selection_behavior;
const auto& values = span.values[0].array;
const auto& filter = span.values[1].array;
ArrayData* out = result->array_data().get();
DCHECK(out->type->Equals(*values.type));
const auto null_selection = FilterState::Get(ctx).null_selection_behavior;
ARROW_ASSIGN_OR_RAISE(auto exec, MakeREExPlainFilterExec(ctx->memory_pool(), values,
filter, null_selection));
return exec->Exec(out);
}

Status PlainxREEFilterExec(KernelContext* ctx, const ExecSpan& span, ExecResult* result) {
const auto null_selection = FilterState::Get(ctx).null_selection_behavior;
const auto& values = span.values[0].array;
const auto& filter = span.values[1].array;
ArrayData* out = result->array_data().get();
DCHECK(out->type->Equals(*values.type));
ARROW_ASSIGN_OR_RAISE(auto exec, MakePlainxREEFilterExec(ctx->memory_pool(), values,
filter, null_selection));
return exec->Exec(out);
}

} // namespace arrow::compute::internal
6 changes: 6 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_run_end_selection.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,14 @@ Result<std::unique_ptr<REEFilterExec>> MakeREExPlainFilterExec(
MemoryPool* pool, const ArraySpan& values, const ArraySpan& filter,
FilterOptions::NullSelectionBehavior null_selection);

Result<std::unique_ptr<REEFilterExec>> MakePlainxREEFilterExec(
MemoryPool* pool, const ArraySpan& values, const ArraySpan& filter,
FilterOptions::NullSelectionBehavior null_selection);

Status REExREEFilterExec(KernelContext* ctx, const ExecSpan& span, ExecResult* result);

Status REExPlainFilterExec(KernelContext* ctx, const ExecSpan& span, ExecResult* result);

Status PlainxREEFilterExec(KernelContext* ctx, const ExecSpan& span, ExecResult* result);

} // namespace arrow::compute::internal
4 changes: 4 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_selection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2506,6 +2506,10 @@ void RegisterVectorSelection(FunctionRegistry* registry) {
// REE(*) x Boolean filtering
{InputType(match::RunEndEncoded(match::Primitive())), InputType(Type::BOOL),
REExPlainFilterExec},
// * x REE(Boolean) filtering
{InputType(match::Primitive()),
InputType(match::RunEndEncoded(match::SameTypeId(Type::BOOL))),
PlainxREEFilterExec},
};

VectorKernel filter_base;
Expand Down

0 comments on commit 8a66457

Please sign in to comment.