From 6d3fbfef73880a6e08743029c69a6b67be263a4c Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Tue, 17 Sep 2024 04:53:46 -0700 Subject: [PATCH] Fix hash build not able to reclaim in finishHashBuild (#11016) Summary: This is a regression caused by previous change. When hash build is in finishHashBuild() function, it ought to be reclaimable while calling ensureTableFits(). But ensureTableFits was called after spiller_ and table_ ownerships have been transferred, which caused the whole operator to be in a not able to reclaim state. This PR fixed the issue and added tests to avoid similar issues from happening again. Pull Request resolved: https://github.com/facebookincubator/velox/pull/11016 Reviewed By: xiaoxmeng Differential Revision: D62783315 Pulled By: tanjialiang fbshipit-source-id: f9d4cc1c1e07fd64b3f006ee231d967a5d033493 --- velox/exec/HashBuild.cpp | 40 ++++++++----------------- velox/exec/HashBuild.h | 5 +--- velox/exec/tests/HashJoinTest.cpp | 49 ++++++++++++------------------- 3 files changed, 31 insertions(+), 63 deletions(-) diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index 3cecfa4fb72a..b7fa3eafd69f 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -678,6 +678,11 @@ bool HashBuild::finishHashBuild() { std::vector otherBuilds; otherBuilds.reserve(peers.size()); + uint64_t numRows{0}; + { + std::lock_guard l(mutex_); + numRows += table_->rows()->numRows(); + } for (auto& peer : peers) { auto op = peer->findOperator(planNodeId()); HashBuild* build = dynamic_cast(op); @@ -695,10 +700,13 @@ bool HashBuild::finishHashBuild() { !build->stateCleared_, "Internal state for a peer is empty. It might have already" " been closed."); + numRows += build->table_->rows()->numRows(); } otherBuilds.push_back(build); } + ensureTableFits(numRows); + std::vector> otherTables; otherTables.reserve(peers.size()); SpillPartitionSet spillPartitions; @@ -730,7 +738,6 @@ bool HashBuild::finishHashBuild() { // it might decide it is not going to trigger parallel join build. const bool allowParallelJoinBuild = !otherTables.empty() && spillPartitions.empty(); - ensureTableFits(otherBuilds, otherTables, allowParallelJoinBuild); SCOPE_EXIT { // Make a guard to release the unused memory reservation since we have @@ -773,16 +780,13 @@ bool HashBuild::finishHashBuild() { return true; } -void HashBuild::ensureTableFits( - const std::vector& otherBuilds, - const std::vector>& otherTables, - bool isParallelJoin) { +void HashBuild::ensureTableFits(uint64_t numRows) { // NOTE: we don't need memory reservation if all the partitions have been // spilled as nothing need to be built. - if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled()) { + if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled() || + numRows == 0) { return; } - VELOX_CHECK_EQ(otherBuilds.size(), otherTables.size()); // Test-only spill path. if (testingTriggerSpill(pool()->name())) { @@ -793,32 +797,12 @@ void HashBuild::ensureTableFits( TestValue::adjust("facebook::velox::exec::HashBuild::ensureTableFits", this); - uint64_t totalNumRows{0}; - { - std::lock_guard l(mutex_); - totalNumRows += table_->rows()->numRows(); - } - - for (auto i = 0; i < otherTables.size(); ++i) { - auto& otherTable = otherTables[i]; - VELOX_CHECK_NOT_NULL(otherTable); - auto& otherBuild = otherBuilds[i]; - { - std::lock_guard l(otherBuild->mutex_); - totalNumRows += otherTable->rows()->numRows(); - } - } - - if (totalNumRows == 0) { - return; - } - // NOTE: reserve a bit more memory to consider the extra memory used for // parallel table build operation. // // TODO: make this query configurable. const uint64_t memoryBytesToReserve = - table_->estimateHashTableSize(totalNumRows) * 1.1; + table_->estimateHashTableSize(numRows) * 1.1; { Operator::ReclaimableSectionGuard guard(this); if (pool()->maybeReserve(memoryBytesToReserve)) { diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 562e57a73e0c..2ad6235a54fa 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -143,10 +143,7 @@ class HashBuild final : public Operator { // Invoked to ensure there is sufficient memory to build the join table. The // function throws to fail the query if the memory reservation fails. - void ensureTableFits( - const std::vector& otherBuilds, - const std::vector>& otherTables, - bool isParallelJoin); + void ensureTableFits(uint64_t numRows); // Invoked to compute spill partitions numbers for each row 'input' and spill // rows to spiller directly if the associated partition(s) is spilling. The diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index eac3e36b3852..b5488d14f2ab 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -7316,42 +7316,29 @@ DEBUG_ONLY_TEST_F(HashJoinTest, arbitrationTriggeredDuringParallelJoinBuild) { } DEBUG_ONLY_TEST_F(HashJoinTest, arbitrationTriggeredByEnsureJoinTableFit) { - std::atomic injectOnce{true}; + // Use manual spill injection other than spill injection framework. This is + // because spill injection framework does not allow fine grain spill within a + // single operator (We do not want to spill during addInput() but only during + // finishHashBuild()). SCOPED_TESTVALUE_SET( "facebook::velox::exec::HashBuild::ensureTableFits", - std::function([&](HashBuild* buildOp) { - // Inject the allocation once to ensure the merged table allocation will - // trigger memory arbitration. - if (!injectOnce.exchange(false)) { - return; - } - testingRunArbitration(buildOp->pool()); - })); - - fuzzerOpts_.vectorSize = 128; - auto probeVectors = createVectors(10, probeType_, fuzzerOpts_); - auto buildVectors = createVectors(20, buildType_, fuzzerOpts_); - const auto spillDirectory = exec::test::TempDirectoryPath::create(); + std::function(([&](Operator* op) { + Operator::ReclaimableSectionGuard guard(op); + memory::testingRunArbitration(op->pool()); + }))); + auto tempDirectory = exec::test::TempDirectoryPath::create(); HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) - .numDrivers(1) - .spillDirectory(spillDirectory->getPath()) - .probeKeys({"t_k1"}) - .probeVectors(std::move(probeVectors)) - .buildKeys({"u_k1"}) - .buildVectors(std::move(buildVectors)) - .config(core::QueryConfig::kJoinSpillEnabled, "true") - .joinType(core::JoinType::kRight) - .joinOutputLayout({"t_k1", "t_k2", "u_k1", "t_v1"}) - .referenceQuery( - "SELECT t.t_k1, t.t_k2, u.u_k1, t.t_v1 FROM t RIGHT JOIN u ON t.t_k1 = u.u_k1") + .numDrivers(numDrivers_) .injectSpill(false) + .spillDirectory(tempDirectory->getPath()) + .keyTypes({BIGINT()}) + .probeVectors(1600, 5) + .buildVectors(1500, 5) + .referenceQuery( + "SELECT t_k0, t_data, u_k0, u_data FROM t, u WHERE t.t_k0 = u.u_k0") .verifier([&](const std::shared_ptr& task, bool /*unused*/) { - auto opStats = toOperatorStats(task->taskStats()); - ASSERT_GT( - opStats.at("HashBuild") - .runtimeStats["memoryArbitrationWallNanos"] - .count, - 0); + const auto statsPair = taskSpilledStats(*task); + ASSERT_GT(statsPair.first.spilledBytes, 0); }) .run(); }