From ec532fc3e256ca0835d19970d67562fd5b9df6d3 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Mon, 9 May 2022 09:15:00 +0800 Subject: [PATCH] Revert "[DNM] some optimizations to shuffle's split function (#839)" This reverts commit 8aad39a0a24ba86c49004226f4e0e2966a580b5f. --- native-sql-engine/cpp/CMakeLists.txt | 3 - native-sql-engine/cpp/src/CMakeLists.txt | 8 +- .../cpp/src/benchmarks/CMakeLists.txt | 2 +- .../src/benchmarks/shuffle_split_benchmark.cc | 416 +++++------------- native-sql-engine/cpp/src/shuffle/splitter.cc | 285 +++++------- native-sql-engine/cpp/src/shuffle/splitter.h | 40 +- .../cpp/src/tests/shuffle_split_test.cc | 3 - 7 files changed, 244 insertions(+), 513 deletions(-) diff --git a/native-sql-engine/cpp/CMakeLists.txt b/native-sql-engine/cpp/CMakeLists.txt index a1301fd1d..fcd9d7cc1 100644 --- a/native-sql-engine/cpp/CMakeLists.txt +++ b/native-sql-engine/cpp/CMakeLists.txt @@ -1,8 +1,5 @@ cmake_minimum_required(VERSION 3.16) project(spark_columnar_plugin) -#add_compile_options(-g) -set(CMAKE_EXPORT_COMPILE_COMMANDS ON) - set(root_directory ${PROJECT_BINARY_DIR}) add_subdirectory(src) diff --git a/native-sql-engine/cpp/src/CMakeLists.txt b/native-sql-engine/cpp/src/CMakeLists.txt index 28c3a97aa..b579bc879 100644 --- a/native-sql-engine/cpp/src/CMakeLists.txt +++ b/native-sql-engine/cpp/src/CMakeLists.txt @@ -440,7 +440,6 @@ endif() if(BENCHMARKS) find_package(GTest) - find_package(benchmark REQUIRED) add_definitions(-DBENCHMARK_FILE_PATH="file://${CMAKE_CURRENT_SOURCE_DIR}/benchmarks/source_files/") macro(package_add_benchmark TESTNAME) add_executable(${TESTNAME} ${ARGN}) @@ -452,12 +451,7 @@ macro(package_add_benchmark TESTNAME) ) set_target_properties(${TESTNAME} PROPERTIES FOLDER tests) endmacro() -macro(package_add_gbenchmark TESTNAME) - add_executable(${TESTNAME} ${ARGN}) - target_link_libraries(${TESTNAME} benchmark::benchmark spark_columnar_jni parquet ${CMAKE_THREAD_LIBS_INIT}) - target_include_directories(${TESTNAME} PUBLIC ${source_root_directory}) - set_target_properties(${TESTNAME} PROPERTIES FOLDER tests) -endmacro() + include(GoogleTest) ENABLE_TESTING() add_custom_target(benchmark ${CMAKE_CTEST_COMMAND} -R BenchmarkArrowCompute --output-on-failure) add_subdirectory(benchmarks) diff --git a/native-sql-engine/cpp/src/benchmarks/CMakeLists.txt b/native-sql-engine/cpp/src/benchmarks/CMakeLists.txt index a152034e3..42671e4da 100644 --- a/native-sql-engine/cpp/src/benchmarks/CMakeLists.txt +++ b/native-sql-engine/cpp/src/benchmarks/CMakeLists.txt @@ -4,5 +4,5 @@ package_add_benchmark(BenchmarkArrowComputeWSCG arrow_compute_benchmark_wscg.cc) package_add_benchmark(BenchmarkArrowComputeSort arrow_compute_benchmark_sort.cc) package_add_benchmark(BenchmarkArrowComputeHashAggregate arrow_compute_benchmark_hash_aggregate.cc) #package_add_benchmark(BenchmarkArrowComputeBigScale arrow_compute_benchmark_big_scale.cc) -package_add_gbenchmark(BenchmarkShuffleSplit shuffle_split_benchmark.cc) +package_add_benchmark(BenchmarkShuffleSplit shuffle_split_benchmark.cc) package_add_benchmark(BenchmarkColumnarToRow columnar_to_row_converter_benchmark.cc) diff --git a/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc b/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc index cd81ef877..8c6e15668 100644 --- a/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc +++ b/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc @@ -19,14 +19,12 @@ #include #include #include -//#include +#include #include #include -//#include -#include +#include #include #include -#include #include #include @@ -38,40 +36,21 @@ namespace sparkcolumnarplugin { namespace shuffle { -const int batch_buffer_size = 32768; -const int split_buffer_size = 8192; +std::vector input_files; +const int num_partitions = 336; +const int buffer_size = 20480; -class BenchmarkShuffleSplit : public ::benchmark::Fixture { +class BenchmarkShuffleSplit : public ::testing::Test { public: - BenchmarkShuffleSplit() { - file_name = - "/mnt/DP_disk1/lineitem/" - "part-00025-356249a2-c285-42b9-8a18-5b10be61e0c4-c000.snappy.parquet"; - - GetRecordBatchReader(file_name); - std::cout << schema->ToString() << std::endl; - const auto& fields = schema->fields(); - for (const auto& field : fields) { - if (field->name() == "l_orderkey") { - auto node = gandiva::TreeExprBuilder::MakeField(field); - expr_vector.push_back(gandiva::TreeExprBuilder::MakeExpression( - std::move(node), arrow::field("res_" + field->name(), field->type()))); - } - } - } void GetRecordBatchReader(const std::string& input_file) { - std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; - std::shared_ptr record_batch_reader; - std::shared_ptr fs; std::string file_name; ARROW_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(input_file, &file_name)) ARROW_ASSIGN_OR_THROW(file, fs->OpenInputFile(file_name)); - properties.set_batch_size(batch_buffer_size); - properties.set_pre_buffer(false); - properties.set_use_threads(false); + parquet::ArrowReaderProperties properties(true); + properties.set_batch_size(buffer_size); ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), @@ -80,304 +59,141 @@ class BenchmarkShuffleSplit : public ::benchmark::Fixture { ASSERT_NOT_OK(parquet_reader->GetSchema(&schema)); auto num_rowgroups = parquet_reader->num_row_groups(); - + std::vector row_group_indices; for (int i = 0; i < num_rowgroups; ++i) { row_group_indices.push_back(i); } auto num_columns = schema->num_fields(); + std::vector column_indices; for (int i = 0; i < num_columns; ++i) { column_indices.push_back(i); } - } - void SetUp(const ::benchmark::State& state) {} - - void TearDown(const ::benchmark::State& state) {} + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(row_group_indices, column_indices, + &record_batch_reader)); + } + void SetUp() override { + // read input from parquet file + if (input_files.empty()) { + std::cout << "No input file." << std::endl; + std::exit(0); + } + std::cout << "Input file: " << std::endl; + for (const auto& file_name : input_files) { + std::cout << file_name << std::endl; + } + GetRecordBatchReader(input_files[0]); + std::cout << schema->ToString() << std::endl; - protected: - long SetCPU(uint32_t cpuindex) { - cpu_set_t cs; - CPU_ZERO(&cs); - CPU_SET(cpuindex, &cs); - return sched_setaffinity(0, sizeof(cs), &cs); + const auto& fields = schema->fields(); + for (const auto& field : fields) { + if (field->name() == "l_partkey") { + auto node = gandiva::TreeExprBuilder::MakeField(field); + expr_vector.push_back(gandiva::TreeExprBuilder::MakeExpression( + std::move(node), arrow::field("res_" + field->name(), field->type()))); + } + } } - virtual void Do_Split(const std::shared_ptr& splitter, int64_t& elapse_read, - int64_t& num_batches, int64_t& num_rows, int64_t& split_time, - benchmark::State& state) {} + + void TearDown() override {} protected: - std::string file_name; std::shared_ptr file; - std::vector row_group_indices; - std::vector column_indices; + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; std::shared_ptr schema; std::vector> expr_vector; - parquet::ArrowReaderProperties properties; -}; - -BENCHMARK_DEFINE_F(BenchmarkShuffleSplit, CacheScan)(benchmark::State& state) { - SetCPU(state.thread_index()); - - arrow::Compression::type compression_type = (arrow::Compression::type)state.range(1); - - const int num_partitions = state.range(0); - - auto options = SplitOptions::Defaults(); - options.compression_type = compression_type; - options.buffer_size = split_buffer_size; - options.buffered_write = true; - options.offheap_per_task = 128 * 1024 * 1024 * 1024L; - options.prefer_spill = true; - options.write_schema = false; std::shared_ptr splitter; - if (!expr_vector.empty()) { - ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("hash", schema, num_partitions, - expr_vector, std::move(options))); - } else { - ARROW_ASSIGN_OR_THROW( - splitter, Splitter::Make("rr", schema, num_partitions, std::move(options))); - } - - std::shared_ptr record_batch; - int64_t elapse_read = 0; - int64_t num_batches = 0; - int64_t num_rows = 0; - int64_t split_time = 0; - - std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; - std::shared_ptr record_batch_reader; - ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( - arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), properties, - &parquet_reader)); - - std::vector> batches; - ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(row_group_indices, column_indices, - &record_batch_reader)); - do { - TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); - - if (record_batch) { - batches.push_back(record_batch); - num_batches += 1; - num_rows += record_batch->num_rows(); + void DoSplit(arrow::Compression::type compression_type) { + auto options = SplitOptions::Defaults(); + options.compression_type = compression_type; + options.buffer_size = buffer_size; + if (!expr_vector.empty()) { + ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("hash", schema, num_partitions, + expr_vector, std::move(options))); + } else { + ARROW_ASSIGN_OR_THROW( + splitter, Splitter::Make("rr", schema, num_partitions, std::move(options))); } - } while (record_batch); - - for (auto _ : state) { - for_each(batches.begin(), batches.end(), - [&splitter, &split_time](std::shared_ptr& record_batch) { - TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); - }); - } - - TIME_NANO_OR_THROW(split_time, splitter->Stop()); - - auto fs = std::make_shared(); - fs->DeleteFile(splitter->DataFile()); - - state.SetBytesProcessed(int64_t(splitter->RawPartitionBytes())); - - state.counters["rowgroups"] = - benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["columns"] = - benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["batches"] = benchmark::Counter( - num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_rows"] = benchmark::Counter( - num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_partitions"] = benchmark::Counter( - num_partitions, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["batch_buffer_size"] = - benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["split_buffer_size"] = - benchmark::Counter(split_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - - state.counters["bytes_spilled"] = - benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_written"] = - benchmark::Counter(splitter->TotalBytesWritten(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_raw"] = - benchmark::Counter(splitter->RawPartitionBytes(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_spilled"] = - benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["parquet_parse"] = benchmark::Counter( - elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["compute_pid_time"] = - benchmark::Counter(splitter->TotalComputePidTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["write_time"] = - benchmark::Counter(splitter->TotalWriteTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["spill_time"] = - benchmark::Counter(splitter->TotalSpillTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["compress_time"] = - benchmark::Counter(splitter->TotalCompressTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - - split_time = split_time - splitter->TotalSpillTime() - splitter->TotalComputePidTime() - - splitter->TotalCompressTime() - splitter->TotalWriteTime(); - state.counters["split_time"] = benchmark::Counter( - split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); -} - -BENCHMARK_DEFINE_F(BenchmarkShuffleSplit, IterateScan)(benchmark::State& state) { - SetCPU(state.thread_index()); - - arrow::Compression::type compression_type = (arrow::Compression::type)state.range(1); - - const int num_partitions = state.range(0); - - auto options = SplitOptions::Defaults(); - options.compression_type = compression_type; - options.buffer_size = split_buffer_size; - options.buffered_write = true; - options.offheap_per_task = 128 * 1024 * 1024 * 1024L; - options.prefer_spill = true; - options.write_schema = false; - - std::shared_ptr splitter; - - if (!expr_vector.empty()) { - ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("hash", schema, num_partitions, - expr_vector, std::move(options))); - } else { - ARROW_ASSIGN_OR_THROW( - splitter, Splitter::Make("rr", schema, num_partitions, std::move(options))); - } + std::shared_ptr record_batch; + int64_t elapse_read = 0; + int64_t num_batches = 0; + int64_t num_rows = 0; + int64_t split_time = 0; - int64_t elapse_read = 0; - int64_t num_batches = 0; - int64_t num_rows = 0; - int64_t split_time = 0; - - std::shared_ptr record_batch; - - std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; - std::shared_ptr record_batch_reader; - ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( - arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), properties, - &parquet_reader)); - - for (auto _ : state) { - std::vector> batches; - ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(row_group_indices, column_indices, - &record_batch_reader)); - TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); - while (record_batch) { - num_batches += 1; - num_rows += record_batch->num_rows(); - TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); + do { TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + if (record_batch) { + TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); + num_batches += 1; + num_rows += record_batch->num_rows(); + } + } while (record_batch); + std::cout << "Done " << input_files[0] << std::endl; + + for (int i = 1; i < input_files.size(); ++i) { + GetRecordBatchReader(input_files[i]); + do { + TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + if (record_batch) { + TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); + num_batches += 1; + num_rows += record_batch->num_rows(); + } + } while (record_batch); + std::cout << "Done " << input_files[i] << std::endl; } - } - TIME_NANO_OR_THROW(split_time, splitter->Stop()); - - auto fs = std::make_shared(); - fs->DeleteFile(splitter->DataFile()); - state.SetBytesProcessed(int64_t(splitter->RawPartitionBytes())); - - state.counters["rowgroups"] = - benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["columns"] = - benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["batches"] = benchmark::Counter( - num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_rows"] = benchmark::Counter( - num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_partitions"] = benchmark::Counter( - num_partitions, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["batch_buffer_size"] = - benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["split_buffer_size"] = - benchmark::Counter(split_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - - state.counters["bytes_spilled"] = - benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_written"] = - benchmark::Counter(splitter->TotalBytesWritten(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_raw"] = - benchmark::Counter(splitter->RawPartitionBytes(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_spilled"] = - benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); + TIME_NANO_OR_THROW(split_time, splitter->Stop()); + + std::cout << "Setting num_partitions to " << num_partitions << ", buffer_size to " + << buffer_size << std::endl; + std::cout << "Total batches read: " << num_batches << ", total rows: " << num_rows + << std::endl; + +#define BYTES_TO_STRING(bytes) \ + (bytes > 1 << 20 ? (bytes * 1.0 / (1 << 20)) \ + : (bytes > 1 << 10 ? (bytes * 1.0 / (1 << 10)) : bytes)) \ + << (bytes > 1 << 20 ? "MiB" : (bytes > 1 << 10) ? "KiB" : "B") + auto bytes_spilled = splitter->TotalBytesSpilled(); + auto bytes_written = splitter->TotalBytesWritten(); + std::cout << "Total bytes spilled: " << BYTES_TO_STRING(bytes_spilled) << std::endl; + std::cout << "Total bytes written: " << BYTES_TO_STRING(bytes_written) << std::endl; +#undef BYTES_TO_STRING + + auto compute_pid_time = splitter->TotalComputePidTime(); + auto write_time = splitter->TotalWriteTime(); + auto spill_time = splitter->TotalSpillTime(); + auto compress_time = splitter->TotalCompressTime(); + split_time = split_time - spill_time - compute_pid_time - compress_time - write_time; + std::cout << "Took " << TIME_NANO_TO_STRING(elapse_read) << " to read data" + << std::endl + << "Took " << TIME_NANO_TO_STRING(compute_pid_time) << " to compute pid" + << std::endl + << "Took " << TIME_NANO_TO_STRING(split_time) << " to split" << std::endl + << "Took " << TIME_NANO_TO_STRING(spill_time) << " to spill" << std::endl + << "Took " << TIME_NANO_TO_STRING(write_time) << " to write" << std::endl + << "Took " << TIME_NANO_TO_STRING(compress_time) << " to compress" + << std::endl; + } +}; - state.counters["parquet_parse"] = benchmark::Counter( - elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["compute_pid_time"] = - benchmark::Counter(splitter->TotalComputePidTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["write_time"] = - benchmark::Counter(splitter->TotalWriteTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["spill_time"] = - benchmark::Counter(splitter->TotalSpillTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["compress_time"] = - benchmark::Counter(splitter->TotalCompressTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); +TEST_F(BenchmarkShuffleSplit, LZ4) { DoSplit(arrow::Compression::LZ4_FRAME); } +TEST_F(BenchmarkShuffleSplit, FASTPFOR) { DoSplit(arrow::Compression::FASTPFOR); } - split_time = split_time - splitter->TotalSpillTime() - splitter->TotalComputePidTime() - - splitter->TotalCompressTime() - splitter->TotalWriteTime(); - state.counters["split_time"] = benchmark::Counter( - split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); -} - -/*BENCHMARK_REGISTER_F(BenchmarkShuffleSplit, CacheScan)->Iterations(1) - ->Args({96*2, arrow::Compression::FASTPFOR}) - ->Args({96*4, arrow::Compression::FASTPFOR}) - ->Args({96*8, arrow::Compression::FASTPFOR}) - ->Args({96*16, arrow::Compression::FASTPFOR}) - ->Args({96*32, arrow::Compression::FASTPFOR}) - ->Threads(1) - ->Threads(2) - ->Threads(4) - ->Threads(8) - ->Threads(16) - ->Threads(24) - ->Unit(benchmark::kSecond); -*/ -/*BENCHMARK_REGISTER_F(BenchmarkShuffleSplit, IterateScan)->Iterations(1) - ->Args({96*2, arrow::Compression::FASTPFOR}) - ->Args({96*4, arrow::Compression::FASTPFOR}) - ->Args({96*8, arrow::Compression::FASTPFOR}) - ->Args({96*16, arrow::Compression::FASTPFOR}) - ->Args({96*32, arrow::Compression::FASTPFOR}) - ->Threads(1) - ->Threads(2) - ->Threads(4) - ->Threads(8) - ->Threads(16) - ->Threads(24) - ->Unit(benchmark::kSecond);*/ -BENCHMARK_REGISTER_F(BenchmarkShuffleSplit, IterateScan) - ->Iterations(1) - ->Args({96 * 16, arrow::Compression::FASTPFOR}) - ->Threads(24) - ->ReportAggregatesOnly(false) - ->MeasureProcessCPUTime() - ->Unit(benchmark::kSecond); } // namespace shuffle } // namespace sparkcolumnarplugin -BENCHMARK_MAIN(); +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + if (argc > 1) { + for (int i = 1; i < argc; ++i) { + sparkcolumnarplugin::shuffle::input_files.emplace_back(argv[i]); + } + } + return RUN_ALL_TESTS(); +} diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index e739bd04f..f49789d2c 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -19,7 +19,6 @@ #include #include -#include #include #include #include @@ -133,9 +132,7 @@ class Splitter::PartitionWriter { : splitter_(splitter), partition_id_(partition_id) {} arrow::Status Spill() { -#ifndef SKIPWRITE RETURN_NOT_OK(EnsureOpened()); -#endif RETURN_NOT_OK(WriteRecordBatchPayload(spilled_file_os_.get(), partition_id_)); ClearCache(); return arrow::Status::OK(); @@ -213,13 +210,11 @@ class Splitter::PartitionWriter { arrow::Status WriteRecordBatchPayload(arrow::io::OutputStream* os, int32_t partition_id) { int32_t metadata_length = 0; // unused -#ifndef SKIPWRITE for (auto& payload : splitter_->partition_cached_recordbatch_[partition_id_]) { RETURN_NOT_OK(arrow::ipc::WriteIpcPayload( *payload, splitter_->options_.ipc_write_options, os, &metadata_length)); payload = nullptr; } -#endif return arrow::Status::OK(); } @@ -278,17 +273,10 @@ arrow::Status Splitter::Init() { ARROW_ASSIGN_OR_RAISE(column_type_id_, ToSplitterTypeId(schema_->fields())); partition_writer_.resize(num_partitions_); - - // pre-computed row count for each partition after the record batch split partition_id_cnt_.resize(num_partitions_); - // pre-allocated buffer size for each partition, unit is row count partition_buffer_size_.resize(num_partitions_); - - // start index for each partition when new record batch starts to split partition_buffer_idx_base_.resize(num_partitions_); - // the offset of each partition during record batch split partition_buffer_idx_offset_.resize(num_partitions_); - partition_cached_recordbatch_.resize(num_partitions_); partition_cached_recordbatch_size_.resize(num_partitions_); partition_lengths_.resize(num_partitions_); @@ -320,15 +308,14 @@ arrow::Status Splitter::Init() { auto num_fixed_width = fixed_width_array_idx_.size(); partition_fixed_width_validity_addrs_.resize(num_fixed_width); - column_has_null_.resize(num_fixed_width, false); partition_fixed_width_value_addrs_.resize(num_fixed_width); partition_fixed_width_buffers_.resize(num_fixed_width); binary_array_empirical_size_.resize(binary_array_idx_.size()); large_binary_array_empirical_size_.resize(large_binary_array_idx_.size()); input_fixed_width_has_null_.resize(num_fixed_width, false); for (auto i = 0; i < num_fixed_width; ++i) { - partition_fixed_width_validity_addrs_[i].resize(num_partitions_, nullptr); - partition_fixed_width_value_addrs_[i].resize(num_partitions_, nullptr); + partition_fixed_width_validity_addrs_[i].resize(num_partitions_); + partition_fixed_width_value_addrs_[i].resize(num_partitions_); partition_fixed_width_buffers_[i].resize(num_partitions_); } partition_binary_builders_.resize(binary_array_idx_.size()); @@ -379,6 +366,29 @@ arrow::Status Splitter::Init() { return arrow::Status::OK(); } +int64_t batch_nbytes(const arrow::RecordBatch& batch) { + int64_t accumulated = 0L; + for (const auto& array : batch.columns()) { + if (array == nullptr || array->data() == nullptr) { + continue; + } + for (const auto& buf : array->data()->buffers) { + if (buf == nullptr) { + continue; + } + accumulated += buf->capacity(); + } + } + return accumulated; +} + +int64_t batch_nbytes(std::shared_ptr batch) { + if (batch == nullptr) { + return 0; + } + return batch_nbytes(*batch); +} + int64_t Splitter::CompressedSize(const arrow::RecordBatch& rb) { auto payload = std::make_shared(); arrow::Status result; @@ -455,35 +465,9 @@ arrow::Status Splitter::Stop() { EVAL_END("write", options_.thread_id, options_.task_attempt_id) return arrow::Status::OK(); } -int64_t batch_nbytes(const arrow::RecordBatch& batch) { - int64_t accumulated = 0L; - - for (const auto& array : batch.columns()) { - if (array == nullptr || array->data() == nullptr) { - continue; - } - for (const auto& buf : array->data()->buffers) { - if (buf == nullptr) { - continue; - } - accumulated += buf->size(); - } - } - return accumulated; -} - -int64_t batch_nbytes(std::shared_ptr batch) { - if (batch == nullptr) { - return 0; - } - return batch_nbytes(*batch); -} arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffers) { - static int printed = 0; - if (partition_buffer_idx_base_[partition_id] > 0) { - // already filled auto fixed_width_idx = 0; auto binary_idx = 0; auto large_binary_idx = 0; @@ -550,18 +534,6 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer } default: { auto& buffers = partition_fixed_width_buffers_[fixed_width_idx][partition_id]; - if (buffers[0] != nullptr) { - buffers[0]->Resize((num_rows >> 3) + 1, /*shrink_to_fit =*/false); - } - if (buffers[1] != nullptr) { - if (column_type_id_[i]->id() == arrow::BooleanType::type_id) - buffers[1]->Resize((num_rows >> 3) + 1, /*shrink_to_fit =*/false); - else - buffers[1]->Resize( - num_rows * (arrow::bit_width(column_type_id_[i]->id()) >> 3), - /*shrink_to_fit =*/false); - } - if (reset_buffers) { arrays[i] = arrow::MakeArray( arrow::ArrayData::Make(schema_->field(i)->type(), num_rows, @@ -579,13 +551,10 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer } } } - auto batch = arrow::RecordBatch::Make(schema_, num_rows, std::move(arrays)); int64_t raw_size = batch_nbytes(batch); - raw_partition_lengths_[partition_id] += raw_size; auto payload = std::make_shared(); -#ifndef SKIPCOMPRESS if (num_rows <= options_.batch_compress_threshold) { TIME_NANO_OR_RAISE(total_compress_time_, arrow::ipc::GetRecordBatchPayload( @@ -595,18 +564,10 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer arrow::ipc::GetRecordBatchPayload( *batch, options_.ipc_write_options, payload.get())); } -#else - // for test reason - TIME_NANO_OR_RAISE(total_compress_time_, - arrow::ipc::GetRecordBatchPayload(*batch, tiny_bach_write_options_, - payload.get())); -#endif - partition_cached_recordbatch_size_[partition_id] += payload->body_length; partition_cached_recordbatch_[partition_id].push_back(std::move(payload)); partition_buffer_idx_base_[partition_id] = 0; } - return arrow::Status::OK(); } @@ -630,8 +591,8 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n auto builder = std::make_shared(options_.memory_pool); assert(builder != nullptr); RETURN_NOT_OK(builder->Reserve(new_size)); - RETURN_NOT_OK(builder->ReserveData( - binary_array_empirical_size_[binary_idx] * new_size + 1024)); + RETURN_NOT_OK( + builder->ReserveData(binary_array_empirical_size_[binary_idx] * new_size)); new_binary_builders.push_back(std::move(builder)); binary_idx++; break; @@ -642,7 +603,7 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n assert(builder != nullptr); RETURN_NOT_OK(builder->Reserve(new_size)); RETURN_NOT_OK(builder->ReserveData( - large_binary_array_empirical_size_[large_binary_idx] * new_size + 1024)); + large_binary_array_empirical_size_[large_binary_idx] * new_size)); new_large_binary_builders.push_back(std::move(builder)); large_binary_idx++; break; @@ -764,7 +725,6 @@ arrow::Status Splitter::AllocateNew(int32_t partition_id, int32_t new_size) { return status; } -// call from memory management arrow::Status Splitter::SpillFixedSize(int64_t size, int64_t* actual) { int64_t current_spilled = 0L; int32_t try_count = 0; @@ -817,7 +777,6 @@ arrow::Result Splitter::SpillLargestPartition(int64_t* size) { arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { // for the first input record batch, scan binary arrays and large binary // arrays to get their empirical sizes - uint32_t size_per_row = 0; if (!empirical_size_calculated_) { auto num_rows = rb.num_rows(); @@ -826,24 +785,21 @@ arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { std::static_pointer_cast(rb.column(binary_array_idx_[i])); auto length = arr->value_offset(num_rows) - arr->value_offset(0); binary_array_empirical_size_[i] = length / num_rows; + size_per_row += binary_array_empirical_size_[i]; } for (int i = 0; i < large_binary_array_idx_.size(); ++i) { auto arr = std::static_pointer_cast( rb.column(large_binary_array_idx_[i])); auto length = arr->value_offset(num_rows) - arr->value_offset(0); large_binary_array_empirical_size_[i] = length / num_rows; + size_per_row += large_binary_array_empirical_size_[i]; } empirical_size_calculated_ = true; } - size_per_row = std::accumulate(binary_array_empirical_size_.begin(), - binary_array_empirical_size_.end(), 0); - size_per_row = std::accumulate(large_binary_array_empirical_size_.begin(), - large_binary_array_empirical_size_.end(), size_per_row); - for (auto col = 0; col < fixed_width_array_idx_.size(); ++col) { auto col_idx = fixed_width_array_idx_[col]; - size_per_row += arrow::bit_width(column_type_id_[col_idx]->id()) / 8; + size_per_row += arrow::bit_width(column_type_id_[col]->id()) / 8; if (rb.column_data(col_idx)->GetNullCount() != 0) { input_fixed_width_has_null_[col] = true; } @@ -853,46 +809,50 @@ arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { options_.offheap_per_task > 0 && size_per_row > 0 ? options_.offheap_per_task / 4 / size_per_row / num_partitions_ : options_.buffer_size; - prealloc_row_cnt = std::min(prealloc_row_cnt, (int64_t)options_.buffer_size); // prepare partition buffers and spill if necessary for (auto pid = 0; pid < num_partitions_; ++pid) { - if (partition_id_cnt_[pid] > 0) { - // make sure the size to be allocated is larger than the size to be filled - auto new_size = std::max((uint16_t)prealloc_row_cnt, partition_id_cnt_[pid]); - if (partition_buffer_size_[pid] == 0) { - // allocate buffer if it's not yet allocated - RETURN_NOT_OK(AllocatePartitionBuffers(pid, new_size)); - } else if (partition_buffer_idx_base_[pid] + partition_id_cnt_[pid] > - partition_buffer_size_[pid]) { - // if the size to be filled + allready filled > the buffer size, need to allocate - // new buffer - if (options_.prefer_spill) { - // if prefer_spill is set, spill current record batch, we may reuse the buffers - - if (new_size > partition_buffer_size_[pid]) { - // if the partition size after split is already larger than allocated buffer - // size, need reallocate - RETURN_NOT_OK(CacheRecordBatch(pid, /*reset_buffers = */ true)); - // splill immediately + if (partition_id_cnt_[pid] > 0 && + partition_buffer_idx_base_[pid] + partition_id_cnt_[pid] > + partition_buffer_size_[pid]) { + auto new_size = std::min((int32_t)prealloc_row_cnt, options_.buffer_size); + // make sure the splitted record batch can be filled + if (partition_id_cnt_[pid] > new_size) new_size = partition_id_cnt_[pid]; + if (options_.prefer_spill) { + if (partition_buffer_size_[pid] == 0) { // first allocate? + RETURN_NOT_OK(AllocatePartitionBuffers(pid, new_size)); + } else { // not first allocate, spill + if (partition_id_cnt_[pid] > partition_buffer_size_[pid]) { // need reallocate? + // TODO(): CacheRecordBatch will try to reset builder buffer + // AllocatePartitionBuffers will then Reserve memory for builder based on last + // recordbatch, the logic on reservation size should be cleaned up + RETURN_NOT_OK(CacheRecordBatch(pid, true)); RETURN_NOT_OK(SpillPartition(pid)); RETURN_NOT_OK(AllocatePartitionBuffers(pid, new_size)); } else { - // partition size after split is smaller than buffer size, no need to reset - // buffer, reuse it. - RETURN_NOT_OK(CacheRecordBatch(pid, /*reset_buffers = */ false)); + RETURN_NOT_OK(CacheRecordBatch(pid, false)); RETURN_NOT_OK(SpillPartition(pid)); } - } else { - // if prefer_spill is disabled, cache the record batch - RETURN_NOT_OK(CacheRecordBatch(pid, /*reset_buffers = */ true)); - // allocate partition buffer with retries - RETURN_NOT_OK(AllocateNew(pid, new_size)); } + } else { + RETURN_NOT_OK(CacheRecordBatch(pid, true)); +#ifdef DEBUG + std::cout << "Attempt to allocate partition buffer, partition id: " + + std::to_string(pid) + ", old buffer size: " + + std::to_string(partition_buffer_size_[pid]) + + ", new buffer size: " + std::to_string(new_size) + + ", input record batch size: " + std::to_string(rb.num_rows()) + << std::endl; +#endif + RETURN_NOT_OK(AllocateNew(pid, new_size)); } } } -// now start to split the record batch +#ifdef DEBUG + std::cout << "Total bytes allocated: " << options_.memory_pool->bytes_allocated() + << std::endl; +#endif + #if defined(COLUMNAR_PLUGIN_USE_AVX512) RETURN_NOT_OK(SplitFixedWidthValueBufferAVX(rb)); #else @@ -903,7 +863,7 @@ arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { RETURN_NOT_OK(SplitLargeBinaryArray(rb)); RETURN_NOT_OK(SplitListArray(rb)); - // update partition buffer base after split + // update partition buffer base for (auto pid = 0; pid < num_partitions_; ++pid) { partition_buffer_idx_base_[pid] += partition_id_cnt_[pid]; } @@ -912,66 +872,68 @@ arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) { const auto num_rows = rb.num_rows(); - int64_t row; - std::vector partition_buffer_idx_offset; - for (auto col = 0; col < fixed_width_array_idx_.size(); ++col) { - const auto& dst_addrs = partition_fixed_width_value_addrs_[col]; - std::copy(dst_addrs.begin(), dst_addrs.end(), partition_buffer_idx_offset_.begin()); + std::fill(std::begin(partition_buffer_idx_offset_), + std::end(partition_buffer_idx_offset_), 0); auto col_idx = fixed_width_array_idx_[col]; auto src_addr = const_cast(rb.column_data(col_idx)->buffers[1]->data()); - - switch (arrow::bit_width(column_type_id_[col_idx]->id())) { -#define PROCESS(_CTYPE) \ - std::transform(partition_buffer_idx_offset_.begin(), \ - partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), \ - partition_buffer_idx_offset_.begin(), \ - [](uint8_t* x, int16_t y) { return x + y * sizeof(_CTYPE); }); \ - for (row = 0; row < num_rows; ++row) { \ - auto pid = partition_id_[row]; \ - auto dst_pid_base = reinterpret_cast<_CTYPE*>(partition_buffer_idx_offset_[pid]); \ - *dst_pid_base = reinterpret_cast<_CTYPE*>(src_addr)[row]; \ - partition_buffer_idx_offset_[pid] += sizeof(_CTYPE); \ - _mm_prefetch(&dst_pid_base[1], _MM_HINT_T0); \ - } \ - break; - case 8: - PROCESS(uint8_t) - case 16: - PROCESS(uint16_t) - case 32: - PROCESS(uint32_t) - case 64: - PROCESS(uint64_t) + const auto& dst_addrs = partition_fixed_width_value_addrs_[col]; + switch (column_type_id_[col_idx]->id()) { +#define PROCESS(SHUFFLE_TYPE, _CTYPE) \ + case SHUFFLE_TYPE::type_id: \ + for (auto row = 0; row < num_rows; ++row) { \ + auto pid = partition_id_[row]; \ + auto dst_offset = \ + partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; \ + reinterpret_cast::CType*>( \ + dst_addrs[pid])[dst_offset] = \ + reinterpret_cast::CType*>(src_addr)[row]; \ + partition_buffer_idx_offset_[pid]++; \ + _mm_prefetch(&reinterpret_cast::CType*>( \ + dst_addrs[pid])[dst_offset + 1], \ + _MM_HINT_T0); \ + } \ + break; + PROCESS(arrow::Int8Type, uint8_t) + PROCESS(arrow::UInt8Type, uint8_t) + PROCESS(arrow::Int16Type, uint16_t) + PROCESS(arrow::UInt16Type, uint16_t) + PROCESS(arrow::Int32Type, uint32_t) + PROCESS(arrow::UInt32Type, uint32_t) + PROCESS(arrow::FloatType, uint32_t) + PROCESS(arrow::Date32Type, uint32_t) + PROCESS(arrow::Time32Type, uint32_t) + PROCESS(arrow::Int64Type, uint64_t) + PROCESS(arrow::UInt64Type, uint64_t) + PROCESS(arrow::DoubleType, uint64_t) + PROCESS(arrow::Date64Type, uint64_t) + PROCESS(arrow::Time64Type, uint64_t) + PROCESS(arrow::TimestampType, uint64_t) #undef PROCESS - case 128: // arrow::Decimal128Type::type_id - std::transform( - partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), - partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), - [](uint8_t* x, int16_t y) { return x + y * 16; }); + case arrow::Decimal128Type::type_id: for (auto row = 0; row < num_rows; ++row) { auto pid = partition_id_[row]; - reinterpret_cast(partition_buffer_idx_offset_[pid])[0] = + auto dst_offset = + (partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]) << 1; + reinterpret_cast(dst_addrs[pid])[dst_offset] = reinterpret_cast(src_addr)[row << 1]; - reinterpret_cast(partition_buffer_idx_offset_[pid])[1] = + reinterpret_cast(dst_addrs[pid])[dst_offset | 1] = reinterpret_cast(src_addr)[row << 1 | 1]; - partition_buffer_idx_offset_[pid] += 16; - _mm_prefetch(&reinterpret_cast(partition_buffer_idx_offset_[pid])[2], + partition_buffer_idx_offset_[pid]++; + _mm_prefetch(&reinterpret_cast(dst_addrs[pid])[dst_offset + 2], _MM_HINT_T0); } break; - case 1: // arrow::BooleanType::type_id: - partition_buffer_idx_offset.resize(partition_buffer_idx_base_.size()); - std::copy(partition_buffer_idx_base_.begin(), partition_buffer_idx_base_.end(), - partition_buffer_idx_offset.begin()); + case arrow::BooleanType::type_id: for (auto row = 0; row < num_rows; ++row) { auto pid = partition_id_[row]; - uint16_t dst_offset = partition_buffer_idx_offset[pid]; + auto dst_offset = + partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; dst_addrs[pid][dst_offset >> 3] ^= (dst_addrs[pid][dst_offset >> 3] >> (dst_offset & 7) ^ src_addr[row >> 3] >> (row & 7)) << (dst_offset & 7); - partition_buffer_idx_offset[pid]++; + partition_buffer_idx_offset_[pid]++; } break; default: @@ -1151,28 +1113,23 @@ arrow::Status Splitter::SplitFixedWidthValueBufferAVX(const arrow::RecordBatch& arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch& rb) { const auto num_rows = rb.num_rows(); - std::vector partition_buffer_idx_offset; - for (auto col = 0; col < fixed_width_array_idx_.size(); ++col) { auto col_idx = fixed_width_array_idx_[col]; auto& dst_addrs = partition_fixed_width_validity_addrs_[col]; - if (rb.column_data(col_idx)->GetNullCount() == 0 && - column_has_null_[col_idx] == true) { - // if the input record batch doesn't have null, set validity to True + if (rb.column_data(col_idx)->GetNullCount() == 0) { for (auto pid = 0; pid < num_partitions_; ++pid) { if (partition_id_cnt_[pid] > 0 && dst_addrs[pid] != nullptr) { arrow::BitUtil::SetBitsTo(dst_addrs[pid], partition_buffer_idx_base_[pid], partition_id_cnt_[pid], true); } } - } else if (rb.column_data(col_idx)->GetNullCount() > 0) { - // there is Null count - column_has_null_[col_idx] = true; + } else { for (auto pid = 0; pid < num_partitions_; ++pid) { if (partition_id_cnt_[pid] > 0 && dst_addrs[pid] == nullptr) { - // init bitmap if it's null, initialize the buffer as true - auto new_size = - std::max(partition_id_cnt_[pid], (uint16_t)options_.buffer_size); + // init bitmap if it's null + auto new_size = partition_id_cnt_[pid] > options_.buffer_size + ? partition_id_cnt_[pid] + : options_.buffer_size; ARROW_ASSIGN_OR_RAISE( auto validity_buffer, arrow::AllocateResizableBuffer(arrow::BitUtil::BytesForBits(new_size), @@ -1185,17 +1142,17 @@ arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch& } auto src_addr = const_cast(rb.column_data(col_idx)->buffers[0]->data()); - partition_buffer_idx_offset.resize(partition_buffer_idx_base_.size()); - std::copy(partition_buffer_idx_base_.begin(), partition_buffer_idx_base_.end(), - partition_buffer_idx_offset.begin()); + std::fill(std::begin(partition_buffer_idx_offset_), + std::end(partition_buffer_idx_offset_), 0); for (auto row = 0; row < num_rows; ++row) { auto pid = partition_id_[row]; - auto dst_offset = partition_buffer_idx_offset[pid]; + auto dst_offset = + partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; dst_addrs[pid][dst_offset >> 3] ^= (dst_addrs[pid][dst_offset >> 3] >> (dst_offset & 7) ^ src_addr[row >> 3] >> (row & 7)) << (dst_offset & 7); - partition_buffer_idx_offset[pid]++; + partition_buffer_idx_offset_[pid]++; } } } @@ -1331,7 +1288,7 @@ arrow::Status RoundRobinSplitter::ComputeAndCountPartitionId( for (auto& pid : partition_id_) { pid = pid_selection_; partition_id_cnt_[pid_selection_]++; - pid_selection_ = (pid_selection_ + 1) == num_partitions_ ? 0 : (pid_selection_ + 1); + pid_selection_ = (pid_selection_ + 1) % num_partitions_; } return arrow::Status::OK(); } diff --git a/native-sql-engine/cpp/src/shuffle/splitter.h b/native-sql-engine/cpp/src/shuffle/splitter.h index 0dfac2f8c..77a48bb2d 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.h +++ b/native-sql-engine/cpp/src/shuffle/splitter.h @@ -25,7 +25,6 @@ #include #include -#include #include #include @@ -103,11 +102,6 @@ class Splitter { return raw_partition_lengths_; } - int64_t RawPartitionBytes() const { - return std::accumulate(raw_partition_lengths_.begin(), raw_partition_lengths_.end(), - 0LL); - } - // for testing const std::string& DataFile() const { return options_.data_file; } @@ -172,62 +166,38 @@ class Splitter { class PartitionWriter; - // partid std::vector partition_buffer_size_; - // partid - std::vector partition_buffer_idx_base_; - // partid - // temp array to hold the destination pointer - std::vector partition_buffer_idx_offset_; - // partid + std::vector partition_buffer_idx_base_; + std::vector partition_buffer_idx_offset_; + std::vector> partition_writer_; - // col partid std::vector> partition_fixed_width_validity_addrs_; - // cache if the column has null so far for any reducer. To bypass the reducer check - std::vector column_has_null_; - // col partid std::vector> partition_fixed_width_value_addrs_; - // col partid std::vector>>> partition_fixed_width_buffers_; - // col partid std::vector>> partition_binary_builders_; - // col partid std::vector>> partition_large_binary_builders_; std::vector>> partition_list_builders_; - // col partid - // partid std::vector>> partition_cached_recordbatch_; - // partid std::vector partition_cached_recordbatch_size_; // in bytes - // col std::vector fixed_width_array_idx_; - // col std::vector binary_array_idx_; - // col std::vector large_binary_array_idx_; - // col std::vector list_array_idx_; - // col bool empirical_size_calculated_ = false; - // col std::vector binary_array_empirical_size_; - // col std::vector large_binary_array_empirical_size_; - // col std::vector input_fixed_width_has_null_; // updated for each input record batch - // col - std::vector partition_id_; - // col - std::vector partition_id_cnt_; + std::vector partition_id_; + std::vector partition_id_cnt_; int32_t num_partitions_; std::shared_ptr schema_; diff --git a/native-sql-engine/cpp/src/tests/shuffle_split_test.cc b/native-sql-engine/cpp/src/tests/shuffle_split_test.cc index 1f12742cd..dab44a38c 100644 --- a/native-sql-engine/cpp/src/tests/shuffle_split_test.cc +++ b/native-sql-engine/cpp/src/tests/shuffle_split_test.cc @@ -181,7 +181,6 @@ const std::vector SplitterTest::input_data_2 = { TEST_F(SplitterTest, TestSingleSplitter) { split_options_.buffer_size = 10; - ARROW_ASSIGN_OR_THROW(splitter_, Splitter::Make("rr", schema_, 1, split_options_)) ASSERT_NOT_OK(splitter_->Split(*input_batch_1_)); @@ -214,8 +213,6 @@ TEST_F(SplitterTest, TestSingleSplitter) { ASSERT_EQ(rb->num_columns(), schema_->num_fields()); for (auto j = 0; j < rb->num_columns(); ++j) { ASSERT_EQ(rb->column(j)->length(), rb->num_rows()); - ASSERT_TRUE(rb->column(j)->Equals(*expected[i]->column(j), - EqualOptions::Defaults().diff_sink(&std::cout))); } ASSERT_TRUE(rb->Equals(*expected[i])); }