Skip to content

Commit

Permalink
allocate vector using pool
Browse files Browse the repository at this point in the history
  • Loading branch information
FelixYBW committed Sep 20, 2024
1 parent eeb4b1e commit 2ceab3e
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 17 deletions.
6 changes: 4 additions & 2 deletions velox/exec/PartitionStreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ PartitionStreamingWindowBuild::PartitionStreamingWindowBuild(
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection)
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {}
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection),
sortedRows_(0,memory::StlAllocator<char*>(*pool)),
inputRows_(0,memory::StlAllocator<char*>(*pool)) {}

void PartitionStreamingWindowBuild::buildNextPartition() {
partitionStartRows_.push_back(sortedRows_.size());
Expand Down Expand Up @@ -93,7 +95,7 @@ PartitionStreamingWindowBuild::nextPartition() {
partitionSize);

return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
pool_, data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

bool PartitionStreamingWindowBuild::hasNextPartition() {
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/PartitionStreamingWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ class PartitionStreamingWindowBuild : public WindowBuild {
// Vector of pointers to each input row in the data_ RowContainer.
// Rows are erased from data_ when they are output from the
// Window operator.
std::vector<char*> sortedRows_;
std::vector<char*, memory::StlAllocator<char*>> sortedRows_;

// Holds input rows within the current partition.
std::vector<char*> inputRows_;
std::vector<char*, memory::StlAllocator<char*>> inputRows_;

// Indices of the start row (in sortedRows_) of each partition in
// the RowContainer data_. This auxiliary structure helps demarcate
Expand Down
9 changes: 7 additions & 2 deletions velox/exec/RowsStreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "velox/exec/RowsStreamingWindowBuild.h"
#include "velox/common/testutil/TestValue.h"
#include <iostream>

namespace facebook::velox::exec {

Expand All @@ -24,7 +25,8 @@ RowsStreamingWindowBuild::RowsStreamingWindowBuild(
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection)
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection),
inputRows_(0, memory::StlAllocator<char*>(*pool)){
velox::common::testutil::TestValue::adjust(
"facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild",
this);
Expand All @@ -37,7 +39,7 @@ void RowsStreamingWindowBuild::addPartitionInputs(bool finished) {

if (windowPartitions_.size() <= inputPartition_) {
windowPartitions_.push_back(std::make_shared<WindowPartition>(
data_.get(), inversedInputChannels_, sortKeyInfo_));
pool_, data_.get(), inversedInputChannels_, sortKeyInfo_));
}

windowPartitions_[inputPartition_]->addRows(inputRows_);
Expand Down Expand Up @@ -77,6 +79,9 @@ void RowsStreamingWindowBuild::addInput(RowVectorPtr input) {
}

void RowsStreamingWindowBuild::noMoreInput() {
std::cout << "xgbtck rowstreamingwindow no more input " << std::endl;
std::cout << this->pool_->root()->treeMemoryUsage() << std::endl;

addPartitionInputs(true);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/RowsStreamingWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class RowsStreamingWindowBuild : public WindowBuild {
void addPartitionInputs(bool finished);

// Points to the input rows in the current partition.
std::vector<char*> inputRows_;
std::vector<char*, memory::StlAllocator<char*>> inputRows_;

// Used to compare rows based on partitionKeys.
char* previousRow_ = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ std::shared_ptr<WindowPartition> SortWindowBuild::nextPartition() {
VELOX_CHECK(!sortedRows_.empty(), "No window partitions available")
auto partition = folly::Range(sortedRows_.data(), sortedRows_.size());
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
pool_, data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

VELOX_CHECK(!partitionStartRows_.empty(), "No window partitions available")
Expand All @@ -317,7 +317,7 @@ std::shared_ptr<WindowPartition> SortWindowBuild::nextPartition() {
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
pool_, data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

bool SortWindowBuild::hasNextPartition() {
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "velox/exec/RowsStreamingWindowBuild.h"
#include "velox/exec/SortWindowBuild.h"
#include "velox/exec/Task.h"
#include <iostream>

namespace facebook::velox::exec {

Expand All @@ -43,13 +44,16 @@ Window::Window(
spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
if (windowNode->inputsSorted()) {
if (supportRowsStreaming()) {
std::cout << "xgbtck streaming window " << std::endl;
windowBuild_ = std::make_unique<RowsStreamingWindowBuild>(
windowNode_, pool(), spillConfig, &nonReclaimableSection_);
} else {
std::cout << "xgbtck partition window " << std::endl;
windowBuild_ = std::make_unique<PartitionStreamingWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
}
} else {
std::cout << "xgbtck sort window " << std::endl;
windowBuild_ = std::make_unique<SortWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_, &spillStats_);
}
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/WindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ WindowBuild::WindowBuild(
tsan_atomic<bool>* nonReclaimableSection)
: spillConfig_{spillConfig},
nonReclaimableSection_{nonReclaimableSection},
decodedInputVectors_(windowNode->inputType()->size()) {
decodedInputVectors_(windowNode->inputType()->size()),
pool_{pool} {
std::tie(inputChannels_, inversedInputChannels_, inputType_) =
reorderInputChannels(
windowNode->inputType(),
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/WindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class WindowBuild {

// The maximum number of rows that can fit into an output block.
vector_size_t numRowsPerOutput_;

velox::memory::MemoryPool* pool_;
};

} // namespace facebook::velox::exec
13 changes: 9 additions & 4 deletions velox/exec/WindowPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
namespace facebook::velox::exec {

WindowPartition::WindowPartition(
velox::memory::MemoryPool* pool,
RowContainer* data,
const folly::Range<char**>& rows,
const std::vector<column_index_t>& inputMapping,
Expand All @@ -26,10 +27,12 @@ WindowPartition::WindowPartition(
bool complete)
: partial_(partial),
data_(data),
rows_(0, memory::StlAllocator<char*>(*pool)),
partition_(rows),
complete_(complete),
inputMapping_(inputMapping),
sortKeyInfo_(sortKeyInfo) {
sortKeyInfo_(sortKeyInfo)
{
VELOX_CHECK_NE(partial_, complete_);
VELOX_CHECK_NE(complete_, partition_.empty());

Expand All @@ -39,19 +42,21 @@ WindowPartition::WindowPartition(
}

WindowPartition::WindowPartition(
velox::memory::MemoryPool* pool,
RowContainer* data,
const folly::Range<char**>& rows,
const std::vector<column_index_t>& inputMapping,
const std::vector<std::pair<column_index_t, core::SortOrder>>& sortKeyInfo)
: WindowPartition(data, rows, inputMapping, sortKeyInfo, false, true) {}
: WindowPartition(pool, data, rows, inputMapping, sortKeyInfo, false, true) {}

WindowPartition::WindowPartition(
velox::memory::MemoryPool* pool,
RowContainer* data,
const std::vector<column_index_t>& inputMapping,
const std::vector<std::pair<column_index_t, core::SortOrder>>& sortKeyInfo)
: WindowPartition(data, {}, inputMapping, sortKeyInfo, true, false) {}
: WindowPartition(pool, data, {}, inputMapping, sortKeyInfo, true, false) {}

void WindowPartition::addRows(const std::vector<char*>& rows) {
void WindowPartition::addRows(const std::vector<char*, memory::StlAllocator<char*>>& rows) {
checkPartial();
rows_.insert(rows_.end(), rows.begin(), rows.end());
partition_ = folly::Range(rows_.data(), rows_.size());
Expand Down
9 changes: 6 additions & 3 deletions velox/exec/WindowPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#include "velox/exec/RowContainer.h"
#include "velox/vector/BaseVector.h"

#include "velox/common/memory/MemoryPool.h"
/// Simple WindowPartition that builds over the RowContainer used for storing
/// the input rows in the Window Operator. This works completely in-memory.
/// WindowPartition supports partial window partitioning to facilitate
Expand All @@ -41,6 +41,7 @@ class WindowPartition {
/// 'sortKeyInfo' : Order by columns used by the the Window operator. Used to
/// get peer rows from the input partition.
WindowPartition(
velox::memory::MemoryPool* pool,
RowContainer* data,
const folly::Range<char**>& rows,
const std::vector<column_index_t>& inputMapping,
Expand All @@ -51,13 +52,14 @@ class WindowPartition {
/// start data processing with a subset of partition rows. 'partial_' flag is
/// set for the constructed window partition.
WindowPartition(
velox::memory::MemoryPool* pool,
RowContainer* data,
const std::vector<column_index_t>& inputMapping,
const std::vector<std::pair<column_index_t, core::SortOrder>>&
sortKeyInfo);

/// Adds remaining input 'rows' for a partial window partition.
void addRows(const std::vector<char*>& rows);
void addRows(const std::vector<char*, memory::StlAllocator<char*>>& rows);

/// Removes the first 'numRows' in 'rows_' from a partial window partition
/// after been processed.
Expand Down Expand Up @@ -169,6 +171,7 @@ class WindowPartition {

private:
WindowPartition(
velox::memory::MemoryPool* pool,
RowContainer* data,
const folly::Range<char**>& rows,
const std::vector<column_index_t>& inputMapping,
Expand Down Expand Up @@ -236,7 +239,7 @@ class WindowPartition {
RowContainer* const data_;

// Points to the input rows for partial partition.
std::vector<char*> rows_;
std::vector<char*, memory::StlAllocator<char*>> rows_;

// folly::Range is for the partition rows iterator provided by the
// Window operator. The pointers are to rows from a RowContainer owned
Expand Down

0 comments on commit 2ceab3e

Please sign in to comment.