Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RowsStreamingWindowBuild to avoid OOM in Window operator #9025

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions velox/exec/AggregateWindow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ void registerAggregateWindowFunction(const std::string& name) {
exec::registerWindowFunction(
name,
std::move(signatures),
{exec::WindowFunction::ProcessMode::kRows, true},
[name](
const std::vector<exec::WindowFunctionArg>& args,
const TypePtr& resultType,
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ velox_add_library(
OutputBufferManager.cpp
PartitionedOutput.cpp
PartitionFunction.cpp
PartitionStreamingWindowBuild.cpp
PlanNodeStats.cpp
PrefixSort.cpp
ProbeOperatorState.cpp
RowsStreamingWindowBuild.cpp
RowContainer.cpp
RowNumber.cpp
SortBuffer.cpp
Expand All @@ -71,7 +73,6 @@ velox_add_library(
SpillFile.cpp
Spiller.cpp
StreamingAggregation.cpp
StreamingWindowBuild.cpp
Strings.cpp
TableScan.cpp
TableWriteMerge.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@
* limitations under the License.
*/

#include "velox/exec/StreamingWindowBuild.h"
#include "velox/exec/PartitionStreamingWindowBuild.h"

namespace facebook::velox::exec {

StreamingWindowBuild::StreamingWindowBuild(
PartitionStreamingWindowBuild::PartitionStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection)
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {}

void StreamingWindowBuild::buildNextPartition() {
void PartitionStreamingWindowBuild::buildNextPartition() {
partitionStartRows_.push_back(sortedRows_.size());
sortedRows_.insert(sortedRows_.end(), inputRows_.begin(), inputRows_.end());
inputRows_.clear();
}

void StreamingWindowBuild::addInput(RowVectorPtr input) {
void PartitionStreamingWindowBuild::addInput(RowVectorPtr input) {
for (auto i = 0; i < inputChannels_.size(); ++i) {
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i]));
}
Expand All @@ -53,14 +53,15 @@ void StreamingWindowBuild::addInput(RowVectorPtr input) {
}
}

void StreamingWindowBuild::noMoreInput() {
void PartitionStreamingWindowBuild::noMoreInput() {
buildNextPartition();

// Help for last partition related calculations.
partitionStartRows_.push_back(sortedRows_.size());
}

std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
std::shared_ptr<WindowPartition>
PartitionStreamingWindowBuild::nextPartition() {
VELOX_CHECK_GT(
partitionStartRows_.size(), 0, "No window partitions available")

Expand Down Expand Up @@ -91,11 +92,11 @@ std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);

return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

bool StreamingWindowBuild::hasNextPartition() {
bool PartitionStreamingWindowBuild::hasNextPartition() {
return partitionStartRows_.size() > 0 &&
currentPartition_ < int(partitionStartRows_.size() - 2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

namespace facebook::velox::exec {

/// The StreamingWindowBuild is used when the input data is already sorted by
/// {partition keys + order by keys}. The logic identifies partition changes
/// when receiving input rows and splits out WindowPartitions for the Window
/// operator to process.
class StreamingWindowBuild : public WindowBuild {
/// The PartitionStreamingWindowBuild is used when the input data is already
/// sorted by {partition keys + order by keys}. The logic identifies partition
/// changes when receiving input rows and splits out WindowPartitions for the
/// Window operator to process.
class PartitionStreamingWindowBuild : public WindowBuild {
public:
StreamingWindowBuild(
PartitionStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
Expand All @@ -46,7 +46,7 @@ class StreamingWindowBuild : public WindowBuild {

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;
std::shared_ptr<WindowPartition> nextPartition() override;

bool needsInput() override {
// No partitions are available or the currentPartition is the last available
Expand Down
93 changes: 93 additions & 0 deletions velox/exec/RowsStreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/RowsStreamingWindowBuild.h"
#include "velox/common/testutil/TestValue.h"

namespace facebook::velox::exec {

RowsStreamingWindowBuild::RowsStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection)
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {
velox::common::testutil::TestValue::adjust(
"facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild",
this);
}

void RowsStreamingWindowBuild::addPartitionInputs(bool finished) {
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved
if (inputRows_.empty()) {
return;
}

if (windowPartitions_.size() <= inputPartition_) {
windowPartitions_.push_back(std::make_shared<WindowPartition>(
data_.get(), inversedInputChannels_, sortKeyInfo_));
}

windowPartitions_[inputPartition_]->addRows(inputRows_);

if (finished) {
windowPartitions_[inputPartition_]->setComplete();
++inputPartition_;
}

inputRows_.clear();
}

void RowsStreamingWindowBuild::addInput(RowVectorPtr input) {
for (auto i = 0; i < inputChannels_.size(); ++i) {
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i]));
}

for (auto row = 0; row < input->size(); ++row) {
char* newRow = data_->newRow();

for (auto col = 0; col < input->childrenSize(); ++col) {
data_->store(decodedInputVectors_[col], row, newRow, col);
}

if (previousRow_ != nullptr &&
compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) {
addPartitionInputs(true);
}

if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_) {
addPartitionInputs(false);
}

inputRows_.push_back(newRow);
previousRow_ = newRow;
}
}

void RowsStreamingWindowBuild::noMoreInput() {
addPartitionInputs(true);
}

std::shared_ptr<WindowPartition> RowsStreamingWindowBuild::nextPartition() {
VELOX_CHECK(hasNextPartition());
return windowPartitions_[++outputPartition_];
}

bool RowsStreamingWindowBuild::hasNextPartition() {
return !windowPartitions_.empty() &&
outputPartition_ + 2 <= windowPartitions_.size();
}

} // namespace facebook::velox::exec
82 changes: 82 additions & 0 deletions velox/exec/RowsStreamingWindowBuild.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/exec/WindowBuild.h"

namespace facebook::velox::exec {

/// Unlike PartitionStreamingWindowBuild, RowsStreamingWindowBuild is capable of
/// processing window functions as rows arrive within a single partition,
/// without the need to wait for the entirewindow partition to be ready. This
/// approach can significantly reduce memory usage, especially when a single
/// partition contains a large amount of data. It is particularly suited for
/// optimizing rank, dense_rank and row_number functions, as well as aggregate
/// window functions with a default frame.
class RowsStreamingWindowBuild : public WindowBuild {
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved
public:
RowsStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection);

void addInput(RowVectorPtr input) override;

void spill() override {
VELOX_UNREACHABLE();
}

std::optional<common::SpillStats> spilledStats() const override {
return std::nullopt;
}

void noMoreInput() override;

bool hasNextPartition() override;

std::shared_ptr<WindowPartition> nextPartition() override;
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved

bool needsInput() override {
// No partitions are available or the currentPartition is the last available
// one, so can consume input rows.
return windowPartitions_.empty() ||
outputPartition_ == windowPartitions_.size() - 1;
}

private:
// Adds input rows to the current partition, or creates a new partition if it
// does not exist.
void addPartitionInputs(bool finished);

// Points to the input rows in the current partition.
std::vector<char*> inputRows_;

// Used to compare rows based on partitionKeys.
char* previousRow_ = nullptr;

// Point to the current output partition if not -1.
vector_size_t outputPartition_ = -1;

// Current input partition that receives inputs.
vector_size_t inputPartition_ = 0;

// Holds all the built window partitions.
std::vector<std::shared_ptr<WindowPartition>> windowPartitions_;
};

} // namespace facebook::velox::exec
6 changes: 3 additions & 3 deletions velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,11 @@ void SortWindowBuild::loadNextPartitionFromSpill() {
}
}

std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
std::shared_ptr<WindowPartition> SortWindowBuild::nextPartition() {
if (merge_ != nullptr) {
VELOX_CHECK(!sortedRows_.empty(), "No window partitions available")
auto partition = folly::Range(sortedRows_.data(), sortedRows_.size());
return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

Expand All @@ -316,7 +316,7 @@ std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
auto partition = folly::Range(
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);
return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class SortWindowBuild : public WindowBuild {

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;
std::shared_ptr<WindowPartition> nextPartition() override;

private:
void ensureInputFits(const RowVectorPtr& input);
Expand Down
Loading
Loading