Skip to content

Commit

Permalink
Let operator not shrink pool after reclaim (facebookincubator#9895)
Browse files Browse the repository at this point in the history
Summary:
Currently operator shrinks pool after reclaim. Moved this operation up to arbitrator level to achieve a better global pool accounting consistency.

Pull Request resolved: facebookincubator#9895

Reviewed By: xiaoxmeng

Differential Revision: D57708523

Pulled By: tanjialiang

fbshipit-source-id: e998b579b183064348c46e8e9ea5e9665d893834
  • Loading branch information
tanjialiang authored and facebook-github-bot committed May 26, 2024
1 parent fe65ed0 commit 20afe68
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 76 deletions.
13 changes: 11 additions & 2 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,17 @@ void registerVeloxMetrics() {
DEFINE_HISTOGRAM_METRIC(
kMetricMemoryReclaimExecTimeMs, 30'000, 0, 600'000, 50, 90, 99, 100);

// Tracks op memory reclaim bytes.
DEFINE_METRIC(kMetricMemoryReclaimedBytes, facebook::velox::StatType::SUM);
// Tracks op memory reclaim bytes distribution in range of [0, 4GB] with 64
// buckets and reports P50, P90, P99, and P100
DEFINE_HISTOGRAM_METRIC(
kMetricMemoryReclaimedBytes,
67'108'864,
0,
4'294'967'296,
50,
90,
99,
100);

// Tracks the memory reclaim count on an operator.
DEFINE_METRIC(
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ uint64_t MemoryReclaimer::run(
stats.reclaimedBytes += reclaimedBytes;
RECORD_HISTOGRAM_METRIC_VALUE(
kMetricMemoryReclaimExecTimeMs, execTimeUs / 1'000);
RECORD_METRIC_VALUE(kMetricMemoryReclaimedBytes, reclaimedBytes);
RECORD_HISTOGRAM_METRIC_VALUE(kMetricMemoryReclaimedBytes, reclaimedBytes);
RECORD_METRIC_VALUE(kMetricMemoryReclaimCount);
addThreadLocalRuntimeStat(
"memoryReclaimWallNanos",
Expand Down
3 changes: 1 addition & 2 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -758,10 +758,9 @@ uint64_t SharedArbitrator::reclaim(
VELOX_MEM_LOG(ERROR) << "Failed to reclaim from memory pool "
<< pool->name() << ", aborting it: " << e.what();
abort(pool, std::current_exception());
// Free up all the free capacity from the aborted pool as the associated
// query has failed at this point.
pool->shrink();
}
pool->shrink(bytesToReclaim);
const uint64_t newCapacity = pool->capacity();
VELOX_CHECK_GE(oldCapacity, newCapacity);
reclaimedBytes = oldCapacity - newCapacity;
Expand Down
10 changes: 7 additions & 3 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,13 @@ class MockMemoryOperator {
allocIt = allocations_.erase(allocIt);
}
totalBytes_ -= bytesReclaimed;
const auto oldReservedBytes = pool_->reservedBytes();
for (const auto& allocation : allocationsToFree) {
pool_->free(allocation.buffer, allocation.size);
}
return pool_->shrink(targetBytes);
const auto newReservedBytes = pool_->reservedBytes();
VELOX_CHECK_GE(oldReservedBytes, newReservedBytes);
return newReservedBytes - oldReservedBytes;
}

void abort(MemoryPool* pool) {
Expand Down Expand Up @@ -676,11 +679,12 @@ TEST_F(MockSharedArbitrationTest, shrinkPools) {
std::string debugString() const {
std::stringstream tasksOss;
for (const auto& testTask : testTasks) {
tasksOss << "[";
tasksOss << testTask.debugString();
tasksOss << ",";
tasksOss << "], ";
}
return fmt::format(
"taskTests: [{}], targetBytes: {}, expectedFreedBytes: {}, expectedFreeCapacity: {}, expectedReservedFreeCapacity: {}, allowSpill: {}, allowAbort: {}",
"testTasks: [{}], targetBytes: {}, expectedFreedBytes: {}, expectedFreeCapacity: {}, expectedReservedFreeCapacity: {}, allowSpill: {}, allowAbort: {}",
tasksOss.str(),
succinctBytes(targetBytes),
succinctBytes(expectedFreedBytes),
Expand Down
5 changes: 3 additions & 2 deletions velox/docs/monitoring/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ Memory Management
with 20 buckets. It is configured to report latency at P50, P90, P99, and
P100 percentiles.
* - memory_reclaim_bytes
- Sum
- The sum of reclaimed memory bytes.
- Histogram
- The distribution of reclaimed bytes in range of [0, 4GB] with 64 buckets
and reports P50, P90, P99, and P100.
* - task_memory_reclaim_count
- Count
- The count of task memory reclaims.
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,11 @@ uint64_t Operator::MemoryReclaimer::reclaim(

auto reclaimBytes = memory::MemoryReclaimer::run(
[&]() {
const auto reservedBytesBeforeReclaim = pool->reservedBytes();
op_->reclaim(targetBytes, stats);
return pool->shrink(targetBytes);
const auto reservedBytesAfterReclaim = pool->reservedBytes();
VELOX_CHECK_GE(reservedBytesBeforeReclaim, reservedBytesAfterReclaim);
return reservedBytesBeforeReclaim - reservedBytesAfterReclaim;
},
stats);

Expand Down
28 changes: 9 additions & 19 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,16 +376,6 @@ class AggregationTest : public OperatorTestBase {
pool_.get());
}

static void reclaimAndRestoreCapacity(
const Operator* op,
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& reclaimerStats) {
const auto oldCapacity = op->pool()->capacity();
op->pool()->reclaim(targetBytes, 0, reclaimerStats);
dynamic_cast<memory::MemoryPoolImpl*>(op->pool())
->testingSetCapacity(oldCapacity);
}

RowTypePtr rowType_{
ROW({"c0", "c1", "c2", "c3", "c4", "c5", "c6"},
{BIGINT(),
Expand Down Expand Up @@ -2109,9 +2099,9 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) {

if (testData.expectedReclaimable) {
const auto usedMemory = op->pool()->usedBytes();
reclaimAndRestoreCapacity(
op,
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
0,
reclaimerStats_);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
Expand Down Expand Up @@ -2233,12 +2223,12 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringReserve) {
ASSERT_GT(reclaimableBytes, 0);

const auto usedMemory = op->pool()->usedBytes();
reclaimAndRestoreCapacity(
op,
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
0,
reclaimerStats_);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
ASSERT_GE(reclaimerStats_.reclaimedBytes, 0);
reclaimerStats_.reset();
// The hash table itself in the grouping set is not cleared so it still
// uses some memory.
Expand Down Expand Up @@ -2476,9 +2466,9 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringOutputProcessing) {
if (enableSpilling) {
ASSERT_GT(reclaimableBytes, 0);
const auto usedMemory = op->pool()->usedBytes();
reclaimAndRestoreCapacity(
op,
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
0,
reclaimerStats_);
ASSERT_EQ(reclaimerStats_.numNonReclaimableAttempts, 0);
ASSERT_GT(usedMemory, op->pool()->usedBytes());
Expand Down Expand Up @@ -2764,9 +2754,9 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimWithEmptyAggregationTable) {
if (enableSpilling) {
ASSERT_EQ(reclaimableBytes, 0);
const auto usedMemory = op->pool()->usedBytes();
reclaimAndRestoreCapacity(
op,
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
0,
reclaimerStats_);
ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{});
// No reclaim as the operator has started output processing.
Expand Down
59 changes: 30 additions & 29 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -897,17 +897,6 @@ class HashJoinTest : public HiveConnectorTestBase {
joinNode->outputType());
}

static void reclaimAndRestoreCapacity(
const Operator* op,
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& reclaimerStats) {
memory::ScopedMemoryArbitrationContext ctx(op->pool());
const auto oldCapacity = op->pool()->capacity();
op->pool()->reclaim(targetBytes, 0, reclaimerStats);
dynamic_cast<memory::MemoryPoolImpl*>(op->pool())
->testingSetCapacity(oldCapacity);
}

const int32_t numDrivers_;

// The default left and right table types used for test.
Expand Down Expand Up @@ -5502,10 +5491,13 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringInputProcessing) {
}

if (testData.expectedReclaimable) {
reclaimAndRestoreCapacity(
op,
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
reclaimerStats_);
{
memory::ScopedMemoryArbitrationContext ctx(op->pool());
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
0,
reclaimerStats_);
}
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
reclaimerStats_.reset();
Expand Down Expand Up @@ -5639,10 +5631,13 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringReserve) {
ASSERT_TRUE(reclaimable);
ASSERT_GT(reclaimableBytes, 0);

reclaimAndRestoreCapacity(
op,
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
reclaimerStats_);
{
memory::ScopedMemoryArbitrationContext ctx(op->pool());
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
0,
reclaimerStats_);
}
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
ASSERT_EQ(op->pool()->usedBytes(), 0);
Expand Down Expand Up @@ -5891,11 +5886,14 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringOutputProcessing) {
if (enableSpilling) {
ASSERT_GT(reclaimableBytes, 0);
const auto usedMemoryBytes = op->pool()->usedBytes();
reclaimAndRestoreCapacity(
op,
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
reclaimerStats_);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
{
memory::ScopedMemoryArbitrationContext ctx(op->pool());
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
0,
reclaimerStats_);
}
ASSERT_GE(reclaimerStats_.reclaimedBytes, 0);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
// No reclaim as the operator has started output processing.
ASSERT_EQ(usedMemoryBytes, op->pool()->usedBytes());
Expand Down Expand Up @@ -6036,11 +6034,14 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) {

const auto usedMemoryBytes = op->pool()->usedBytes();
reclaimerStats_.reset();
reclaimAndRestoreCapacity(
op,
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
reclaimerStats_);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
{
memory::ScopedMemoryArbitrationContext ctx(op->pool());
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
0,
reclaimerStats_);
}
ASSERT_GE(reclaimerStats_.reclaimedBytes, 0);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
// No reclaim as the build operator is not in building table state.
ASSERT_EQ(usedMemoryBytes, op->pool()->usedBytes());
Expand Down
25 changes: 8 additions & 17 deletions velox/exec/tests/OrderByTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,6 @@ class OrderByTest : public OperatorTestBase {
}
}

static void reclaimAndRestoreCapacity(
const Operator* op,
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& reclaimerStats) {
const auto oldCapacity = op->pool()->capacity();
op->pool()->reclaim(targetBytes, 0, reclaimerStats);
dynamic_cast<memory::MemoryPoolImpl*>(op->pool())
->testingSetCapacity(oldCapacity);
}

std::vector<RowVectorPtr> makeVectors(
const RowTypePtr& rowType,
int32_t numVectors,
Expand Down Expand Up @@ -661,9 +651,9 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringInputProcessing) {
}

if (testData.expectedReclaimable) {
reclaimAndRestoreCapacity(
op,
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
0,
reclaimerStats_);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
Expand Down Expand Up @@ -784,9 +774,9 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringReserve) {
ASSERT_TRUE(reclaimable);
ASSERT_GT(reclaimableBytes, 0);

reclaimAndRestoreCapacity(
op,
op->pool()->reclaim(
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
0,
reclaimerStats_);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
Expand Down Expand Up @@ -939,7 +929,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) {
constexpr int64_t kMaxBytes = 1LL << 30; // 1GB
auto rowType = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()});
VectorFuzzer fuzzer({.vectorSize = 1000}, pool());
const int32_t numBatches = 10;
const int32_t numBatches = 200;
std::vector<RowVectorPtr> batches;
for (int32_t i = 0; i < numBatches; ++i) {
batches.push_back(fuzzer.fuzzRow(rowType));
Expand Down Expand Up @@ -1032,8 +1022,9 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) {
if (enableSpilling) {
ASSERT_GT(reclaimableBytes, 0);
reclaimerStats_.reset();
reclaimAndRestoreCapacity(op, reclaimableBytes, reclaimerStats_);
ASSERT_EQ(reclaimerStats_.reclaimedBytes, reclaimableBytes);
op->pool()->reclaim(reclaimableBytes, 0, reclaimerStats_);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
ASSERT_LE(reclaimerStats_.reclaimedBytes, reclaimableBytes);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
} else {
ASSERT_EQ(reclaimableBytes, 0);
Expand Down

0 comments on commit 20afe68

Please sign in to comment.