Skip to content

Commit

Permalink
Relax the reclaimed bytes check for hash build (#10833)
Browse files Browse the repository at this point in the history
Summary:
Join fuzzer detects check failure on hash build operator reclaim as the operator memory usage gets increased after reclamation. This happens when the parallel join build is running at a background and the memory reclamation is
skipped. The parallel join build can cause additional memory usage such as data structure used to store parallel join
data partitioning as well as duplicate row vector. This PR fixes this by skipping the reclaimed bytes check for hash
build which only use the spill memory pool for memory reclamation.

Pull Request resolved: #10833

Test Plan: This PR has run through join fuzzer in opt mode for 5 hours.

Reviewed By: Yuhta

Differential Revision: D61800038

Pulled By: xiaoxmeng

fbshipit-source-id: 3135e6d908082598b4afe7e52efbb85611b57f60
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Aug 27, 2024
1 parent 3bf1a74 commit c74b5e1
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 23 deletions.
23 changes: 11 additions & 12 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ void HashTable<ignoreNullKeys>::buildJoinPartition(
buildPartitionBounds_[partition],
buildPartitionBounds_[partition + 1],
overflow};
auto rowContainer =
auto* rowContainer =
(partition == 0 ? this : otherTables_[partition - 1].get())->rows();
for (auto i = 0; i < numPartitions; ++i) {
auto* table = i == 0 ? this : otherTables_[i - 1].get();
Expand Down Expand Up @@ -1138,27 +1138,26 @@ void HashTable<ignoreNullKeys>::insertForGroupBy(

template <bool ignoreNullKeys>
bool HashTable<ignoreNullKeys>::arrayPushRow(char* row, int32_t index) {
auto existing = table_[index];
if (existing) {
if (nextOffset_) {
auto* existingRow = table_[index];
if (existingRow != nullptr) {
if (nextOffset_ > 0) {
hasDuplicates_ = true;
rows_->appendNextRow(existing, row);
rows_->appendNextRow(existingRow, row);
}
return false;
}
table_[index] = row;
return !existing;
return existingRow == nullptr;
}

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::pushNext(
RowContainer* rows,
char* row,
char* next) {
if (nextOffset_ > 0) {
hasDuplicates_ = true;
rows->appendNextRow(row, next);
}
VELOX_CHECK_GT(nextOffset_, 0);
hasDuplicates_ = true;
rows->appendNextRow(row, next);
}

template <bool ignoreNullKeys>
Expand Down Expand Up @@ -1187,7 +1186,7 @@ FOLLY_ALWAYS_INLINE void HashTable<ignoreNullKeys>::buildFullProbe(
[&](char* group, int32_t /*row*/) {
if (RowContainer::normalizedKey(group) ==
RowContainer::normalizedKey(inserted)) {
if (nextOffset_) {
if (nextOffset_ > 0) {
pushNext(rows, group, inserted);
}
return true;
Expand Down Expand Up @@ -1809,7 +1808,7 @@ int32_t HashTable<ignoreNullKeys>::listJoinResults(
(joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) +
iter.fixedSizeListColumnsSizeSum);
} else {
auto numRows = rows->size();
const auto numRows = rows->size();
auto num =
std::min(numRows - iter.lastDuplicateRowIndex, maxOut - numOut);
std::fill_n(inputRows.begin() + numOut, num, row);
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@ class HashTable : public BaseHashTable {
std::atomic<bool> hasDuplicates_{false};

// Offset of next row link for join build side set from 'rows_'.
int32_t nextOffset_;
int32_t nextOffset_{0};
char** table_ = nullptr;
memory::ContiguousAllocation tableAllocation_;

Expand Down
32 changes: 23 additions & 9 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,25 +304,29 @@ void Operator::recordSpillStats() {
lockedStats->addRuntimeStat(
kSpillFillTime,
RuntimeCounter{
static_cast<int64_t>(lockedSpillStats->spillFillTimeNanos)});
static_cast<int64_t>(lockedSpillStats->spillFillTimeNanos),
RuntimeCounter::Unit::kNanos});
}
if (lockedSpillStats->spillSortTimeNanos != 0) {
lockedStats->addRuntimeStat(
kSpillSortTime,
RuntimeCounter{
static_cast<int64_t>(lockedSpillStats->spillSortTimeNanos)});
static_cast<int64_t>(lockedSpillStats->spillSortTimeNanos),
RuntimeCounter::Unit::kNanos});
}
if (lockedSpillStats->spillSerializationTimeNanos != 0) {
lockedStats->addRuntimeStat(
kSpillSerializationTime,
RuntimeCounter{static_cast<int64_t>(
lockedSpillStats->spillSerializationTimeNanos)});
RuntimeCounter{
static_cast<int64_t>(lockedSpillStats->spillSerializationTimeNanos),
RuntimeCounter::Unit::kNanos});
}
if (lockedSpillStats->spillFlushTimeNanos != 0) {
lockedStats->addRuntimeStat(
kSpillFlushTime,
RuntimeCounter{
static_cast<int64_t>(lockedSpillStats->spillFlushTimeNanos)});
static_cast<int64_t>(lockedSpillStats->spillFlushTimeNanos),
RuntimeCounter::Unit::kNanos});
}
if (lockedSpillStats->spillWrites != 0) {
lockedStats->addRuntimeStat(
Expand All @@ -333,7 +337,8 @@ void Operator::recordSpillStats() {
lockedStats->addRuntimeStat(
kSpillWriteTime,
RuntimeCounter{
static_cast<int64_t>(lockedSpillStats->spillWriteTimeNanos)});
static_cast<int64_t>(lockedSpillStats->spillWriteTimeNanos),
RuntimeCounter::Unit::kNanos});
}
if (lockedSpillStats->spillRuns != 0) {
lockedStats->addRuntimeStat(
Expand Down Expand Up @@ -369,14 +374,17 @@ void Operator::recordSpillStats() {
lockedStats->addRuntimeStat(
kSpillReadTime,
RuntimeCounter{
static_cast<int64_t>(lockedSpillStats->spillReadTimeNanos)});
static_cast<int64_t>(lockedSpillStats->spillReadTimeNanos),
RuntimeCounter::Unit::kNanos});
}

if (lockedSpillStats->spillDeserializationTimeNanos != 0) {
lockedStats->addRuntimeStat(
kSpillDeserializationTime,
RuntimeCounter{static_cast<int64_t>(
lockedSpillStats->spillDeserializationTimeNanos)});
RuntimeCounter{
static_cast<int64_t>(
lockedSpillStats->spillDeserializationTimeNanos),
RuntimeCounter::Unit::kNanos});
}
lockedSpillStats->reset();
}
Expand Down Expand Up @@ -648,6 +656,12 @@ uint64_t Operator::MemoryReclaimer::reclaim(
memory::ScopedReclaimedBytesRecorder recoder(pool, &reclaimedBytes);
op_->reclaim(targetBytes, stats);
}
// NOTE: the parallel hash build is running at the background thread
// pool which won't stop during memory reclamation so the operator's
// memory usage might increase in such case. memory usage.
if (op_->operatorType() == "HashBuild") {
reclaimedBytes = std::max<int64_t>(0, reclaimedBytes);
}
return reclaimedBytes;
},
stats);
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,9 @@ int32_t RowContainer::findRows(folly::Range<char**> rows, char** result) {
}

void RowContainer::appendNextRow(char* current, char* nextRow) {
VELOX_CHECK(getNextRowVector(nextRow) == nullptr);
NextRowVector*& nextRowArrayPtr = getNextRowVector(current);
if (!nextRowArrayPtr) {
if (nextRowArrayPtr == nullptr) {
nextRowArrayPtr =
new (stringAllocator_->allocate(kNextRowVectorSize)->begin())
NextRowVector(StlAllocator<char*>(stringAllocator_.get()));
Expand Down

0 comments on commit c74b5e1

Please sign in to comment.