Skip to content

Commit

Permalink
Fix Operator outputBatchRows may overflow (facebookincubator#10868)
Browse files Browse the repository at this point in the history
Summary:
The computation of function outputBatchRows() may overflow, fix it. And refactor the relevant output batch size config from uint32_t to vector_size_t(int32_t) because the RowVector numRows type is vector_size_t.

Pull Request resolved: facebookincubator#10868

Reviewed By: gggrace14

Differential Revision: D62013297

Pulled By: xiaoxmeng

fbshipit-source-id: 087b603967ff3666624e8d4c8b1a23c6130846f9
  • Loading branch information
jinchengchenghh authored and facebook-github-bot committed Aug 30, 2024
1 parent 205cbdf commit 4499332
Show file tree
Hide file tree
Showing 16 changed files with 115 additions and 62 deletions.
14 changes: 10 additions & 4 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once

#include "velox/common/config/Config.h"
#include "velox/vector/TypeAliases.h"

namespace facebook::velox::core {

Expand Down Expand Up @@ -439,12 +440,17 @@ class QueryConfig {
return get<uint64_t>(kPreferredOutputBatchBytes, kDefault);
}

uint32_t preferredOutputBatchRows() const {
return get<uint32_t>(kPreferredOutputBatchRows, 1024);
vector_size_t preferredOutputBatchRows() const {
const uint32_t batchRows = get<uint32_t>(kPreferredOutputBatchRows, 1024);
VELOX_USER_CHECK_LE(batchRows, std::numeric_limits<vector_size_t>::max());
return batchRows;
}

uint32_t maxOutputBatchRows() const {
return get<uint32_t>(kMaxOutputBatchRows, 10'000);
vector_size_t maxOutputBatchRows() const {
const uint32_t maxBatchRows = get<uint32_t>(kMaxOutputBatchRows, 10'000);
VELOX_USER_CHECK_LE(
maxBatchRows, std::numeric_limits<vector_size_t>::max());
return maxBatchRows;
}

uint32_t tableScanGetOutputTimeLimitMs() const {
Expand Down
14 changes: 10 additions & 4 deletions velox/dwio/common/SortingWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace facebook::velox::dwio::common {
SortingWriter::SortingWriter(
std::unique_ptr<Writer> writer,
std::unique_ptr<exec::SortBuffer> sortBuffer,
uint32_t maxOutputRowsConfig,
vector_size_t maxOutputRowsConfig,
uint64_t maxOutputBytesConfig)
: outputWriter_(std::move(writer)),
maxOutputRowsConfig_(maxOutputRowsConfig),
Expand Down Expand Up @@ -111,12 +111,18 @@ uint64_t SortingWriter::reclaim(
stats);
}

uint32_t SortingWriter::outputBatchRows() {
uint32_t estimatedMaxOutputRows = UINT_MAX;
vector_size_t SortingWriter::outputBatchRows() {
vector_size_t estimatedMaxOutputRows =
std::numeric_limits<vector_size_t>::max();
if (sortBuffer_->estimateOutputRowSize().has_value() &&
sortBuffer_->estimateOutputRowSize().value() != 0) {
estimatedMaxOutputRows =
const uint64_t maxOutputRows =
maxOutputBytesConfig_ / sortBuffer_->estimateOutputRowSize().value();
if (UNLIKELY(maxOutputRows > std::numeric_limits<vector_size_t>::max())) {
return maxOutputRowsConfig_;
}

estimatedMaxOutputRows = maxOutputRows;
}
return std::min(estimatedMaxOutputRows, maxOutputRowsConfig_);
}
Expand Down
6 changes: 3 additions & 3 deletions velox/dwio/common/SortingWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class SortingWriter : public Writer {
SortingWriter(
std::unique_ptr<Writer> writer,
std::unique_ptr<exec::SortBuffer> sortBuffer,
uint32_t maxOutputRowsConfig,
vector_size_t maxOutputRowsConfig,
uint64_t maxOutputBytesConfig);

~SortingWriter() override;
Expand Down Expand Up @@ -73,10 +73,10 @@ class SortingWriter : public Writer {

uint64_t reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats);

uint32_t outputBatchRows();
vector_size_t outputBatchRows();

const std::unique_ptr<Writer> outputWriter_;
const uint32_t maxOutputRowsConfig_;
const vector_size_t maxOutputRowsConfig_;
const uint64_t maxOutputBytesConfig_;
memory::MemoryPool* const sortPool_;
const bool canReclaim_;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class HashProbe : public Operator {
void clearBuffers();

// TODO: Define batch size as bytes based on RowContainer row sizes.
const uint32_t outputBatchSize_;
const vector_size_t outputBatchSize_;

const std::shared_ptr<const core::HashJoinNode> joinNode_;

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class Merge : public SourceOperator {
void initializeTreeOfLosers();

/// Maximum number of rows in the output batch.
const uint32_t outputBatchSize_;
const vector_size_t outputBatchSize_;

std::vector<std::pair<column_index_t, CompareFlags>> sortingKeys_;

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/MergeJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ MergeJoin::MergeJoin(
operatorId,
joinNode->id(),
"MergeJoin"),
outputBatchSize_{static_cast<vector_size_t>(outputBatchRows())},
outputBatchSize_{outputBatchRows()},
joinType_{joinNode->joinType()},
numKeys_{joinNode->leftKeys().size()},
joinNode_(joinNode) {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/NestedLoopJoinProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ class NestedLoopJoinProbe : public Operator {
// Output buffer members.

// Maximum number of rows in the output batch.
const uint32_t outputBatchSize_;
const vector_size_t outputBatchSize_;

// The current output batch being populated.
RowVectorPtr output_;
Expand Down
15 changes: 8 additions & 7 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,22 +257,23 @@ OperatorStats Operator::stats(bool clear) {
return stats;
}

uint32_t Operator::outputBatchRows(
vector_size_t Operator::outputBatchRows(
std::optional<uint64_t> averageRowSize) const {
const auto& queryConfig = operatorCtx_->task()->queryCtx()->queryConfig();

if (!averageRowSize.has_value()) {
return queryConfig.preferredOutputBatchRows();
}

const uint64_t rowSize = averageRowSize.value();
if (averageRowSize.value() == 0) {
return queryConfig.maxOutputBatchRows();
}

if (rowSize * queryConfig.maxOutputBatchRows() <
queryConfig.preferredOutputBatchBytes()) {
const uint64_t batchSize =
queryConfig.preferredOutputBatchBytes() / averageRowSize.value();
if (batchSize > queryConfig.maxOutputBatchRows()) {
return queryConfig.maxOutputBatchRows();
}
return std::max<uint32_t>(
queryConfig.preferredOutputBatchBytes() / rowSize, 1);
return std::max<vector_size_t>(batchSize, 1);
}

void Operator::recordBlockingTime(uint64_t start, BlockingReason reason) {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ class Operator : public BaseRuntimeStatWriter {
/// must not be negative. If the averageRowSize is 0 which is not advised,
/// returns maxOutputBatchRows. If the averageRowSize is not given, returns
/// preferredOutputBatchRows.
uint32_t outputBatchRows(
vector_size_t outputBatchRows(
std::optional<uint64_t> averageRowSize = std::nullopt) const;

/// Invoked to record spill stats in operator stats.
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/OrderBy.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,6 @@ class OrderBy : public Operator {
private:
std::unique_ptr<SortBuffer> sortBuffer_;
bool finished_ = false;
uint32_t maxOutputRows_;
vector_size_t maxOutputRows_;
};
} // namespace facebook::velox::exec
4 changes: 2 additions & 2 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void SortBuffer::noMoreInput() {
pool_->release();
}

RowVectorPtr SortBuffer::getOutput(uint32_t maxOutputRows) {
RowVectorPtr SortBuffer::getOutput(vector_size_t maxOutputRows) {
VELOX_CHECK(noMoreInput_);

if (numOutputRows_ == numInputRows_) {
Expand Down Expand Up @@ -284,7 +284,7 @@ void SortBuffer::spillOutput() {
finishSpill();
}

void SortBuffer::prepareOutput(uint32_t maxOutputRows) {
void SortBuffer::prepareOutput(vector_size_t maxOutputRows) {
VELOX_CHECK_GT(maxOutputRows, 0);
VELOX_CHECK_GT(numInputRows_, numOutputRows_);

Expand Down
4 changes: 2 additions & 2 deletions velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class SortBuffer {
void noMoreInput();

/// Returns the sorted output rows in batch.
RowVectorPtr getOutput(uint32_t maxOutputRows);
RowVectorPtr getOutput(vector_size_t maxOutputRows);

/// Indicates if this sort buffer can spill or not.
bool canSpill() const {
Expand All @@ -71,7 +71,7 @@ class SortBuffer {
void ensureInputFits(const VectorPtr& input);
void updateEstimatedOutputRowSize();
// Invoked to initialize or reset the reusable output buffer to get output.
void prepareOutput(uint32_t maxOutputRows);
void prepareOutput(vector_size_t maxOutputRows);
void getOutputWithoutSpill();
void getOutputWithSpill();
// Spill during input stage.
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/StreamingAggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class StreamingAggregation : public Operator {
void initializeAggregates(uint32_t numKeys);

/// Maximum number of rows in the output batch.
const uint32_t outputBatchSize_;
const vector_size_t outputBatchSize_;

// Used at initialize() and gets reset() afterward.
std::shared_ptr<const core::AggregationNode> aggregationNode_;
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ class TableScan : public SourceOperator {
// Count of splits that finished preloading before being read.
int32_t numReadyPreloadedSplits_{0};

int32_t readBatchSize_;
int32_t maxReadBatchSize_;
vector_size_t readBatchSize_;
vector_size_t maxReadBatchSize_;

// Exits getOutput() method after this many milliseconds. Zero means 'no
// limit'.
Expand Down
98 changes: 69 additions & 29 deletions velox/exec/tests/OperatorUtilsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,56 @@ class OperatorUtilsTest : public OperatorTestBase {
}
}

void setTaskOutputBatchConfig(
uint32_t preferredBatchSize,
uint32_t maxRows,
uint64_t preferredBytes) {
std::unordered_map<std::string, std::string> configs;
configs[core::QueryConfig::kPreferredOutputBatchRows] =
std::to_string(preferredBatchSize);
configs[core::QueryConfig::kMaxOutputBatchRows] = std::to_string(maxRows);
configs[core::QueryConfig::kPreferredOutputBatchBytes] =
std::to_string(preferredBytes);
task_->queryCtx()->testingOverrideConfigUnsafe(std::move(configs));
}

class MockOperator : public Operator {
public:
MockOperator(
DriverCtx* driverCtx,
RowTypePtr rowType,
std::string operatorType = "MockType")
: Operator(
driverCtx,
std::move(rowType),
0,
"MockOperator",
operatorType) {}

bool needsInput() const override {
return false;
}

void addInput(RowVectorPtr input) override {}

RowVectorPtr getOutput() override {
return nullptr;
}

BlockingReason isBlocked(ContinueFuture* future) override {
return BlockingReason::kNotBlocked;
}

bool isFinished() override {
return false;
}

vector_size_t outputRows(
std::optional<uint64_t> averageRowSize = std::nullopt) const {
return outputBatchRows(averageRowSize);
}
};

std::shared_ptr<folly::CPUThreadPoolExecutor> executor_;
std::shared_ptr<Task> task_;
std::shared_ptr<Driver> driver_;
Expand Down Expand Up @@ -392,35 +442,6 @@ TEST_F(OperatorUtilsTest, projectChildren) {
}

TEST_F(OperatorUtilsTest, reclaimableSectionGuard) {
class MockOperator : public Operator {
public:
MockOperator(DriverCtx* driverCtx, RowTypePtr rowType)
: Operator(
driverCtx,
std::move(rowType),
0,
"MockOperator",
"MockType") {}

bool needsInput() const override {
return false;
}

void addInput(RowVectorPtr input) override {}

RowVectorPtr getOutput() override {
return nullptr;
}

BlockingReason isBlocked(ContinueFuture* future) override {
return BlockingReason::kNotBlocked;
}

bool isFinished() override {
return false;
}
};

RowTypePtr rowType = ROW({"c0"}, {INTEGER()});

MockOperator mockOp(driverCtx_.get(), rowType);
Expand Down Expand Up @@ -491,3 +512,22 @@ TEST_F(OperatorUtilsTest, dynamicFilterStats) {
dynamicFilterStats.clear();
ASSERT_TRUE(dynamicFilterStats.empty());
}

TEST_F(OperatorUtilsTest, outputBatchRows) {
RowTypePtr rowType = ROW({"c0"}, {INTEGER()});
{
setTaskOutputBatchConfig(10, 20, 234);
MockOperator mockOp(driverCtx_.get(), rowType, "MockType1");
ASSERT_EQ(10, mockOp.outputRows(std::nullopt));
ASSERT_EQ(20, mockOp.outputRows(1));
ASSERT_EQ(20, mockOp.outputRows(0));
ASSERT_EQ(1, mockOp.outputRows(UINT64_MAX));
ASSERT_EQ(1, mockOp.outputRows(1000));
ASSERT_EQ(234 / 40, mockOp.outputRows(40));
}
{
setTaskOutputBatchConfig(10, INT32_MAX, 3'000'000'000'000);
MockOperator mockOp(driverCtx_.get(), rowType, "MockType2");
ASSERT_EQ(1000, mockOp.outputRows(3'000'000'000));
}
}
4 changes: 2 additions & 2 deletions velox/experimental/wave/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ class TableScan : public WaveSourceOperator {
// Count of splits that finished preloading before being read.
int32_t numReadyPreloadedSplits_{0};

int32_t readBatchSize_;
int32_t maxReadBatchSize_;
vector_size_t readBatchSize_;
vector_size_t maxReadBatchSize_;

// Exits getOutput() method after this many milliseconds.
// Zero means 'no limit'.
Expand Down

0 comments on commit 4499332

Please sign in to comment.