Skip to content

Commit

Permalink
Fix hash build not able to reclaim in finishHashBuild (#11016)
Browse files Browse the repository at this point in the history
Summary:
This is a regression caused by previous change. When hash build is in finishHashBuild() function, it ought to be reclaimable while calling ensureTableFits(). But ensureTableFits was called after spiller_ and table_ ownerships have been transferred, which caused the whole operator to be in a not able to reclaim state. This PR fixed the issue and added tests to avoid similar issues from happening again.

Pull Request resolved: #11016

Reviewed By: xiaoxmeng

Differential Revision: D62783315

Pulled By: tanjialiang

fbshipit-source-id: f9d4cc1c1e07fd64b3f006ee231d967a5d033493
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Sep 17, 2024
1 parent 0a9c481 commit 6d3fbfe
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 63 deletions.
40 changes: 12 additions & 28 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,11 @@ bool HashBuild::finishHashBuild() {

std::vector<HashBuild*> otherBuilds;
otherBuilds.reserve(peers.size());
uint64_t numRows{0};
{
std::lock_guard<std::mutex> l(mutex_);
numRows += table_->rows()->numRows();
}
for (auto& peer : peers) {
auto op = peer->findOperator(planNodeId());
HashBuild* build = dynamic_cast<HashBuild*>(op);
Expand All @@ -695,10 +700,13 @@ bool HashBuild::finishHashBuild() {
!build->stateCleared_,
"Internal state for a peer is empty. It might have already"
" been closed.");
numRows += build->table_->rows()->numRows();
}
otherBuilds.push_back(build);
}

ensureTableFits(numRows);

std::vector<std::unique_ptr<BaseHashTable>> otherTables;
otherTables.reserve(peers.size());
SpillPartitionSet spillPartitions;
Expand Down Expand Up @@ -730,7 +738,6 @@ bool HashBuild::finishHashBuild() {
// it might decide it is not going to trigger parallel join build.
const bool allowParallelJoinBuild =
!otherTables.empty() && spillPartitions.empty();
ensureTableFits(otherBuilds, otherTables, allowParallelJoinBuild);

SCOPE_EXIT {
// Make a guard to release the unused memory reservation since we have
Expand Down Expand Up @@ -773,16 +780,13 @@ bool HashBuild::finishHashBuild() {
return true;
}

void HashBuild::ensureTableFits(
const std::vector<HashBuild*>& otherBuilds,
const std::vector<std::unique_ptr<BaseHashTable>>& otherTables,
bool isParallelJoin) {
void HashBuild::ensureTableFits(uint64_t numRows) {
// NOTE: we don't need memory reservation if all the partitions have been
// spilled as nothing need to be built.
if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled()) {
if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled() ||
numRows == 0) {
return;
}
VELOX_CHECK_EQ(otherBuilds.size(), otherTables.size());

// Test-only spill path.
if (testingTriggerSpill(pool()->name())) {
Expand All @@ -793,32 +797,12 @@ void HashBuild::ensureTableFits(

TestValue::adjust("facebook::velox::exec::HashBuild::ensureTableFits", this);

uint64_t totalNumRows{0};
{
std::lock_guard<std::mutex> l(mutex_);
totalNumRows += table_->rows()->numRows();
}

for (auto i = 0; i < otherTables.size(); ++i) {
auto& otherTable = otherTables[i];
VELOX_CHECK_NOT_NULL(otherTable);
auto& otherBuild = otherBuilds[i];
{
std::lock_guard<std::mutex> l(otherBuild->mutex_);
totalNumRows += otherTable->rows()->numRows();
}
}

if (totalNumRows == 0) {
return;
}

// NOTE: reserve a bit more memory to consider the extra memory used for
// parallel table build operation.
//
// TODO: make this query configurable.
const uint64_t memoryBytesToReserve =
table_->estimateHashTableSize(totalNumRows) * 1.1;
table_->estimateHashTableSize(numRows) * 1.1;
{
Operator::ReclaimableSectionGuard guard(this);
if (pool()->maybeReserve(memoryBytesToReserve)) {
Expand Down
5 changes: 1 addition & 4 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,7 @@ class HashBuild final : public Operator {

// Invoked to ensure there is sufficient memory to build the join table. The
// function throws to fail the query if the memory reservation fails.
void ensureTableFits(
const std::vector<HashBuild*>& otherBuilds,
const std::vector<std::unique_ptr<BaseHashTable>>& otherTables,
bool isParallelJoin);
void ensureTableFits(uint64_t numRows);

// Invoked to compute spill partitions numbers for each row 'input' and spill
// rows to spiller directly if the associated partition(s) is spilling. The
Expand Down
49 changes: 18 additions & 31 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7316,42 +7316,29 @@ DEBUG_ONLY_TEST_F(HashJoinTest, arbitrationTriggeredDuringParallelJoinBuild) {
}

DEBUG_ONLY_TEST_F(HashJoinTest, arbitrationTriggeredByEnsureJoinTableFit) {
std::atomic<bool> injectOnce{true};
// Use manual spill injection other than spill injection framework. This is
// because spill injection framework does not allow fine grain spill within a
// single operator (We do not want to spill during addInput() but only during
// finishHashBuild()).
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::HashBuild::ensureTableFits",
std::function<void(HashBuild*)>([&](HashBuild* buildOp) {
// Inject the allocation once to ensure the merged table allocation will
// trigger memory arbitration.
if (!injectOnce.exchange(false)) {
return;
}
testingRunArbitration(buildOp->pool());
}));

fuzzerOpts_.vectorSize = 128;
auto probeVectors = createVectors(10, probeType_, fuzzerOpts_);
auto buildVectors = createVectors(20, buildType_, fuzzerOpts_);
const auto spillDirectory = exec::test::TempDirectoryPath::create();
std::function<void(Operator*)>(([&](Operator* op) {
Operator::ReclaimableSectionGuard guard(op);
memory::testingRunArbitration(op->pool());
})));
auto tempDirectory = exec::test::TempDirectoryPath::create();
HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.numDrivers(1)
.spillDirectory(spillDirectory->getPath())
.probeKeys({"t_k1"})
.probeVectors(std::move(probeVectors))
.buildKeys({"u_k1"})
.buildVectors(std::move(buildVectors))
.config(core::QueryConfig::kJoinSpillEnabled, "true")
.joinType(core::JoinType::kRight)
.joinOutputLayout({"t_k1", "t_k2", "u_k1", "t_v1"})
.referenceQuery(
"SELECT t.t_k1, t.t_k2, u.u_k1, t.t_v1 FROM t RIGHT JOIN u ON t.t_k1 = u.u_k1")
.numDrivers(numDrivers_)
.injectSpill(false)
.spillDirectory(tempDirectory->getPath())
.keyTypes({BIGINT()})
.probeVectors(1600, 5)
.buildVectors(1500, 5)
.referenceQuery(
"SELECT t_k0, t_data, u_k0, u_data FROM t, u WHERE t.t_k0 = u.u_k0")
.verifier([&](const std::shared_ptr<Task>& task, bool /*unused*/) {
auto opStats = toOperatorStats(task->taskStats());
ASSERT_GT(
opStats.at("HashBuild")
.runtimeStats["memoryArbitrationWallNanos"]
.count,
0);
const auto statsPair = taskSpilledStats(*task);
ASSERT_GT(statsPair.first.spilledBytes, 0);
})
.run();
}
Expand Down

0 comments on commit 6d3fbfe

Please sign in to comment.