Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC Memory Tracking of Potentially Shared Buffer #6590

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions arrow-buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,17 @@ include = { workspace = true }
edition = { workspace = true }
rust-version = { workspace = true }

[package.metadata.docs.rs]
features = ["pool"]

[lib]
name = "arrow_buffer"
path = "src/lib.rs"
bench = false

[features]
pool = []
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it is very hard to guage what if any performance implications there are of this, I think it is important that it is gated by a feature flag at least initially.


[dependencies]
bytes = { version = "1.4" }
num = { version = "0.4", default-features = false, features = ["std"] }
Expand Down
6 changes: 6 additions & 0 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@ impl Buffer {
pub fn ptr_eq(&self, other: &Self) -> bool {
self.ptr == other.ptr && self.length == other.length
}

/// Register this [`Buffer`] with the provided [`MemoryPool`], replacing any prior assignment
#[cfg(feature = "pool")]
pub fn claim(&self, pool: &dyn crate::MemoryPool) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could plumb this claim API through the various arrays and RecordBatch. Consumers like DF could then claim RecordBatch they wish to buffer, and know they aren't double-counting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is interesting -- so the idea is that this would support cooperatively assigning / tracking usage (rather than relying on some global allocator to do so).

That aligns pretty nicely with the "no overhead unless asked" and "everything being explicit" principles

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this API would be easier to use / manage if it also returned the prior reservation, if any

Copy link
Contributor Author

@tustvold tustvold Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would make it harder to chain through to Arrays and RecordBatch that may consist of multiple buffers and therefore reservations. It would also mean potentially keeping around "duplicate" reservations for longer than necessary, which would become even more problematic if the same buffer is used multiple times.

What would be the use-case for the returned reservation?

Copy link
Contributor

@alamb alamb Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking like if only some Arrays in a RecordBatch have a reservation but some didn't and then a system like the DataFusion memory pool added an entirely new reservation for all the arrays wiping out the old reservations

In that case I would want the previous reservations to be unregistered

Although maybe Drop that did the unregistering would be good enough for that case 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although maybe Drop that did the unregistering would be good enough for that case 🤔

That's what is intended, and follows RAII best-practices

self.data.claim(pool)
}
}

/// Note that here we deliberately do not implement
Expand Down
13 changes: 13 additions & 0 deletions arrow-buffer/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub struct Bytes {

/// how to deallocate this region
deallocation: Deallocation,

#[cfg(feature = "pool")]
reservation: std::sync::Mutex<Option<Box<dyn crate::MemoryReservation>>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally used parking_lot::Mutex but this would have made Buffer not RewindSafe, which would be a breaking change (and given there is a test for this I presume this is important).

}

impl Bytes {
Expand All @@ -65,6 +68,8 @@ impl Bytes {
ptr,
len,
deallocation,
#[cfg(feature = "pool")]
reservation: std::sync::Mutex::new(None),
}
}

Expand Down Expand Up @@ -96,6 +101,12 @@ impl Bytes {
}
}

/// Register this [`Bytes`] with the provided [`MemoryPool`], replacing any prior assignment
#[cfg(feature = "pool")]
pub fn claim(&self, pool: &dyn crate::MemoryPool) {
*self.reservation.lock().unwrap() = Some(pool.register(self.capacity()));
Copy link
Contributor Author

@tustvold tustvold Oct 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two implications of this formulation:

  • We always allocate a new memory reservation which could be wasteful
  • We allocate from the new pool before freeing the memory from the previous reservation

}

#[inline]
pub(crate) fn deallocation(&self) -> &Deallocation {
&self.deallocation
Expand Down Expand Up @@ -152,6 +163,8 @@ impl From<bytes::Bytes> for Bytes {
len,
ptr: NonNull::new(value.as_ptr() as _).unwrap(),
deallocation: Deallocation::Custom(std::sync::Arc::new(value), len),
#[cfg(feature = "pool")]
reservation: std::sync::Mutex::new(None),
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions arrow-buffer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,8 @@ mod interval;
pub use interval::*;

mod arith;

#[cfg(feature = "pool")]
mod pool;
#[cfg(feature = "pool")]
pub use pool::*;
81 changes: 81 additions & 0 deletions arrow-buffer/src/pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

/// A [`MemoryPool`] can be used to track memory usage by [`Buffer`](crate::Buffer)
pub trait MemoryPool {
/// Return a memory reservation of `size` bytes
fn register(&self, size: usize) -> Box<dyn MemoryReservation>;
}

/// A memory reservation within a [`MemoryPool`] that is freed on drop
pub trait MemoryReservation {
/// Resize this reservation to `new` bytes
fn resize(&mut self, new: usize);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't actually have a use-case for this atm, but this seemed sensible to add now to avoid it being difficult later

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutable buffer might need this. It may resize its reservation dynamically

}

/// A simple [`MemoryPool`] that reports the total memory usage
#[derive(Debug, Default)]
pub struct TrackingMemoryPool(Arc<AtomicUsize>);

impl TrackingMemoryPool {
/// Returns the total allocated size
pub fn allocated(&self) -> usize {
self.0.load(Ordering::Relaxed)
}
}

impl MemoryPool for TrackingMemoryPool {
fn register(&self, size: usize) -> Box<dyn MemoryReservation> {
self.0.fetch_add(size, Ordering::Relaxed);
Box::new(Tracker {
size,
shared: Arc::clone(&self.0),
})
}
}

#[derive(Debug)]
struct Tracker {
size: usize,
shared: Arc<AtomicUsize>,
}

impl Drop for Tracker {
fn drop(&mut self) {
self.shared.fetch_sub(self.size, Ordering::Relaxed);
}
}

impl MemoryReservation for Tracker {
fn resize(&mut self, new: usize) {
match self.size < new {
true => self.shared.fetch_add(new - self.size, Ordering::Relaxed),
false => self.shared.fetch_sub(self.size - new, Ordering::Relaxed),
};
self.size = new;
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::Buffer;

#[test]
fn test_memory_pool() {
let pool = TrackingMemoryPool::default();
let b1 = Buffer::from(vec![0_i64, 1, 2]);
let b2 = Buffer::from(vec![3_u16, 4, 5]);

let buffers = [b1.clone(), b1.slice(12), b1.clone(), b2.clone()];
buffers.iter().for_each(|x| x.claim(&pool));

assert_eq!(pool.allocated(), b1.capacity() + b2.capacity());
drop(buffers);
assert_eq!(pool.allocated(), b1.capacity() + b2.capacity());
drop(b2);
assert_eq!(pool.allocated(), b1.capacity());
drop(b1);
assert_eq!(pool.allocated(), 0);
}
}
Loading