Skip to content

Commit

Permalink
Move freedBytes inside group reclaiming method (facebookincubator#10375)
Browse files Browse the repository at this point in the history
Summary:
Move the freedBytes condition inside reclaiming method in shared arbitrator. At the same time rename targetBytes to requestBytes for future PR.

Pull Request resolved: facebookincubator#10375

Reviewed By: kewang1024

Differential Revision: D59285074

Pulled By: tanjialiang

fbshipit-source-id: cca6f4e586aef7a402db10ae0af695953dad39df
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Jul 8, 2024
1 parent 770f2ad commit 562a7dc
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 76 deletions.
118 changes: 55 additions & 63 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,11 @@ int64_t SharedArbitrator::minGrowCapacity(const MemoryPool& pool) const {

uint64_t SharedArbitrator::growCapacity(
MemoryPool* pool,
uint64_t targetBytes) {
uint64_t requestBytes) {
std::lock_guard<std::mutex> l(mutex_);
++numReserves_;
const int64_t maxBytesToReserve =
std::min<int64_t>(maxGrowCapacity(*pool), targetBytes);
std::min<int64_t>(maxGrowCapacity(*pool), requestBytes);
const int64_t minBytesToReserve = minGrowCapacity(*pool);
uint64_t reservedBytes =
decrementFreeCapacityLocked(maxBytesToReserve, minBytesToReserve);
Expand Down Expand Up @@ -283,45 +283,44 @@ uint64_t SharedArbitrator::decrementFreeCapacityLocked(

uint64_t SharedArbitrator::shrinkCapacity(
MemoryPool* pool,
uint64_t targetBytes) {
uint64_t requestBytes) {
std::lock_guard<std::mutex> l(mutex_);
++numReleases_;
const uint64_t freedBytes = shrinkPool(pool, targetBytes);
const uint64_t freedBytes = shrinkPool(pool, requestBytes);
incrementFreeCapacityLocked(freedBytes);
return freedBytes;
}

uint64_t SharedArbitrator::shrinkCapacity(
const std::vector<std::shared_ptr<MemoryPool>>& pools,
uint64_t targetBytes,
uint64_t requestBytes,
bool allowSpill,
bool allowAbort) {
incrementGlobalArbitrationCount();
ArbitrationOperation op(targetBytes, pools);
requestBytes = requestBytes == 0 ? capacity_ : requestBytes;
ArbitrationOperation op(requestBytes, pools);
ScopedArbitration scopedArbitration(this, &op);
if (targetBytes == 0) {
targetBytes = capacity_;
} else {
targetBytes = std::max(memoryPoolTransferCapacity_, targetBytes);
}

uint64_t fastReclaimTargetBytes =
std::max(memoryPoolTransferCapacity_, requestBytes);

std::lock_guard<std::shared_mutex> exclusiveLock(arbitrationLock_);
getCandidateStats(&op);
uint64_t freedBytes =
reclaimFreeMemoryFromCandidates(&op, targetBytes, false);
reclaimFreeMemoryFromCandidates(&op, fastReclaimTargetBytes, false);
auto freeGuard = folly::makeGuard([&]() {
// Returns the freed memory capacity back to the arbitrator.
if (freedBytes > 0) {
incrementFreeCapacity(freedBytes);
}
});
if (freedBytes >= targetBytes) {
if (freedBytes >= op.requestBytes) {
return freedBytes;
}
RECORD_METRIC_VALUE(kMetricArbitratorSlowGlobalArbitrationCount);
if (allowSpill) {
freedBytes +=
reclaimUsedMemoryFromCandidatesBySpill(&op, targetBytes - freedBytes);
if (freedBytes >= targetBytes) {
reclaimUsedMemoryFromCandidatesBySpill(&op, freedBytes);
if (freedBytes >= op.requestBytes) {
return freedBytes;
}
if (allowAbort) {
Expand All @@ -330,8 +329,7 @@ uint64_t SharedArbitrator::shrinkCapacity(
}
}
if (allowAbort) {
freedBytes +=
reclaimUsedMemoryFromCandidatesByAbort(&op, targetBytes - freedBytes);
reclaimUsedMemoryFromCandidatesByAbort(&op, freedBytes);
}
return freedBytes;
}
Expand All @@ -348,8 +346,8 @@ uint64_t SharedArbitrator::testingNumRequests() const {
bool SharedArbitrator::growCapacity(
MemoryPool* pool,
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
uint64_t targetBytes) {
ArbitrationOperation op(pool, targetBytes, candidatePools);
uint64_t requestBytes) {
ArbitrationOperation op(pool, requestBytes, candidatePools);
ScopedArbitration scopedArbitration(this, &op);

bool needGlobalArbitration{false};
Expand Down Expand Up @@ -390,12 +388,12 @@ bool SharedArbitrator::runLocalArbitration(
VELOX_MEM_LOG(ERROR) << "Can't grow " << op->requestRoot->name()
<< " capacity to "
<< succinctBytes(
op->requestRoot->capacity() + op->targetBytes)
op->requestRoot->capacity() + op->requestBytes)
<< " which exceeds its max capacity "
<< succinctBytes(op->requestRoot->maxCapacity())
<< ", current capacity "
<< succinctBytes(op->requestRoot->capacity())
<< ", request " << succinctBytes(op->targetBytes);
<< ", request " << succinctBytes(op->requestBytes);
return false;
}
VELOX_CHECK(!op->requestRoot->aborted());
Expand All @@ -415,8 +413,8 @@ bool SharedArbitrator::runLocalArbitration(
incrementFreeCapacity(freedBytes);
}
});
if (freedBytes >= op->targetBytes) {
checkedGrow(op->requestRoot, freedBytes, op->targetBytes);
if (freedBytes >= op->requestBytes) {
checkedGrow(op->requestRoot, freedBytes, op->requestBytes);
freedBytes = 0;
return true;
}
Expand All @@ -425,9 +423,9 @@ bool SharedArbitrator::runLocalArbitration(
getCandidateStats(op, true);
freedBytes +=
reclaimFreeMemoryFromCandidates(op, maxGrowTarget - freedBytes, true);
if (freedBytes >= op->targetBytes) {
if (freedBytes >= op->requestBytes) {
const uint64_t bytesToGrow = std::min(maxGrowTarget, freedBytes);
checkedGrow(op->requestRoot, bytesToGrow, op->targetBytes);
checkedGrow(op->requestRoot, bytesToGrow, op->requestBytes);
freedBytes -= bytesToGrow;
return true;
}
Expand All @@ -438,9 +436,9 @@ bool SharedArbitrator::runLocalArbitration(
}
checkIfAborted(op);

if (freedBytes >= op->targetBytes) {
if (freedBytes >= op->requestBytes) {
const uint64_t bytesToGrow = std::min(maxGrowTarget, freedBytes);
checkedGrow(op->requestRoot, bytesToGrow, op->targetBytes);
checkedGrow(op->requestRoot, bytesToGrow, op->requestBytes);
freedBytes -= bytesToGrow;
return true;
}
Expand Down Expand Up @@ -482,7 +480,7 @@ bool SharedArbitrator::runGlobalArbitration(ArbitrationOperation* op) {
VELOX_MEM_LOG(ERROR)
<< "Failed to arbitrate sufficient memory for memory pool "
<< op->requestRoot->name() << ", request "
<< succinctBytes(op->targetBytes) << " after " << attempts
<< succinctBytes(op->requestBytes) << " after " << attempts
<< " attempts, Arbitrator state: " << toString();
updateArbitrationFailureStats();
return false;
Expand All @@ -494,7 +492,7 @@ void SharedArbitrator::getGrowTargets(
uint64_t& minGrowTarget) {
maxGrowTarget = std::min(
maxGrowCapacity(*op->requestRoot),
std::max(memoryPoolTransferCapacity_, op->targetBytes));
std::max(memoryPoolTransferCapacity_, op->requestBytes));
minGrowTarget = minGrowCapacity(*op->requestRoot);
}

Expand All @@ -506,30 +504,30 @@ void SharedArbitrator::checkIfAborted(ArbitrationOperation* op) {
}

bool SharedArbitrator::maybeGrowFromSelf(ArbitrationOperation* op) {
if (op->requestRoot->freeBytes() >= op->targetBytes) {
if (growPool(op->requestRoot, 0, op->targetBytes)) {
if (op->requestRoot->freeBytes() >= op->requestBytes) {
if (growPool(op->requestRoot, 0, op->requestBytes)) {
return true;
}
}
return false;
}

bool SharedArbitrator::checkCapacityGrowth(ArbitrationOperation* op) const {
return (maxGrowCapacity(*op->requestRoot) >= op->targetBytes) &&
(capacityAfterGrowth(*op->requestRoot, op->targetBytes) <= capacity_);
return (maxGrowCapacity(*op->requestRoot) >= op->requestBytes) &&
(capacityAfterGrowth(*op->requestRoot, op->requestBytes) <= capacity_);
}

bool SharedArbitrator::ensureCapacity(ArbitrationOperation* op) {
if ((op->targetBytes > capacity_) ||
(op->targetBytes > op->requestRoot->maxCapacity())) {
if ((op->requestBytes > capacity_) ||
(op->requestBytes > op->requestRoot->maxCapacity())) {
return false;
}
if (checkCapacityGrowth(op)) {
return true;
}

const uint64_t reclaimedBytes =
reclaim(op->requestRoot, op->targetBytes, true);
reclaim(op->requestRoot, op->requestBytes, true);
// NOTE: return the reclaimed bytes back to the arbitrator and let the memory
// arbitration process to grow the requestor's memory capacity accordingly.
incrementFreeCapacity(reclaimedBytes);
Expand All @@ -543,7 +541,7 @@ bool SharedArbitrator::ensureCapacity(ArbitrationOperation* op) {

bool SharedArbitrator::handleOOM(ArbitrationOperation* op) {
MemoryPool* victim = findCandidateWithLargestCapacity(
op->requestRoot, op->targetBytes, op->candidates)
op->requestRoot, op->requestBytes, op->candidates)
.pool;
if (op->requestRoot == victim) {
VELOX_MEM_LOG(ERROR)
Expand All @@ -557,10 +555,10 @@ bool SharedArbitrator::handleOOM(ArbitrationOperation* op) {
try {
if (victim == op->requestRoot) {
VELOX_MEM_POOL_CAP_EXCEEDED(
memoryPoolAbortMessage(victim, op->requestRoot, op->targetBytes));
memoryPoolAbortMessage(victim, op->requestRoot, op->requestBytes));
} else {
VELOX_MEM_POOL_ABORTED(
memoryPoolAbortMessage(victim, op->requestRoot, op->targetBytes));
memoryPoolAbortMessage(victim, op->requestRoot, op->requestBytes));
}
} catch (VeloxRuntimeError&) {
abort(victim, std::current_exception());
Expand Down Expand Up @@ -597,8 +595,8 @@ bool SharedArbitrator::arbitrateMemory(ArbitrationOperation* op) {
incrementFreeCapacity(freedBytes);
}
});
if (freedBytes >= op->targetBytes) {
checkedGrow(op->requestRoot, freedBytes, op->targetBytes);
if (freedBytes >= op->requestBytes) {
checkedGrow(op->requestRoot, freedBytes, op->requestBytes);
freedBytes = 0;
return true;
}
Expand All @@ -609,31 +607,30 @@ bool SharedArbitrator::arbitrateMemory(ArbitrationOperation* op) {

freedBytes +=
reclaimFreeMemoryFromCandidates(op, maxGrowTarget - freedBytes, false);
if (freedBytes >= op->targetBytes) {
if (freedBytes >= op->requestBytes) {
const uint64_t bytesToGrow = std::min(maxGrowTarget, freedBytes);
checkedGrow(op->requestRoot, bytesToGrow, op->targetBytes);
checkedGrow(op->requestRoot, bytesToGrow, op->requestBytes);
freedBytes -= bytesToGrow;
return true;
}
VELOX_CHECK_LT(freedBytes, maxGrowTarget);

RECORD_METRIC_VALUE(kMetricArbitratorSlowGlobalArbitrationCount);
freedBytes +=
reclaimUsedMemoryFromCandidatesBySpill(op, maxGrowTarget - freedBytes);
reclaimUsedMemoryFromCandidatesBySpill(op, freedBytes);
checkIfAborted(op);

if (freedBytes < op->targetBytes) {
if (freedBytes < op->requestBytes) {
VELOX_MEM_LOG(WARNING)
<< "Failed to arbitrate sufficient memory for memory pool "
<< op->requestRoot->name() << ", request "
<< succinctBytes(op->targetBytes) << ", only "
<< succinctBytes(op->requestBytes) << ", only "
<< succinctBytes(freedBytes)
<< " has been freed, Arbitrator state: " << toString();
return false;
}

const uint64_t bytesToGrow = std::min(freedBytes, maxGrowTarget);
checkedGrow(op->requestRoot, bytesToGrow, op->targetBytes);
checkedGrow(op->requestRoot, bytesToGrow, op->requestBytes);
freedBytes -= bytesToGrow;
return true;
}
Expand Down Expand Up @@ -676,36 +673,32 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates(
return reclaimedBytes;
}

uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesBySpill(
void SharedArbitrator::reclaimUsedMemoryFromCandidatesBySpill(
ArbitrationOperation* op,
uint64_t reclaimTargetBytes) {
uint64_t& freedBytes) {
// Sort candidate memory pools based on their reclaimable used capacity.
sortCandidatesByReclaimableUsedCapacity(op->candidates);

uint64_t reclaimedBytes{0};
for (const auto& candidate : op->candidates) {
VELOX_CHECK_LT(reclaimedBytes, reclaimTargetBytes);
VELOX_CHECK_LT(freedBytes, op->requestBytes);
if (candidate.reclaimableBytes == 0) {
break;
}
reclaimedBytes +=
reclaim(candidate.pool, reclaimTargetBytes - reclaimedBytes, false);
if ((reclaimedBytes >= reclaimTargetBytes) ||
freedBytes += reclaim(candidate.pool, op->requestBytes - freedBytes, false);
if ((freedBytes >= op->requestBytes) ||
(op->requestRoot != nullptr && op->requestRoot->aborted())) {
break;
}
}
return reclaimedBytes;
}

uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesByAbort(
void SharedArbitrator::reclaimUsedMemoryFromCandidatesByAbort(
ArbitrationOperation* op,
uint64_t reclaimTargetBytes) {
uint64_t& freedBytes) {
sortCandidatesByUsage(op->candidates);

uint64_t freedBytes{0};
for (const auto& candidate : op->candidates) {
VELOX_CHECK_LT(freedBytes, reclaimTargetBytes);
VELOX_CHECK_LT(freedBytes, op->requestBytes);
if (candidate.pool->capacity() == 0) {
break;
}
Expand All @@ -720,11 +713,10 @@ uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesByAbort(
abort(candidate.pool, std::current_exception());
}
freedBytes += shrinkPool(candidate.pool, 0);
if (freedBytes >= reclaimTargetBytes) {
if (freedBytes >= op->requestBytes) {
break;
}
}
return freedBytes;
}

uint64_t SharedArbitrator::reclaim(
Expand Down
Loading

0 comments on commit 562a7dc

Please sign in to comment.