diff --git a/velox/exec/PartitionStreamingWindowBuild.cpp b/velox/exec/PartitionStreamingWindowBuild.cpp index f8deecf99194..ad58879655ff 100644 --- a/velox/exec/PartitionStreamingWindowBuild.cpp +++ b/velox/exec/PartitionStreamingWindowBuild.cpp @@ -23,7 +23,9 @@ PartitionStreamingWindowBuild::PartitionStreamingWindowBuild( velox::memory::MemoryPool* pool, const common::SpillConfig* spillConfig, tsan_atomic* nonReclaimableSection) - : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {} + : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection), + sortedRows_(0,memory::StlAllocator(*pool)), + inputRows_(0,memory::StlAllocator(*pool)) {} void PartitionStreamingWindowBuild::buildNextPartition() { partitionStartRows_.push_back(sortedRows_.size()); @@ -93,7 +95,7 @@ PartitionStreamingWindowBuild::nextPartition() { partitionSize); return std::make_shared( - data_.get(), partition, inversedInputChannels_, sortKeyInfo_); + pool_, data_.get(), partition, inversedInputChannels_, sortKeyInfo_); } bool PartitionStreamingWindowBuild::hasNextPartition() { diff --git a/velox/exec/PartitionStreamingWindowBuild.h b/velox/exec/PartitionStreamingWindowBuild.h index bb5cb352d24f..13c169017ff1 100644 --- a/velox/exec/PartitionStreamingWindowBuild.h +++ b/velox/exec/PartitionStreamingWindowBuild.h @@ -61,10 +61,10 @@ class PartitionStreamingWindowBuild : public WindowBuild { // Vector of pointers to each input row in the data_ RowContainer. // Rows are erased from data_ when they are output from the // Window operator. - std::vector sortedRows_; + std::vector> sortedRows_; // Holds input rows within the current partition. - std::vector inputRows_; + std::vector> inputRows_; // Indices of the start row (in sortedRows_) of each partition in // the RowContainer data_. This auxiliary structure helps demarcate diff --git a/velox/exec/RowsStreamingWindowBuild.cpp b/velox/exec/RowsStreamingWindowBuild.cpp index 81d4a4f8d00a..4bb1fa6c5515 100644 --- a/velox/exec/RowsStreamingWindowBuild.cpp +++ b/velox/exec/RowsStreamingWindowBuild.cpp @@ -16,6 +16,7 @@ #include "velox/exec/RowsStreamingWindowBuild.h" #include "velox/common/testutil/TestValue.h" +#include namespace facebook::velox::exec { @@ -24,7 +25,8 @@ RowsStreamingWindowBuild::RowsStreamingWindowBuild( velox::memory::MemoryPool* pool, const common::SpillConfig* spillConfig, tsan_atomic* nonReclaimableSection) - : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) { + : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection), + inputRows_(0, memory::StlAllocator(*pool)){ velox::common::testutil::TestValue::adjust( "facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild", this); @@ -37,7 +39,7 @@ void RowsStreamingWindowBuild::addPartitionInputs(bool finished) { if (windowPartitions_.size() <= inputPartition_) { windowPartitions_.push_back(std::make_shared( - data_.get(), inversedInputChannels_, sortKeyInfo_)); + pool_, data_.get(), inversedInputChannels_, sortKeyInfo_)); } windowPartitions_[inputPartition_]->addRows(inputRows_); @@ -77,6 +79,9 @@ void RowsStreamingWindowBuild::addInput(RowVectorPtr input) { } void RowsStreamingWindowBuild::noMoreInput() { + std::cout << "xgbtck rowstreamingwindow no more input " << std::endl; + std::cout << this->pool_->root()->treeMemoryUsage() << std::endl; + addPartitionInputs(true); } diff --git a/velox/exec/RowsStreamingWindowBuild.h b/velox/exec/RowsStreamingWindowBuild.h index c003f1e6a3f9..938771a35a98 100644 --- a/velox/exec/RowsStreamingWindowBuild.h +++ b/velox/exec/RowsStreamingWindowBuild.h @@ -64,7 +64,7 @@ class RowsStreamingWindowBuild : public WindowBuild { void addPartitionInputs(bool finished); // Points to the input rows in the current partition. - std::vector inputRows_; + std::vector> inputRows_; // Used to compare rows based on partitionKeys. char* previousRow_ = nullptr; diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index 400b1edbe636..60cd8444e3cc 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -299,7 +299,7 @@ std::shared_ptr SortWindowBuild::nextPartition() { VELOX_CHECK(!sortedRows_.empty(), "No window partitions available") auto partition = folly::Range(sortedRows_.data(), sortedRows_.size()); return std::make_shared( - data_.get(), partition, inversedInputChannels_, sortKeyInfo_); + pool_, data_.get(), partition, inversedInputChannels_, sortKeyInfo_); } VELOX_CHECK(!partitionStartRows_.empty(), "No window partitions available") @@ -317,7 +317,7 @@ std::shared_ptr SortWindowBuild::nextPartition() { sortedRows_.data() + partitionStartRows_[currentPartition_], partitionSize); return std::make_shared( - data_.get(), partition, inversedInputChannels_, sortKeyInfo_); + pool_, data_.get(), partition, inversedInputChannels_, sortKeyInfo_); } bool SortWindowBuild::hasNextPartition() { diff --git a/velox/exec/Window.cpp b/velox/exec/Window.cpp index 6f60f9fcf5e3..181058c3058c 100644 --- a/velox/exec/Window.cpp +++ b/velox/exec/Window.cpp @@ -19,6 +19,7 @@ #include "velox/exec/RowsStreamingWindowBuild.h" #include "velox/exec/SortWindowBuild.h" #include "velox/exec/Task.h" +#include namespace facebook::velox::exec { @@ -43,13 +44,16 @@ Window::Window( spillConfig_.has_value() ? &spillConfig_.value() : nullptr; if (windowNode->inputsSorted()) { if (supportRowsStreaming()) { + std::cout << "xgbtck streaming window " << std::endl; windowBuild_ = std::make_unique( windowNode_, pool(), spillConfig, &nonReclaimableSection_); } else { + std::cout << "xgbtck partition window " << std::endl; windowBuild_ = std::make_unique( windowNode, pool(), spillConfig, &nonReclaimableSection_); } } else { + std::cout << "xgbtck sort window " << std::endl; windowBuild_ = std::make_unique( windowNode, pool(), spillConfig, &nonReclaimableSection_, &spillStats_); } diff --git a/velox/exec/WindowBuild.cpp b/velox/exec/WindowBuild.cpp index 2c5096be472c..e6030287b714 100644 --- a/velox/exec/WindowBuild.cpp +++ b/velox/exec/WindowBuild.cpp @@ -90,7 +90,8 @@ WindowBuild::WindowBuild( tsan_atomic* nonReclaimableSection) : spillConfig_{spillConfig}, nonReclaimableSection_{nonReclaimableSection}, - decodedInputVectors_(windowNode->inputType()->size()) { + decodedInputVectors_(windowNode->inputType()->size()), + pool_{pool} { std::tie(inputChannels_, inversedInputChannels_, inputType_) = reorderInputChannels( windowNode->inputType(), diff --git a/velox/exec/WindowBuild.h b/velox/exec/WindowBuild.h index 01c470803ed7..29a79c493a7c 100644 --- a/velox/exec/WindowBuild.h +++ b/velox/exec/WindowBuild.h @@ -118,6 +118,8 @@ class WindowBuild { // The maximum number of rows that can fit into an output block. vector_size_t numRowsPerOutput_; + + velox::memory::MemoryPool* pool_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/WindowPartition.cpp b/velox/exec/WindowPartition.cpp index 162817e80d4d..978093e7a890 100644 --- a/velox/exec/WindowPartition.cpp +++ b/velox/exec/WindowPartition.cpp @@ -18,6 +18,7 @@ namespace facebook::velox::exec { WindowPartition::WindowPartition( + velox::memory::MemoryPool* pool, RowContainer* data, const folly::Range& rows, const std::vector& inputMapping, @@ -26,10 +27,12 @@ WindowPartition::WindowPartition( bool complete) : partial_(partial), data_(data), + rows_(0, memory::StlAllocator(*pool)), partition_(rows), complete_(complete), inputMapping_(inputMapping), - sortKeyInfo_(sortKeyInfo) { + sortKeyInfo_(sortKeyInfo) + { VELOX_CHECK_NE(partial_, complete_); VELOX_CHECK_NE(complete_, partition_.empty()); @@ -39,19 +42,21 @@ WindowPartition::WindowPartition( } WindowPartition::WindowPartition( + velox::memory::MemoryPool* pool, RowContainer* data, const folly::Range& rows, const std::vector& inputMapping, const std::vector>& sortKeyInfo) - : WindowPartition(data, rows, inputMapping, sortKeyInfo, false, true) {} + : WindowPartition(pool, data, rows, inputMapping, sortKeyInfo, false, true) {} WindowPartition::WindowPartition( + velox::memory::MemoryPool* pool, RowContainer* data, const std::vector& inputMapping, const std::vector>& sortKeyInfo) - : WindowPartition(data, {}, inputMapping, sortKeyInfo, true, false) {} + : WindowPartition(pool, data, {}, inputMapping, sortKeyInfo, true, false) {} -void WindowPartition::addRows(const std::vector& rows) { +void WindowPartition::addRows(const std::vector>& rows) { checkPartial(); rows_.insert(rows_.end(), rows.begin(), rows.end()); partition_ = folly::Range(rows_.data(), rows_.size()); diff --git a/velox/exec/WindowPartition.h b/velox/exec/WindowPartition.h index 7948611a9829..5c0cf64d54b1 100644 --- a/velox/exec/WindowPartition.h +++ b/velox/exec/WindowPartition.h @@ -17,7 +17,7 @@ #include "velox/exec/RowContainer.h" #include "velox/vector/BaseVector.h" - +#include "velox/common/memory/MemoryPool.h" /// Simple WindowPartition that builds over the RowContainer used for storing /// the input rows in the Window Operator. This works completely in-memory. /// WindowPartition supports partial window partitioning to facilitate @@ -41,6 +41,7 @@ class WindowPartition { /// 'sortKeyInfo' : Order by columns used by the the Window operator. Used to /// get peer rows from the input partition. WindowPartition( + velox::memory::MemoryPool* pool, RowContainer* data, const folly::Range& rows, const std::vector& inputMapping, @@ -51,13 +52,14 @@ class WindowPartition { /// start data processing with a subset of partition rows. 'partial_' flag is /// set for the constructed window partition. WindowPartition( + velox::memory::MemoryPool* pool, RowContainer* data, const std::vector& inputMapping, const std::vector>& sortKeyInfo); /// Adds remaining input 'rows' for a partial window partition. - void addRows(const std::vector& rows); + void addRows(const std::vector>& rows); /// Removes the first 'numRows' in 'rows_' from a partial window partition /// after been processed. @@ -169,6 +171,7 @@ class WindowPartition { private: WindowPartition( + velox::memory::MemoryPool* pool, RowContainer* data, const folly::Range& rows, const std::vector& inputMapping, @@ -236,7 +239,7 @@ class WindowPartition { RowContainer* const data_; // Points to the input rows for partial partition. - std::vector rows_; + std::vector> rows_; // folly::Range is for the partition rows iterator provided by the // Window operator. The pointers are to rows from a RowContainer owned