Skip to content

Commit

Permalink
Do not suspend task if probe and build operator exceeds spill limit
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Oct 2, 2024
1 parent 42940c8 commit bf18946
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 76 deletions.
2 changes: 1 addition & 1 deletion velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ class QueryConfig {

/// Returns the number of bits used to calculate the spill partition number
/// for hash join and RowNumber. The number of spill partitions will be power
/// of tow.
/// of two.
/// NOTE: as for now, we only support up to 8-way spill partitioning.
uint8_t spillNumPartitionBits() const {
constexpr uint8_t kDefaultBits = 3;
Expand Down
45 changes: 30 additions & 15 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
VELOX_CHECK_NULL(spiller_);
VELOX_CHECK_NULL(spillInputReader_);

if (!spillEnabled()) {
if (!canSpill()) {
return;
}
if (spillType_ == nullptr) {
Expand Down Expand Up @@ -428,15 +428,15 @@ void HashBuild::addInput(RowVectorPtr input) {
void HashBuild::ensureInputFits(RowVectorPtr& input) {
// NOTE: we don't need memory reservation if all the partitions are spilling
// as we spill all the input rows to disk directly.
if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled()) {
if (!canSpill() || spiller_ == nullptr || spiller_->isAllSpilled()) {
return;
}

// NOTE: we simply reserve memory all inputs even though some of them are
// spilling directly. It is okay as we will accumulate the extra reservation
// in the operator's memory pool, and won't make any new reservation if there
// is already sufficient reservations.
VELOX_CHECK(spillEnabled());
VELOX_CHECK(canSpill());

auto* rows = table_->rows();
const auto numRows = rows->numRows();
Expand Down Expand Up @@ -515,7 +515,7 @@ void HashBuild::ensureInputFits(RowVectorPtr& input) {
void HashBuild::spillInput(const RowVectorPtr& input) {
VELOX_CHECK_EQ(input->size(), activeRows_.size());

if (!spillEnabled() || spiller_ == nullptr || !spiller_->isAnySpilled() ||
if (!canSpill() || spiller_ == nullptr || !spiller_->isAnySpilled() ||
!activeRows_.hasSelections()) {
return;
}
Expand Down Expand Up @@ -615,7 +615,7 @@ void HashBuild::spillPartition(
vector_size_t size,
const BufferPtr& indices,
const RowVectorPtr& input) {
VELOX_DCHECK(spillEnabled());
VELOX_DCHECK(canSpill());

if (isInputFromSpill()) {
spiller_->spill(partition, wrap(size, indices, input));
Expand Down Expand Up @@ -780,7 +780,7 @@ bool HashBuild::finishHashBuild() {
addRuntimeStats();
joinBridge_->setHashTable(
std::move(table_), std::move(spillPartitions), joinHasNullKeys_);
if (spillEnabled()) {
if (canSpill()) {
stateCleared_ = true;
}
return true;
Expand All @@ -789,7 +789,7 @@ bool HashBuild::finishHashBuild() {
void HashBuild::ensureTableFits(uint64_t numRows) {
// NOTE: we don't need memory reservation if all the partitions have been
// spilled as nothing need to be built.
if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled() ||
if (!canSpill() || spiller_ == nullptr || spiller_->isAllSpilled() ||
numRows == 0) {
return;
}
Expand Down Expand Up @@ -832,7 +832,7 @@ void HashBuild::ensureTableFits(uint64_t numRows) {
void HashBuild::postHashBuildProcess() {
checkRunning();

if (!spillEnabled()) {
if (!canSpill()) {
setState(State::kFinish);
return;
}
Expand Down Expand Up @@ -997,7 +997,7 @@ void HashBuild::checkStateTransition(State state) {
VELOX_CHECK_NE(state_, state);
switch (state) {
case State::kRunning:
if (!spillEnabled()) {
if (!canSpill()) {
VELOX_CHECK_EQ(state_, State::kWaitForBuild);
} else {
VELOX_CHECK_NE(state_, State::kFinish);
Expand Down Expand Up @@ -1037,21 +1037,36 @@ std::string HashBuild::stateName(State state) {
}
}

bool HashBuild::canSpill() const {
return Operator::canSpill() &&
!operatorCtx_->task()->hasMixedExecutionGroup();
}

bool HashBuild::canReclaim() const {
return canSpill() && !operatorCtx_->task()->hasMixedExecutionGroup();
return canSpill() && !exceededMaxSpillLevelLimit_;
}

void HashBuild::reclaim(
uint64_t /*unused*/,
memory::MemoryReclaimer::Stats& stats) {
VELOX_CHECK(canReclaim());
TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this);
VELOX_CHECK(canSpill());
auto* driver = operatorCtx_->driver();
VELOX_CHECK_NOT_NULL(driver);
VELOX_CHECK(!nonReclaimableSection_);

TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this);

if (exceededMaxSpillLevelLimit_) {
if (UNLIKELY(exceededMaxSpillLevelLimit_)) {
// 'canReclaim()' already checks the spill limit is not exceeding max, there
// is only a small chance from the time 'canReclaim()' is checked to the
// actual reclaim happens that the operator has spilled such that the spill
// level exceeds max.
const auto* config = spillConfig();
VELOX_CHECK_NOT_NULL(config);
LOG(WARNING)
<< "Can't reclaim from hash build operator, exceeded maximum spill "
"level of "
<< config->maxSpillLevel << ", " << pool()->name() << ", usage "
<< succinctBytes(pool()->usedBytes());
return;
}

Expand Down Expand Up @@ -1080,7 +1095,7 @@ void HashBuild::reclaim(
for (auto* op : operators) {
HashBuild* buildOp = dynamic_cast<HashBuild*>(op);
VELOX_CHECK_NOT_NULL(buildOp);
VELOX_CHECK(buildOp->canReclaim());
VELOX_CHECK(buildOp->canSpill());
if (buildOp->nonReclaimableState()) {
// TODO: reduce the log frequency if it is too verbose.
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
Expand Down
4 changes: 1 addition & 3 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ class HashBuild final : public Operator {
// process which will be set by the join probe side.
void postHashBuildProcess();

bool spillEnabled() const {
return canReclaim();
}
bool canSpill() const override;

// Indicates if the input is read from spill data or not.
bool isInputFromSpill() const;
Expand Down
46 changes: 29 additions & 17 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ bool HashProbe::isSpillInput() const {

void HashProbe::prepareForSpillRestore() {
checkRunning();
VELOX_CHECK(spillEnabled());
VELOX_CHECK(canSpill());
VELOX_CHECK(hasMoreSpillData());

// Reset the internal states which are relevant to the previous probe run.
Expand Down Expand Up @@ -512,7 +512,7 @@ void HashProbe::spillInput(RowVectorPtr& input) {
void HashProbe::prepareInputIndicesBuffers(
vector_size_t numInput,
const folly::F14FastSet<uint32_t>& spillPartitions) {
VELOX_DCHECK(spillEnabled());
VELOX_DCHECK(canSpill());
const auto maxIndicesBufferBytes = numInput * sizeof(vector_size_t);
if (nonSpillInputIndicesBuffer_ == nullptr ||
!nonSpillInputIndicesBuffer_->isMutable() ||
Expand Down Expand Up @@ -548,7 +548,7 @@ BlockingReason HashProbe::isBlocked(ContinueFuture* future) {
}
break;
case ProbeOperatorState::kWaitForPeers:
VELOX_CHECK(spillEnabled());
VELOX_CHECK(canSpill());
if (!future_.valid()) {
setRunning();
}
Expand Down Expand Up @@ -873,17 +873,18 @@ bool HashProbe::skipProbeOnEmptyBuild() const {
isRightSemiProjectJoin(joinType_);
}

bool HashProbe::spillEnabled() const {
return canSpill() && !operatorCtx_->task()->hasMixedExecutionGroup();
bool HashProbe::canSpill() const {
return Operator::canSpill() &&
!operatorCtx_->task()->hasMixedExecutionGroup();
}

bool HashProbe::hasMoreSpillData() const {
VELOX_CHECK(spillPartitionSet_.empty() || spillEnabled());
VELOX_CHECK(spillPartitionSet_.empty() || canSpill());
return !spillPartitionSet_.empty() || needSpillInput();
}

bool HashProbe::needSpillInput() const {
VELOX_CHECK(spillInputPartitionIds_.empty() || spillEnabled());
VELOX_CHECK(spillInputPartitionIds_.empty() || canSpill());
VELOX_CHECK_EQ(spillInputPartitionIds_.empty(), inputSpiller_ == nullptr);

return !spillInputPartitionIds_.empty();
Expand All @@ -898,7 +899,7 @@ void HashProbe::checkStateTransition(ProbeOperatorState state) {
VELOX_CHECK_NE(state_, state);
switch (state) {
case ProbeOperatorState::kRunning:
if (!spillEnabled()) {
if (!canSpill()) {
VELOX_CHECK_EQ(state_, ProbeOperatorState::kWaitForBuild);
} else {
VELOX_CHECK(
Expand All @@ -907,7 +908,7 @@ void HashProbe::checkStateTransition(ProbeOperatorState state) {
}
break;
case ProbeOperatorState::kWaitForPeers:
VELOX_CHECK(spillEnabled());
VELOX_CHECK(canSpill());
[[fallthrough]];
case ProbeOperatorState::kWaitForBuild:
[[fallthrough]];
Expand Down Expand Up @@ -965,7 +966,7 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) {
prepareForSpillRestore();
asyncWaitForHashTable();
} else {
if (lastProber_ && spillEnabled()) {
if (lastProber_ && canSpill()) {
joinBridge_->probeFinished();
wakeupPeerOperators();
}
Expand Down Expand Up @@ -1568,7 +1569,7 @@ void HashProbe::noMoreInputInternal() {
VELOX_CHECK_EQ(spillStats_.rlock()->spillSortTimeNanos, 0);
}

const bool hasSpillEnabled = spillEnabled();
const bool hasSpillEnabled = canSpill();
std::vector<ContinuePromise> promises;
std::vector<std::shared_ptr<Driver>> peers;
// The last operator to finish processing inputs is responsible for
Expand Down Expand Up @@ -1651,19 +1652,30 @@ void HashProbe::ensureOutputFits() {
}

bool HashProbe::canReclaim() const {
return spillEnabled();
return canSpill() && !exceededMaxSpillLevelLimit_;
}

void HashProbe::reclaim(
uint64_t /*unused*/,
memory::MemoryReclaimer::Stats& stats) {
VELOX_CHECK(canReclaim());
TestValue::adjust("facebook::velox::exec::HashProbe::reclaim", this);
VELOX_CHECK(canSpill());
auto* driver = operatorCtx_->driver();
VELOX_CHECK_NOT_NULL(driver);
VELOX_CHECK(!nonReclaimableSection_);

if (exceededMaxSpillLevelLimit_) {
// NOTE: we might have reached to the max spill limit.
if (UNLIKELY(exceededMaxSpillLevelLimit_)) {
// 'canReclaim()' already checks the spill limit is not exceeding max, there
// is only a small chance from the time 'canReclaim()' is checked to the
// actual reclaim happens that the operator has spilled such that the spill
// level exceeds max.
const auto* config = spillConfig();
VELOX_CHECK_NOT_NULL(config);
LOG(WARNING)
<< "Can't reclaim from hash probe operator, exceeded maximum spill "
"level of "
<< config->maxSpillLevel << ", " << pool()->name() << ", usage "
<< succinctBytes(pool()->usedBytes());
return;
}

Expand Down Expand Up @@ -1693,7 +1705,7 @@ void HashProbe::reclaim(
bool hasMoreProbeInput{false};
for (auto* probeOp : probeOps) {
VELOX_CHECK_NOT_NULL(probeOp);
VELOX_CHECK(probeOp->canReclaim());
VELOX_CHECK(probeOp->canSpill());
if (probeOp->nonReclaimableState()) {
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
++stats.numNonReclaimableAttempts;
Expand Down Expand Up @@ -1932,7 +1944,7 @@ std::unique_ptr<Spiller> HashProbe::spillTable(RowContainer* subTableRows) {

void HashProbe::prepareTableSpill(
const std::optional<SpillPartitionId>& restoredPartitionId) {
if (!spillEnabled()) {
if (!canSpill()) {
return;
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class HashProbe : public Operator {
void prepareTableSpill(
const std::optional<SpillPartitionId>& restoredPartitionId);

bool spillEnabled() const;
bool canSpill() const override;

// Indicates if the probe input is read from spilled data or not.
bool isSpillInput() const;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ class Operator : public BaseRuntimeStatWriter {
void maybeSetReclaimer();

/// Returns true if this is a spillable operator and has configured spilling.
FOLLY_ALWAYS_INLINE bool canSpill() const {
FOLLY_ALWAYS_INLINE virtual bool canSpill() const {
return spillConfig_.has_value();
}

Expand Down
Loading

0 comments on commit bf18946

Please sign in to comment.