From 72d5a7cc9f771cffe0e790adef13e41a9261da33 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Fri, 19 Feb 2021 05:36:30 +0000 Subject: [PATCH] refactor sort kernels --- .../codegen/arrow_compute/ext/sort_kernel.cc | 1499 +++-------------- 1 file changed, 251 insertions(+), 1248 deletions(-) diff --git a/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc b/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc index 7fe2fe373..f9ce516f7 100644 --- a/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc +++ b/cpp/src/codegen/arrow_compute/ext/sort_kernel.cc @@ -78,11 +78,13 @@ using namespace sparkcolumnarplugin::precompile; class SortArraysToIndicesKernel::Impl { public: Impl() {} - Impl(arrow::compute::FunctionContext* ctx, std::shared_ptr result_schema, + Impl(arrow::compute::FunctionContext* ctx, + std::shared_ptr result_schema, std::shared_ptr key_projector, std::vector> projected_types, std::vector> key_field_list, - std::vector sort_directions, std::vector nulls_order, + std::vector sort_directions, + std::vector nulls_order, bool NaN_check) : ctx_(ctx), result_schema_(result_schema), @@ -90,86 +92,118 @@ class SortArraysToIndicesKernel::Impl { key_field_list_(key_field_list), sort_directions_(sort_directions), nulls_order_(nulls_order), - asc_(sort_directions[0]), - nulls_first_(nulls_order[0]), projected_types_(projected_types), NaN_check_(NaN_check) { + #ifdef DEBUG + std::cout << "use SortArraysToIndicesKernel::Impl" << std::endl; + #endif for (auto field : key_field_list) { auto indices = result_schema->GetAllFieldIndices(field->name()); if (indices.size() != 1) { - std::cout << "[ERROR] SortArraysToIndicesKernel::Impl can't find key " + std::cout << "[ERROR] SortMultiplekeyCodegenKernel can't find key " << field->ToString() << " from " << result_schema->ToString() << std::endl; throw; } key_index_list_.push_back(indices[0]); } + col_num_ = result_schema->num_fields(); + if (!key_projector) { + auto status = LoadJITFunction(key_field_list); + if (!status.ok()) { + std::cout << "LoadJITFunction failed, msg is " << status.message() << std::endl; + throw; + } + } else { + int i = 0; + for (auto type : projected_types) { + auto field = arrow::field(std::to_string(i), type); + projected_field_list_.push_back(field); + i++; + } + auto status = LoadJITFunction(projected_field_list_); + if (!status.ok()) { + std::cout << "LoadJITFunction failed, msg is " << status.message() << std::endl; + throw; + } + } } + virtual ~Impl() {} + virtual arrow::Status LoadJITFunction( - std::vector> key_field_list, - std::shared_ptr result_schema) { + std::vector> key_field_list) { // generate ddl signature std::stringstream func_args_ss; - func_args_ss << (key_projector_ ? "project" : "original"); - int col_num = sort_directions_.size(); - for (int i = 0; i < col_num; i++) { + for (int i = 0; i < sort_directions_.size(); i++) { func_args_ss << "[Sorter]" << (nulls_order_[i] ? "nulls_first" : "nulls_last") << "|" << (sort_directions_[i] ? "asc" : "desc"); } - for (auto i : key_index_list_) { - auto field = result_schema->field(i); - func_args_ss << "[sort_key_" << i << "]" << field->type()->ToString(); - } - - func_args_ss << "[schema]"; - for (auto field : result_schema->fields()) { - func_args_ss << field->type()->ToString(); + for (int i = 0; i < key_field_list.size(); i++) { + auto field = key_field_list[i]; + func_args_ss << "[Type]" << field->type()->ToString(); } - #ifdef DEBUG std::cout << "func_args_ss is " << func_args_ss.str() << std::endl; #endif - std::stringstream signature_ss; signature_ss << std::hex << std::hash{}(func_args_ss.str()); signature_ = signature_ss.str(); auto file_lock = FileSpinLock(); - auto status = LoadLibrary(signature_, ctx_, &sorter); + auto status = LoadLibrary(signature_, ctx_, &sorter_); if (!status.ok()) { // process - auto codes = ProduceCodes(result_schema); + auto codes = ProduceCodes(); // compile codes RETURN_NOT_OK(CompileCodes(codes, signature_)); - RETURN_NOT_OK(LoadLibrary(signature_, ctx_, &sorter)); + RETURN_NOT_OK(LoadLibrary(signature_, ctx_, &sorter_)); } FileSpinUnLock(file_lock); return arrow::Status::OK(); } virtual arrow::Status Evaluate(const ArrayList& in) { - std::vector> outputs; - if (key_projector_) { + num_batches_++; + if (cached_.size() <= col_num_) { + cached_.resize(col_num_ + 1); + } + for (int i = 0; i < col_num_; i++) { + cached_[i].push_back(in[i]); + } + if (!key_projector_) { + ArrayList key_cols; + for (auto idx : key_index_list_) { + key_cols.push_back(in[idx]); + } + sorter_->Evaluate(key_cols); + } else { + std::vector> projected_batch; + // do projection here, and the projected arrays are used for comparison auto length = in.size() > 0 ? in[0]->length() : 0; auto in_batch = arrow::RecordBatch::Make(result_schema_, length, in); - RETURN_NOT_OK(key_projector_->Evaluate(*in_batch, ctx_->memory_pool(), &outputs)); + RETURN_NOT_OK( + key_projector_->Evaluate(*in_batch, ctx_->memory_pool(), &projected_batch)); + sorter_->Evaluate(projected_batch); } - RETURN_NOT_OK(sorter->Evaluate(in, outputs)); + items_total_ += in[0]->length(); + length_list_.push_back(in[0]->length()); return arrow::Status::OK(); } virtual arrow::Status MakeResultIterator( std::shared_ptr schema, std::shared_ptr>* out) { - RETURN_NOT_OK(sorter->MakeResultIterator(schema, out)); + std::shared_ptr indices_out; + RETURN_NOT_OK(sorter_->FinishInternal(&indices_out)); + *out = std::make_shared(ctx_, schema, indices_out, cached_); return arrow::Status::OK(); } virtual arrow::Status MakeResultIterator( std::shared_ptr schema, std::shared_ptr>* out) { - RETURN_NOT_OK(sorter->MakeResultIterator(schema, out)); + RETURN_NOT_OK(sorter_->MakeResultIterator(schema, out)); return arrow::Status::OK(); } @@ -179,29 +213,30 @@ class SortArraysToIndicesKernel::Impl { std::string GetSignature() { return signature_; } protected: - std::shared_ptr sorter; + std::shared_ptr sorter_; + std::vector cached_; arrow::compute::FunctionContext* ctx_; std::string signature_; std::vector key_index_list_; std::shared_ptr result_schema_; std::shared_ptr key_projector_; std::vector> projected_types_; + std::vector> projected_field_list_; std::vector> key_field_list_; + std::vector length_list_; + uint64_t num_batches_ = 0; + uint64_t items_total_ = 0; // true for asc, false for desc std::vector sort_directions_; // true for nulls_first, false for nulls_last std::vector nulls_order_; - // keep the direction and nulls order for the first key - bool nulls_first_; - bool asc_; bool NaN_check_; + int col_num_; class TypedSorterCodeGenImpl { public: - TypedSorterCodeGenImpl(std::string indice, std::shared_ptr data_type, - std::string name) - : indice_(indice), data_type_(data_type), name_(name) {} - std::string GetTypeNameString() { return "arrow::" + GetTypeString(data_type_); } + TypedSorterCodeGenImpl(std::string indice, std::shared_ptr data_type) + : indice_(indice), data_type_(data_type) {} std::string GetCachedVariablesDefine() { std::stringstream ss; ss << "using ArrayType_" << indice_ << " = " << GetTypeString(data_type_, "Array") @@ -210,94 +245,41 @@ class SortArraysToIndicesKernel::Impl { << "_;" << std::endl; return ss.str(); } - std::string GetResultIterDefine() { - std::stringstream ss; - ss << "cached_" << indice_ << "_ = cached_" << indice_ << ";" << std::endl; - ss << "builder_" + indice_ + "_ = std::make_shared<" - << GetTypeString(data_type_, "Builder") << ">(ctx_->memory_pool());" - << std::endl; - return ss.str(); - } - std::string GetFieldDefine() { - return "arrow::field(\"" + name_ + "\", data_type_" + indice_ + ")"; - } - std::string GetResultIterVariables() { - std::stringstream ss; - ss << "using ArrayType_" << indice_ << " = " + GetTypeString(data_type_, "Array") - << ";" << std::endl; - ss << "using BuilderType_" << indice_ << " = " - << GetTypeString(data_type_, "Builder") << ";" << std::endl; - ss << "std::vector> cached_" << indice_ - << "_;" << std::endl; - ss << "std::shared_ptr builder_" << indice_ << "_;" - << std::endl; - ss << "std::shared_ptr data_type_" << indice_ - << " = arrow::" + GetArrowTypeDefString(data_type_) << ";" << std::endl; - return ss.str(); - } private: std::string indice_; - std::string name_; std::shared_ptr data_type_; }; - virtual std::string ProduceCodes(std::shared_ptr result_schema) { + virtual std::string ProduceCodes() { int indice = 0; - std::vector> shuffle_typed_codegen_list; - for (auto field : result_schema->fields()) { - auto codegen = std::make_shared( - std::to_string(indice), field->type(), field->name()); - shuffle_typed_codegen_list.push_back(codegen); - indice++; + std::vector> key_typed_codegen_list; + if (key_projector_) { + for (auto field : projected_field_list_) { + auto codegen = std::make_shared( + std::to_string(indice), field->type()); + key_typed_codegen_list.push_back(codegen); + indice++; + } + } else { + for (auto field : key_field_list_) { + auto codegen = std::make_shared( + std::to_string(indice), field->type()); + key_typed_codegen_list.push_back(codegen); + indice++; + } } - std::string cached_insert_str = GetCachedInsert( - shuffle_typed_codegen_list.size(), projected_types_.size(), key_projector_, - key_index_list_); - std::string comp_func_str = - GetCompFunction(key_index_list_, key_projector_, projected_types_, - key_field_list_, sort_directions_, nulls_order_); - std::string comp_func_str_without_null = - GetCompFunctionWithoutNull(key_index_list_, key_projector_, projected_types_, - key_field_list_, sort_directions_); - std::string pre_sort_valid_str = GetPreSortValid(); + std::string cached_insert_str = GetCachedInsert(); + + std::string comp_func_str = GetCompFunction(true); - std::string pre_sort_null_str = GetPreSortNull(); + std::string comp_func_str_without_null = GetCompFunction(false); std::string sort_func_str = GetSortFunction(); - std::string make_result_iter_str = - GetMakeResultIter(shuffle_typed_codegen_list.size()); - std::string cached_variables_define_str = - GetCachedVariablesDefine(shuffle_typed_codegen_list); - - std::string result_iter_param_define_str = - GetResultIterParamsDefine(shuffle_typed_codegen_list.size()); - - std::string result_iter_define_str = GetResultIterDefine(shuffle_typed_codegen_list); - - std::string typed_build_str = GetTypedBuild(shuffle_typed_codegen_list.size()); - - std::string result_variables_define_str = - GetResultIterVariables(shuffle_typed_codegen_list); - - std::string typed_res_array_build_str = - GetTypedResArrayBuild(shuffle_typed_codegen_list.size()); - - std::string typed_res_array_str = GetTypedResArray(shuffle_typed_codegen_list.size()); - - std::string projected_variables_str = - GetProjectedVariables(key_projector_, projected_types_); - - std::string cur_col_str = GetCurCol(key_index_list_[0], key_projector_); - - std::string first_cmp_col_str = GetFirstCmpCol(key_index_list_[0], key_projector_); - - std::string key_relation_list_str = GetKeyRelationList(key_index_list_); - - std::string make_key_relation_str = GetMakeKeyRelation(shuffle_typed_codegen_list); + GetCachedVariablesDefine(key_typed_codegen_list); return BaseCodes() + R"( #include @@ -317,13 +299,12 @@ class TypedSorterImpl : public CodeGenBase { public: TypedSorterImpl(arrow::compute::FunctionContext* ctx) : ctx_(ctx) {} - arrow::Status Evaluate(const ArrayList& in, const ArrayList& projected_batch) override { + arrow::Status Evaluate(const ArrayList& in) override { num_batches_++; )" + cached_insert_str + - cur_col_str + R"( + auto cur = cached_0_[cached_0_.size() - 1]; items_total_ += cur->length(); - nulls_total_ += cur->null_count(); length_list_.push_back(cur->length()); return arrow::Status::OK(); @@ -360,89 +341,16 @@ class TypedSorterImpl : public CodeGenBase { RETURN_NOT_OK(MakeFixedSizeBinaryArray(out_type, items_total_, indices_buf, out)); return arrow::Status::OK(); } - - arrow::Status MakeResultIterator( - std::shared_ptr schema, - std::shared_ptr>* out) override { - std::shared_ptr indices_out; - RETURN_NOT_OK(FinishInternal(&indices_out)); - )" + make_result_iter_str + - R"( - return arrow::Status::OK(); - } private: )" + cached_variables_define_str + - projected_variables_str + R"( std::vector length_list_; arrow::compute::FunctionContext* ctx_; uint64_t num_batches_ = 0; uint64_t items_total_ = 0; - uint64_t nulls_total_ = 0; + // If all batches has no null value, has_null_ will remain false bool has_null_ = false; - - class SortRelationResultIterator : public ResultIterator { - public: - SortRelationResultIterator(std::shared_ptr sort_relation) - : sort_relation_(sort_relation) {} - arrow::Status Next(std::shared_ptr* out) { - *out = sort_relation_; - return arrow::Status::OK(); - } - - private: - std::shared_ptr sort_relation_; - }; - - class SorterResultIterator : public ResultIterator { - public: - SorterResultIterator(arrow::compute::FunctionContext* ctx, - std::shared_ptr indices_in, - )" + result_iter_param_define_str + - R"(): ctx_(ctx), total_length_(indices_in->length()), indices_in_cache_(indices_in) { - )" + result_iter_define_str + - R"( - indices_begin_ = (ArrayItemIndexS*)indices_in->value_data(); - } - - std::string ToString() override { return "SortArraysToIndicesResultIterator"; } - - bool HasNext() override { - if (offset_ >= total_length_) { - return false; - } - return true; - } - - arrow::Status Next(std::shared_ptr* out) { - auto length = (total_length_ - offset_) > )" + - std::to_string(GetBatchSize()) + R"( ? )" + std::to_string(GetBatchSize()) + - R"( : (total_length_ - offset_); - uint64_t count = 0; - while (count < length) { - auto item = indices_begin_ + offset_ + count++; - )" + typed_build_str + - R"( - } - offset_ += length; - )" + typed_res_array_build_str + - R"( - *out = arrow::RecordBatch::Make(result_schema_, length, {)" + - typed_res_array_str + R"(}); - return arrow::Status::OK(); - } - - private: - )" + result_variables_define_str + - R"( - std::shared_ptr indices_in_cache_; - uint64_t offset_ = 0; - ArrayItemIndexS* indices_begin_; - const uint64_t total_length_; - std::shared_ptr result_schema_; - arrow::compute::FunctionContext* ctx_; - }; }; extern "C" void MakeCodeGen(arrow::compute::FunctionContext* ctx, @@ -452,115 +360,85 @@ extern "C" void MakeCodeGen(arrow::compute::FunctionContext* ctx, )"; } - std::string GetCachedInsert(int shuffle_size, int projected_size, - const std::shared_ptr& key_projector, - const std::vector& sort_key_index_list) { + + std::string GetCachedInsert() { std::stringstream ss; - for (int i = 0; i < shuffle_size; i++) { - ss << "cached_" << i << "_.push_back(std::make_shared(in[" - << i << "]));" << std::endl; - } - if (key_projector) { - for (int i = 0; i < projected_size; i++) { - ss << "projected_" << i << "_.push_back(std::make_shared(projected_batch[" << i << "]));" << std::endl; - } - } - if (key_projector) { - for (int i = 0; i < projected_size; i++) { - ss << "if (!has_null_ && projected_" << i - << "_[projected_0_.size() - 1]->null_count() > 0) { " << "has_null_ = true;}" - << std::endl; - } - } else { - for (int i = 0; i < sort_key_index_list.size(); i++) { - int key_id = sort_key_index_list[i]; - ss << "if (!has_null_ && cached_" << key_id << "_[cached_" << key_id - << "_.size() - 1]->null_count() > 0) {" - << "has_null_ = true;}" << std::endl; - } + for (int i = 0; i < key_index_list_.size(); i++) { + ss << "cached_" << i << "_.push_back(std::make_shared(in[" << i << "]));\n" + // update has_null_ + << "if (!has_null_ && cached_" << i << "_[cached_" << i + << "_.size() - 1]->null_count() > 0) {" + << "has_null_ = true;}" << std::endl; } return ss.str(); } - std::string GetCompFunction( - const std::vector& sort_key_index_list, - const std::shared_ptr& key_projector, - const std::vector>& projected_types, - const std::vector>& key_field_list, - const std::vector& sort_directions, const std::vector& nulls_order) { + + std::string GetSortFunction() { std::stringstream ss; - bool projected; - if (key_projector) { - projected = true; - } else { - projected = false; - } - ss << "auto comp = [this](ArrayItemIndexS x, ArrayItemIndexS y) {" - << GetCompFunction_(0, projected, sort_key_index_list, key_field_list, - projected_types, sort_directions, nulls_order) - << "};\n"; + ss << "if (has_null_) {\n" + << "gfx::timsort(indices_begin, indices_begin + items_total_, comp);} else {\n" + << "gfx::timsort(indices_begin, indices_begin + items_total_, comp_without_null);}" + << std::endl; return ss.str(); + } - std::string GetCompFunctionWithoutNull( - const std::vector& sort_key_index_list, - const std::shared_ptr& key_projector, - const std::vector>& projected_types, - const std::vector>& key_field_list, - const std::vector& sort_directions) { + + std::string GetCompFunction(bool has_null) { std::stringstream ss; bool projected; - if (key_projector) { + if (key_projector_) { projected = true; } else { projected = false; } - ss << "auto comp_without_null = [this](ArrayItemIndexS x, ArrayItemIndexS y) {" - << GetCompFunction_Without_Null_(0, projected, sort_key_index_list, key_field_list, - projected_types, sort_directions) - << "};\n"; + if (has_null) { + ss << "auto comp = [this](ArrayItemIndexS x, ArrayItemIndexS y) {" + << GetCompFunction_(0, projected, key_field_list_, + projected_types_, sort_directions_, nulls_order_); + } else { + ss << "auto comp_without_null = [this](ArrayItemIndexS x, ArrayItemIndexS y) {" + << GetCompFunction_Without_Null_(0, projected, key_field_list_, + projected_types_, sort_directions_); + } + ss << "};\n"; return ss.str(); } + std::string GetCompFunction_( - int cur_key_index, bool projected, const std::vector& sort_key_index_list, + int cur_key_idx, bool projected, const std::vector>& key_field_list, const std::vector>& projected_types, const std::vector& sort_directions, const std::vector& nulls_order) { std::string comp_str; - int cur_key_id; - auto field = key_field_list[cur_key_index]; - bool asc = sort_directions[cur_key_index]; - bool nulls_first = nulls_order[cur_key_index]; + bool asc = sort_directions[cur_key_idx]; + bool nulls_first = nulls_order[cur_key_idx]; std::shared_ptr data_type; std::string array; // if projected, use projected batch to compare, and use projected type + array = "cached_"; if (projected) { - array = "projected_"; - data_type = projected_types[cur_key_index]; - // use the index of projected key - cur_key_id = cur_key_index; + data_type = projected_types[cur_key_idx]; } else { - array = "cached_"; - data_type = field->type(); - // use the key_id - cur_key_id = sort_key_index_list[cur_key_index]; + data_type = key_field_list[cur_key_idx]->type(); } auto x_num_value = - array + std::to_string(cur_key_id) + "_[x.array_id]->GetView(x.id)"; + array + std::to_string(cur_key_idx) + "_[x.array_id]->GetView(x.id)"; auto x_str_value = - array + std::to_string(cur_key_id) + "_[x.array_id]->GetString(x.id)"; + array + std::to_string(cur_key_idx) + "_[x.array_id]->GetString(x.id)"; auto y_num_value = - array + std::to_string(cur_key_id) + "_[y.array_id]->GetView(y.id)"; + array + std::to_string(cur_key_idx) + "_[y.array_id]->GetView(y.id)"; auto y_str_value = - array + std::to_string(cur_key_id) + "_[y.array_id]->GetString(y.id)"; - auto is_x_null = array + std::to_string(cur_key_id) + "_[x.array_id]->IsNull(x.id)"; - auto is_y_null = array + std::to_string(cur_key_id) + "_[y.array_id]->IsNull(y.id)"; + array + std::to_string(cur_key_idx) + "_[y.array_id]->GetString(y.id)"; + auto is_x_null = array + std::to_string(cur_key_idx) + "_[x.array_id]->IsNull(x.id)"; + auto is_y_null = array + std::to_string(cur_key_idx) + "_[y.array_id]->IsNull(y.id)"; auto x_null_count = - array + std::to_string(cur_key_id) + "_[x.array_id]->null_count() > 0"; + array + std::to_string(cur_key_idx) + "_[x.array_id]->null_count() > 0"; auto y_null_count = - array + std::to_string(cur_key_id) + "_[y.array_id]->null_count() > 0"; + array + std::to_string(cur_key_idx) + "_[y.array_id]->null_count() > 0"; auto x_null = "(" + x_null_count + " && " + is_x_null + " )"; - auto y_null = "(" + y_null_count + " && " + is_y_null + " )"; + auto y_null = "(" + y_null_count + " && " + is_y_null + " )"; auto is_x_nan = "std::isnan(" + x_num_value + ")"; auto is_y_nan = "std::isnan(" + y_num_value + ")"; @@ -618,7 +496,7 @@ extern "C" void MakeCodeGen(arrow::compute::FunctionContext* ctx, } } comp_str = ss.str(); - if ((cur_key_index + 1) == sort_key_index_list.size()) { + if ((cur_key_idx + 1) == sort_directions.size()) { return comp_str; } // clear the contents of stringstream @@ -638,43 +516,37 @@ extern "C" void MakeCodeGen(arrow::compute::FunctionContext* ctx, << " == " << y_num_value << ")) {"; } } - ss << GetCompFunction_(cur_key_index + 1, projected, sort_key_index_list, - key_field_list, projected_types, sort_directions, nulls_order) + ss << GetCompFunction_(cur_key_idx + 1, projected, key_field_list, + projected_types, sort_directions, nulls_order) << "} else { " << comp_str << "}"; return ss.str(); } + std::string GetCompFunction_Without_Null_( - int cur_key_index, bool projected, const std::vector& sort_key_index_list, + int cur_key_idx, bool projected, const std::vector>& key_field_list, const std::vector>& projected_types, const std::vector& sort_directions) { std::string comp_str; - int cur_key_id; - auto field = key_field_list[cur_key_index]; - bool asc = sort_directions[cur_key_index]; + bool asc = sort_directions[cur_key_idx]; std::shared_ptr data_type; std::string array; // if projected, use projected batch to compare, and use projected type + array = "cached_"; if (projected) { - array = "projected_"; - data_type = projected_types[cur_key_index]; - // use the index of projected key - cur_key_id = cur_key_index; + data_type = projected_types[cur_key_idx]; } else { - array = "cached_"; - data_type = field->type(); - // use the key_id - cur_key_id = sort_key_index_list[cur_key_index]; + data_type = key_field_list[cur_key_idx]->type(); } auto x_num_value = - array + std::to_string(cur_key_id) + "_[x.array_id]->GetView(x.id)"; + array + std::to_string(cur_key_idx) + "_[x.array_id]->GetView(x.id)"; auto x_str_value = - array + std::to_string(cur_key_id) + "_[x.array_id]->GetString(x.id)"; + array + std::to_string(cur_key_idx) + "_[x.array_id]->GetString(x.id)"; auto y_num_value = - array + std::to_string(cur_key_id) + "_[y.array_id]->GetView(y.id)"; + array + std::to_string(cur_key_idx) + "_[y.array_id]->GetView(y.id)"; auto y_str_value = - array + std::to_string(cur_key_id) + "_[y.array_id]->GetString(y.id)"; + array + std::to_string(cur_key_idx) + "_[y.array_id]->GetString(y.id)"; auto is_x_nan = "std::isnan(" + x_num_value + ")"; auto is_y_nan = "std::isnan(" + y_num_value + ")"; @@ -719,7 +591,7 @@ extern "C" void MakeCodeGen(arrow::compute::FunctionContext* ctx, ss << "}" << std::endl; } comp_str = ss.str(); - if ((cur_key_index + 1) == sort_key_index_list.size()) { + if ((cur_key_idx + 1) == sort_directions.size()) { return comp_str; } // clear the contents of stringstream @@ -736,56 +608,12 @@ extern "C" void MakeCodeGen(arrow::compute::FunctionContext* ctx, ss << "if (" << x_num_value << " == " << y_num_value << ") {"; } } - ss << GetCompFunction_Without_Null_(cur_key_index + 1, projected, sort_key_index_list, - key_field_list, projected_types, sort_directions) + ss << GetCompFunction_Without_Null_(cur_key_idx + 1, projected, key_field_list, + projected_types, sort_directions) << "} else { " << comp_str << "}"; return ss.str(); } - std::string GetPreSortValid() { - if (nulls_first_) { - return R"( - (indices_begin + nulls_total_ + indices_i)->array_id = array_id; - (indices_begin + nulls_total_ + indices_i)->id = i;)"; - } else { - return R"( - (indices_begin + indices_i)->array_id = array_id; - (indices_begin + indices_i)->id = i;)"; - } - } - std::string GetPreSortNull() { - if (nulls_first_) { - return R"( - (indices_begin + indices_null)->array_id = array_id; - (indices_begin + indices_null)->id = i;)"; - } else { - return R"( - (indices_end - nulls_total_ + indices_null)->array_id = array_id; - (indices_end - nulls_total_ + indices_null)->id = i;)"; - } - } - std::string GetSortFunction() { - std::stringstream ss; - ss << "if (has_null_) {\n" - << "gfx::timsort(indices_begin, indices_begin + items_total_, comp);} else {\n" - << "gfx::timsort(indices_begin, indices_begin + items_total_, comp_without_null);}" - << std::endl; - return ss.str(); - } - std::string GetMakeResultIter(int shuffle_size) { - std::stringstream ss; - std::stringstream params_ss; - for (int i = 0; i < shuffle_size; i++) { - if (i + 1 < shuffle_size) { - params_ss << "cached_" << i << "_,"; - } else { - params_ss << "cached_" << i << "_"; - } - } - auto params = params_ss.str(); - ss << "*out = std::make_shared(ctx_, indices_out, " << params - << ");"; - return ss.str(); - } + std::string GetCachedVariablesDefine( std::vector> shuffle_typed_codegen_list) { std::stringstream ss; @@ -794,143 +622,98 @@ extern "C" void MakeCodeGen(arrow::compute::FunctionContext* ctx, } return ss.str(); } - std::string GetResultIterParamsDefine(int shuffle_size) { - std::stringstream ss; - for (int i = 0; i < shuffle_size; i++) { - if (i + 1 < shuffle_size) { - ss << "std::vector> cached_" << i << "," - << std::endl; - } else { - ss << "std::vector> cached_" << i; - } + + class SortRelationResultIterator : public ResultIterator { + public: + SortRelationResultIterator(std::shared_ptr sort_relation) + : sort_relation_(sort_relation) {} + arrow::Status Next(std::shared_ptr* out) { + *out = sort_relation_; + return arrow::Status::OK(); } - return ss.str(); - } - std::string GetResultIterDefine( - std::vector> shuffle_typed_codegen_list) { - std::stringstream ss; - std::stringstream field_define_ss; - for (auto codegen : shuffle_typed_codegen_list) { - ss << codegen->GetResultIterDefine() << std::endl; - if (codegen != *(shuffle_typed_codegen_list.end() - 1)) { - field_define_ss << codegen->GetFieldDefine() << ","; - } else { - field_define_ss << codegen->GetFieldDefine(); + private: + std::shared_ptr sort_relation_; + }; + + class SorterResultIterator : public ResultIterator { + public: + SorterResultIterator(arrow::compute::FunctionContext* ctx, + std::shared_ptr schema, + std::shared_ptr indices_in, + std::vector& cached) + : ctx_(ctx), + schema_(schema), + indices_in_cache_(indices_in), + total_length_(indices_in->length()), + cached_in_(cached) { + col_num_ = schema->num_fields(); + indices_begin_ = (ArrayItemIndexS*)indices_in->value_data(); + // appender_type won't be used + AppenderBase::AppenderType appender_type = AppenderBase::left; + for (int i = 0; i < col_num_; i++) { + auto field = schema->field(i); + std::shared_ptr appender; + MakeAppender(ctx_, field->type(), appender_type, &appender); + appender_list_.push_back(appender); } - } - ss << "result_schema_ = arrow::schema({" << field_define_ss.str() << "});\n" - << std::endl; - return ss.str(); - } - std::string GetTypedBuild(int shuffle_size) { - std::stringstream ss; - for (int i = 0; i < shuffle_size; i++) { - ss << "if (!cached_" << i << "_[item->array_id]->IsNull(item->id)) {\n" - << " RETURN_NOT_OK(builder_" << i << "_->Append(cached_" << i - << "_[item->array_id]->GetView(item->id)));\n" - << "} else {\n" - << " RETURN_NOT_OK(builder_" << i << "_->AppendNull());\n" - << "}" << std::endl; - } - return ss.str(); - } - std::string GetTypedResArrayBuild(int shuffle_size) { - std::stringstream ss; - for (int i = 0; i < shuffle_size; i++) { - ss << "std::shared_ptr out_" << i << ";\n" - << "RETURN_NOT_OK(builder_" << i << "_->Finish(&out_" << i << "));\n" - << "builder_" << i << "_->Reset();" << std::endl; - } - return ss.str(); - } - std::string GetTypedResArray(int shuffle_size) { - std::stringstream ss; - for (int i = 0; i < shuffle_size; i++) { - if (i + 1 < shuffle_size) { - ss << "out_" << i << ", "; - } else { - ss << "out_" << i; + for (int i = 0; i < col_num_; i++) { + arrow::ArrayVector array_vector = cached_in_[i]; + int array_num = array_vector.size(); + for (int array_id = 0; array_id < array_num; array_id++) { + auto arr = array_vector[array_id]; + appender_list_[i]->AddArray(arr); + } } + batch_size_ = GetBatchSize(); } - return ss.str(); - } - std::string GetResultIterVariables( - std::vector> shuffle_typed_codegen_list) { - std::stringstream ss; - for (auto codegen : shuffle_typed_codegen_list) { - ss << codegen->GetResultIterVariables() << std::endl; - } - return ss.str(); - } - std::string GetProjectedVariables( - const std::shared_ptr& key_projector, - const std::vector>& projected_types) { - std::stringstream ss; - if (key_projector) { - for (int i = 0; i < projected_types.size(); i++) { - ss << "using ProjectedType_" << i - << " = " + GetTypeString(projected_types[i], "Array") << ";" << std::endl; - ss << "std::vector> projected_" << i - << "_;" << std::endl; - ss << std::endl; + ~SorterResultIterator(){} + + std::string ToString() override { return "SortArraysToIndicesResultIterator"; } + + bool HasNext() override { + if (offset_ >= total_length_) { + return false; } + return true; } - return ss.str(); - } - std::string GetCurCol(int first_key_id, - const std::shared_ptr& key_projector) { - std::stringstream ss; - std::string array; - if (key_projector) { - array = "projected_"; - ss << "auto cur = " + array << "0_[" + array << "0_.size() - 1];" << std::endl; - } else { - array = "cached_"; - ss << "auto cur = " + array << first_key_id << "_[" + array << first_key_id - << "_.size() - 1];" << std::endl; - } - return ss.str(); - } - std::string GetFirstCmpCol(int first_key_id, - const std::shared_ptr& key_projector) { - std::stringstream ss; - std::string array; - if (key_projector) { - array = "projected_"; - ss << array << "0_[array_id]"; - } else { - array = "cached_"; - ss << array << first_key_id << "_[array_id]"; - } - return ss.str(); - } - std::string GetKeyRelationList(std::vector key_index_list) { - std::vector relation_col_list; - for (auto idx : key_index_list) { - relation_col_list.push_back("sort_relation_list[" + std::to_string(idx) + "]"); - } - return GetParameterList(relation_col_list, false); - } + arrow::Status Next(std::shared_ptr* out) { + auto length = (total_length_ - offset_) > batch_size_ ? batch_size_ + : (total_length_ - offset_); + uint64_t count = 0; + while (count < length) { + auto item = indices_begin_ + offset_ + count++; + for (int i = 0; i < col_num_; i++) { + RETURN_NOT_OK(appender_list_[i]->Append(item->array_id, item->id)); + } + } + offset_ += length; + ArrayList arrays; + for (int i = 0; i < col_num_; i++) { + std::shared_ptr out_array; + RETURN_NOT_OK(appender_list_[i]->Finish(&out_array)); + arrays.push_back(out_array); + appender_list_[i]->Reset(); + } - std::string GetMakeKeyRelation( - std::vector> shuffle_typed_codegen_list) { - std::stringstream ss; - int numFields = shuffle_typed_codegen_list.size(); - ss << "std::shared_ptr sort_relation_out;" << std::endl; - for (int i = 0; i < numFields; i++) { - auto array_vector_name = "cached_" + std::to_string(i) + "_"; - ss << "RETURN_NOT_OK(MakeRelationColumn(" - << shuffle_typed_codegen_list[i]->GetTypeNameString() - << "::type_id, &sort_relation_out));" << std::endl; - ss << "for (auto arr : " << array_vector_name << ") {" << std::endl; - ss << " RETURN_NOT_OK(sort_relation_out->AppendColumn(arr->cache_));" << std::endl; - ss << "}" << std::endl; - ss << "sort_relation_list.push_back(sort_relation_out);" << std::endl; - ss << std::endl; + *out = arrow::RecordBatch::Make(schema_, length, arrays); + return arrow::Status::OK(); } - return ss.str(); - } + + private: + uint64_t offset_ = 0; + const uint64_t total_length_; + std::shared_ptr schema_; + arrow::compute::FunctionContext* ctx_; + uint64_t batch_size_; + int col_num_; + ArrayItemIndexS* indices_begin_; + std::vector cached_in_; + std::vector> type_list_; + std::vector> appender_list_; + std::vector> array_list_; + std::shared_ptr indices_in_cache_; + }; }; /////////////// SortArraysInPlace //////////////// @@ -1574,7 +1357,7 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl { }); } } else { - auto comp = [this](ArrayItemIndexS x, ArrayItemIndexS y) { + auto comp = [this](ArrayItemIndexS& x, ArrayItemIndexS& y) { return cached_key_[x.array_id]->GetView(x.id) > cached_key_[y.array_id]->GetView(y.id); }; @@ -1592,7 +1375,7 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl { auto Sort(ArrayItemIndexS* indices_begin, ArrayItemIndexS* indices_end, int64_t num_nan) -> typename std::enable_if_t::value> { if (asc_) { - auto comp = [this](ArrayItemIndexS x, ArrayItemIndexS y) { + auto comp = [this](ArrayItemIndexS& x, ArrayItemIndexS& y) { return cached_key_[x.array_id]->GetString(x.id) < cached_key_[y.array_id]->GetString(y.id); }; @@ -1602,7 +1385,7 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl { std::sort(indices_begin, indices_begin + items_total_ - nulls_total_, comp); } } else { - auto comp = [this](ArrayItemIndexS x, ArrayItemIndexS y) { + auto comp = [this](ArrayItemIndexS& x, ArrayItemIndexS& y) { return cached_key_[x.array_id]->GetString(x.id) > cached_key_[y.array_id]->GetString(y.id); }; @@ -1658,97 +1441,6 @@ class SortOnekeyKernel : public SortArraysToIndicesKernel::Impl { uint64_t nulls_total_ = 0; int col_num_; int key_id_; - class SortRelationResultIterator : public ResultIterator { - public: - SortRelationResultIterator(std::shared_ptr sort_relation) - : sort_relation_(sort_relation) {} - arrow::Status Next(std::shared_ptr* out) { - *out = sort_relation_; - return arrow::Status::OK(); - } - - private: - std::shared_ptr sort_relation_; - }; - class SorterResultIterator : public ResultIterator { - public: - SorterResultIterator(arrow::compute::FunctionContext* ctx, - std::shared_ptr schema, - std::shared_ptr indices_in, - std::vector& cached) - : ctx_(ctx), - schema_(schema), - indices_in_cache_(indices_in), - total_length_(indices_in->length()), - cached_in_(cached) { - col_num_ = schema->num_fields(); - indices_begin_ = (ArrayItemIndexS*)indices_in->value_data(); - // appender_type won't be used - AppenderBase::AppenderType appender_type = AppenderBase::left; - for (int i = 0; i < col_num_; i++) { - auto field = schema->field(i); - std::shared_ptr appender; - MakeAppender(ctx_, field->type(), appender_type, &appender); - appender_list_.push_back(appender); - } - for (int i = 0; i < col_num_; i++) { - arrow::ArrayVector array_vector = cached_in_[i]; - int array_num = array_vector.size(); - for (int array_id = 0; array_id < array_num; array_id++) { - auto arr = array_vector[array_id]; - appender_list_[i]->AddArray(arr); - } - } - batch_size_ = GetBatchSize(); - } - ~SorterResultIterator() {} - - std::string ToString() override { return "SortArraysToIndicesResultIterator"; } - - bool HasNext() override { - if (offset_ >= total_length_) { - return false; - } - return true; - } - - arrow::Status Next(std::shared_ptr* out) { - auto length = (total_length_ - offset_) > batch_size_ ? batch_size_ - : (total_length_ - offset_); - uint64_t count = 0; - while (count < length) { - auto item = indices_begin_ + offset_ + count++; - for (int i = 0; i < col_num_; i++) { - RETURN_NOT_OK(appender_list_[i]->Append(item->array_id, item->id)); - } - } - offset_ += length; - ArrayList arrays; - for (int i = 0; i < col_num_; i++) { - std::shared_ptr out_array; - RETURN_NOT_OK(appender_list_[i]->Finish(&out_array)); - arrays.push_back(out_array); - appender_list_[i]->Reset(); - } - - *out = arrow::RecordBatch::Make(schema_, length, arrays); - return arrow::Status::OK(); - } - - private: - uint64_t offset_ = 0; - const uint64_t total_length_; - std::shared_ptr schema_; - arrow::compute::FunctionContext* ctx_; - uint64_t batch_size_; - int col_num_; - ArrayItemIndexS* indices_begin_; - std::vector cached_in_; - std::vector> type_list_; - std::vector> appender_list_; - std::vector> array_list_; - std::shared_ptr indices_in_cache_; - }; }; /////////////// SortArraysMultipleKeys //////////////// @@ -1922,698 +1614,8 @@ class SortMultiplekeyKernel : public SortArraysToIndicesKernel::Impl { uint64_t num_batches_ = 0; uint64_t items_total_ = 0; int col_num_; - std::vector> cmp_functions_; - - class SorterResultIterator : public ResultIterator { - public: - SorterResultIterator(arrow::compute::FunctionContext* ctx, - std::shared_ptr schema, - std::shared_ptr indices_in, - std::vector& cached) - : ctx_(ctx), - schema_(schema), - indices_in_cache_(indices_in), - total_length_(indices_in->length()), - cached_in_(cached) { - col_num_ = schema->num_fields(); - indices_begin_ = (ArrayItemIndexS*)indices_in->value_data(); - // appender_type won't be used - AppenderBase::AppenderType appender_type = AppenderBase::left; - for (int i = 0; i < col_num_; i++) { - auto field = schema->field(i); - std::shared_ptr appender; - MakeAppender(ctx_, field->type(), appender_type, &appender); - appender_list_.push_back(appender); - } - for (int i = 0; i < col_num_; i++) { - arrow::ArrayVector array_vector = cached_in_[i]; - int array_num = array_vector.size(); - for (int array_id = 0; array_id < array_num; array_id++) { - auto arr = array_vector[array_id]; - appender_list_[i]->AddArray(arr); - } - } - batch_size_ = GetBatchSize(); - } - ~SorterResultIterator(){} - - std::string ToString() override { return "SortArraysToIndicesResultIterator"; } - - bool HasNext() override { - if (offset_ >= total_length_) { - return false; - } - return true; - } - - arrow::Status Next(std::shared_ptr* out) { - auto length = (total_length_ - offset_) > batch_size_ ? batch_size_ - : (total_length_ - offset_); - uint64_t count = 0; - while (count < length) { - auto item = indices_begin_ + offset_ + count++; - for (int i = 0; i < col_num_; i++) { - RETURN_NOT_OK(appender_list_[i]->Append(item->array_id, item->id)); - } - } - offset_ += length; - ArrayList arrays; - for (int i = 0; i < col_num_; i++) { - std::shared_ptr out_array; - RETURN_NOT_OK(appender_list_[i]->Finish(&out_array)); - arrays.push_back(out_array); - appender_list_[i]->Reset(); - } - - *out = arrow::RecordBatch::Make(schema_, length, arrays); - return arrow::Status::OK(); - } - - private: - uint64_t offset_ = 0; - const uint64_t total_length_; - std::shared_ptr schema_; - arrow::compute::FunctionContext* ctx_; - uint64_t batch_size_; - int col_num_; - ArrayItemIndexS* indices_begin_; - std::vector cached_in_; - std::vector> type_list_; - std::vector> appender_list_; - std::vector> array_list_; - std::shared_ptr indices_in_cache_; - }; -}; - -/////////////// SortMultiplekeyCodegenKernel //////////////// -class SortMultiplekeyCodegenKernel : public SortArraysToIndicesKernel::Impl { - public: - SortMultiplekeyCodegenKernel(arrow::compute::FunctionContext* ctx, - std::shared_ptr result_schema, - std::shared_ptr key_projector, - std::vector> projected_types, - std::vector> key_field_list, - std::vector sort_directions, - std::vector nulls_order, - bool NaN_check) - : ctx_(ctx), - nulls_order_(nulls_order), - sort_directions_(sort_directions), - result_schema_(result_schema), - key_projector_(key_projector), - key_field_list_(key_field_list), - projected_types_(projected_types), - NaN_check_(NaN_check) { - #ifdef DEBUG - std::cout << "use SortMultiplekeyCodegenKernel" << std::endl; - #endif - for (auto field : key_field_list) { - auto indices = result_schema->GetAllFieldIndices(field->name()); - if (indices.size() != 1) { - std::cout << "[ERROR] SortMultiplekeyCodegenKernel can't find key " - << field->ToString() << " from " << result_schema->ToString() - << std::endl; - throw; - } - key_index_list_.push_back(indices[0]); - } - col_num_ = result_schema->num_fields(); - if (!key_projector) { - auto status = LoadJITFunction(key_field_list); - if (!status.ok()) { - std::cout << "LoadJITFunction failed, msg is " << status.message() << std::endl; - throw; - } - } else { - int i = 0; - for (auto type : projected_types) { - auto field = arrow::field(std::to_string(i), type); - projected_field_list_.push_back(field); - i++; - } - auto status = LoadJITFunction(projected_field_list_); - if (!status.ok()) { - std::cout << "LoadJITFunction failed, msg is " << status.message() << std::endl; - throw; - } - } - } - ~SortMultiplekeyCodegenKernel(){} - - arrow::Status LoadJITFunction( - std::vector> key_field_list) { - // generate ddl signature - std::stringstream func_args_ss; - for (int i = 0; i < sort_directions_.size(); i++) { - func_args_ss << "[Sorter]" << (nulls_order_[i] ? "nulls_first" : "nulls_last") - << "|" << (sort_directions_[i] ? "asc" : "desc"); - } - for (int i = 0; i < key_field_list.size(); i++) { - auto field = key_field_list[i]; - func_args_ss << "[Type]" << field->type()->ToString(); - } - -#ifdef DEBUG - std::cout << "func_args_ss is " << func_args_ss.str() << std::endl; -#endif - - std::stringstream signature_ss; - signature_ss << std::hex << std::hash{}(func_args_ss.str()); - signature_ = signature_ss.str(); - - auto file_lock = FileSpinLock(); - auto status = LoadLibrary(signature_, ctx_, &sorter_); - if (!status.ok()) { - // process - auto codes = ProduceCodes(); - // compile codes - RETURN_NOT_OK(CompileCodes(codes, signature_)); - RETURN_NOT_OK(LoadLibrary(signature_, ctx_, &sorter_)); - } - FileSpinUnLock(file_lock); - return arrow::Status::OK(); - } - - arrow::Status Evaluate(const ArrayList& in) override { - num_batches_++; - if (cached_.size() <= col_num_) { - cached_.resize(col_num_ + 1); - } - for (int i = 0; i < col_num_; i++) { - cached_[i].push_back(in[i]); - } - if (!key_projector_) { - ArrayList key_cols; - for (auto idx : key_index_list_) { - key_cols.push_back(in[idx]); - } - sorter_->Evaluate(key_cols); - } else { - std::vector> projected_batch; - // do projection here, and the projected arrays are used for comparison - auto length = in.size() > 0 ? in[0]->length() : 0; - auto in_batch = arrow::RecordBatch::Make(result_schema_, length, in); - RETURN_NOT_OK( - key_projector_->Evaluate(*in_batch, ctx_->memory_pool(), &projected_batch)); - sorter_->Evaluate(projected_batch); - } - items_total_ += in[0]->length(); - length_list_.push_back(in[0]->length()); - return arrow::Status::OK(); - } - - arrow::Status MakeResultIterator( - std::shared_ptr schema, - std::shared_ptr>* out) override { - std::shared_ptr indices_out; - RETURN_NOT_OK(sorter_->FinishInternal(&indices_out)); - *out = std::make_shared(ctx_, schema, indices_out, cached_); - return arrow::Status::OK(); - } - - private: - std::shared_ptr sorter_; - std::vector cached_; - arrow::compute::FunctionContext* ctx_; - std::shared_ptr result_schema_; - std::shared_ptr key_projector_; - std::vector> key_field_list_; - std::vector> projected_types_; - std::vector> projected_field_list_; - std::vector length_list_; - uint64_t num_batches_ = 0; - uint64_t items_total_ = 0; - std::vector key_index_list_; - std::vector nulls_order_; - std::vector sort_directions_; - bool NaN_check_; - int col_num_; - - class TypedSorterCodeGenImpl { - public: - TypedSorterCodeGenImpl(std::string indice, std::shared_ptr data_type) - : indice_(indice), data_type_(data_type) {} - std::string GetCachedVariablesDefine() { - std::stringstream ss; - ss << "using ArrayType_" << indice_ << " = " << GetTypeString(data_type_, "Array") - << ";" << std::endl; - ss << "std::vector> cached_" << indice_ - << "_;" << std::endl; - return ss.str(); - } - - private: - std::string indice_; - std::shared_ptr data_type_; - }; - - std::string ProduceCodes() { - int indice = 0; - std::vector> key_typed_codegen_list; - if (key_projector_) { - for (auto field : projected_field_list_) { - auto codegen = std::make_shared( - std::to_string(indice), field->type()); - key_typed_codegen_list.push_back(codegen); - indice++; - } - } else { - for (auto field : key_field_list_) { - auto codegen = std::make_shared( - std::to_string(indice), field->type()); - key_typed_codegen_list.push_back(codegen); - indice++; - } - } - - std::string cached_insert_str = GetCachedInsert(); - - std::string comp_func_str = GetCompFunction(true); - - std::string comp_func_str_without_null = GetCompFunction(false); - - std::string sort_func_str = GetSortFunction(); - - std::string cached_variables_define_str = - GetCachedVariablesDefine(key_typed_codegen_list); - - return BaseCodes() + R"( -#include - -#include -#include - -#include "codegen/arrow_compute/ext/array_item_index.h" -#include "codegen/common/sort_relation.h" -#include "precompile/builder.h" -#include "precompile/type.h" -#include "third_party/ska_sort.hpp" -#include "third_party/timsort.hpp" -using namespace sparkcolumnarplugin::precompile; - -class TypedSorterImpl : public CodeGenBase { - public: - TypedSorterImpl(arrow::compute::FunctionContext* ctx) : ctx_(ctx) {} - - arrow::Status Evaluate(const ArrayList& in) override { - num_batches_++; - )" + cached_insert_str + - R"( - auto cur = cached_0_[cached_0_.size() - 1]; - items_total_ += cur->length(); - length_list_.push_back(cur->length()); - - return arrow::Status::OK(); - } - - arrow::Status FinishInternal(std::shared_ptr* out) { - // we should support nulls first and nulls last here - // we should also support desc and asc here - )" + comp_func_str + - comp_func_str_without_null + - R"( - // initiate buffer for all arrays - std::shared_ptr indices_buf; - int64_t buf_size = items_total_ * sizeof(ArrayItemIndexS); - RETURN_NOT_OK(arrow::AllocateBuffer(ctx_->memory_pool(), buf_size, &indices_buf)); - - ArrayItemIndexS* indices_begin = - reinterpret_cast(indices_buf->mutable_data()); - ArrayItemIndexS* indices_end = indices_begin + items_total_; - - int64_t indices_i = 0; - for (int array_id = 0; array_id < num_batches_; array_id++) { - for (int64_t i = 0; i < length_list_[array_id]; i++) { - (indices_begin + indices_i)->array_id = array_id; - (indices_begin + indices_i)->id = i; - indices_i++; - } - } - - )" + sort_func_str + - R"( - std::shared_ptr out_type; - RETURN_NOT_OK(MakeFixedSizeBinaryType(sizeof(ArrayItemIndexS) / sizeof(int32_t), &out_type)); - RETURN_NOT_OK(MakeFixedSizeBinaryArray(out_type, items_total_, indices_buf, out)); - return arrow::Status::OK(); - } - - private: - )" + cached_variables_define_str + - R"( - std::vector length_list_; - arrow::compute::FunctionContext* ctx_; - uint64_t num_batches_ = 0; - uint64_t items_total_ = 0; - // If all batches has no null value, has_null_ will remain false - bool has_null_ = false; -}; - -extern "C" void MakeCodeGen(arrow::compute::FunctionContext* ctx, - std::shared_ptr* out) { - *out = std::make_shared(ctx); -} - - )"; - } - std::string GetCachedInsert() { - std::stringstream ss; - for (int i = 0; i < key_index_list_.size(); i++) { - ss << "cached_" << i << "_.push_back(std::make_shared(in[" << i << "]));\n" - // update has_null_ - << "if (!has_null_ && cached_" << i << "_[cached_" << i - << "_.size() - 1]->null_count() > 0) {" - << "has_null_ = true;}" << std::endl; - } - return ss.str(); - } - std::string GetSortFunction() { - std::stringstream ss; - ss << "if (has_null_) {\n" - << "gfx::timsort(indices_begin, indices_begin + items_total_, comp);} else {\n" - << "gfx::timsort(indices_begin, indices_begin + items_total_, comp_without_null);}" - << std::endl; - return ss.str(); - - } - std::string GetCompFunction(bool has_null) { - std::stringstream ss; - bool projected; - if (key_projector_) { - projected = true; - } else { - projected = false; - } - if (has_null) { - ss << "auto comp = [this](ArrayItemIndexS x, ArrayItemIndexS y) {" - << GetCompFunction_(0, projected, key_field_list_, - projected_types_, sort_directions_, nulls_order_); - } else { - ss << "auto comp_without_null = [this](ArrayItemIndexS x, ArrayItemIndexS y) {" - << GetCompFunction_Without_Null_(0, projected, key_field_list_, - projected_types_, sort_directions_); - } - ss << "};\n"; - return ss.str(); - } - std::string GetCompFunction_( - int cur_key_idx, bool projected, - const std::vector>& key_field_list, - const std::vector>& projected_types, - const std::vector& sort_directions, const std::vector& nulls_order) { - std::string comp_str; - bool asc = sort_directions[cur_key_idx]; - bool nulls_first = nulls_order[cur_key_idx]; - std::shared_ptr data_type; - std::string array; - // if projected, use projected batch to compare, and use projected type - array = "cached_"; - if (projected) { - data_type = projected_types[cur_key_idx]; - } else { - data_type = key_field_list[cur_key_idx]->type(); - } - - auto x_num_value = - array + std::to_string(cur_key_idx) + "_[x.array_id]->GetView(x.id)"; - auto x_str_value = - array + std::to_string(cur_key_idx) + "_[x.array_id]->GetString(x.id)"; - auto y_num_value = - array + std::to_string(cur_key_idx) + "_[y.array_id]->GetView(y.id)"; - auto y_str_value = - array + std::to_string(cur_key_idx) + "_[y.array_id]->GetString(y.id)"; - auto is_x_null = array + std::to_string(cur_key_idx) + "_[x.array_id]->IsNull(x.id)"; - auto is_y_null = array + std::to_string(cur_key_idx) + "_[y.array_id]->IsNull(y.id)"; - auto x_null_count = - array + std::to_string(cur_key_idx) + "_[x.array_id]->null_count() > 0"; - auto y_null_count = - array + std::to_string(cur_key_idx) + "_[y.array_id]->null_count() > 0"; - auto x_null = "(" + x_null_count + " && " + is_x_null + " )"; - auto y_null = "(" + y_null_count + " && " + is_y_null + " )"; - auto is_x_nan = "std::isnan(" + x_num_value + ")"; - auto is_y_nan = "std::isnan(" + y_num_value + ")"; - - // Multiple keys sorting w/ nulls first/last is supported. - std::stringstream ss; - // We need to determine the position of nulls. - ss << "if (" << x_null << ") {\n"; - // If value accessed from x is null, return true to make nulls first. - if (nulls_first) { - ss << "return true;\n}"; - } else { - ss << "return false;\n}"; - } - // If value accessed from y is null, return false to make nulls first. - ss << " else if (" << y_null << ") {\n"; - if (nulls_first) { - ss << "return false;\n}"; - } else { - ss << "return true;\n}"; - } - // If datatype is floating, we need to do partition for NaN if NaN check is enabled - if (data_type->id() == arrow::Type::DOUBLE || data_type->id() == arrow::Type::FLOAT) { - if (NaN_check_) { - ss << "else if (" << is_x_nan << ") {\n"; - if (asc) { - ss << "return false;\n}"; - } else { - ss << "return true;\n}"; - } - ss << "else if (" << is_y_nan << ") {\n"; - if (asc) { - ss << "return true;\n}"; - } else { - ss << "return false;\n}"; - } - } - } - - // If values accessed from x and y are both not null - ss << " else {\n"; - - // Multiple keys sorting w/ different ordering is supported. - // For string type of data, GetString should be used instead of GetView. - if (asc) { - if (data_type->id() == arrow::Type::STRING) { - ss << "return " << x_str_value << " < " << y_str_value << ";\n}\n"; - } else { - ss << "return " << x_num_value << " < " << y_num_value << ";\n}\n"; - } - } else { - if (data_type->id() == arrow::Type::STRING) { - ss << "return " << x_str_value << " > " << y_str_value << ";\n}\n"; - } else { - ss << "return " << x_num_value << " > " << y_num_value << ";\n}\n"; - } - } - comp_str = ss.str(); - if ((cur_key_idx + 1) == sort_directions.size()) { - return comp_str; - } - // clear the contents of stringstream - ss.str(std::string()); - if (data_type->id() == arrow::Type::STRING) { - ss << "if ((" << x_null << " && " << y_null << ") || (" << x_str_value - << " == " << y_str_value << ")) {"; - } else { - if (NaN_check_ && (data_type->id() == arrow::Type::DOUBLE || - data_type->id() == arrow::Type::FLOAT)) { - // need to check NaN - ss << "if ((" << x_null << " && " << y_null << ") || (" << is_x_nan - << " && " << is_y_nan << ") || (" << x_num_value << " == " << y_num_value - << ")) {"; - } else { - ss << "if ((" << x_null << " && " << y_null << ") || (" << x_num_value - << " == " << y_num_value << ")) {"; - } - } - ss << GetCompFunction_(cur_key_idx + 1, projected, key_field_list, - projected_types, sort_directions, nulls_order) - << "} else { " << comp_str << "}"; - return ss.str(); - } - std::string GetCompFunction_Without_Null_( - int cur_key_idx, bool projected, - const std::vector>& key_field_list, - const std::vector>& projected_types, - const std::vector& sort_directions) { - std::string comp_str; - bool asc = sort_directions[cur_key_idx]; - std::shared_ptr data_type; - std::string array; - // if projected, use projected batch to compare, and use projected type - array = "cached_"; - if (projected) { - data_type = projected_types[cur_key_idx]; - } else { - data_type = key_field_list[cur_key_idx]->type(); - } - - auto x_num_value = - array + std::to_string(cur_key_idx) + "_[x.array_id]->GetView(x.id)"; - auto x_str_value = - array + std::to_string(cur_key_idx) + "_[x.array_id]->GetString(x.id)"; - auto y_num_value = - array + std::to_string(cur_key_idx) + "_[y.array_id]->GetView(y.id)"; - auto y_str_value = - array + std::to_string(cur_key_idx) + "_[y.array_id]->GetString(y.id)"; - auto is_x_nan = "std::isnan(" + x_num_value + ")"; - auto is_y_nan = "std::isnan(" + y_num_value + ")"; - - // Multiple keys sorting w/ nulls first/last is supported. - std::stringstream ss; - // If datatype is floating, we need to do partition for NaN if NaN check is enabled - if (NaN_check_ && (data_type->id() == arrow::Type::DOUBLE || - data_type->id() == arrow::Type::FLOAT)) { - ss << "if (" << is_x_nan << ") {\n"; - if (asc) { - ss << "return false;\n}"; - } else { - ss << "return true;\n}"; - } - ss << "else if (" << is_y_nan << ") {\n"; - if (asc) { - ss << "return true;\n}"; - } else { - ss << "return false;\n}"; - } - // If values accessed from x and y are both not nan - ss << " else {\n"; - } - - // Multiple keys sorting w/ different ordering is supported. - // For string type of data, GetString should be used instead of GetView. - if (asc) { - if (data_type->id() == arrow::Type::STRING) { - ss << "return " << x_str_value << " < " << y_str_value << ";\n"; - } else { - ss << "return " << x_num_value << " < " << y_num_value << ";\n"; - } - } else { - if (data_type->id() == arrow::Type::STRING) { - ss << "return " << x_str_value << " > " << y_str_value << ";\n"; - } else { - ss << "return " << x_num_value << " > " << y_num_value << ";\n"; - } - } - if (NaN_check_ && (data_type->id() == arrow::Type::DOUBLE || - data_type->id() == arrow::Type::FLOAT)) { - ss << "}" << std::endl; - } - comp_str = ss.str(); - if ((cur_key_idx + 1) == sort_directions.size()) { - return comp_str; - } - // clear the contents of stringstream - ss.str(std::string()); - if (data_type->id() == arrow::Type::STRING) { - ss << "if (" << x_str_value << " == " << y_str_value << ") {"; - } else { - if (NaN_check_ && (data_type->id() == arrow::Type::DOUBLE || - data_type->id() == arrow::Type::FLOAT)) { - // need to check NaN - ss << "if ((" << is_x_nan << " && " << is_y_nan << ") || (" - << x_num_value << " == " << y_num_value << ")) {"; - } else { - ss << "if (" << x_num_value << " == " << y_num_value << ") {"; - } - } - ss << GetCompFunction_Without_Null_(cur_key_idx + 1, projected, key_field_list, - projected_types, sort_directions) - << "} else { " << comp_str << "}"; - return ss.str(); - } - std::string GetCachedVariablesDefine( - std::vector> shuffle_typed_codegen_list) { - std::stringstream ss; - for (auto codegen : shuffle_typed_codegen_list) { - ss << codegen->GetCachedVariablesDefine() << std::endl; - } - return ss.str(); - } - - class SorterResultIterator : public ResultIterator { - public: - SorterResultIterator(arrow::compute::FunctionContext* ctx, - std::shared_ptr schema, - std::shared_ptr indices_in, - std::vector& cached) - : ctx_(ctx), - schema_(schema), - indices_in_cache_(indices_in), - total_length_(indices_in->length()), - cached_in_(cached) { - col_num_ = schema->num_fields(); - indices_begin_ = (ArrayItemIndexS*)indices_in->value_data(); - // appender_type won't be used - AppenderBase::AppenderType appender_type = AppenderBase::left; - for (int i = 0; i < col_num_; i++) { - auto field = schema->field(i); - std::shared_ptr appender; - MakeAppender(ctx_, field->type(), appender_type, &appender); - appender_list_.push_back(appender); - } - for (int i = 0; i < col_num_; i++) { - arrow::ArrayVector array_vector = cached_in_[i]; - int array_num = array_vector.size(); - for (int array_id = 0; array_id < array_num; array_id++) { - auto arr = array_vector[array_id]; - appender_list_[i]->AddArray(arr); - } - } - batch_size_ = GetBatchSize(); - } - ~SorterResultIterator(){} - - std::string ToString() override { return "SortArraysToIndicesResultIterator"; } - - bool HasNext() override { - if (offset_ >= total_length_) { - return false; - } - return true; - } - - arrow::Status Next(std::shared_ptr* out) { - auto length = (total_length_ - offset_) > batch_size_ ? batch_size_ - : (total_length_ - offset_); - uint64_t count = 0; - for (int i = 0; i < col_num_; i++) { - while (count < length) { - auto item = indices_begin_ + offset_ + count++; - RETURN_NOT_OK(appender_list_[i]->Append(item->array_id, item->id)); - } - count = 0; - } - offset_ += length; - ArrayList arrays; - for (int i = 0; i < col_num_; i++) { - std::shared_ptr out_array; - RETURN_NOT_OK(appender_list_[i]->Finish(&out_array)); - arrays.push_back(out_array); - appender_list_[i]->Reset(); - } - - *out = arrow::RecordBatch::Make(schema_, length, arrays); - return arrow::Status::OK(); - } - - private: - uint64_t offset_ = 0; - const uint64_t total_length_; - std::shared_ptr schema_; - arrow::compute::FunctionContext* ctx_; - uint64_t batch_size_; - int col_num_; - ArrayItemIndexS* indices_begin_; - std::vector cached_in_; - std::vector> type_list_; - std::vector> appender_list_; - std::vector> array_list_; - std::shared_ptr indices_in_cache_; - }; -}; + std::vector> cmp_functions_; +}; arrow::Status SortArraysToIndicesKernel::Make( arrow::compute::FunctionContext* ctx, @@ -2655,7 +1657,7 @@ SortArraysToIndicesKernel::SortArraysToIndicesKernel( bool NaN_check, bool do_codegen, int result_type) { - // sort_key_node may need to do projection + // represents whether need to projection for sort keys bool pre_processed_key_ = false; gandiva::NodePtr key_project; gandiva::ExpressionVector key_project_exprs; @@ -2664,6 +1666,7 @@ SortArraysToIndicesKernel::SortArraysToIndicesKernel( std::shared_ptr node_visitor; THROW_NOT_OK(MakeTypedNodeVisitor(node, &node_visitor)); if (node_visitor->GetResultType() != TypedNodeVisitor::FieldNode) { + // if projection is needed pre_processed_key_ = true; break; } @@ -2758,9 +1761,9 @@ SortArraysToIndicesKernel::SortArraysToIndicesKernel( } } else { if (do_codegen) { - // Will use Sort Codegen for multiple-key sort - impl_.reset(new SortMultiplekeyCodegenKernel(ctx, result_schema, key_projector, - projected_types, key_field_list, sort_directions, nulls_order, NaN_check)); + // Will use Sort with Codegen for multiple-key sort + impl_.reset(new Impl(ctx, result_schema, key_projector, projected_types, + key_field_list, sort_directions, nulls_order, NaN_check)); } else { // Will use Sort without Codegen for multiple-key sort impl_.reset(new SortMultiplekeyKernel(ctx, result_schema, key_projector,