Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shrinkPool API to MemoryManager/MemoryArbitrator #6102

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,12 @@ bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) {
if (arbitrator_ == nullptr) {
return false;
}
// Holds a shared reference to each alive memory pool in 'pools_' to keep
// their aliveness during the memory arbitration process.
std::vector<std::shared_ptr<MemoryPool>> candidates = getAlivePools();
return arbitrator_->growMemory(pool, candidates, incrementBytes);
return arbitrator_->growMemory(pool, getAlivePools(), incrementBytes);
}

uint64_t MemoryManager::shrinkPools(uint64_t targetBytes) {
VELOX_CHECK_NOT_NULL(arbitrator_);
return arbitrator_->shrinkMemory(getAlivePools(), targetBytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getAlivePools() holds the memory pool while shrinking memory. So let's change to the following (as what growPool does)

  // Holds a shared reference to each alive memory pool in 'pools_' to keep
  // their aliveness during the memory arbitration process.
  std::vector<std::shared_ptr<MemoryPool>> candidates = getAlivePools();
  return arbitrator_->shrinkMemory(candidates, targetBytes);

Copy link
Contributor Author

@tanjialiang tanjialiang Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference? Calling getAlivePools() will pass a copy of every shared_ptr to shrinkMemory() and hence keeping them alive. growPools() should make them a single line for simplicity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Could you make a change to growMemory? Thanks!

}

void MemoryManager::dropPool(MemoryPool* pool) {
Expand Down
4 changes: 4 additions & 0 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ class MemoryManager {
/// 'incrementBytes'. The function returns true on success, otherwise false.
bool growPool(MemoryPool* pool, uint64_t incrementBytes);

/// Invoked to shrink alive pools to free 'targetBytes' capacity. The function
/// returns the actual freed memory capacity in bytes.
uint64_t shrinkPools(uint64_t targetBytes);

/// Default unmanaged leaf pool with no threadsafe stats support. Libraries
/// using this method can get a pool that is shared with other threads. The
/// goal is to minimize lock contention while supporting such use cases.
Expand Down
7 changes: 7 additions & 0 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ class MemoryArbitrator {
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
uint64_t targetBytes) = 0;

/// Invoked by the memory manager to shrink memory from a given list of memory
/// pools. The freed memory capacity is given back to the arbitrator. The
/// function returns the actual freed memory capacity in bytes.
virtual uint64_t shrinkMemory(
const std::vector<std::shared_ptr<MemoryPool>>& pools,
uint64_t targetBytes) = 0;

/// The internal execution stats of the memory arbitrator.
struct Stats {
/// The number of arbitration requests.
Expand Down
6 changes: 6 additions & 0 deletions velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ class SharedArbitrator : public MemoryArbitrator {
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
uint64_t targetBytes) final;

uint64_t shrinkMemory(
const std::vector<std::shared_ptr<MemoryPool>>& /*unused*/,
uint64_t /*unused*/) override final {
VELOX_NYI("shrinkMemory is not supported by SharedArbitrator");
}

Stats stats() const final;

std::string kind() override;
Expand Down
22 changes: 17 additions & 5 deletions velox/common/memory/tests/MemoryArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ class FakeTestArbitrator : public MemoryArbitrator {
.retryArbitrationFailure = config.retryArbitrationFailure}) {}

void reserveMemory(MemoryPool* pool, uint64_t bytes) override {
VELOX_NYI()
VELOX_NYI();
}

void releaseMemory(MemoryPool* pool) override{VELOX_NYI()}
void releaseMemory(MemoryPool* pool) override {
VELOX_NYI();
}

std::string kind() override {
return "USER";
Expand All @@ -93,12 +95,22 @@ class FakeTestArbitrator : public MemoryArbitrator {
bool growMemory(
MemoryPool* pool,
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
uint64_t targetBytes) override{VELOX_NYI()}
uint64_t targetBytes) override {
VELOX_NYI();
}

Stats stats() const override{VELOX_NYI()}
uint64_t shrinkMemory(
const std::vector<std::shared_ptr<MemoryPool>>& pools,
uint64_t targetBytes) override {
VELOX_NYI();
}

Stats stats() const override {
VELOX_NYI();
}

std::string toString() const override {
VELOX_NYI()
VELOX_NYI();
}
};
} // namespace
Expand Down
4 changes: 4 additions & 0 deletions velox/common/memory/tests/MemoryManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ class FakeTestArbitrator : public MemoryArbitrator {
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
uint64_t targetBytes) override{VELOX_NYI()}

uint64_t shrinkMemory(
const std::vector<std::shared_ptr<MemoryPool>>& pools,
uint64_t targetBytes) override{VELOX_NYI()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


Stats stats() const override{VELOX_NYI()}

std::string toString() const override{VELOX_NYI()}
Expand Down
5 changes: 5 additions & 0 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,11 @@ TEST_F(MockSharedArbitrationTest, arbitrationFailsTask) {
growOp->freeAll();
}

TEST_F(MockSharedArbitrationTest, shrinkMemory) {
std::vector<std::shared_ptr<MemoryPool>> pools;
ASSERT_THROW(arbitrator_->shrinkMemory(pools, 128), VeloxException);
}

TEST_F(MockSharedArbitrationTest, singlePoolGrowWithoutArbitration) {
auto* memOp = addMemoryOp();
const int allocateSize = 1 * MB;
Expand Down