From 93535f2b02803cd67c57f22c13a47ad655f597d9 Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Fri, 27 Sep 2024 16:25:23 -0700 Subject: [PATCH] Do not suspend task if probe and build operator exceeds spill limit --- velox/core/QueryConfig.h | 2 +- velox/exec/HashBuild.cpp | 45 +++++++---- velox/exec/HashBuild.h | 4 +- velox/exec/HashProbe.cpp | 46 ++++++----- velox/exec/HashProbe.h | 2 +- velox/exec/Operator.h | 2 +- velox/exec/tests/HashJoinTest.cpp | 122 +++++++++++++++++++++--------- 7 files changed, 148 insertions(+), 75 deletions(-) diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index ad42655b69c1..1fd734ed9dfb 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -635,7 +635,7 @@ class QueryConfig { /// Returns the number of bits used to calculate the spill partition number /// for hash join and RowNumber. The number of spill partitions will be power - /// of tow. + /// of two. /// NOTE: as for now, we only support up to 8-way spill partitioning. uint8_t spillNumPartitionBits() const { constexpr uint8_t kDefaultBits = 3; diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index a944f6c2dcaf..d3d095c5447c 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -194,7 +194,7 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) { VELOX_CHECK_NULL(spiller_); VELOX_CHECK_NULL(spillInputReader_); - if (!spillEnabled()) { + if (!canSpill()) { return; } if (spillType_ == nullptr) { @@ -428,7 +428,7 @@ void HashBuild::addInput(RowVectorPtr input) { void HashBuild::ensureInputFits(RowVectorPtr& input) { // NOTE: we don't need memory reservation if all the partitions are spilling // as we spill all the input rows to disk directly. - if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled()) { + if (!canSpill() || spiller_ == nullptr || spiller_->isAllSpilled()) { return; } @@ -436,7 +436,7 @@ void HashBuild::ensureInputFits(RowVectorPtr& input) { // spilling directly. It is okay as we will accumulate the extra reservation // in the operator's memory pool, and won't make any new reservation if there // is already sufficient reservations. - VELOX_CHECK(spillEnabled()); + VELOX_CHECK(canSpill()); auto* rows = table_->rows(); const auto numRows = rows->numRows(); @@ -515,7 +515,7 @@ void HashBuild::ensureInputFits(RowVectorPtr& input) { void HashBuild::spillInput(const RowVectorPtr& input) { VELOX_CHECK_EQ(input->size(), activeRows_.size()); - if (!spillEnabled() || spiller_ == nullptr || !spiller_->isAnySpilled() || + if (!canSpill() || spiller_ == nullptr || !spiller_->isAnySpilled() || !activeRows_.hasSelections()) { return; } @@ -615,7 +615,7 @@ void HashBuild::spillPartition( vector_size_t size, const BufferPtr& indices, const RowVectorPtr& input) { - VELOX_DCHECK(spillEnabled()); + VELOX_DCHECK(canSpill()); if (isInputFromSpill()) { spiller_->spill(partition, wrap(size, indices, input)); @@ -780,7 +780,7 @@ bool HashBuild::finishHashBuild() { addRuntimeStats(); joinBridge_->setHashTable( std::move(table_), std::move(spillPartitions), joinHasNullKeys_); - if (spillEnabled()) { + if (canSpill()) { stateCleared_ = true; } return true; @@ -789,7 +789,7 @@ bool HashBuild::finishHashBuild() { void HashBuild::ensureTableFits(uint64_t numRows) { // NOTE: we don't need memory reservation if all the partitions have been // spilled as nothing need to be built. - if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled() || + if (!canSpill() || spiller_ == nullptr || spiller_->isAllSpilled() || numRows == 0) { return; } @@ -832,7 +832,7 @@ void HashBuild::ensureTableFits(uint64_t numRows) { void HashBuild::postHashBuildProcess() { checkRunning(); - if (!spillEnabled()) { + if (!canSpill()) { setState(State::kFinish); return; } @@ -997,7 +997,7 @@ void HashBuild::checkStateTransition(State state) { VELOX_CHECK_NE(state_, state); switch (state) { case State::kRunning: - if (!spillEnabled()) { + if (!canSpill()) { VELOX_CHECK_EQ(state_, State::kWaitForBuild); } else { VELOX_CHECK_NE(state_, State::kFinish); @@ -1037,21 +1037,36 @@ std::string HashBuild::stateName(State state) { } } +bool HashBuild::canSpill() const { + return Operator::canSpill() && + !operatorCtx_->task()->hasMixedExecutionGroup(); +} + bool HashBuild::canReclaim() const { - return canSpill() && !operatorCtx_->task()->hasMixedExecutionGroup(); + return canSpill() && !exceededMaxSpillLevelLimit_; } void HashBuild::reclaim( uint64_t /*unused*/, memory::MemoryReclaimer::Stats& stats) { - VELOX_CHECK(canReclaim()); + TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this); + VELOX_CHECK(canSpill()); auto* driver = operatorCtx_->driver(); VELOX_CHECK_NOT_NULL(driver); VELOX_CHECK(!nonReclaimableSection_); - TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this); - - if (exceededMaxSpillLevelLimit_) { + if (UNLIKELY(exceededMaxSpillLevelLimit_)) { + // 'canReclaim()' already checks the spill limit is not exceeding max, there + // is only a small chance from the time 'canReclaim()' is checked to the + // actual reclaim happens that the operator has spilled such that the spill + // level exceeds max. + const auto* config = spillConfig(); + VELOX_CHECK_NOT_NULL(config); + LOG(WARNING) + << "Can't reclaim from hash build operator, exceeded maximum spill " + "level of " + << config->maxSpillLevel << ", " << pool()->name() << ", usage " + << succinctBytes(pool()->usedBytes()); return; } @@ -1080,7 +1095,7 @@ void HashBuild::reclaim( for (auto* op : operators) { HashBuild* buildOp = dynamic_cast(op); VELOX_CHECK_NOT_NULL(buildOp); - VELOX_CHECK(buildOp->canReclaim()); + VELOX_CHECK(buildOp->canSpill()); if (buildOp->nonReclaimableState()) { // TODO: reduce the log frequency if it is too verbose. RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount); diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index f30f79ee7d8f..2e477bfc3d29 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -113,9 +113,7 @@ class HashBuild final : public Operator { // process which will be set by the join probe side. void postHashBuildProcess(); - bool spillEnabled() const { - return canReclaim(); - } + bool canSpill() const override; // Indicates if the input is read from spill data or not. bool isInputFromSpill() const; diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index e62aeda2deae..13150935518e 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -401,7 +401,7 @@ bool HashProbe::isSpillInput() const { void HashProbe::prepareForSpillRestore() { checkRunning(); - VELOX_CHECK(spillEnabled()); + VELOX_CHECK(canSpill()); VELOX_CHECK(hasMoreSpillData()); // Reset the internal states which are relevant to the previous probe run. @@ -512,7 +512,7 @@ void HashProbe::spillInput(RowVectorPtr& input) { void HashProbe::prepareInputIndicesBuffers( vector_size_t numInput, const folly::F14FastSet& spillPartitions) { - VELOX_DCHECK(spillEnabled()); + VELOX_DCHECK(canSpill()); const auto maxIndicesBufferBytes = numInput * sizeof(vector_size_t); if (nonSpillInputIndicesBuffer_ == nullptr || !nonSpillInputIndicesBuffer_->isMutable() || @@ -548,7 +548,7 @@ BlockingReason HashProbe::isBlocked(ContinueFuture* future) { } break; case ProbeOperatorState::kWaitForPeers: - VELOX_CHECK(spillEnabled()); + VELOX_CHECK(canSpill()); if (!future_.valid()) { setRunning(); } @@ -873,17 +873,18 @@ bool HashProbe::skipProbeOnEmptyBuild() const { isRightSemiProjectJoin(joinType_); } -bool HashProbe::spillEnabled() const { - return canSpill() && !operatorCtx_->task()->hasMixedExecutionGroup(); +bool HashProbe::canSpill() const { + return Operator::canSpill() && + !operatorCtx_->task()->hasMixedExecutionGroup(); } bool HashProbe::hasMoreSpillData() const { - VELOX_CHECK(spillPartitionSet_.empty() || spillEnabled()); + VELOX_CHECK(spillPartitionSet_.empty() || canSpill()); return !spillPartitionSet_.empty() || needSpillInput(); } bool HashProbe::needSpillInput() const { - VELOX_CHECK(spillInputPartitionIds_.empty() || spillEnabled()); + VELOX_CHECK(spillInputPartitionIds_.empty() || canSpill()); VELOX_CHECK_EQ(spillInputPartitionIds_.empty(), inputSpiller_ == nullptr); return !spillInputPartitionIds_.empty(); @@ -898,7 +899,7 @@ void HashProbe::checkStateTransition(ProbeOperatorState state) { VELOX_CHECK_NE(state_, state); switch (state) { case ProbeOperatorState::kRunning: - if (!spillEnabled()) { + if (!canSpill()) { VELOX_CHECK_EQ(state_, ProbeOperatorState::kWaitForBuild); } else { VELOX_CHECK( @@ -907,7 +908,7 @@ void HashProbe::checkStateTransition(ProbeOperatorState state) { } break; case ProbeOperatorState::kWaitForPeers: - VELOX_CHECK(spillEnabled()); + VELOX_CHECK(canSpill()); [[fallthrough]]; case ProbeOperatorState::kWaitForBuild: [[fallthrough]]; @@ -965,7 +966,7 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) { prepareForSpillRestore(); asyncWaitForHashTable(); } else { - if (lastProber_ && spillEnabled()) { + if (lastProber_ && canSpill()) { joinBridge_->probeFinished(); wakeupPeerOperators(); } @@ -1568,7 +1569,7 @@ void HashProbe::noMoreInputInternal() { VELOX_CHECK_EQ(spillStats_.rlock()->spillSortTimeNanos, 0); } - const bool hasSpillEnabled = spillEnabled(); + const bool hasSpillEnabled = canSpill(); std::vector promises; std::vector> peers; // The last operator to finish processing inputs is responsible for @@ -1651,19 +1652,30 @@ void HashProbe::ensureOutputFits() { } bool HashProbe::canReclaim() const { - return spillEnabled(); + return canSpill() && !exceededMaxSpillLevelLimit_; } void HashProbe::reclaim( uint64_t /*unused*/, memory::MemoryReclaimer::Stats& stats) { - VELOX_CHECK(canReclaim()); + TestValue::adjust("facebook::velox::exec::HashProbe::reclaim", this); + VELOX_CHECK(canSpill()); auto* driver = operatorCtx_->driver(); VELOX_CHECK_NOT_NULL(driver); VELOX_CHECK(!nonReclaimableSection_); - if (exceededMaxSpillLevelLimit_) { - // NOTE: we might have reached to the max spill limit. + if (UNLIKELY(exceededMaxSpillLevelLimit_)) { + // 'canReclaim()' already checks the spill limit is not exceeding max, there + // is only a small chance from the time 'canReclaim()' is checked to the + // actual reclaim happens that the operator has spilled such that the spill + // level exceeds max. + const auto* config = spillConfig(); + VELOX_CHECK_NOT_NULL(config); + LOG(WARNING) + << "Can't reclaim from hash probe operator, exceeded maximum spill " + "level of " + << config->maxSpillLevel << ", " << pool()->name() << ", usage " + << succinctBytes(pool()->usedBytes()); return; } @@ -1693,7 +1705,7 @@ void HashProbe::reclaim( bool hasMoreProbeInput{false}; for (auto* probeOp : probeOps) { VELOX_CHECK_NOT_NULL(probeOp); - VELOX_CHECK(probeOp->canReclaim()); + VELOX_CHECK(probeOp->canSpill()); if (probeOp->nonReclaimableState()) { RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount); ++stats.numNonReclaimableAttempts; @@ -1932,7 +1944,7 @@ std::unique_ptr HashProbe::spillTable(RowContainer* subTableRows) { void HashProbe::prepareTableSpill( const std::optional& restoredPartitionId) { - if (!spillEnabled()) { + if (!canSpill()) { return; } diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index 5c8dab9b8b3a..0868de07b6d3 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -202,7 +202,7 @@ class HashProbe : public Operator { void prepareTableSpill( const std::optional& restoredPartitionId); - bool spillEnabled() const; + bool canSpill() const override; // Indicates if the probe input is read from spilled data or not. bool isSpillInput() const; diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 711a59da0c71..16d8d6e0f894 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -726,7 +726,7 @@ class Operator : public BaseRuntimeStatWriter { void maybeSetReclaimer(); /// Returns true if this is a spillable operator and has configured spilling. - FOLLY_ALWAYS_INLINE bool canSpill() const { + FOLLY_ALWAYS_INLINE virtual bool canSpill() const { return spillConfig_.has_value(); } diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 9ce08a5e5e5e..78f2846fd8f6 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -6836,46 +6836,94 @@ DEBUG_ONLY_TEST_F(HashJoinTest, exceededMaxSpillLevel) { auto tempDirectory = exec::test::TempDirectoryPath::create(); const int exceededMaxSpillLevelCount = common::globalSpillStats().spillMaxLevelExceededCount; + + std::atomic_bool noMoreProbeInput{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::noMoreInput", + std::function(([&](exec::Operator* op) { + if (op->operatorType() == "HashProbe") { + noMoreProbeInput = true; + } + }))); + + std::atomic_bool lastProbeReclaimTriggered{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::HashProbe::reclaim", + std::function(([&](exec::Operator* op) { + if (!lastProbeReclaimTriggered) { + if (noMoreProbeInput) { + lastProbeReclaimTriggered = true; + } + } else { + FAIL(); + } + }))); + + std::atomic_bool lastBuildReclaimTriggered{false}; SCOPED_TESTVALUE_SET( - "facebook::velox::exec::HashBuild::addInput", + "facebook::velox::exec::HashBuild::reclaim", std::function(([&](exec::HashBuild* hashBuild) { - Operator::ReclaimableSectionGuard guard(hashBuild); - testingRunArbitration(hashBuild->pool()); + if (!lastBuildReclaimTriggered) { + if (noMoreProbeInput) { + lastBuildReclaimTriggered = true; + } + } else { + FAIL(); + } }))); - HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) - .numDrivers(1) - .planNode(plan) - // Always trigger spilling. - .injectSpill(false) - .maxSpillLevel(0) - .spillDirectory(tempDirectory->getPath()) - .referenceQuery( - "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") - .config(core::QueryConfig::kSpillStartPartitionBit, "29") - .verifier([&](const std::shared_ptr& task, bool /*unused*/) { - auto opStats = toOperatorStats(task->taskStats()); - ASSERT_EQ( - opStats.at("HashProbe") - .runtimeStats[Operator::kExceededMaxSpillLevel] - .sum, - 8); - ASSERT_EQ( - opStats.at("HashProbe") - .runtimeStats[Operator::kExceededMaxSpillLevel] - .count, - 1); - ASSERT_EQ( - opStats.at("HashBuild") - .runtimeStats[Operator::kExceededMaxSpillLevel] - .sum, - 8); - ASSERT_EQ( - opStats.at("HashBuild") - .runtimeStats[Operator::kExceededMaxSpillLevel] - .count, - 1); - }) - .run(); + + // Always trigger spilling. + TestScopedSpillInjection scopedSpillInjection(100); + auto task = + AssertQueryBuilder(plan, duckDbQueryRunner_) + .maxDrivers(1) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kJoinSpillEnabled, "true") + // Disable write buffering to ease test verification. For example, we + // want many spilled vectors in a spilled file to trigger recursive + // spilling. + .config(core::QueryConfig::kSpillWriteBufferSize, std::to_string(0)) + .config(core::QueryConfig::kMaxSpillLevel, "0") + .config(core::QueryConfig::kSpillStartPartitionBit, "29") + .spillDirectory(tempDirectory->getPath()) + .assertResults( + "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1"); + + uint64_t totalTaskWaitTimeUs{0}; + while (task.use_count() != 1) { + constexpr uint64_t kWaitInternalUs = 1'000; + std::this_thread::sleep_for(std::chrono::microseconds(kWaitInternalUs)); + totalTaskWaitTimeUs += kWaitInternalUs; + if (totalTaskWaitTimeUs >= 5'000'000) { + VELOX_FAIL( + "Failed to wait for all the background activities of task {} to finish, pending reference count: {}", + task->taskId(), + task.use_count()); + } + } + + auto opStats = toOperatorStats(task->taskStats()); + ASSERT_EQ( + opStats.at("HashProbe") + .runtimeStats[Operator::kExceededMaxSpillLevel] + .sum, + 8); + ASSERT_EQ( + opStats.at("HashProbe") + .runtimeStats[Operator::kExceededMaxSpillLevel] + .count, + 1); + ASSERT_EQ( + opStats.at("HashBuild") + .runtimeStats[Operator::kExceededMaxSpillLevel] + .sum, + 8); + ASSERT_EQ( + opStats.at("HashBuild") + .runtimeStats[Operator::kExceededMaxSpillLevel] + .count, + 1); + ASSERT_EQ( common::globalSpillStats().spillMaxLevelExceededCount, exceededMaxSpillLevelCount + 16);