diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index bc1a24976a71..814bdef03a26 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -16,6 +16,7 @@ #pragma once #include "velox/common/config/Config.h" +#include "velox/vector/TypeAliases.h" namespace facebook::velox::core { @@ -439,12 +440,17 @@ class QueryConfig { return get(kPreferredOutputBatchBytes, kDefault); } - uint32_t preferredOutputBatchRows() const { - return get(kPreferredOutputBatchRows, 1024); + vector_size_t preferredOutputBatchRows() const { + const uint32_t batchRows = get(kPreferredOutputBatchRows, 1024); + VELOX_USER_CHECK_LE(batchRows, std::numeric_limits::max()); + return batchRows; } - uint32_t maxOutputBatchRows() const { - return get(kMaxOutputBatchRows, 10'000); + vector_size_t maxOutputBatchRows() const { + const uint32_t maxBatchRows = get(kMaxOutputBatchRows, 10'000); + VELOX_USER_CHECK_LE( + maxBatchRows, std::numeric_limits::max()); + return maxBatchRows; } uint32_t tableScanGetOutputTimeLimitMs() const { diff --git a/velox/dwio/common/SortingWriter.cpp b/velox/dwio/common/SortingWriter.cpp index e488dc68ca15..0243db048bbc 100644 --- a/velox/dwio/common/SortingWriter.cpp +++ b/velox/dwio/common/SortingWriter.cpp @@ -21,7 +21,7 @@ namespace facebook::velox::dwio::common { SortingWriter::SortingWriter( std::unique_ptr writer, std::unique_ptr sortBuffer, - uint32_t maxOutputRowsConfig, + vector_size_t maxOutputRowsConfig, uint64_t maxOutputBytesConfig) : outputWriter_(std::move(writer)), maxOutputRowsConfig_(maxOutputRowsConfig), @@ -111,12 +111,18 @@ uint64_t SortingWriter::reclaim( stats); } -uint32_t SortingWriter::outputBatchRows() { - uint32_t estimatedMaxOutputRows = UINT_MAX; +vector_size_t SortingWriter::outputBatchRows() { + vector_size_t estimatedMaxOutputRows = + std::numeric_limits::max(); if (sortBuffer_->estimateOutputRowSize().has_value() && sortBuffer_->estimateOutputRowSize().value() != 0) { - estimatedMaxOutputRows = + const uint64_t maxOutputRows = maxOutputBytesConfig_ / sortBuffer_->estimateOutputRowSize().value(); + if (UNLIKELY(maxOutputRows > std::numeric_limits::max())) { + return maxOutputRowsConfig_; + } + + estimatedMaxOutputRows = maxOutputRows; } return std::min(estimatedMaxOutputRows, maxOutputRowsConfig_); } diff --git a/velox/dwio/common/SortingWriter.h b/velox/dwio/common/SortingWriter.h index c73574b334f9..d7b70f09032d 100644 --- a/velox/dwio/common/SortingWriter.h +++ b/velox/dwio/common/SortingWriter.h @@ -28,7 +28,7 @@ class SortingWriter : public Writer { SortingWriter( std::unique_ptr writer, std::unique_ptr sortBuffer, - uint32_t maxOutputRowsConfig, + vector_size_t maxOutputRowsConfig, uint64_t maxOutputBytesConfig); ~SortingWriter() override; @@ -73,10 +73,10 @@ class SortingWriter : public Writer { uint64_t reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats); - uint32_t outputBatchRows(); + vector_size_t outputBatchRows(); const std::unique_ptr outputWriter_; - const uint32_t maxOutputRowsConfig_; + const vector_size_t maxOutputRowsConfig_; const uint64_t maxOutputBytesConfig_; memory::MemoryPool* const sortPool_; const bool canReclaim_; diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index 4149f0c68276..5c8dab9b8b3a 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -311,7 +311,7 @@ class HashProbe : public Operator { void clearBuffers(); // TODO: Define batch size as bytes based on RowContainer row sizes. - const uint32_t outputBatchSize_; + const vector_size_t outputBatchSize_; const std::shared_ptr joinNode_; diff --git a/velox/exec/Merge.h b/velox/exec/Merge.h index 2ff364097f40..b33780613fb7 100644 --- a/velox/exec/Merge.h +++ b/velox/exec/Merge.h @@ -59,7 +59,7 @@ class Merge : public SourceOperator { void initializeTreeOfLosers(); /// Maximum number of rows in the output batch. - const uint32_t outputBatchSize_; + const vector_size_t outputBatchSize_; std::vector> sortingKeys_; diff --git a/velox/exec/MergeJoin.cpp b/velox/exec/MergeJoin.cpp index d006fd527bba..6daed9fa9c72 100644 --- a/velox/exec/MergeJoin.cpp +++ b/velox/exec/MergeJoin.cpp @@ -30,7 +30,7 @@ MergeJoin::MergeJoin( operatorId, joinNode->id(), "MergeJoin"), - outputBatchSize_{static_cast(outputBatchRows())}, + outputBatchSize_{outputBatchRows()}, joinType_{joinNode->joinType()}, numKeys_{joinNode->leftKeys().size()}, joinNode_(joinNode) { diff --git a/velox/exec/NestedLoopJoinProbe.h b/velox/exec/NestedLoopJoinProbe.h index f98fb646dd2f..b59457b118aa 100644 --- a/velox/exec/NestedLoopJoinProbe.h +++ b/velox/exec/NestedLoopJoinProbe.h @@ -273,7 +273,7 @@ class NestedLoopJoinProbe : public Operator { // Output buffer members. // Maximum number of rows in the output batch. - const uint32_t outputBatchSize_; + const vector_size_t outputBatchSize_; // The current output batch being populated. RowVectorPtr output_; diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 57d2a1fd8c8b..0798785a031d 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -257,22 +257,23 @@ OperatorStats Operator::stats(bool clear) { return stats; } -uint32_t Operator::outputBatchRows( +vector_size_t Operator::outputBatchRows( std::optional averageRowSize) const { const auto& queryConfig = operatorCtx_->task()->queryCtx()->queryConfig(); - if (!averageRowSize.has_value()) { return queryConfig.preferredOutputBatchRows(); } - const uint64_t rowSize = averageRowSize.value(); + if (averageRowSize.value() == 0) { + return queryConfig.maxOutputBatchRows(); + } - if (rowSize * queryConfig.maxOutputBatchRows() < - queryConfig.preferredOutputBatchBytes()) { + const uint64_t batchSize = + queryConfig.preferredOutputBatchBytes() / averageRowSize.value(); + if (batchSize > queryConfig.maxOutputBatchRows()) { return queryConfig.maxOutputBatchRows(); } - return std::max( - queryConfig.preferredOutputBatchBytes() / rowSize, 1); + return std::max(batchSize, 1); } void Operator::recordBlockingTime(uint64_t start, BlockingReason reason) { diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 1ead59019a73..051ce55c02c8 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -739,7 +739,7 @@ class Operator : public BaseRuntimeStatWriter { /// must not be negative. If the averageRowSize is 0 which is not advised, /// returns maxOutputBatchRows. If the averageRowSize is not given, returns /// preferredOutputBatchRows. - uint32_t outputBatchRows( + vector_size_t outputBatchRows( std::optional averageRowSize = std::nullopt) const; /// Invoked to record spill stats in operator stats. diff --git a/velox/exec/OrderBy.h b/velox/exec/OrderBy.h index cbf6f1e57eac..fe8e2e61e9b2 100644 --- a/velox/exec/OrderBy.h +++ b/velox/exec/OrderBy.h @@ -65,6 +65,6 @@ class OrderBy : public Operator { private: std::unique_ptr sortBuffer_; bool finished_ = false; - uint32_t maxOutputRows_; + vector_size_t maxOutputRows_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index cae316d19086..0d7b5e00ec99 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -130,7 +130,7 @@ void SortBuffer::noMoreInput() { pool_->release(); } -RowVectorPtr SortBuffer::getOutput(uint32_t maxOutputRows) { +RowVectorPtr SortBuffer::getOutput(vector_size_t maxOutputRows) { VELOX_CHECK(noMoreInput_); if (numOutputRows_ == numInputRows_) { @@ -284,7 +284,7 @@ void SortBuffer::spillOutput() { finishSpill(); } -void SortBuffer::prepareOutput(uint32_t maxOutputRows) { +void SortBuffer::prepareOutput(vector_size_t maxOutputRows) { VELOX_CHECK_GT(maxOutputRows, 0); VELOX_CHECK_GT(numInputRows_, numOutputRows_); diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index 7b03dc7ea186..cd02f966f9df 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -50,7 +50,7 @@ class SortBuffer { void noMoreInput(); /// Returns the sorted output rows in batch. - RowVectorPtr getOutput(uint32_t maxOutputRows); + RowVectorPtr getOutput(vector_size_t maxOutputRows); /// Indicates if this sort buffer can spill or not. bool canSpill() const { @@ -71,7 +71,7 @@ class SortBuffer { void ensureInputFits(const VectorPtr& input); void updateEstimatedOutputRowSize(); // Invoked to initialize or reset the reusable output buffer to get output. - void prepareOutput(uint32_t maxOutputRows); + void prepareOutput(vector_size_t maxOutputRows); void getOutputWithoutSpill(); void getOutputWithSpill(); // Spill during input stage. diff --git a/velox/exec/StreamingAggregation.h b/velox/exec/StreamingAggregation.h index 953ffd0a33ce..ee70a1e8e627 100644 --- a/velox/exec/StreamingAggregation.h +++ b/velox/exec/StreamingAggregation.h @@ -84,7 +84,7 @@ class StreamingAggregation : public Operator { void initializeAggregates(uint32_t numKeys); /// Maximum number of rows in the output batch. - const uint32_t outputBatchSize_; + const vector_size_t outputBatchSize_; // Used at initialize() and gets reset() afterward. std::shared_ptr aggregationNode_; diff --git a/velox/exec/TableScan.h b/velox/exec/TableScan.h index 92fcac85a650..deec109b02c1 100644 --- a/velox/exec/TableScan.h +++ b/velox/exec/TableScan.h @@ -105,8 +105,8 @@ class TableScan : public SourceOperator { // Count of splits that finished preloading before being read. int32_t numReadyPreloadedSplits_{0}; - int32_t readBatchSize_; - int32_t maxReadBatchSize_; + vector_size_t readBatchSize_; + vector_size_t maxReadBatchSize_; // Exits getOutput() method after this many milliseconds. Zero means 'no // limit'. diff --git a/velox/exec/tests/OperatorUtilsTest.cpp b/velox/exec/tests/OperatorUtilsTest.cpp index dbc3d635d049..5d4bab633b4b 100644 --- a/velox/exec/tests/OperatorUtilsTest.cpp +++ b/velox/exec/tests/OperatorUtilsTest.cpp @@ -133,6 +133,56 @@ class OperatorUtilsTest : public OperatorTestBase { } } + void setTaskOutputBatchConfig( + uint32_t preferredBatchSize, + uint32_t maxRows, + uint64_t preferredBytes) { + std::unordered_map configs; + configs[core::QueryConfig::kPreferredOutputBatchRows] = + std::to_string(preferredBatchSize); + configs[core::QueryConfig::kMaxOutputBatchRows] = std::to_string(maxRows); + configs[core::QueryConfig::kPreferredOutputBatchBytes] = + std::to_string(preferredBytes); + task_->queryCtx()->testingOverrideConfigUnsafe(std::move(configs)); + } + + class MockOperator : public Operator { + public: + MockOperator( + DriverCtx* driverCtx, + RowTypePtr rowType, + std::string operatorType = "MockType") + : Operator( + driverCtx, + std::move(rowType), + 0, + "MockOperator", + operatorType) {} + + bool needsInput() const override { + return false; + } + + void addInput(RowVectorPtr input) override {} + + RowVectorPtr getOutput() override { + return nullptr; + } + + BlockingReason isBlocked(ContinueFuture* future) override { + return BlockingReason::kNotBlocked; + } + + bool isFinished() override { + return false; + } + + vector_size_t outputRows( + std::optional averageRowSize = std::nullopt) const { + return outputBatchRows(averageRowSize); + } + }; + std::shared_ptr executor_; std::shared_ptr task_; std::shared_ptr driver_; @@ -392,35 +442,6 @@ TEST_F(OperatorUtilsTest, projectChildren) { } TEST_F(OperatorUtilsTest, reclaimableSectionGuard) { - class MockOperator : public Operator { - public: - MockOperator(DriverCtx* driverCtx, RowTypePtr rowType) - : Operator( - driverCtx, - std::move(rowType), - 0, - "MockOperator", - "MockType") {} - - bool needsInput() const override { - return false; - } - - void addInput(RowVectorPtr input) override {} - - RowVectorPtr getOutput() override { - return nullptr; - } - - BlockingReason isBlocked(ContinueFuture* future) override { - return BlockingReason::kNotBlocked; - } - - bool isFinished() override { - return false; - } - }; - RowTypePtr rowType = ROW({"c0"}, {INTEGER()}); MockOperator mockOp(driverCtx_.get(), rowType); @@ -491,3 +512,22 @@ TEST_F(OperatorUtilsTest, dynamicFilterStats) { dynamicFilterStats.clear(); ASSERT_TRUE(dynamicFilterStats.empty()); } + +TEST_F(OperatorUtilsTest, outputBatchRows) { + RowTypePtr rowType = ROW({"c0"}, {INTEGER()}); + { + setTaskOutputBatchConfig(10, 20, 234); + MockOperator mockOp(driverCtx_.get(), rowType, "MockType1"); + ASSERT_EQ(10, mockOp.outputRows(std::nullopt)); + ASSERT_EQ(20, mockOp.outputRows(1)); + ASSERT_EQ(20, mockOp.outputRows(0)); + ASSERT_EQ(1, mockOp.outputRows(UINT64_MAX)); + ASSERT_EQ(1, mockOp.outputRows(1000)); + ASSERT_EQ(234 / 40, mockOp.outputRows(40)); + } + { + setTaskOutputBatchConfig(10, INT32_MAX, 3'000'000'000'000); + MockOperator mockOp(driverCtx_.get(), rowType, "MockType2"); + ASSERT_EQ(1000, mockOp.outputRows(3'000'000'000)); + } +} diff --git a/velox/experimental/wave/exec/TableScan.h b/velox/experimental/wave/exec/TableScan.h index e4b3ff23a073..7c12321ffbf1 100644 --- a/velox/experimental/wave/exec/TableScan.h +++ b/velox/experimental/wave/exec/TableScan.h @@ -145,8 +145,8 @@ class TableScan : public WaveSourceOperator { // Count of splits that finished preloading before being read. int32_t numReadyPreloadedSplits_{0}; - int32_t readBatchSize_; - int32_t maxReadBatchSize_; + vector_size_t readBatchSize_; + vector_size_t maxReadBatchSize_; // Exits getOutput() method after this many milliseconds. // Zero means 'no limit'.