Skip to content

Commit

Permalink
apacheGH-42116: [C++] Support list-view typed arrays in array_take an…
Browse files Browse the repository at this point in the history
…d array_filter (apache#42117)

### Rationale for this change

Completing the type coverage in array_take and array_filter.

### What changes are included in this PR?

Add support for `ListView` and `LargeListView` in `"array_take"`, `"array_filter"` and all the functions that indirectly rely on these to do their thing.

### Are these changes tested?

New test cases were added.
* GitHub Issue: apache#42116

Authored-by: Felipe Oliveira Carvalho <felipekde@gmail.com>
Signed-off-by: Felipe Oliveira Carvalho <felipekde@gmail.com>
  • Loading branch information
felipecrv authored Jun 27, 2024
1 parent 1da71ba commit 62ee676
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 91 deletions.
36 changes: 15 additions & 21 deletions cpp/src/arrow/compute/kernels/vector_selection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,10 @@ using TakeState = OptionsWrapper<TakeOptions>;
// ----------------------------------------------------------------------
// DropNull Implementation

Result<std::shared_ptr<arrow::BooleanArray>> GetDropNullFilter(const Array& values,
MemoryPool* memory_pool) {
auto bitmap_buffer = values.null_bitmap();
std::shared_ptr<arrow::BooleanArray> out_array = std::make_shared<BooleanArray>(
values.length(), bitmap_buffer, nullptr, 0, values.offset());
return out_array;
std::shared_ptr<arrow::BooleanArray> MakeDropNullFilter(const Array& values) {
auto& bitmap_buffer = values.null_bitmap();
return std::make_shared<BooleanArray>(values.length(), bitmap_buffer, nullptr, 0,
values.offset());
}

Result<Datum> DropNullArray(const std::shared_ptr<Array>& values, ExecContext* ctx) {
Expand All @@ -86,8 +84,7 @@ Result<Datum> DropNullArray(const std::shared_ptr<Array>& values, ExecContext* c
if (values->type()->id() == Type::type::NA) {
return std::make_shared<NullArray>(0);
}
ARROW_ASSIGN_OR_RAISE(auto drop_null_filter,
GetDropNullFilter(*values, ctx->memory_pool()));
auto drop_null_filter = Datum{MakeDropNullFilter(*values)};
return Filter(values, drop_null_filter, FilterOptions::Defaults(), ctx);
}

Expand Down Expand Up @@ -185,19 +182,16 @@ class DropNullMetaFunction : public MetaFunction {
Result<Datum> ExecuteImpl(const std::vector<Datum>& args,
const FunctionOptions* options,
ExecContext* ctx) const override {
switch (args[0].kind()) {
case Datum::ARRAY: {
return DropNullArray(args[0].make_array(), ctx);
} break;
case Datum::CHUNKED_ARRAY: {
return DropNullChunkedArray(args[0].chunked_array(), ctx);
} break;
case Datum::RECORD_BATCH: {
return DropNullRecordBatch(args[0].record_batch(), ctx);
} break;
case Datum::TABLE: {
return DropNullTable(args[0].table(), ctx);
} break;
auto& values = args[0];
switch (values.kind()) {
case Datum::ARRAY:
return DropNullArray(values.make_array(), ctx);
case Datum::CHUNKED_ARRAY:
return DropNullChunkedArray(values.chunked_array(), ctx);
case Datum::RECORD_BATCH:
return DropNullRecordBatch(values.record_batch(), ctx);
case Datum::TABLE:
return DropNullTable(values.table(), ctx);
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,8 @@ void PopulateFilterKernels(std::vector<SelectionKernelData>* out) {
{InputType(Type::EXTENSION), plain_filter, ExtensionFilterExec},
{InputType(Type::LIST), plain_filter, ListFilterExec},
{InputType(Type::LARGE_LIST), plain_filter, LargeListFilterExec},
{InputType(Type::LIST_VIEW), plain_filter, ListViewFilterExec},
{InputType(Type::LARGE_LIST_VIEW), plain_filter, LargeListViewFilterExec},
{InputType(Type::FIXED_SIZE_LIST), plain_filter, FSLFilterExec},
{InputType(Type::DENSE_UNION), plain_filter, DenseUnionFilterExec},
{InputType(Type::SPARSE_UNION), plain_filter, SparseUnionFilterExec},
Expand All @@ -1119,6 +1121,8 @@ void PopulateFilterKernels(std::vector<SelectionKernelData>* out) {
{InputType(Type::EXTENSION), ree_filter, ExtensionFilterExec},
{InputType(Type::LIST), ree_filter, ListFilterExec},
{InputType(Type::LARGE_LIST), ree_filter, LargeListFilterExec},
{InputType(Type::LIST_VIEW), ree_filter, ListViewFilterExec},
{InputType(Type::LARGE_LIST_VIEW), ree_filter, LargeListViewFilterExec},
{InputType(Type::FIXED_SIZE_LIST), ree_filter, FSLFilterExec},
{InputType(Type::DENSE_UNION), ree_filter, DenseUnionFilterExec},
{InputType(Type::SPARSE_UNION), ree_filter, SparseUnionFilterExec},
Expand Down
74 changes: 74 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_selection_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,63 @@ struct ListSelectionImpl : public Selection<ListSelectionImpl<Type>, Type> {
}
};

template <typename Type>
struct ListViewSelectionImpl : public Selection<ListViewSelectionImpl<Type>, Type> {
using offset_type = typename Type::offset_type;

using Base = Selection<ListViewSelectionImpl<Type>, Type>;
LIFT_BASE_MEMBERS();

TypedBufferBuilder<offset_type> offsets_builder;
TypedBufferBuilder<offset_type> sizes_builder;

ListViewSelectionImpl(KernelContext* ctx, const ExecSpan& batch, int64_t output_length,
ExecResult* out)
: Base(ctx, batch, output_length, out),
offsets_builder(ctx->memory_pool()),
sizes_builder(ctx->memory_pool()) {}

template <typename Adapter>
Status GenerateOutput() {
auto* offsets = this->values.template GetValues<offset_type>(1);
auto* sizes = this->values.template GetValues<offset_type>(2);

offset_type null_list_view_offset = 0;
Adapter adapter(this);
RETURN_NOT_OK(adapter.Generate(
[&](int64_t index) {
offset_type value_offset = offsets[index];
offset_type value_length = sizes[index];
offsets_builder.UnsafeAppend(value_offset);
sizes_builder.UnsafeAppend(value_length);
null_list_view_offset = value_offset + value_length;
return Status::OK();
},
[&]() {
// 0 could be appended here, but by adding the last offset, we keep
// the buffer compatible with how offsets behave in ListType as well.
// The invariant that `offsets[i] + sizes[i] <= values.length` is
// trivially maintained by having `sizes[i]` set to 0 here.
offsets_builder.UnsafeAppend(null_list_view_offset);
sizes_builder.UnsafeAppend(0);
return Status::OK();
}));
return Status::OK();
}

Status Init() override {
RETURN_NOT_OK(offsets_builder.Reserve(output_length));
return sizes_builder.Reserve(output_length);
}

Status Finish() override {
RETURN_NOT_OK(offsets_builder.Finish(&out->buffers[1]));
RETURN_NOT_OK(sizes_builder.Finish(&out->buffers[2]));
out->child_data = {this->values.child_data[0].ToArrayData()};
return Status::OK();
}
};

struct DenseUnionSelectionImpl
: public Selection<DenseUnionSelectionImpl, DenseUnionType> {
using Base = Selection<DenseUnionSelectionImpl, DenseUnionType>;
Expand Down Expand Up @@ -858,6 +915,15 @@ Status LargeListFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
return FilterExec<ListSelectionImpl<LargeListType>>(ctx, batch, out);
}

Status ListViewFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
return FilterExec<ListViewSelectionImpl<ListViewType>>(ctx, batch, out);
}

Status LargeListViewFilterExec(KernelContext* ctx, const ExecSpan& batch,
ExecResult* out) {
return FilterExec<ListViewSelectionImpl<LargeListViewType>>(ctx, batch, out);
}

Status FSLFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
const ArraySpan& values = batch[0].array;

Expand Down Expand Up @@ -914,6 +980,14 @@ Status LargeListTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult*
return TakeExec<ListSelectionImpl<LargeListType>>(ctx, batch, out);
}

Status ListViewTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
return TakeExec<ListViewSelectionImpl<ListViewType>>(ctx, batch, out);
}

Status LargeListViewTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
return TakeExec<ListViewSelectionImpl<LargeListViewType>>(ctx, batch, out);
}

Status FSLTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
const ArraySpan& values = batch[0].array;

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_selection_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ void VisitPlainxREEFilterOutputSegments(
Status PrimitiveFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
Status ListFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
Status LargeListFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
Status ListViewFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
Status LargeListViewFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
Status FSLFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
Status DenseUnionFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
Status MapFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
Expand All @@ -76,6 +78,8 @@ Status LargeVarBinaryTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status FixedWidthTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status ListTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status LargeListTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status ListViewTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status LargeListViewTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status FSLTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status DenseUnionTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Status SparseUnionTakeExec(KernelContext*, const ExecSpan&, ExecResult*);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,8 @@ void PopulateTakeKernels(std::vector<SelectionKernelData>* out) {
{InputType(Type::EXTENSION), take_indices, ExtensionTake},
{InputType(Type::LIST), take_indices, ListTakeExec},
{InputType(Type::LARGE_LIST), take_indices, LargeListTakeExec},
{InputType(Type::LIST_VIEW), take_indices, ListViewTakeExec},
{InputType(Type::LARGE_LIST_VIEW), take_indices, LargeListViewTakeExec},
{InputType(Type::FIXED_SIZE_LIST), take_indices, FSLTakeExec},
{InputType(Type::DENSE_UNION), take_indices, DenseUnionTakeExec},
{InputType(Type::SPARSE_UNION), take_indices, SparseUnionTakeExec},
Expand Down
Loading

0 comments on commit 62ee676

Please sign in to comment.