Skip to content

Commit

Permalink
Add array builder for REE arrays
Browse files Browse the repository at this point in the history
Lead-authored-by: Tobias Zagorni <tobias@zagorni.eu>
Co-authored-by: Felipe Oliveira Carvalho <felipekde@gmail.com>
  • Loading branch information
zagto and felipecrv committed Feb 2, 2023
1 parent e6dd3b3 commit 7ff1ce7
Show file tree
Hide file tree
Showing 9 changed files with 354 additions and 4 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ set(ARROW_SRCS
array/builder_binary.cc
array/builder_decimal.cc
array/builder_dict.cc
array/builder_run_end.cc
array/builder_nested.cc
array/builder_primitive.cc
array/builder_union.cc
Expand Down
41 changes: 41 additions & 0 deletions cpp/src/arrow/array/array_run_end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "arrow/array.h"
#include "arrow/array/builder_nested.h"
#include "arrow/array/builder_run_end.h"
#include "arrow/chunked_array.h"
#include "arrow/pretty_print.h"
#include "arrow/scalar.h"
Expand Down Expand Up @@ -135,6 +136,46 @@ TEST_P(TestRunEndEncodedArray, FindOffsetAndLength) {
ASSERT_EQ(zero_length_at_end->FindPhysicalLength(), 0);
}

TEST_P(TestRunEndEncodedArray, Builder) {
auto expected_run_ends =
ArrayFromJSON(run_end_type, "[1, 3, 105, 165, 205, 305, 405, 505]");
auto expected_values = ArrayFromJSON(
utf8(),
R"(["unique", null, "common", "common", "appended", "common", "common", "appended"])");
auto appended_run_ends = ArrayFromJSON(run_end_type, "[100, 200]");
auto appended_values = ArrayFromJSON(utf8(), R"(["common", "appended"])");
ASSERT_OK_AND_ASSIGN(auto appended_array,
RunEndEncodedArray::Make(200, appended_run_ends, appended_values));
auto appended_span = ArraySpan(*appended_array->data());

ASSERT_OK_AND_ASSIGN(std::shared_ptr<ArrayBuilder> builder,
MakeBuilder(run_end_encoded(run_end_type, utf8())));
auto ree_builder = std::dynamic_pointer_cast<RunEndEncodedBuilder>(builder);
ASSERT_NE(ree_builder, NULLPTR);
ASSERT_OK(builder->AppendScalar(*MakeScalar("unique")));
ASSERT_OK(builder->AppendNull());
ASSERT_OK(builder->AppendScalar(*MakeNullScalar(utf8())));
ASSERT_OK(builder->AppendScalar(*MakeScalar("common"), 100));
ASSERT_OK(builder->AppendScalar(*MakeScalar("common")));
ASSERT_OK(builder->AppendScalar(*MakeScalar("common")));
// Append span that starts with the same value as the previous run ends. They
// are currently not merged for simplicity and performance. This is still a
// valid REE array
ASSERT_OK(builder->AppendArraySlice(appended_span, 40, 100));
ASSERT_OK(builder->AppendArraySlice(appended_span, 0, 100));
// Append an entire array
ASSERT_OK(builder->AppendArraySlice(appended_span, 0, appended_span.length));
ASSERT_EQ(builder->length(), 505);
ASSERT_EQ(*builder->type(), *run_end_encoded(run_end_type, utf8()));
ASSERT_OK_AND_ASSIGN(auto array, builder->Finish());
auto ree_array = std::dynamic_pointer_cast<RunEndEncodedArray>(array);
ASSERT_NE(ree_array, NULLPTR);
ASSERT_ARRAYS_EQUAL(*expected_run_ends, *ree_array->run_ends());
ASSERT_ARRAYS_EQUAL(*expected_values, *ree_array->values());
ASSERT_EQ(array->length(), 505);
ASSERT_EQ(array->offset(), 0);
}

TEST_P(TestRunEndEncodedArray, Validate) {
auto run_ends_good = ArrayFromJSON(run_end_type, "[10, 20, 30, 40]");
auto values = ArrayFromJSON(utf8(), R"(["A", "B", "C", null])");
Expand Down
213 changes: 213 additions & 0 deletions cpp/src/arrow/array/builder_run_end.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/array/builder_primitive.h"
#include "arrow/array/builder_run_end.h"

#include <cstddef>
#include <cstdint>
#include <utility>
#include <vector>

#include "arrow/scalar.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/ree_util.h"

namespace arrow {

// ----------------------------------------------------------------------
// RunEndEncodedBuilder

RunEndEncodedBuilder::RunEndEncodedBuilder(
MemoryPool* pool, const std::shared_ptr<ArrayBuilder>& run_end_builder,
const std::shared_ptr<ArrayBuilder>& value_builder, std::shared_ptr<DataType> type)
: ArrayBuilder(pool), type_(internal::checked_pointer_cast<RunEndEncodedType>(type)) {
children_ = {std::move(run_end_builder), std::move(value_builder)};
}

Status RunEndEncodedBuilder::Resize(int64_t) {
return Status::NotImplemented(
"Resizing an REE for a given number of logical elements is not possible, since the "
"physical length will vary depending on the contents. To allocate memory for a "
"certain number of runs, use ResizePhysical.");
}

void RunEndEncodedBuilder::Reset() {
capacity_ = length_ = null_count_ = 0;
value_builder().Reset();
run_end_builder().Reset();
current_value_.reset();
run_start_ = 0;
}

Status RunEndEncodedBuilder::AppendNull() { return AppendNulls(1); }

Status RunEndEncodedBuilder::AppendNulls(int64_t length) {
if (length == 0) {
return Status::OK();
}
if (current_value_) {
RETURN_NOT_OK(FinishRun());
current_value_ = {};
}
return AddLength(length);
}

Status RunEndEncodedBuilder::AppendEmptyValue() { return AppendNull(); }

Status RunEndEncodedBuilder::AppendEmptyValues(int64_t length) {
return AppendNulls(length);
}

Status RunEndEncodedBuilder::AppendScalar(const Scalar& scalar, int64_t n_repeats) {
if (n_repeats == 0) {
return Status::OK();
}
if (!scalar.is_valid) {
return AppendNulls(n_repeats);
}
if (!current_value_ || !current_value_->Equals(scalar)) {
RETURN_NOT_OK(FinishRun());
current_value_ = scalar.shared_from_this();
}
return AddLength(n_repeats);
}

Status RunEndEncodedBuilder::AppendScalars(const ScalarVector& scalars) {
for (auto scalar : scalars) {
RETURN_NOT_OK(AppendScalar(*scalar, 1));
}
return Status::OK();
}

template <typename RunEndsType>
Status RunEndEncodedBuilder::DoAppendArray(const ArraySpan& to_append) {
const int64_t physical_offset = ree_util::FindPhysicalOffset(to_append);
int64_t physical_length = 0;
for (auto it = ree_util::MergedRunsIterator<RunEndsType>(to_append);
it != ree_util::MergedRunsIterator(); ++it) {
physical_length++;
length_ += it.run_length();
RETURN_NOT_OK(DoAppendRunEnd<RunEndsType>());
}
return value_builder().AppendArraySlice(ree_util::ValuesArray(to_append),
physical_offset, physical_length);
}

Status RunEndEncodedBuilder::AppendArraySlice(const ArraySpan& array, int64_t offset,
int64_t length) {
// Finish eventual runs started using AppendScalars() and others before. We don't
// attempt to merge them with the runs from the appended array slice for now
RETURN_NOT_OK(FinishRun());

ARROW_DCHECK(offset + length <= array.length);
ARROW_DCHECK(array.type->Equals(type_));
// Create a slice of the slice for the part we actually want to add
ArraySpan to_append = array;
to_append.SetSlice(array.offset + offset, length);

switch (type_->run_end_type()->id()) {
case Type::INT16:
RETURN_NOT_OK(DoAppendArray<int16_t>(to_append));
break;
case Type::INT32:
RETURN_NOT_OK(DoAppendArray<int32_t>(to_append));
break;
case Type::INT64:
RETURN_NOT_OK(DoAppendArray<int64_t>(to_append));
break;
default:
return Status::Invalid("Invalid type for run ends array: ", type_->run_end_type());
}

// next run is not merged either
run_start_ = length_;
return Status::OK();
}

std::shared_ptr<DataType> RunEndEncodedBuilder::type() const { return type_; }

Status RunEndEncodedBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
ARROW_ASSIGN_OR_RAISE(auto run_ends_array, run_end_builder().Finish());
ARROW_ASSIGN_OR_RAISE(auto values_array, value_builder().Finish());
ARROW_ASSIGN_OR_RAISE(auto ree_array,
RunEndEncodedArray::Make(length_, run_ends_array, values_array));
*out = std::move(ree_array->data());
return Status::OK();
}

template <typename RunEndsType>
Status RunEndEncodedBuilder::DoAppendRunEnd() {
return internal::checked_cast<typename CTypeTraits<RunEndsType>::BuilderType*>(
children_[0].get())
->Append(static_cast<RunEndsType>(length_));
}

Status RunEndEncodedBuilder::FinishRun() {
if (length_ - run_start_ == 0) {
return Status::OK();
}
if (current_value_) {
RETURN_NOT_OK(value_builder().AppendScalar(*current_value_));
} else {
RETURN_NOT_OK(value_builder().AppendNull());
}
switch (type_->run_end_type()->id()) {
case Type::INT16:
RETURN_NOT_OK(DoAppendRunEnd<int16_t>());
break;
case Type::INT32:
RETURN_NOT_OK(DoAppendRunEnd<int32_t>());
break;
case Type::INT64:
RETURN_NOT_OK(DoAppendRunEnd<int64_t>());
break;
default:
return Status::Invalid("Invalid type for run ends array: ", type_->run_end_type());
}
current_value_.reset();
run_start_ = 0;
return Status::OK();
}

Status RunEndEncodedBuilder::ResizePhyiscal(int64_t capacity) {
RETURN_NOT_OK(value_builder().Resize(capacity));
return run_end_builder().Resize(capacity);
}

Status RunEndEncodedBuilder::AddLength(int64_t added_length) {
if (ARROW_PREDICT_FALSE(added_length < 0)) {
return Status::Invalid("Added length must be positive (requested: ", added_length,
")");
}
if (ARROW_PREDICT_FALSE(added_length > std::numeric_limits<int32_t>::max() ||
length_ + added_length > std::numeric_limits<int32_t>::max())) {
return Status::Invalid(
"Run-end encoded array length must fit in a 32-bit signed integer (adding ",
added_length, " items to array of length ", length_, ")");
}

length_ += added_length;
return Status::OK();
}

ArrayBuilder& RunEndEncodedBuilder::run_end_builder() { return *children_[0]; }

ArrayBuilder& RunEndEncodedBuilder::value_builder() { return *children_[1]; }

} // namespace arrow
85 changes: 85 additions & 0 deletions cpp/src/arrow/array/builder_run_end.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstdint>
#include <limits>
#include <memory>
#include <utility>
#include <vector>

#include "arrow/array.h"
#include "arrow/array/builder_base.h"

namespace arrow {

/// \addtogroup encoded-builders
///
/// @{

// ----------------------------------------------------------------------
// RunEndEncoded builder

class ARROW_EXPORT RunEndEncodedBuilder : public ArrayBuilder {
public:
RunEndEncodedBuilder(MemoryPool* pool,
const std::shared_ptr<ArrayBuilder>& run_end_builder,
const std::shared_ptr<ArrayBuilder>& value_builder,
std::shared_ptr<DataType> type);

Status FinishInternal(std::shared_ptr<ArrayData>* out) final;
/// \cond FALSE
using ArrayBuilder::Finish;
/// \endcond

Status Resize(int64_t capacity) final;
void Reset() final;

Status Finish(std::shared_ptr<RunEndEncodedArray>* out) { return FinishTyped(out); }
Status AppendNull() final;
Status AppendNulls(int64_t length) final;
Status AppendEmptyValue() final;
Status AppendEmptyValues(int64_t length) final;
Status AppendScalar(const Scalar& scalar, int64_t n_repeats) final;
Status AppendScalars(const ScalarVector& scalars) final;
Status AppendArraySlice(const ArraySpan& array, int64_t offset, int64_t length) final;
std::shared_ptr<DataType> type() const final;

/// \brief Allocate enough memory for a given number of runs. Like Resize on non-REE
/// builders, it does not account for variable size data.
Status ResizePhyiscal(int64_t capacity);

private:
Status FinishRun();
Status AddLength(int64_t added_length);
template <typename RunEndsType>
Status DoAppendArray(const ArraySpan& to_append);
template <typename RunEndsType>
Status DoAppendRunEnd();
ArrayBuilder& run_end_builder();
ArrayBuilder& value_builder();

std::shared_ptr<RunEndEncodedType> type_;
// must be null pointer for non-valid values
std::shared_ptr<const Scalar> current_value_;
int64_t run_start_ = 0;
};

/// @}

} // namespace arrow
8 changes: 8 additions & 0 deletions cpp/src/arrow/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,14 @@ struct MakeBuilderImpl {
return Status::OK();
}

Status Visit(const RunEndEncodedType& ree_type) {
ARROW_ASSIGN_OR_RAISE(auto run_end_builder, ChildBuilder(ree_type.run_end_type()));
ARROW_ASSIGN_OR_RAISE(auto value_builder, ChildBuilder(ree_type.value_type()));
out.reset(new RunEndEncodedBuilder(pool, std::move(run_end_builder),
std::move(value_builder), type));
return Status::OK();
}

Status Visit(const ExtensionType&) { return NotImplemented(); }
Status Visit(const DataType&) { return NotImplemented(); }

Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/array/builder_dict.h" // IWYU pragma: keep
#include "arrow/array/builder_nested.h" // IWYU pragma: keep
#include "arrow/array/builder_primitive.h" // IWYU pragma: keep
#include "arrow/array/builder_run_end.h" // IWYU pragma: keep
#include "arrow/array/builder_time.h" // IWYU pragma: keep
#include "arrow/array/builder_union.h" // IWYU pragma: keep
#include "arrow/status.h"
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ struct DenseUnionScalar;

class RunEndEncodedType;
class RunEndEncodedArray;
class RunEndEncodedBuilder;

template <typename TypeClass>
class NumericArray;
Expand Down
Loading

0 comments on commit 7ff1ce7

Please sign in to comment.