Skip to content

Commit

Permalink
ARROW-9605: [C++] Speed up aggregate min/max compute kernels on integ…
Browse files Browse the repository at this point in the history
…er types

1. Use BitBlockCounter to speedup the performance for typical 0.01% null probability data.
2. Enable compiler auto SIMD vectorize for no-nulls on int types. Float/Double use fmin/fmax to handle NaN which can't be vectorize by compiler.
3. Also add test case to cover different null probability.

Closes apache#7871 from jianxind/kernel_min_max

Lead-authored-by: Frank Du <frank.du@intel.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
2 people authored and GeorgeAp committed Jun 7, 2021
1 parent cf8018a commit 85a9933
Show file tree
Hide file tree
Showing 7 changed files with 469 additions and 242 deletions.
12 changes: 6 additions & 6 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -387,17 +387,17 @@ if(ARROW_COMPUTE)
compute/kernels/vector_sort.cc)

if(CXX_SUPPORTS_AVX2)
list(APPEND ARROW_SRCS compute/kernels/aggregate_sum_avx2.cc)
set_source_files_properties(compute/kernels/aggregate_sum_avx2.cc PROPERTIES
list(APPEND ARROW_SRCS compute/kernels/aggregate_basic_avx2.cc)
set_source_files_properties(compute/kernels/aggregate_basic_avx2.cc PROPERTIES
SKIP_PRECOMPILE_HEADERS ON)
set_source_files_properties(compute/kernels/aggregate_sum_avx2.cc PROPERTIES
set_source_files_properties(compute/kernels/aggregate_basic_avx2.cc PROPERTIES
COMPILE_FLAGS ${ARROW_AVX2_FLAG})
endif()
if(CXX_SUPPORTS_AVX512)
list(APPEND ARROW_SRCS compute/kernels/aggregate_sum_avx512.cc)
set_source_files_properties(compute/kernels/aggregate_sum_avx512.cc PROPERTIES
list(APPEND ARROW_SRCS compute/kernels/aggregate_basic_avx512.cc)
set_source_files_properties(compute/kernels/aggregate_basic_avx512.cc PROPERTIES
SKIP_PRECOMPILE_HEADERS ON)
set_source_files_properties(compute/kernels/aggregate_sum_avx512.cc PROPERTIES
set_source_files_properties(compute/kernels/aggregate_basic_avx512.cc PROPERTIES
COMPILE_FLAGS ${ARROW_AVX512_FLAG})
endif()
endif()
Expand Down
17 changes: 0 additions & 17 deletions cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,23 +130,6 @@ Result<Datum> MinMax(const Datum& value,
const MinMaxOptions& options = MinMaxOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Calculate the min / max of a numeric array.
///
/// This function returns both the min and max as a collection. The resulting
/// datum thus consists of two scalar datums: {Datum(min), Datum(max)}
///
/// \param[in] array input array
/// \param[in] options see MinMaxOptions for more information
/// \param[in] ctx the function execution context, optional
/// \return resulting datum containing a {min, max} collection
///
/// \since 1.0.0
/// \note API not yet finalized
ARROW_EXPORT
Result<Datum> MinMax(const Array& array,
const MinMaxOptions& options = MinMaxOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Calculate the modal (most common) value of a numeric array
///
/// This function returns both mode and count as a struct scalar, with type
Expand Down
225 changes: 16 additions & 209 deletions cpp/src/arrow/compute/kernels/aggregate_basic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.

#include <cmath>

#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/kernels/aggregate_basic_internal.h"
#include "arrow/compute/kernels/aggregate_internal.h"
Expand Down Expand Up @@ -132,212 +130,10 @@ std::unique_ptr<KernelState> MeanInit(KernelContext* ctx, const KernelInitArgs&
// ----------------------------------------------------------------------
// MinMax implementation

template <typename ArrowType, typename Enable = void>
struct MinMaxState {};

template <typename ArrowType>
struct MinMaxState<ArrowType, enable_if_boolean<ArrowType>> {
using ThisType = MinMaxState<ArrowType>;
using T = typename ArrowType::c_type;

ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = this->min && rhs.min;
this->max = this->max || rhs.max;
return *this;
}

void MergeOne(T value) {
this->min = this->min && value;
this->max = this->max || value;
}

T min = true;
T max = false;
bool has_nulls = false;
bool has_values = false;
};

template <typename ArrowType>
struct MinMaxState<ArrowType, enable_if_integer<ArrowType>> {
using ThisType = MinMaxState<ArrowType>;
using T = typename ArrowType::c_type;

ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = std::min(this->min, rhs.min);
this->max = std::max(this->max, rhs.max);
return *this;
}

void MergeOne(T value) {
this->min = std::min(this->min, value);
this->max = std::max(this->max, value);
}

T min = std::numeric_limits<T>::max();
T max = std::numeric_limits<T>::min();
bool has_nulls = false;
bool has_values = false;
};

template <typename ArrowType>
struct MinMaxState<ArrowType, enable_if_floating_point<ArrowType>> {
using ThisType = MinMaxState<ArrowType>;
using T = typename ArrowType::c_type;

ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = std::fmin(this->min, rhs.min);
this->max = std::fmax(this->max, rhs.max);
return *this;
}

void MergeOne(T value) {
this->min = std::fmin(this->min, value);
this->max = std::fmax(this->max, value);
}

T min = std::numeric_limits<T>::infinity();
T max = -std::numeric_limits<T>::infinity();
bool has_nulls = false;
bool has_values = false;
};

template <typename ArrowType>
struct MinMaxImpl : public ScalarAggregator {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
using ThisType = MinMaxImpl<ArrowType>;
using StateType = MinMaxState<ArrowType>;

MinMaxImpl(const std::shared_ptr<DataType>& out_type, const MinMaxOptions& options)
: out_type(out_type), options(options) {}

void Consume(KernelContext*, const ExecBatch& batch) override {
StateType local;

ArrayType arr(batch[0].array());

const auto null_count = arr.null_count();
local.has_nulls = null_count > 0;
local.has_values = (arr.length() - null_count) > 0;

if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) {
this->state = local;
return;
}

if (local.has_nulls) {
BitmapReader reader(arr.null_bitmap_data(), arr.offset(), arr.length());
for (int64_t i = 0; i < arr.length(); i++) {
if (reader.IsSet()) {
local.MergeOne(arr.Value(i));
}
reader.Next();
}
} else {
for (int64_t i = 0; i < arr.length(); i++) {
local.MergeOne(arr.Value(i));
}
}
this->state = local;
}

void MergeFrom(KernelContext*, const KernelState& src) override {
const auto& other = checked_cast<const ThisType&>(src);
this->state += other.state;
}

void Finalize(KernelContext*, Datum* out) override {
using ScalarType = typename TypeTraits<ArrowType>::ScalarType;

std::vector<std::shared_ptr<Scalar>> values;
if (!state.has_values ||
(state.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL)) {
// (null, null)
values = {std::make_shared<ScalarType>(), std::make_shared<ScalarType>()};
} else {
values = {std::make_shared<ScalarType>(state.min),
std::make_shared<ScalarType>(state.max)};
}
out->value = std::make_shared<StructScalar>(std::move(values), this->out_type);
}

std::shared_ptr<DataType> out_type;
MinMaxOptions options;
MinMaxState<ArrowType> state;
};

struct BooleanMinMaxImpl : public MinMaxImpl<BooleanType> {
using MinMaxImpl::MinMaxImpl;

void Consume(KernelContext*, const ExecBatch& batch) override {
StateType local;
ArrayType arr(batch[0].array());

const auto arr_length = arr.length();
const auto null_count = arr.null_count();
const auto valid_count = arr_length - null_count;

local.has_nulls = null_count > 0;
local.has_values = valid_count > 0;
if (local.has_nulls && options.null_handling == MinMaxOptions::OUTPUT_NULL) {
this->state = local;
return;
}

const auto true_count = arr.true_count();
const auto false_count = valid_count - true_count;
local.max = true_count > 0;
local.min = false_count == 0;

this->state = local;
}
};

struct MinMaxInitState {
std::unique_ptr<KernelState> state;
KernelContext* ctx;
const DataType& in_type;
const std::shared_ptr<DataType>& out_type;
const MinMaxOptions& options;

MinMaxInitState(KernelContext* ctx, const DataType& in_type,
const std::shared_ptr<DataType>& out_type, const MinMaxOptions& options)
: ctx(ctx), in_type(in_type), out_type(out_type), options(options) {}

Status Visit(const DataType&) {
return Status::NotImplemented("No min/max implemented");
}

Status Visit(const HalfFloatType&) {
return Status::NotImplemented("No sum implemented");
}

Status Visit(const BooleanType&) {
state.reset(new BooleanMinMaxImpl(out_type, options));
return Status::OK();
}

template <typename Type>
enable_if_number<Type, Status> Visit(const Type&) {
state.reset(new MinMaxImpl<Type>(out_type, options));
return Status::OK();
}

std::unique_ptr<KernelState> Create() {
ctx->SetStatus(VisitTypeInline(in_type, this));
return std::move(state);
}
};

std::unique_ptr<KernelState> MinMaxInit(KernelContext* ctx, const KernelInitArgs& args) {
MinMaxInitState visitor(ctx, *args.inputs[0].type,
args.kernel->signature->out_type().type(),
static_cast<const MinMaxOptions&>(*args.options));
MinMaxInitState<SimdLevel::NONE> visitor(
ctx, *args.inputs[0].type, args.kernel->signature->out_type().type(),
static_cast<const MinMaxOptions&>(*args.options));
return visitor.Create();
}

Expand All @@ -363,8 +159,7 @@ void AddBasicAggKernels(KernelInit init,

void AddMinMaxKernels(KernelInit init,
const std::vector<std::shared_ptr<DataType>>& types,
ScalarAggregateFunction* func,
SimdLevel::type simd_level = SimdLevel::NONE) {
ScalarAggregateFunction* func, SimdLevel::type simd_level) {
for (const auto& ty : types) {
// array[T] -> scalar[struct<min: T, max: T>]
auto out_ty = struct_({field("min", ty), field("max", ty)});
Expand Down Expand Up @@ -431,6 +226,18 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
&default_minmax_options);
aggregate::AddMinMaxKernels(aggregate::MinMaxInit, {boolean()}, func.get());
aggregate::AddMinMaxKernels(aggregate::MinMaxInit, NumericTypes(), func.get());
// Add the SIMD variants for min max
#if defined(ARROW_HAVE_RUNTIME_AVX2)
if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX2)) {
aggregate::AddMinMaxAvx2AggKernels(func.get());
}
#endif
#if defined(ARROW_HAVE_RUNTIME_AVX512)
if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX512)) {
aggregate::AddMinMaxAvx512AggKernels(func.get());
}
#endif

DCHECK_OK(registry->AddFunction(std::move(func)));

DCHECK_OK(registry->AddFunction(aggregate::AddModeAggKernels()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ std::unique_ptr<KernelState> MeanInitAvx2(KernelContext* ctx,
return visitor.Create();
}

// ----------------------------------------------------------------------
// MinMax implementation

std::unique_ptr<KernelState> MinMaxInitAvx2(KernelContext* ctx,
const KernelInitArgs& args) {
MinMaxInitState<SimdLevel::AVX2> visitor(
ctx, *args.inputs[0].type, args.kernel->signature->out_type().type(),
static_cast<const MinMaxOptions&>(*args.options));
return visitor.Create();
}

void AddSumAvx2AggKernels(ScalarAggregateFunction* func) {
AddBasicAggKernels(SumInitAvx2, internal::SignedIntTypes(), int64(), func,
SimdLevel::AVX2);
Expand All @@ -81,6 +92,12 @@ void AddMeanAvx2AggKernels(ScalarAggregateFunction* func) {
SimdLevel::AVX2);
}

void AddMinMaxAvx2AggKernels(ScalarAggregateFunction* func) {
// Enable int types for AVX2 variants.
// No auto vectorize for float/double as it use fmin/fmax which has NaN handling.
AddMinMaxKernels(MinMaxInitAvx2, internal::IntTypes(), func, SimdLevel::AVX2);
}

} // namespace aggregate
} // namespace compute
} // namespace arrow
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ std::unique_ptr<KernelState> MeanInitAvx512(KernelContext* ctx,
return visitor.Create();
}

// ----------------------------------------------------------------------
// MinMax implementation

std::unique_ptr<KernelState> MinMaxInitAvx512(KernelContext* ctx,
const KernelInitArgs& args) {
MinMaxInitState<SimdLevel::AVX512> visitor(
ctx, *args.inputs[0].type, args.kernel->signature->out_type().type(),
static_cast<const MinMaxOptions&>(*args.options));
return visitor.Create();
}

void AddSumAvx512AggKernels(ScalarAggregateFunction* func) {
AddBasicAggKernels(SumInitAvx512, internal::SignedIntTypes(), int64(), func,
SimdLevel::AVX512);
Expand All @@ -82,6 +93,12 @@ void AddMeanAvx512AggKernels(ScalarAggregateFunction* func) {
SimdLevel::AVX512);
}

void AddMinMaxAvx512AggKernels(ScalarAggregateFunction* func) {
// Enable 32/64 int types for avx512 variants, no advantage on 8/16 int.
AddMinMaxKernels(MinMaxInitAvx512, {int32(), uint32(), int64(), uint64()}, func,
SimdLevel::AVX512);
}

} // namespace aggregate
} // namespace compute
} // namespace arrow
Loading

0 comments on commit 85a9933

Please sign in to comment.