Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Mar 19, 2024
1 parent b4a5d3e commit 194ebc3
Show file tree
Hide file tree
Showing 16 changed files with 298 additions and 135 deletions.
66 changes: 37 additions & 29 deletions velox/exec/RankLikeWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,30 @@ RankLikeWindowBuild::RankLikeWindowBuild(
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection)
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {
partitionOffsets_.push_back(0);
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {}

void RankLikeWindowBuild::buildNextInputOrPartition(bool isFinished) {
sortedRows_.push_back(inputRows_);
if (windowPartitions_.size() <= inputCurrentPartition_) {
auto partition =
folly::Range(sortedRows_.back().data(), sortedRows_.back().size());

windowPartitions_.push_back(std::make_shared<WindowPartition>(
data_.get(), partition, inputColumns_, sortKeyInfo_, true));
} else {
windowPartitions_[inputCurrentPartition_]->insertNewBatch(
sortedRows_.back());
}

if (isFinished) {
windowPartitions_[inputCurrentPartition_]->setTotalNum(
currentPartitionNum_ - 1);
windowPartitions_[inputCurrentPartition_]->setFinished();

inputRows_.clear();
inputCurrentPartition_++;
currentPartitionNum_ = 1;
}
}

void RankLikeWindowBuild::addInput(RowVectorPtr input) {
Expand All @@ -33,6 +55,7 @@ void RankLikeWindowBuild::addInput(RowVectorPtr input) {
}

for (auto row = 0; row < input->size(); ++row) {
currentPartitionNum_++;
char* newRow = data_->newRow();

for (auto col = 0; col < input->childrenSize(); ++col) {
Expand All @@ -41,49 +64,34 @@ void RankLikeWindowBuild::addInput(RowVectorPtr input) {

if (previousRow_ != nullptr &&
compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) {
sortedRows_.push_back(inputRows_);
partitionOffsets_.push_back(0);
inputRows_.clear();
buildNextInputOrPartition(true);
}

inputRows_.push_back(newRow);
previousRow_ = newRow;
}
partitionOffsets_.push_back(inputRows_.size());
sortedRows_.push_back(inputRows_);

buildNextInputOrPartition(false);

inputRows_.clear();
}

void RankLikeWindowBuild::noMoreInput() {
isFinished_ = true;
windowPartitions_[outputCurrentPartition_]->setTotalNum(
currentPartitionNum_ - 1);
windowPartitions_[outputCurrentPartition_]->setFinished();
inputRows_.clear();
}

std::unique_ptr<WindowPartition> RankLikeWindowBuild::nextPartition() {
currentPartition_++;

if (currentPartition_ > 0) {
// Erase data_ and sortedRows;
data_->eraseRows(folly::Range<char**>(
sortedRows_[currentPartition_ - 1].data(),
sortedRows_[currentPartition_ - 1].size()));
sortedRows_[currentPartition_ - 1].clear();
}

auto partition = folly::Range(
sortedRows_[currentPartition_].data(),
sortedRows_[currentPartition_].size());

auto offset = 0;
for (auto i = currentPartition_; partitionOffsets_[i] != 0; i--) {
offset += partitionOffsets_[i];
}
return std::make_unique<WindowPartition>(
data_.get(), partition, inputColumns_, sortKeyInfo_, offset);
std::shared_ptr<WindowPartition> RankLikeWindowBuild::nextPartition() {
outputCurrentPartition_++;
return windowPartitions_[outputCurrentPartition_];
}

bool RankLikeWindowBuild::hasNextPartition() {
return sortedRows_.size() > 0 && currentPartition_ != sortedRows_.size() - 1;
return windowPartitions_.size() > 0 &&
outputCurrentPartition_ != windowPartitions_.size() - 1;
}

} // namespace facebook::velox::exec
25 changes: 14 additions & 11 deletions velox/exec/RankLikeWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,37 @@ class RankLikeWindowBuild : public WindowBuild {

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;
std::shared_ptr<WindowPartition> nextPartition() override;

bool needsInput() override {
return !isFinished_;
}

private:
void buildNextInputOrPartition(bool isFinished);

// Vector of pointers to each input row in the data_ RowContainer.
// Rows are erased from data_ when they are output from the
// Window operator.
// Rows are erased from data_ when they are processed in WindowPartition.
std::vector<std::vector<char*>> sortedRows_;

// Holds input rows within the current partition.
std::vector<char*> inputRows_;

// Indices of the start row (in sortedRows_) of each partition in
// the RowContainer data_. This auxiliary structure helps demarcate
// partitions.
std::vector<vector_size_t> partitionOffsets_;

// Used to compare rows based on partitionKeys.
char* previousRow_ = nullptr;

// Current partition being output. Used to construct WindowPartitions
// during resetPartition.
vector_size_t currentPartition_ = -1;
// Current partition being output. Used to return the WidnowPartitions.
vector_size_t outputCurrentPartition_ = -1;

bool isFinished_ = false;

// Current partition when adding input. Used to construct WindowPartitions.
vector_size_t inputCurrentPartition_ = 0;

std::vector<std::shared_ptr<WindowPartition>> windowPartitions_;

// Records the total rows number in each partition.
vector_size_t currentPartitionNum_ = 0;
};

} // namespace facebook::velox::exec
4 changes: 2 additions & 2 deletions velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ void SortWindowBuild::loadNextPartitionFromSpill() {
}
}

std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
std::shared_ptr<WindowPartition> SortWindowBuild::nextPartition() {
if (merge_ != nullptr) {
VELOX_CHECK(!sortedRows_.empty(), "No window partitions available")
auto partition = folly::Range(sortedRows_.data(), sortedRows_.size());
Expand All @@ -310,7 +310,7 @@ std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
auto partition = folly::Range(
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);
return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inputColumns_, sortKeyInfo_);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class SortWindowBuild : public WindowBuild {

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;
std::shared_ptr<WindowPartition> nextPartition() override;

private:
void ensureInputFits(const RowVectorPtr& input);
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/StreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void StreamingWindowBuild::noMoreInput() {
partitionStartRows_.push_back(sortedRows_.size());
}

std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
std::shared_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
VELOX_CHECK_GT(
partitionStartRows_.size(), 0, "No window partitions available")

Expand Down Expand Up @@ -89,7 +89,7 @@ std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);

return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inputColumns_, sortKeyInfo_);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/StreamingWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class StreamingWindowBuild : public WindowBuild {

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;
std::shared_ptr<WindowPartition> nextPartition() override;

bool needsInput() override {
// No partitions are available or the currentPartition is the last available
Expand Down
140 changes: 77 additions & 63 deletions velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "velox/exec/Window.h"
#include <iostream>
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/RankLikeWindowBuild.h"
#include "velox/exec/SortWindowBuild.h"
Expand All @@ -22,6 +23,48 @@

namespace facebook::velox::exec {

Window::Window(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::WindowNode>& windowNode)
: Operator(
driverCtx,
windowNode->outputType(),
operatorId,
windowNode->id(),
"Window",
windowNode->canSpill(driverCtx->queryConfig())
? driverCtx->makeSpillConfig(operatorId)
: std::nullopt),
numInputColumns_(windowNode->inputType()->size()),
windowNode_(windowNode),
currentPartition_(nullptr),
stringAllocator_(pool()) {
auto* spillConfig =
spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
if (windowNode->inputsSorted()) {
if (supportRankWindowBuild()) {
windowBuild_ = std::make_unique<RankLikeWindowBuild>(
windowNode_, pool(), spillConfig, &nonReclaimableSection_);
} else {
windowBuild_ = std::make_unique<StreamingWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
}

} else {
windowBuild_ = std::make_unique<SortWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
}
}

void Window::initialize() {
Operator::initialize();
VELOX_CHECK_NOT_NULL(windowNode_);
createWindowFunctions();
createPeerAndFrameBuffers();
windowNode_.reset();
}

namespace {
void checkRowFrameBounds(const core::WindowNode::Frame& frame) {
auto frameBoundCheck = [&](const core::TypedExprPtr& frameValue) -> void {
Expand Down Expand Up @@ -71,70 +114,8 @@ void checkKRangeFrameBounds(
frameBoundCheck(frame.endValue);
}

// The RankLikeWindowBuild is designed to support 'rank', 'dense_rank', and
// 'row_number' functions with a default frame.
bool checkRankLikeWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode) {
for (const auto& windowNodeFunction : windowNode->windowFunctions()) {
const auto& functionName = windowNodeFunction.functionCall->name();
const auto& frame = windowNodeFunction.frame;

bool isRankLikeFunction =
(functionName == "rank" || functionName == "row_number");
bool isDefaultFrame =
(frame.startType == core::WindowNode::BoundType::kUnboundedPreceding &&
frame.endType == core::WindowNode::BoundType::kCurrentRow);

if (!isRankLikeFunction || !isDefaultFrame) {
return false;
}
}
return true;
}

} // namespace

Window::Window(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::WindowNode>& windowNode)
: Operator(
driverCtx,
windowNode->outputType(),
operatorId,
windowNode->id(),
"Window",
windowNode->canSpill(driverCtx->queryConfig())
? driverCtx->makeSpillConfig(operatorId)
: std::nullopt),
numInputColumns_(windowNode->inputType()->size()),
windowNode_(windowNode),
currentPartition_(nullptr),
stringAllocator_(pool()) {
auto* spillConfig =
spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
if (windowNode->inputsSorted()) {
if (checkRankLikeWindowBuild(windowNode)) {
windowBuild_ = std::make_unique<RankLikeWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
} else {
windowBuild_ = std::make_unique<StreamingWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
}
} else {
windowBuild_ = std::make_unique<SortWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
}
}

void Window::initialize() {
Operator::initialize();
VELOX_CHECK_NOT_NULL(windowNode_);
createWindowFunctions();
createPeerAndFrameBuffers();
windowNode_.reset();
}

Window::WindowFrame Window::createWindowFrame(
const std::shared_ptr<const core::WindowNode>& windowNode,
const core::WindowNode::Frame& frame,
Expand Down Expand Up @@ -214,6 +195,28 @@ void Window::createWindowFunctions() {
}
}

// The supportRankWindowBuild is designed to support 'rank' and
// 'row_number' functions with a default frame.
bool Window::supportRankWindowBuild() {
for (const auto& windowNodeFunction : windowNode_->windowFunctions()) {
bool isRankFunction = exec::getWindowFunctionSignatures(
windowNodeFunction.functionCall->name())
.value()[0]
->streaming();
bool isDefaultFrame =
(windowNodeFunction.frame.startType ==
core::WindowNode::BoundType::kUnboundedPreceding &&
windowNodeFunction.frame.endType ==
core::WindowNode::BoundType::kCurrentRow);

if (!(isRankFunction) || !isDefaultFrame) {
return false;
}
}

return true;
}

void Window::addInput(RowVectorPtr input) {
windowBuild_->addInput(input);
numRows_ += input->size();
Expand Down Expand Up @@ -574,7 +577,14 @@ vector_size_t Window::callApplyLoop(
result);
resultIndex += rowsForCurrentPartition;
numOutputRowsLeft -= rowsForCurrentPartition;
callResetPartition();
if (currentPartition_->isFinished()) {
callResetPartition();
} else {
// Break until the next getOutput call to handle the remaining data in
// currentPartition_.
break;
}

if (!currentPartition_) {
// The WindowBuild doesn't have any more partitions to process right
// now. So break until the next getOutput call.
Expand Down Expand Up @@ -616,6 +626,10 @@ RowVectorPtr Window::getOutput() {
}
}

if (!currentPartition_->isFinished()) {
currentPartition_->buildNextBatch();
}

auto numOutputRows = std::min(numRowsPerOutput_, numRowsLeft);
auto result = BaseVector::create<RowVector>(
outputType_, numOutputRows, operatorCtx_->pool());
Expand Down
Loading

0 comments on commit 194ebc3

Please sign in to comment.