diff --git a/native-sql-engine/cpp/src/CMakeLists.txt b/native-sql-engine/cpp/src/CMakeLists.txt index b579bc879..df9678533 100644 --- a/native-sql-engine/cpp/src/CMakeLists.txt +++ b/native-sql-engine/cpp/src/CMakeLists.txt @@ -7,7 +7,7 @@ include(FindPkgConfig) include(GNUInstallDirs) include(CheckCXXCompilerFlag) -set(CMAKE_CXX_STANDARD 14) +set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) @@ -520,7 +520,6 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS precompile/builder.cc precompile/array.cc precompile/type.cc - precompile/sort.cc precompile/hash_arrays_kernel.cc precompile/unsafe_array.cc precompile/gandiva_projector.cc diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc index 5a0860ac3..2f2d699a2 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc @@ -152,20 +152,31 @@ class UniqueAction : public ActionBase { row_id_ = 0; in_null_count_ = in_->null_count(); // prepare evaluate lambda - *on_valid = [this](int dest_group_id) { - const bool is_null = in_null_count_ > 0 && in_->IsNull(row_id_); - if (cache_validity_[dest_group_id] == false) { - if (!is_null) { + if (in_null_count_) { + *on_valid = [this](int dest_group_id) { + const bool is_null = in_null_count_ > 0 && in_->IsNull(row_id_); + if (cache_validity_[dest_group_id] == false) { + if (!is_null) { + cache_validity_[dest_group_id] = true; + cache_[dest_group_id] = (CType)in_->GetView(row_id_); + } else { + cache_validity_[dest_group_id] = true; + null_flag_[dest_group_id] = true; + } + } + row_id_++; + return arrow::Status::OK(); + }; + } else { + *on_valid = [this](int dest_group_id) { + if (cache_validity_[dest_group_id] == false) { cache_validity_[dest_group_id] = true; cache_[dest_group_id] = (CType)in_->GetView(row_id_); - } else { - cache_validity_[dest_group_id] = true; - null_flag_[dest_group_id] = true; } - } - row_id_++; - return arrow::Status::OK(); - }; + row_id_++; + return arrow::Status::OK(); + }; + } *on_null = [this]() { row_id_++; @@ -1802,15 +1813,25 @@ class SumAction(in_->data()->GetValues(1)); row_id = 0; - *on_valid = [this](int dest_group_id) { - const bool is_null = in_null_count_ > 0 && in_->IsNull(row_id); - if (!is_null) { + if (!in_null_count_) { + *on_valid = [this](int dest_group_id) { cache_validity_[dest_group_id] = true; cache_[dest_group_id] += data_[row_id]; - } - row_id++; - return arrow::Status::OK(); - }; + + row_id++; + return arrow::Status::OK(); + }; + } else { + *on_valid = [this](int dest_group_id) { + const bool is_null = in_null_count_ > 0 && in_->IsNull(row_id); + if (!is_null) { + cache_validity_[dest_group_id] = true; + cache_[dest_group_id] += data_[row_id]; + } + row_id++; + return arrow::Status::OK(); + }; + } *on_null = [this]() { row_id++; @@ -1952,15 +1973,24 @@ class SumActionnull_count(); // prepare evaluate lambda row_id = 0; - *on_valid = [this](int dest_group_id) { - const bool is_null = in_null_count_ > 0 && in_->IsNull(row_id); - if (!is_null) { + if (in_null_count_) { + *on_valid = [this](int dest_group_id) { + const bool is_null = in_null_count_ > 0 && in_->IsNull(row_id); + if (!is_null) { + cache_validity_[dest_group_id] = true; + cache_[dest_group_id] += in_->GetView(row_id); + } + row_id++; + return arrow::Status::OK(); + }; + } else { + *on_valid = [this](int dest_group_id) { cache_validity_[dest_group_id] = true; cache_[dest_group_id] += in_->GetView(row_id); - } - row_id++; - return arrow::Status::OK(); - }; + row_id++; + return arrow::Status::OK(); + }; + } *on_null = [this]() { row_id++; @@ -2108,18 +2138,29 @@ class SumActionPartialnull_count(); - // prepare evaluate lambda + data_ = const_cast(in_->data()->GetValues(1)); row_id = 0; - *on_valid = [this](int dest_group_id) { - const bool is_null = in_null_count_ > 0 && in_->IsNull(row_id); - if (!is_null) { + // prepare evaluate lambda + if (in_null_count_) { + *on_valid = [this](int dest_group_id) { + const bool is_null = in_null_count_ > 0 && in_->IsNull(row_id); + if (!is_null) { + cache_validity_[dest_group_id] = true; + cache_[dest_group_id] += data_[row_id]; + } + row_id++; + return arrow::Status::OK(); + }; + } else { + *on_valid = [this](int dest_group_id) { cache_validity_[dest_group_id] = true; cache_[dest_group_id] += data_[row_id]; - } - row_id++; - return arrow::Status::OK(); - }; + + row_id++; + return arrow::Status::OK(); + }; + } *on_null = [this]() { row_id++; @@ -2263,17 +2304,28 @@ class SumActionPartial(in_list[0]); in_null_count_ = in_->null_count(); - // prepare evaluate lambda + row_id = 0; - *on_valid = [this](int dest_group_id) { - const bool is_null = in_null_count_ > 0 && in_->IsNull(row_id); - if (!is_null) { + // prepare evaluate lambda + if (in_null_count_) { + *on_valid = [this](int dest_group_id) { + const bool is_null = in_null_count_ > 0 && in_->IsNull(row_id); + if (!is_null) { + cache_validity_[dest_group_id] = true; + cache_[dest_group_id] += in_->GetView(row_id); + } + row_id++; + return arrow::Status::OK(); + }; + } else { + *on_valid = [this](int dest_group_id) { cache_validity_[dest_group_id] = true; cache_[dest_group_id] += in_->GetView(row_id); - } - row_id++; - return arrow::Status::OK(); - }; + + row_id++; + return arrow::Status::OK(); + }; + } *on_null = [this]() { row_id++; @@ -2785,16 +2837,26 @@ class SumCountAction(in_->data()->GetValues(1)); row_id = 0; - *on_valid = [this](int dest_group_id) { - const bool is_null = in_null_count_ > 0 && in_->IsNull(row_id); - if (!is_null) { + if (!in_null_count_) { + *on_valid = [this](int dest_group_id) { cache_sum_[dest_group_id] += data_[row_id]; cache_count_[dest_group_id] += 1; cache_validity_[dest_group_id] = true; - } - row_id++; - return arrow::Status::OK(); - }; + row_id++; + return arrow::Status::OK(); + }; + } else { + *on_valid = [this](int dest_group_id) { + const bool is_null = in_null_count_ > 0 && in_->IsNull(row_id); + if (!is_null) { + cache_sum_[dest_group_id] += data_[row_id]; + cache_count_[dest_group_id] += 1; + cache_validity_[dest_group_id] = true; + } + row_id++; + return arrow::Status::OK(); + }; + } *on_null = [this]() { row_id++; @@ -2963,16 +3025,27 @@ class SumCountActionnull_count(); // prepare evaluate lambda row_id = 0; - *on_valid = [this](int dest_group_id) { - const bool is_null = in_null_count_ > 0 && in_->IsNull(row_id); - if (!is_null) { + if (in_null_count_) { + *on_valid = [this](int dest_group_id) { + const bool is_null = in_null_count_ > 0 && in_->IsNull(row_id); + if (!is_null) { + cache_sum_[dest_group_id] += in_->GetView(row_id); + cache_count_[dest_group_id] += 1; + cache_validity_[dest_group_id] = true; + } + row_id++; + return arrow::Status::OK(); + }; + } else { + *on_valid = [this](int dest_group_id) { cache_sum_[dest_group_id] += in_->GetView(row_id); cache_count_[dest_group_id] += 1; cache_validity_[dest_group_id] = true; - } - row_id++; - return arrow::Status::OK(); - }; + + row_id++; + return arrow::Status::OK(); + }; + } *on_null = [this]() { row_id++; diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc index 2288865e8..c210102dc 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/hash_aggregate_kernel.cc @@ -911,14 +911,14 @@ class HashAggregateKernel::Impl { for (int i = 0; i < length; i++) { aggr_key_unsafe_row->reset(); - for (auto payload_arr : payloads) { + for (const auto& payload_arr : payloads) { payload_arr->Append(i, &aggr_key_unsafe_row); } - aggr_key = arrow::util::string_view(aggr_key_unsafe_row->data, - aggr_key_unsafe_row->cursor); + // FIXME(): all keys are null? aggr_hash_table_->GetOrInsert( - aggr_key, [](int) {}, [](int) {}, &(indices[i])); + aggr_key_unsafe_row->data, aggr_key_unsafe_row->cursor, [](int) {}, + [](int) {}, &(indices[i])); } } else { for (int i = 0; i < length; i++) { @@ -973,7 +973,6 @@ class HashAggregateKernel::Impl { arrow::Status Next(std::shared_ptr* out) { uint64_t out_length = 0; - int gp_idx = 0; std::vector> outputs; for (auto action : action_impl_list_) { action->Finish(offset_, batch_size_, &outputs); diff --git a/native-sql-engine/cpp/src/precompile/hash_map.cc b/native-sql-engine/cpp/src/precompile/hash_map.cc index d55156fc6..6c1ff6db6 100644 --- a/native-sql-engine/cpp/src/precompile/hash_map.cc +++ b/native-sql-engine/cpp/src/precompile/hash_map.cc @@ -27,27 +27,6 @@ namespace sparkcolumnarplugin { namespace precompile { -#define TYPED_SPARSE_HASH_MAP_IMPL(TYPENAME, TYPE) \ - class TYPENAME::Impl : public SparseHashMap { \ - public: \ - Impl(arrow::MemoryPool* pool) : SparseHashMap(pool) {} \ - }; \ - \ - TYPENAME::TYPENAME(arrow::MemoryPool* pool) { impl_ = std::make_shared(pool); } \ - arrow::Status TYPENAME::GetOrInsert(const TYPE& value, void (*on_found)(int32_t), \ - void (*on_not_found)(int32_t), \ - int32_t* out_memo_index) { \ - return impl_->GetOrInsert(value, on_found, on_not_found, out_memo_index); \ - } \ - int32_t TYPENAME::GetOrInsertNull(void (*on_found)(int32_t), \ - void (*on_not_found)(int32_t)) { \ - return impl_->GetOrInsertNull(on_found, on_not_found); \ - } \ - int32_t TYPENAME::Get(const TYPE& value) { return impl_->Get(value); } \ - int32_t TYPENAME::GetNull() { return impl_->GetNull(); } - -#undef TYPED_SPARSE_HASH_MAP_IMPL - #define TYPED_ARROW_HASH_MAP_IMPL(HASHMAPNAME, TYPENAME, TYPE, MEMOTABLETYPE) \ using MEMOTABLETYPE = \ typename arrow::internal::HashTraits::MemoTableType; \ @@ -72,6 +51,35 @@ namespace precompile { int32_t HASHMAPNAME::Get(const TYPE& value) { return impl_->Get(value); } \ int32_t HASHMAPNAME::GetNull() { return impl_->GetNull(); } +#define TYPED_ARROW_HASH_MAP_BINARY_IMPL(HASHMAPNAME, TYPENAME, TYPE, MEMOTABLETYPE) \ + using MEMOTABLETYPE = \ + typename arrow::internal::HashTraits::MemoTableType; \ + class HASHMAPNAME::Impl : public MEMOTABLETYPE { \ + public: \ + Impl(arrow::MemoryPool* pool) : MEMOTABLETYPE(pool, 128) {} \ + }; \ + \ + HASHMAPNAME::HASHMAPNAME(arrow::MemoryPool* pool) { \ + impl_ = std::make_shared(pool); \ + } \ + arrow::Status HASHMAPNAME::GetOrInsert(const TYPE& value, void (*on_found)(int32_t), \ + void (*on_not_found)(int32_t), \ + int32_t* out_memo_index) { \ + return impl_->GetOrInsert(value, on_found, on_not_found, out_memo_index); \ + } \ + arrow::Status HASHMAPNAME::GetOrInsert(const void* value, int len, void (*on_found)(int32_t), \ + void (*on_not_found)(int32_t), \ + int32_t* out_memo_index) { \ + return impl_->GetOrInsert(value, len, on_found, on_not_found, out_memo_index); \ + } \ + int32_t HASHMAPNAME::GetOrInsertNull(void (*on_found)(int32_t), \ + void (*on_not_found)(int32_t)) { \ + return impl_->GetOrInsertNull(on_found, on_not_found); \ + } \ + int32_t HASHMAPNAME::Size() { return impl_->size(); } \ + int32_t HASHMAPNAME::Get(const TYPE& value) { return impl_->Get(value); } \ + int32_t HASHMAPNAME::GetNull() { return impl_->GetNull(); } + #define TYPED_ARROW_HASH_MAP_DECIMAL_IMPL(HASHMAPNAME, TYPENAME, TYPE, MEMOTABLETYPE) \ using MEMOTABLETYPE = \ typename arrow::internal::HashTraits::MemoTableType; \ @@ -103,7 +111,7 @@ TYPED_ARROW_HASH_MAP_IMPL(FloatHashMap, FloatType, float, FloatMemoTableType) TYPED_ARROW_HASH_MAP_IMPL(DoubleHashMap, DoubleType, double, DoubleMemoTableType) TYPED_ARROW_HASH_MAP_IMPL(Date32HashMap, Date32Type, int32_t, Date32MemoTableType) TYPED_ARROW_HASH_MAP_IMPL(Date64HashMap, Date64Type, int64_t, Date64MemoTableType) -TYPED_ARROW_HASH_MAP_IMPL(StringHashMap, StringType, arrow::util::string_view, +TYPED_ARROW_HASH_MAP_BINARY_IMPL(StringHashMap, StringType, arrow::util::string_view, StringMemoTableType) TYPED_ARROW_HASH_MAP_DECIMAL_IMPL(Decimal128HashMap, Decimal128Type, arrow::Decimal128, DecimalMemoTableType) diff --git a/native-sql-engine/cpp/src/precompile/hash_map.h b/native-sql-engine/cpp/src/precompile/hash_map.h index 6ff5afec9..43fa95eb5 100644 --- a/native-sql-engine/cpp/src/precompile/hash_map.h +++ b/native-sql-engine/cpp/src/precompile/hash_map.h @@ -26,6 +26,8 @@ namespace precompile { TYPENAME(arrow::MemoryPool* pool); \ arrow::Status GetOrInsert(const TYPE& value, void (*on_found)(int32_t), \ void (*on_not_found)(int32_t), int32_t* out_memo_index); \ + arrow::Status GetOrInsert(const void* value, int len, void (*on_found)(int32_t), \ + void (*on_not_found)(int32_t), int32_t* out_memo_index); \ int32_t GetOrInsertNull(void (*on_found)(int32_t), void (*on_not_found)(int32_t)); \ int32_t Get(const TYPE& value); \ int32_t Size(); \ diff --git a/native-sql-engine/cpp/src/precompile/sort.cc b/native-sql-engine/cpp/src/precompile/sort.cc deleted file mode 100644 index a9d1515ca..000000000 --- a/native-sql-engine/cpp/src/precompile/sort.cc +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "precompile/sort.h" - -#include "third_party/ska_sort.hpp" - -using namespace sparkcolumnarplugin::codegen::arrowcompute::extra; -namespace sparkcolumnarplugin { -namespace precompile { -#define TYPED_ASC_SORT_IMPL(CTYPE) \ - void sort_asc(ArrayItemIndex* begin, ArrayItemIndex* end, \ - std::function extract_key) { \ - ska_sort(begin, end, extract_key); \ - } - -TYPED_ASC_SORT_IMPL(int32_t) -TYPED_ASC_SORT_IMPL(uint32_t) -TYPED_ASC_SORT_IMPL(int64_t) -TYPED_ASC_SORT_IMPL(uint64_t) -TYPED_ASC_SORT_IMPL(float) -TYPED_ASC_SORT_IMPL(double) -TYPED_ASC_SORT_IMPL(std::string) - -void sort_desc(ArrayItemIndex* begin, ArrayItemIndex* end, - std::function comp) { - // std::sort(begin, end, *comp.target()); - std::sort(begin, end, comp); -} -} // namespace precompile -} // namespace sparkcolumnarplugin diff --git a/native-sql-engine/cpp/src/precompile/sort.h b/native-sql-engine/cpp/src/precompile/sort.h deleted file mode 100644 index bbc93e4eb..000000000 --- a/native-sql-engine/cpp/src/precompile/sort.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include -#include -#include - -#include "codegen/arrow_compute/ext/array_item_index.h" - -using namespace sparkcolumnarplugin::codegen::arrowcompute::extra; -namespace sparkcolumnarplugin { -namespace precompile { -#define TYPED_ASC_SORT_DEFINE(CTYPE) \ - void sort_asc(ArrayItemIndex*, ArrayItemIndex*, std::function); - -TYPED_ASC_SORT_DEFINE(int32_t) -TYPED_ASC_SORT_DEFINE(uint32_t) -TYPED_ASC_SORT_DEFINE(int64_t) -TYPED_ASC_SORT_DEFINE(uint64_t) -TYPED_ASC_SORT_DEFINE(float) -TYPED_ASC_SORT_DEFINE(double) -TYPED_ASC_SORT_DEFINE(std::string) - -void sort_desc(ArrayItemIndex*, ArrayItemIndex*, - std::function); - -} // namespace precompile -} // namespace sparkcolumnarplugin diff --git a/native-sql-engine/cpp/src/precompile/unsafe_array.h b/native-sql-engine/cpp/src/precompile/unsafe_array.h index 42b0c2fe3..e13314135 100644 --- a/native-sql-engine/cpp/src/precompile/unsafe_array.h +++ b/native-sql-engine/cpp/src/precompile/unsafe_array.h @@ -90,7 +90,22 @@ class TypedUnsafeArray> : public Unsaf setNullAt((*unsafe_row).get(), idx_); } else { auto v = typed_array_->GetView(i); - appendToUnsafeRow((*unsafe_row).get(), idx_, v); + switch (v.size()) { + case 1: + appendToUnsafeRow((*unsafe_row).get(), idx_, *(int8_t*)(v.data())); + break; + case 2: + appendToUnsafeRow((*unsafe_row).get(), idx_, *(int16_t*)(v.data())); + break; + case 4: + appendToUnsafeRow((*unsafe_row).get(), idx_, *(int32_t*)(v.data())); + break; + case 8: + appendToUnsafeRow((*unsafe_row).get(), idx_, *(int64_t*)(v.data())); + break; + default: + appendToUnsafeRow((*unsafe_row).get(), idx_, v); + } } return arrow::Status::OK(); } @@ -107,4 +122,4 @@ arrow::Status MakeUnsafeArray(std::shared_ptr type, int idx, const std::shared_ptr& in, std::shared_ptr* out); } // namespace precompile -} // namespace sparkcolumnarplugin \ No newline at end of file +} // namespace sparkcolumnarplugin