Skip to content

Commit

Permalink
feat: track memory consumers for GreedyMemoryPoolState
Browse files Browse the repository at this point in the history
  • Loading branch information
asimsedhain committed Jan 27, 2024
1 parent 4d02cc0 commit c39dc03
Showing 1 changed file with 108 additions and 18 deletions.
126 changes: 108 additions & 18 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ use crate::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_common::{DataFusionError, Result};
use log::debug;
use parking_lot::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{
collections::HashMap,
fmt::Display,
sync::atomic::{AtomicUsize, Ordering},
};

/// A [`MemoryPool`] that enforces no limit
#[derive(Debug, Default)]
Expand Down Expand Up @@ -54,7 +58,12 @@ impl MemoryPool for UnboundedMemoryPool {
#[derive(Debug)]
pub struct GreedyMemoryPool {
pool_size: usize,
used: AtomicUsize,
state: Mutex<GreedyMemoryPoolState>,
}

#[derive(Debug)]
struct GreedyMemoryPoolState {
pool_members: HashMap<String, usize>,
}

impl GreedyMemoryPool {
Expand All @@ -63,38 +72,89 @@ impl GreedyMemoryPool {
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
Self {
pool_size,
used: AtomicUsize::new(0),
state: Mutex::new(GreedyMemoryPoolState {
pool_members: HashMap::new(),
}),
}
}
}

impl MemoryPool for GreedyMemoryPool {
fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
self.used.fetch_add(additional, Ordering::Relaxed);
let mut state = self.state.lock();
let used = state
.pool_members
.entry(_reservation.consumer().name().into())
.or_insert(0);
*used = used.saturating_add(additional);
}

fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
self.used.fetch_sub(shrink, Ordering::Relaxed);
let mut state = self.state.lock();
let used = state
.pool_members
.entry(_reservation.consumer().name().into())
.or_insert(0);
*used = used.saturating_sub(shrink);
}

fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
self.used
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
let new_used = used + additional;
(new_used <= self.pool_size).then_some(new_used)
})
.map_err(|used| {
insufficient_capacity_err(
reservation,
additional,
self.pool_size.saturating_sub(used),
)
})?;
let mut state = self.state.lock();
let used: usize = state.pool_members.values().sum();
if used.saturating_add(additional) > self.pool_size {
// dropping the mutex so that the display trait method does not deadlock
drop(state);

debug!("Pool Exhausted while trying to allocate {additional} bytes for {}:\n{self}", reservation.registration.consumer.name());
return Err(insufficient_capacity_err(
reservation,
additional,
self.pool_size.saturating_sub(used),
));
}
let entry = state
.pool_members
.entry(reservation.consumer().name().into())
.or_insert(0);
*entry = entry.saturating_add(additional);
Ok(())
}

fn reserved(&self) -> usize {
self.used.load(Ordering::Relaxed)
let state = self.state.lock();
state.pool_members.values().sum()
}

fn unregister(&self, consumer: &MemoryConsumer) {
let mut state = self.state.lock();
state.pool_members.remove(consumer.name());
}
}

impl Display for GreedyMemoryPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = self.state.lock();
let used: usize = state.pool_members.values().sum();
let free = self.pool_size.saturating_sub(used);

let mut allocations = state.pool_members.iter().collect::<Vec<_>>();
allocations.sort_by(|(_, a), (_, b)| a.cmp(b));

let allocation_report = allocations
.iter()
.fold("".to_string(), |acc, (member, bytes)| {
format!("{acc}\t{bytes}: {member}\n")
});

write!(
f,
"GreedyPool {} allocations, {} used, {} free, {} capacity\n{}",
state.pool_members.len(),
used,
free,
self.pool_size,
allocation_report
)
}
}

Expand Down Expand Up @@ -310,4 +370,34 @@ mod tests {
let err = r4.try_grow(30).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20");
}

#[test]
fn test_greedy() {
let pool = Arc::new(GreedyMemoryPool::new(100)) as _;
let mut r1 = MemoryConsumer::new("r1").register(&pool);

// Can grow beyond capacity of pool
r1.grow(2000);
assert_eq!(pool.reserved(), 2000);

let mut r2 = MemoryConsumer::new("r2")
.with_can_spill(true)
.register(&pool);
// Can grow beyond capacity of pool
r2.grow(2000);

let err = r1.try_grow(1).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r1 with 2000 bytes already allocated - maximum available is 0");

let err = r2.try_grow(1).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0");

r1.shrink(1990);
r2.shrink(2000);

assert_eq!(pool.reserved(), 10);

r1.try_grow(10).unwrap();
assert_eq!(pool.reserved(), 20);
}
}

0 comments on commit c39dc03

Please sign in to comment.