Skip to content

Commit

Permalink
Add shrinkPool API to MemoryManager/MemoryArbitrator
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Aug 16, 2023
1 parent 215c784 commit 262f97a
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 9 deletions.
10 changes: 6 additions & 4 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,12 @@ bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) {
if (arbitrator_ == nullptr) {
return false;
}
// Holds a shared reference to each alive memory pool in 'pools_' to keep
// their aliveness during the memory arbitration process.
std::vector<std::shared_ptr<MemoryPool>> candidates = getAlivePools();
return arbitrator_->growMemory(pool, candidates, incrementBytes);
return arbitrator_->growMemory(pool, getAlivePools(), incrementBytes);
}

uint64_t MemoryManager::shrinkPools(uint64_t targetBytes) {
VELOX_CHECK_NOT_NULL(arbitrator_);
return arbitrator_->shrinkMemory(getAlivePools(), targetBytes);
}

void MemoryManager::dropPool(MemoryPool* pool) {
Expand Down
4 changes: 4 additions & 0 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ class MemoryManager {
/// 'incrementBytes'. The function returns true on success, otherwise false.
bool growPool(MemoryPool* pool, uint64_t incrementBytes);

/// Invoked to shrink alive pools to free 'targetBytes' capacity. The function
/// returns the actual freed memory capacity in bytes.
uint64_t shrinkPools(uint64_t targetBytes);

/// Default unmanaged leaf pool with no threadsafe stats support. Libraries
/// using this method can get a pool that is shared with other threads. The
/// goal is to minimize lock contention while supporting such use cases.
Expand Down
7 changes: 7 additions & 0 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ class MemoryArbitrator {
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
uint64_t targetBytes) = 0;

/// Invoked by the memory manager to shrink memory from a given list of memory
/// pools. The freed memory capacity is given back to the arbitrator. The
/// function returns the actual freed memory capacity in bytes.
virtual uint64_t shrinkMemory(
const std::vector<std::shared_ptr<MemoryPool>>& pools,
uint64_t targetBytes) = 0;

/// The internal execution stats of the memory arbitrator.
struct Stats {
/// The number of arbitration requests.
Expand Down
6 changes: 6 additions & 0 deletions velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ class SharedArbitrator : public MemoryArbitrator {
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
uint64_t targetBytes) final;

uint64_t shrinkMemory(
const std::vector<std::shared_ptr<MemoryPool>>& /*unused*/,
uint64_t /*unused*/) override final {
VELOX_NYI("shrinkMemory is not supported by SharedArbitrator");
}

Stats stats() const final;

std::string kind() override;
Expand Down
22 changes: 17 additions & 5 deletions velox/common/memory/tests/MemoryArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ class FakeTestArbitrator : public MemoryArbitrator {
.retryArbitrationFailure = config.retryArbitrationFailure}) {}

void reserveMemory(MemoryPool* pool, uint64_t bytes) override {
VELOX_NYI()
VELOX_NYI();
}

void releaseMemory(MemoryPool* pool) override{VELOX_NYI()}
void releaseMemory(MemoryPool* pool) override {
VELOX_NYI();
}

std::string kind() override {
return "USER";
Expand All @@ -93,12 +95,22 @@ class FakeTestArbitrator : public MemoryArbitrator {
bool growMemory(
MemoryPool* pool,
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
uint64_t targetBytes) override{VELOX_NYI()}
uint64_t targetBytes) override {
VELOX_NYI();
}

Stats stats() const override{VELOX_NYI()}
uint64_t shrinkMemory(
const std::vector<std::shared_ptr<MemoryPool>>& pools,
uint64_t targetBytes) override {
VELOX_NYI();
}

Stats stats() const override {
VELOX_NYI();
}

std::string toString() const override {
VELOX_NYI()
VELOX_NYI();
}
};
} // namespace
Expand Down
4 changes: 4 additions & 0 deletions velox/common/memory/tests/MemoryManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ class FakeTestArbitrator : public MemoryArbitrator {
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
uint64_t targetBytes) override{VELOX_NYI()}

uint64_t shrinkMemory(
const std::vector<std::shared_ptr<MemoryPool>>& pools,
uint64_t targetBytes) override{VELOX_NYI()}

Stats stats() const override{VELOX_NYI()}

std::string toString() const override{VELOX_NYI()}
Expand Down
5 changes: 5 additions & 0 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,11 @@ TEST_F(MockSharedArbitrationTest, arbitrationFailsTask) {
growOp->freeAll();
}

TEST_F(MockSharedArbitrationTest, shrinkMemory) {
std::vector<std::shared_ptr<MemoryPool>> pools;
ASSERT_THROW(arbitrator_->shrinkMemory(pools, 128), VeloxException);
}

TEST_F(MockSharedArbitrationTest, singlePoolGrowWithoutArbitration) {
auto* memOp = addMemoryOp();
const int allocateSize = 1 * MB;
Expand Down

0 comments on commit 262f97a

Please sign in to comment.