Skip to content

Commit

Permalink
Hash join changes for probe side spilling support (facebookincubator#…
Browse files Browse the repository at this point in the history
…8926)

Summary:
The hash join related changes made to support probe side spilling:
1. hash build operator is always waiting for the hash probe to finish no matter
    there is pending spilled partition to restore or not. The reason is that the
    hash probe might trigger spilling as well. This requires the change in hash build
    operator and the corresponding API change in hash join bridge: setHashTable
2.  extend hash join bridge to add setSpilledHashTable API which used by hash
     probe operator to save the spilled table partitions in the hash join bridge. The
     function also clears the the spilled hash table cached in the build result in hash
     join bridge to release the held memory resource.
3.  extend hash join node reclaimer to reclaim from both build and probe side once.
4.  extend hash table clear() method to support free table used by probe side to
     free up memory held by the spilled build table.

The actual probe side spilling is in followup.

Pull Request resolved: facebookincubator#8926

Reviewed By: mbasmanova

Differential Revision: D54403110

Pulled By: xiaoxmeng

fbshipit-source-id: bae9509e7de996ed2c368ee74b572f69df4802ce
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Mar 5, 2024
1 parent ccb464b commit eba938b
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 275 deletions.
15 changes: 8 additions & 7 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -844,8 +844,9 @@ bool HashBuild::finishHashBuild() {
isInputFromSpill() ? spillConfig()->startPartitionBit
: BaseHashTable::kNoSpillInputStartPartitionBit);
addRuntimeStats();
if (joinBridge_->setHashTable(
std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) {
joinBridge_->setHashTable(
std::move(table_), std::move(spillPartitions), joinHasNullKeys_);
if (spillEnabled()) {
intermediateStateCleared_ = true;
spillGroup_->restart();
}
Expand Down Expand Up @@ -1212,11 +1213,11 @@ void HashBuild::reclaim(
}

bool HashBuild::nonReclaimableState() const {
// Apart from being in the nonReclaimable section,
// its also not reclaimable if:
// 1) the hash table has been built by the last build thread (inidicated
// by state_)
// 2) the last build operator has transferred ownership of 'this' operator's
// Apart from being in the nonReclaimable section, it's also not reclaimable
// if:
// 1) the hash table has been built by the last build thread (indicated by
// state_)
// 2) the last build operator has transferred ownership of 'this operator's
// intermediate state (table_ and spiller_) to itself
// 3) it has completed spilling before reaching either of the previous
// two states.
Expand Down
8 changes: 4 additions & 4 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ class HashBuild final : public Operator {
// The row type used for hash table build and disk spilling.
RowTypePtr tableType_;

// Used to serialize access to intermediate state variables (like 'table_' and
// 'spiller_'). This is only required when variables are accessed
// Used to serialize access to internal state including 'table_' and
// 'spiller_'. This is only required when variables are accessed
// concurrently, that is, when a thread tries to close the operator while
// another thread is building the hash table. Refer to 'close()' and
// finishHashBuild()' for more details.
Expand Down Expand Up @@ -309,8 +309,8 @@ class HashBuild final : public Operator {
uint64_t numSpillBytes_{0};

// This can be nullptr if either spilling is not allowed or it has been
// trsnaferred to the last hash build operator while in kWaitForBuild state or
// it has been cleared to setup a new one for recursive spilling.
// transferred to the last hash build operator while in kWaitForBuild state or
// it has been cleared to set up a new one for recursive spilling.
std::unique_ptr<Spiller> spiller_;

// Used to read input from previously spilled data for restoring.
Expand Down
79 changes: 57 additions & 22 deletions velox/exec/HashJoinBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ void HashJoinBridge::addBuilder() {
++numBuilders_;
}

bool HashJoinBridge::setHashTable(
void HashJoinBridge::setHashTable(
std::unique_ptr<BaseHashTable> table,
SpillPartitionSet spillPartitionSet,
bool hasNullKeys) {
VELOX_CHECK_NOT_NULL(table, "setHashTable called with null table");

auto spillPartitionIdSet = toSpillPartitionIdSet(spillPartitionSet);

bool hasSpillData;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
Expand All @@ -64,12 +63,25 @@ bool HashJoinBridge::setHashTable(
std::move(spillPartitionIdSet),
hasNullKeys);
restoringSpillPartitionId_.reset();

hasSpillData = !spillPartitionSets_.empty();
promises = std::move(promises_);
}
notify(std::move(promises));
return hasSpillData;
}

void HashJoinBridge::setSpilledHashTable(SpillPartitionSet spillPartitionSet) {
VELOX_CHECK(
!spillPartitionSet.empty(), "Spilled table partitions can't be empty");
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(started_);
VELOX_CHECK(buildResult_.has_value());
VELOX_CHECK(restoringSpillShards_.empty());
VELOX_CHECK(!restoringSpillPartitionId_.has_value());

for (auto& partitionEntry : spillPartitionSet) {
const auto id = partitionEntry.first;
VELOX_CHECK_EQ(spillPartitionSets_.count(id), 0);
spillPartitionSets_.emplace(id, std::move(partitionEntry.second));
}
}

void HashJoinBridge::setAntiJoinHasNullKeys() {
Expand Down Expand Up @@ -131,10 +143,8 @@ bool HashJoinBridge::probeFinished() {
spillPartitionSets_.begin()->second->split(numBuilders_);
VELOX_CHECK_EQ(restoringSpillShards_.size(), numBuilders_);
spillPartitionSets_.erase(spillPartitionSets_.begin());
promises = std::move(promises_);
} else {
VELOX_CHECK(promises_.empty());
}
promises = std::move(promises_);
}
notify(std::move(promises));
return hasSpillInput;
Expand All @@ -148,15 +158,23 @@ std::optional<HashJoinBridge::SpillInput> HashJoinBridge::spillInputOrFuture(
VELOX_DCHECK(
!restoringSpillPartitionId_.has_value() || !buildResult_.has_value());

// If 'buildResult_' is set, then the probe side is under processing. The
// build shall just wait.
if (buildResult_.has_value()) {
VELOX_CHECK(!restoringSpillPartitionId_.has_value());
promises_.emplace_back("HashJoinBridge::spillInputOrFuture");
*future = promises_.back().getSemiFuture();
return std::nullopt;
}

// If 'restoringSpillPartitionId_' is not set after probe side is done, then
// the join processing is all done.
if (!restoringSpillPartitionId_.has_value()) {
if (spillPartitionSets_.empty()) {
return HashJoinBridge::SpillInput{};
} else {
promises_.emplace_back("HashJoinBridge::spillInputOrFuture");
*future = promises_.back().getSemiFuture();
return std::nullopt;
}
VELOX_CHECK(spillPartitionSets_.empty());
VELOX_CHECK(restoringSpillShards_.empty());
return HashJoinBridge::SpillInput{};
}

VELOX_CHECK(!restoringSpillShards_.empty());
auto spillShard = std::move(restoringSpillShards_.back());
restoringSpillShards_.pop_back();
Expand All @@ -175,22 +193,39 @@ uint64_t HashJoinMemoryReclaimer::reclaim(
uint64_t targetBytes,
uint64_t maxWaitMs,
memory::MemoryReclaimer::Stats& stats) {
// The flags to track if we have reclaimed from both build and probe operators
// under a hash join node.
bool hasReclaimedFromBuild{false};
bool hasReclaimedFromProbe{false};
uint64_t reclaimedBytes{0};
pool->visitChildren([&](memory::MemoryPool* child) {
VELOX_CHECK_EQ(child->kind(), memory::MemoryPool::Kind::kLeaf);
// The hash probe operator do not support memory reclaim.
if (!isHashBuildMemoryPool(*child)) {
return true;
const bool isBuild = isHashBuildMemoryPool(*child);
if (isBuild) {
if (!hasReclaimedFromBuild) {
// We just need to reclaim from any one of the hash build operator.
hasReclaimedFromBuild = true;
reclaimedBytes = child->reclaim(targetBytes, maxWaitMs, stats);
}
return !hasReclaimedFromProbe;
}
// We only need to reclaim from any one of the hash build operators
// which will reclaim from all the peer hash build operators.
reclaimedBytes = child->reclaim(targetBytes, maxWaitMs, stats);
return false;

if (!hasReclaimedFromProbe) {
// The same as build operator, we only need to reclaim from any one of the
// hash probe operator.
hasReclaimedFromProbe = true;
reclaimedBytes = child->reclaim(targetBytes, maxWaitMs, stats);
}
return !hasReclaimedFromBuild;
});
return reclaimedBytes;
}

bool isHashBuildMemoryPool(const memory::MemoryPool& pool) {
return folly::StringPiece(pool.name()).endsWith("HashBuild");
}

bool isHashProbeMemoryPool(const memory::MemoryPool& pool) {
return folly::StringPiece(pool.name()).endsWith("HashProbe");
}
} // namespace facebook::velox::exec
29 changes: 21 additions & 8 deletions velox/exec/HashJoinBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

namespace facebook::velox::exec {

namespace test {
class HashJoinBridgeTestHelper;
}

/// Hands over a hash table from a multi-threaded build pipeline to a
/// multi-threaded probe pipeline. This is owned by shared_ptr by all the build
/// and probe Operator instances concerned. Corresponds to the Presto concept of
Expand All @@ -35,15 +39,20 @@ class HashJoinBridge : public JoinBridge {
/// HashBuild operators to parallelize the restoring operation.
void addBuilder();

/// Invoked by the build operator to set the built hash table.
/// 'spillPartitionSet' contains the spilled partitions while building
/// 'table'. The function returns true if there is spill data to restore
/// after HashProbe operators process 'table', otherwise false. This only
/// applies if the disk spilling is enabled.
bool setHashTable(
/// 'table' which only applies if the disk spilling is enabled.
void setHashTable(
std::unique_ptr<BaseHashTable> table,
SpillPartitionSet spillPartitionSet,
bool hasNullKeys);

/// Invoked by the probe operator to set the spilled hash table while the
/// probing. The function puts the spilled table partitions into
/// 'spillPartitionSets_' stack. This only applies if the disk spilling is
/// enabled.
void setSpilledHashTable(SpillPartitionSet spillPartitionSet);

void setAntiJoinHasNullKeys();

/// Represents the result of HashBuild operators: a hash table, an optional
Expand Down Expand Up @@ -75,8 +84,7 @@ class HashJoinBridge : public JoinBridge {
/// HashBuild operators. If HashProbe operator calls this early, 'future' will
/// be set to wait asynchronously, otherwise the built table along with
/// optional spilling related information will be returned in HashBuildResult.
std::optional<HashBuildResult> tableOrFuture(
ContinueFuture* FOLLY_NONNULL future);
std::optional<HashBuildResult> tableOrFuture(ContinueFuture* future);

/// Invoked by HashProbe operator after finishes probing the built table to
/// set one of the previously spilled partition to restore. The HashBuild
Expand All @@ -102,8 +110,7 @@ class HashJoinBridge : public JoinBridge {
/// If HashBuild operator calls this early, 'future' will be set to wait
/// asynchronously. If there is no more spill data to restore, then
/// 'spillPartition' will be set to null in the returned SpillInput.
std::optional<SpillInput> spillInputOrFuture(
ContinueFuture* FOLLY_NONNULL future);
std::optional<SpillInput> spillInputOrFuture(ContinueFuture* future);

private:
uint32_t numBuilders_{0};
Expand All @@ -129,6 +136,8 @@ class HashJoinBridge : public JoinBridge {
// This set can grow if HashBuild operator cannot load full partition in
// memory and engages in recursive spilling.
SpillPartitionSet spillPartitionSets_;

friend test::HashJoinBridgeTestHelper;
};

// Indicates if 'joinNode' is null-aware anti or left semi project join type and
Expand Down Expand Up @@ -156,4 +165,8 @@ class HashJoinMemoryReclaimer final : public MemoryReclaimer {
/// Returns true if 'pool' is a hash build operator's memory pool. The check is
/// currently based on the pool name.
bool isHashBuildMemoryPool(const memory::MemoryPool& pool);

/// Returns true if 'pool' is a hash probe operator's memory pool. The check is
/// currently based on the pool name.
bool isHashProbeMemoryPool(const memory::MemoryPool& pool);
} // namespace facebook::velox::exec
11 changes: 8 additions & 3 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -726,11 +726,16 @@ void HashTable<ignoreNullKeys>::allocateTables(uint64_t size) {
}

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::clear() {
void HashTable<ignoreNullKeys>::clear(bool freeTable) {
rows_->clear();
if (table_) {
// All modes have 8 bytes per slot.
memset(table_, 0, capacity_ * sizeof(char*));
if (!freeTable) {
// All modes have 8 bytes per slot.
::memset(table_, 0, capacity_ * sizeof(char*));
} else {
rows_->pool()->freeContiguous(tableAllocation_);
table_ = nullptr;
}
}
numDistinct_ = 0;
numTombstones_ = 0;
Expand Down
8 changes: 4 additions & 4 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ class BaseHashTable {
/// owned by 'this'.
virtual int64_t allocatedBytes() const = 0;

/// Deletes any content of 'this' but does not free the memory. Can
/// be used for flushing a partial group by, for example.
virtual void clear() = 0;
/// Deletes any content of 'this'. If 'freeTable' is false, then hash table is
/// not freed which can be used for flushing a partial group by, for example.
virtual void clear(bool freeTable = false) = 0;

/// Returns the capacity of the internal hash table which is number of rows
/// it can stores in a group by or hash join build.
Expand Down Expand Up @@ -502,7 +502,7 @@ class HashTable : public BaseHashTable {
int32_t maxRows,
char** rows) override;

void clear() override;
void clear(bool freeTable = false) override;

int64_t allocatedBytes() const override {
// For each row: sizeof(char*) per table entry + memory
Expand Down
Loading

0 comments on commit eba938b

Please sign in to comment.