diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp index daabd7d0d1de..ad239a2d1551 100644 --- a/velox/common/memory/Memory.cpp +++ b/velox/common/memory/Memory.cpp @@ -175,10 +175,12 @@ bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) { if (arbitrator_ == nullptr) { return false; } - // Holds a shared reference to each alive memory pool in 'pools_' to keep - // their aliveness during the memory arbitration process. - std::vector> candidates = getAlivePools(); - return arbitrator_->growMemory(pool, candidates, incrementBytes); + return arbitrator_->growMemory(pool, getAlivePools(), incrementBytes); +} + +uint64_t MemoryManager::shrinkPools(uint64_t targetBytes) { + VELOX_CHECK_NOT_NULL(arbitrator_); + return arbitrator_->shrinkMemory(getAlivePools(), targetBytes); } void MemoryManager::dropPool(MemoryPool* pool) { diff --git a/velox/common/memory/Memory.h b/velox/common/memory/Memory.h index 46ef048b048d..678e355d58b8 100644 --- a/velox/common/memory/Memory.h +++ b/velox/common/memory/Memory.h @@ -157,6 +157,10 @@ class MemoryManager { /// 'incrementBytes'. The function returns true on success, otherwise false. bool growPool(MemoryPool* pool, uint64_t incrementBytes); + /// Invoked to shrink alive pools to free 'targetBytes' capacity. The function + /// returns the actual freed memory capacity in bytes. + uint64_t shrinkPools(uint64_t targetBytes); + /// Default unmanaged leaf pool with no threadsafe stats support. Libraries /// using this method can get a pool that is shared with other threads. The /// goal is to minimize lock contention while supporting such use cases. diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index d6232ba04c8d..505b70321300 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -140,6 +140,13 @@ class MemoryArbitrator { const std::vector>& candidatePools, uint64_t targetBytes) = 0; + /// Invoked by the memory manager to shrink memory from a given list of memory + /// pools. The freed memory capacity is given back to the arbitrator. The + /// function returns the actual freed memory capacity in bytes. + virtual uint64_t shrinkMemory( + const std::vector>& pools, + uint64_t targetBytes) = 0; + /// The internal execution stats of the memory arbitrator. struct Stats { /// The number of arbitration requests. diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index b0aed37b7e9f..7ba6c60a1377 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -50,6 +50,12 @@ class SharedArbitrator : public MemoryArbitrator { const std::vector>& candidatePools, uint64_t targetBytes) final; + uint64_t shrinkMemory( + const std::vector>& /*unused*/, + uint64_t /*unused*/) override final { + VELOX_NYI("shrinkMemory is not supported by SharedArbitrator"); + } + Stats stats() const final; std::string kind() override; diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index a5345eb82ac2..fb546cfb5998 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -81,10 +81,12 @@ class FakeTestArbitrator : public MemoryArbitrator { .retryArbitrationFailure = config.retryArbitrationFailure}) {} void reserveMemory(MemoryPool* pool, uint64_t bytes) override { - VELOX_NYI() + VELOX_NYI(); } - void releaseMemory(MemoryPool* pool) override{VELOX_NYI()} + void releaseMemory(MemoryPool* pool) override { + VELOX_NYI(); + } std::string kind() override { return "USER"; @@ -93,12 +95,22 @@ class FakeTestArbitrator : public MemoryArbitrator { bool growMemory( MemoryPool* pool, const std::vector>& candidatePools, - uint64_t targetBytes) override{VELOX_NYI()} + uint64_t targetBytes) override { + VELOX_NYI(); + } - Stats stats() const override{VELOX_NYI()} + uint64_t shrinkMemory( + const std::vector>& pools, + uint64_t targetBytes) override { + VELOX_NYI(); + } + + Stats stats() const override { + VELOX_NYI(); + } std::string toString() const override { - VELOX_NYI() + VELOX_NYI(); } }; } // namespace diff --git a/velox/common/memory/tests/MemoryManagerTest.cpp b/velox/common/memory/tests/MemoryManagerTest.cpp index c6bc7c935e3f..b94f17d84c08 100644 --- a/velox/common/memory/tests/MemoryManagerTest.cpp +++ b/velox/common/memory/tests/MemoryManagerTest.cpp @@ -113,6 +113,10 @@ class FakeTestArbitrator : public MemoryArbitrator { const std::vector>& candidatePools, uint64_t targetBytes) override{VELOX_NYI()} + uint64_t shrinkMemory( + const std::vector>& pools, + uint64_t targetBytes) override{VELOX_NYI()} + Stats stats() const override{VELOX_NYI()} std::string toString() const override{VELOX_NYI()} diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 1f636a374a17..1724ee3fc3ec 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -535,6 +535,11 @@ TEST_F(MockSharedArbitrationTest, arbitrationFailsTask) { growOp->freeAll(); } +TEST_F(MockSharedArbitrationTest, shrinkMemory) { + std::vector> pools; + ASSERT_THROW(arbitrator_->shrinkMemory(pools, 128), VeloxException); +} + TEST_F(MockSharedArbitrationTest, singlePoolGrowWithoutArbitration) { auto* memOp = addMemoryOp(); const int allocateSize = 1 * MB;