Skip to content

Commit

Permalink
Move min/max to header and rework with BitBlockCounter
Browse files Browse the repository at this point in the history
Signed-off-by: Frank Du <frank.du@intel.com>
  • Loading branch information
frankdjx committed Aug 10, 2020
1 parent 1452a5e commit 10d2217
Show file tree
Hide file tree
Showing 2 changed files with 260 additions and 204 deletions.
204 changes: 0 additions & 204 deletions cpp/src/arrow/compute/kernels/aggregate_basic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.

#include <cmath>

#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/kernels/aggregate_basic_internal.h"
#include "arrow/compute/kernels/aggregate_internal.h"
Expand Down Expand Up @@ -132,208 +130,6 @@ std::unique_ptr<KernelState> MeanInit(KernelContext* ctx, const KernelInitArgs&
// ----------------------------------------------------------------------
// MinMax implementation

template <typename ArrowType, typename Enable = void>
struct MinMaxState {};

template <typename ArrowType>
struct MinMaxState<ArrowType, enable_if_boolean<ArrowType>> {
using ThisType = MinMaxState<ArrowType>;
using T = typename ArrowType::c_type;

ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = this->min && rhs.min;
this->max = this->max || rhs.max;
return *this;
}

void MergeOne(T value) {
this->min = this->min && value;
this->max = this->max || value;
}

T min = true;
T max = false;
bool has_nulls = false;
bool has_values = false;
};

template <typename ArrowType>
struct MinMaxState<ArrowType, enable_if_integer<ArrowType>> {
using ThisType = MinMaxState<ArrowType>;
using T = typename ArrowType::c_type;

ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = std::min(this->min, rhs.min);
this->max = std::max(this->max, rhs.max);
return *this;
}

void MergeOne(T value) {
this->min = std::min(this->min, value);
this->max = std::max(this->max, value);
}

T min = std::numeric_limits<T>::max();
T max = std::numeric_limits<T>::min();
bool has_nulls = false;
bool has_values = false;
};

template <typename ArrowType>
struct MinMaxState<ArrowType, enable_if_floating_point<ArrowType>> {
using ThisType = MinMaxState<ArrowType>;
using T = typename ArrowType::c_type;

ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = std::fmin(this->min, rhs.min);
this->max = std::fmax(this->max, rhs.max);
return *this;
}

void MergeOne(T value) {
this->min = std::fmin(this->min, value);
this->max = std::fmax(this->max, value);
}

T min = std::numeric_limits<T>::infinity();
T max = -std::numeric_limits<T>::infinity();
bool has_nulls = false;
bool has_values = false;
};

template <typename ArrowType>
struct MinMaxImpl : public ScalarAggregator {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
using ThisType = MinMaxImpl<ArrowType>;
using StateType = MinMaxState<ArrowType>;

MinMaxImpl(const std::shared_ptr<DataType>& out_type, const MinMaxOptions& options)
: out_type(out_type), options(options) {}

void Consume(KernelContext*, const ExecBatch& batch) override {
StateType local;

ArrayType arr(batch[0].array());

const auto null_count = arr.null_count();
local.has_nulls = null_count > 0;
local.has_values = (arr.length() - null_count) > 0;

if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) {
this->state = local;
return;
}

if (local.has_nulls) {
BitmapReader reader(arr.null_bitmap_data(), arr.offset(), arr.length());
for (int64_t i = 0; i < arr.length(); i++) {
if (reader.IsSet()) {
local.MergeOne(arr.Value(i));
}
reader.Next();
}
} else {
for (int64_t i = 0; i < arr.length(); i++) {
local.MergeOne(arr.Value(i));
}
}
this->state = local;
}

void MergeFrom(KernelContext*, const KernelState& src) override {
const auto& other = checked_cast<const ThisType&>(src);
this->state += other.state;
}

void Finalize(KernelContext*, Datum* out) override {
using ScalarType = typename TypeTraits<ArrowType>::ScalarType;

std::vector<std::shared_ptr<Scalar>> values;
if (!state.has_values ||
(state.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL)) {
// (null, null)
values = {std::make_shared<ScalarType>(), std::make_shared<ScalarType>()};
} else {
values = {std::make_shared<ScalarType>(state.min),
std::make_shared<ScalarType>(state.max)};
}
out->value = std::make_shared<StructScalar>(std::move(values), this->out_type);
}

std::shared_ptr<DataType> out_type;
MinMaxOptions options;
MinMaxState<ArrowType> state;
};

struct BooleanMinMaxImpl : public MinMaxImpl<BooleanType> {
using MinMaxImpl::MinMaxImpl;

void Consume(KernelContext*, const ExecBatch& batch) override {
StateType local;
ArrayType arr(batch[0].array());

const auto arr_length = arr.length();
const auto null_count = arr.null_count();
const auto valid_count = arr_length - null_count;

local.has_nulls = null_count > 0;
local.has_values = valid_count > 0;
if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) {
this->state = local;
return;
}

const auto true_count = arr.true_count();
const auto false_count = valid_count - true_count;
local.max = true_count > 0;
local.min = false_count == 0;

this->state = local;
}
};

struct MinMaxInitState {
std::unique_ptr<KernelState> state;
KernelContext* ctx;
const DataType& in_type;
const std::shared_ptr<DataType>& out_type;
const MinMaxOptions& options;

MinMaxInitState(KernelContext* ctx, const DataType& in_type,
const std::shared_ptr<DataType>& out_type, const MinMaxOptions& options)
: ctx(ctx), in_type(in_type), out_type(out_type), options(options) {}

Status Visit(const DataType&) {
return Status::NotImplemented("No min/max implemented");
}

Status Visit(const HalfFloatType&) {
return Status::NotImplemented("No sum implemented");
}

Status Visit(const BooleanType&) {
state.reset(new BooleanMinMaxImpl(out_type, options));
return Status::OK();
}

template <typename Type>
enable_if_number<Type, Status> Visit(const Type&) {
state.reset(new MinMaxImpl<Type>(out_type, options));
return Status::OK();
}

std::unique_ptr<KernelState> Create() {
ctx->SetStatus(VisitTypeInline(in_type, this));
return std::move(state);
}
};

std::unique_ptr<KernelState> MinMaxInit(KernelContext* ctx, const KernelInitArgs& args) {
MinMaxInitState visitor(ctx, *args.inputs[0].type,
args.kernel->signature->out_type().type(),
Expand Down
Loading

0 comments on commit 10d2217

Please sign in to comment.