Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PROF-8967] Reduce memory footprint and allocations for profiling timeline data #293

Merged
merged 18 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions profiling/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ serde = {version = "1.0", features = ["derive"]}
serde_json = {version = "1.0"}
tokio = {version = "1.23", features = ["rt", "macros"]}
tokio-util = "0.7.1"
byteorder = { version = "1.5", features = ["std"] }
uuid = { version = "1.4.1", features = ["v4", "serde"] }
6 changes: 6 additions & 0 deletions profiling/src/internal/label.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,9 @@ impl LabelSetId {
self.0 as usize
}
}

impl From<LabelSetId> for u32 {
fn from(value: LabelSetId) -> Self {
value.0
}
}
1 change: 1 addition & 0 deletions profiling/src/internal/observation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! single pointer of overhead.

mod observations;
mod timestamped_observations;
mod trimmed_observation;

// We keep trimmed_observation private, to ensure that only maps can make and
Expand Down
195 changes: 85 additions & 110 deletions profiling/src/internal/observation/observations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,62 @@
//! See the mod.rs file comment for why this module and file exists.

use super::super::Sample;
use super::timestamped_observations::TimestampedObservations;
use super::trimmed_observation::{ObservationLength, TrimmedObservation};
use crate::internal::Timestamp;
use std::collections::HashMap;

struct NonEmptyObservations {
// Samples with no timestamps are aggregated in-place as each observation is added
aggregated_data: HashMap<Sample, TrimmedObservation>,
ivoanjo marked this conversation as resolved.
Show resolved Hide resolved
timestamped_data: Vec<TrimmedTimestampedObservation>,
// Samples with timestamps are all separately kept (so we can know the exact values at the given timestamp)
timestamped_data: TimestampedObservations,
obs_len: ObservationLength,
timestamped_samples_count: usize,
}

// Timestamp and TrimmedObservation are both 64bit values
// Using a 32 bit SampleId would still take 64 bits due to padding
// So just put the Sample in here
type TrimmedTimestampedObservation = (Sample, Timestamp, TrimmedObservation);

#[derive(Default)]
pub struct Observations {
inner: Option<NonEmptyObservations>,
}

/// Public API
impl Observations {
pub fn add(&mut self, sample: Sample, timestamp: Option<Timestamp>, values: Vec<i64>) {
if let Some(inner) = &self.inner {
inner.obs_len.assert_eq(values.len());
} else {
self.inner = Some(NonEmptyObservations {
pub fn new(observations_len: usize) -> Self {
Observations {
inner: Some(NonEmptyObservations {
aggregated_data: Default::default(),
timestamped_data: vec![],
obs_len: ObservationLength::new(values.len()),
});
};
timestamped_data: TimestampedObservations::new(observations_len),
obs_len: ObservationLength::new(observations_len),
timestamped_samples_count: 0,
}),
}
}

pub fn add(
&mut self,
sample: Sample,
timestamp: Option<Timestamp>,
values: Vec<i64>,
) -> anyhow::Result<()> {
anyhow::ensure!(
self.inner.is_some(),
"Use of add on Observations that were not initialized"
);

// SAFETY: we just ensured it has an item above.
let observations = unsafe { self.inner.as_mut().unwrap_unchecked() };
let obs_len = observations.obs_len;

anyhow::ensure!(
obs_len.eq(values.len()),
"Observation length mismatch, expected {obs_len:?} values, got {} instead",
values.len()
);

if let Some(ts) = timestamp {
let trimmed = TrimmedObservation::new(values, obs_len);
observations.timestamped_data.push((sample, ts, trimmed));
observations.timestamped_data.add(sample, ts, values)?;
observations.timestamped_samples_count += 1;
} else if let Some(v) = observations.aggregated_data.get_mut(&sample) {
// SAFETY: This method is only way to build one of these, and at
// the top we already checked the length matches.
Expand All @@ -55,31 +71,27 @@ impl Observations {
let trimmed = TrimmedObservation::new(values, obs_len);
observations.aggregated_data.insert(sample, trimmed);
}

Ok(())
}

pub fn is_empty(&self) -> bool {
self.inner.is_none()
|| (self.aggregated_samples_count() == 0 && self.timestamped_samples_count() == 0)
}

pub fn aggregated_samples_count(&self) -> usize {
self.inner
.as_ref()
.map(|o| o.aggregated_data.len())
.unwrap_or(0)
}

pub fn iter(&self) -> impl Iterator<Item = (Sample, Option<Timestamp>, &[i64])> {
self.inner.iter().flat_map(|observations| {
let obs_len = observations.obs_len;
let aggregated_data = observations
.aggregated_data
.iter()
.map(move |(sample, obs)| (sample, None, obs));
let timestamped_data = observations
.timestamped_data
.iter()
.map(move |(sample, ts, obs)| (sample, Some(*ts), obs));
aggregated_data
.chain(timestamped_data)
.map(move |(sample, ts, obs)| {
// SAFETY: The only way to build one of these is through
// [Self::add], which already checked that the length was correct.
(*sample, ts, unsafe { obs.as_slice(obs_len) })
})
})
pub fn timestamped_samples_count(&self) -> usize {
self.inner
.as_ref()
.map(|o| o.timestamped_samples_count)
.unwrap_or(0)
}
}

Expand All @@ -100,15 +112,17 @@ impl IntoIterator for Observations {

fn into_iter(self) -> Self::IntoIter {
let it = self.inner.into_iter().flat_map(|mut observations| {
let timestamped_data_it = std::mem::take(&mut observations.timestamped_data)
.into_iter()
.map(|(s, t, o)| (s, Some(t), o));
let timestamped_data_it = std::mem::replace(
&mut observations.timestamped_data,
TimestampedObservations::with_no_backing_store(),
)
.into_iter()
.map(|(s, t, o)| (s, Some(t), o));
let aggregated_data_it = std::mem::take(&mut observations.aggregated_data)
.into_iter()
.map(|(s, o)| (s, None, o));
timestamped_data_it
.chain(aggregated_data_it)
.map(move |(s, t, o)| (s, t, unsafe { o.into_vec(observations.obs_len) }))
.map(|(s, o)| (s, None, o))
.map(move |(s, t, o)| (s, t, unsafe { o.into_vec(observations.obs_len) }));
timestamped_data_it.chain(aggregated_data_it)
});
ObservationsIntoIter { it: Box::new(it) }
}
Expand All @@ -122,11 +136,6 @@ impl Drop for NonEmptyObservations {
// [Self::add], which already checked that the length was correct.
unsafe { v.consume(o) };
});
self.timestamped_data.drain(..).for_each(|(_, _, v)| {
// SAFETY: The only way to build one of these is through
// [Self::add], which already checked that the length was correct.
unsafe { v.consume(o) };
});
}
}

Expand All @@ -139,7 +148,7 @@ mod test {

#[test]
fn add_and_iter_test() {
let mut o = Observations::default();
let mut o = Observations::new(3);
// These are only for test purposes. The only thing that matters is that
// they differ
let s1 = Sample {
Expand All @@ -157,53 +166,18 @@ mod test {
let t1 = Some(Timestamp::new(1).unwrap());
let t2 = Some(Timestamp::new(2).unwrap());

o.add(s1, None, vec![1, 2, 3]);
o.add(s1, None, vec![4, 5, 6]);
o.add(s2, None, vec![7, 8, 9]);
o.iter().for_each(|(k, ts, v)| {
assert!(ts.is_none());
if k == s1 {
assert_eq!(v, vec![5, 7, 9]);
} else if k == s2 {
assert_eq!(v, vec![7, 8, 9]);
} else {
panic!("Unexpected key");
}
});
// Iter twice to make sure there are no issues doing that
o.iter().for_each(|(k, ts, v)| {
assert!(ts.is_none());
if k == s1 {
assert_eq!(v, vec![5, 7, 9]);
} else if k == s2 {
assert_eq!(v, vec![7, 8, 9]);
} else {
panic!("Unexpected key");
}
});
o.add(s3, t1, vec![10, 11, 12]);
o.add(s1, None, vec![1, 2, 3]).unwrap();
o.add(s1, None, vec![4, 5, 6]).unwrap();
o.add(s2, None, vec![7, 8, 9]).unwrap();
o.add(s3, t1, vec![10, 11, 12]).unwrap();
o.add(s2, t2, vec![13, 14, 15]).unwrap();

ivoanjo marked this conversation as resolved.
Show resolved Hide resolved
o.iter().for_each(|(k, ts, v)| {
if k == s1 {
assert_eq!(v, vec![5, 7, 9]);
assert!(ts.is_none());
} else if k == s2 {
assert_eq!(v, vec![7, 8, 9]);
assert!(ts.is_none());
} else if k == s3 {
assert_eq!(v, vec![10, 11, 12]);
assert_eq!(ts, t1);
} else {
panic!("Unexpected key");
}
});

o.add(s2, t2, vec![13, 14, 15]);
o.iter().for_each(|(k, ts, v)| {
o.into_iter().for_each(|(k, ts, v)| {
if k == s1 {
// Observations without timestamp, these are aggregated together
assert_eq!(v, vec![5, 7, 9]);
assert!(ts.is_none());
} else if k == s2 {
// Same stack with and without timestamp
if ts.is_some() {
assert_eq!(v, vec![13, 14, 15]);
assert_eq!(ts, t2);
Expand All @@ -212,6 +186,7 @@ mod test {
assert!(ts.is_none());
}
} else if k == s3 {
// Observation with timestamp
assert_eq!(v, vec![10, 11, 12]);
assert_eq!(ts, t1);
} else {
Expand All @@ -235,9 +210,9 @@ mod test {
};

let mut o = Observations::default();
o.add(s1, None, vec![1, 2, 3]);
o.add(s1, None, vec![1, 2, 3]).unwrap();
// This should panic
o.add(s2, None, vec![4, 5]);
o.add(s2, None, vec![4, 5]).unwrap();
ivoanjo marked this conversation as resolved.
Show resolved Hide resolved
}

#[test]
Expand All @@ -249,9 +224,9 @@ mod test {
};

let mut o = Observations::default();
o.add(s1, None, vec![1, 2, 3]);
o.add(s1, None, vec![1, 2, 3]).unwrap();
// This should panic
o.add(s1, None, vec![4, 5]);
o.add(s1, None, vec![4, 5]).unwrap();
}

#[test]
Expand All @@ -270,9 +245,9 @@ mod test {

let mut o = Observations::default();
let ts = NonZeroI64::new(1).unwrap();
o.add(s1, Some(ts), vec![1, 2, 3]);
o.add(s1, Some(ts), vec![1, 2, 3]).unwrap();
// This should panic
o.add(s2, Some(ts), vec![4, 5]);
o.add(s2, Some(ts), vec![4, 5]).unwrap();
ivoanjo marked this conversation as resolved.
Show resolved Hide resolved
}

#[test]
Expand All @@ -285,9 +260,9 @@ mod test {

let mut o = Observations::default();
let ts = NonZeroI64::new(1).unwrap();
o.add(s1, Some(ts), vec![1, 2, 3]);
o.add(s1, Some(ts), vec![1, 2, 3]).unwrap();
// This should panic
o.add(s1, Some(ts), vec![4, 5]);
o.add(s1, Some(ts), vec![4, 5]).unwrap();
}

#[test]
Expand All @@ -306,9 +281,9 @@ mod test {

let mut o = Observations::default();
let ts = NonZeroI64::new(1).unwrap();
o.add(s1, None, vec![1, 2, 3]);
o.add(s1, None, vec![1, 2, 3]).unwrap();
// This should panic
o.add(s2, Some(ts), vec![4, 5]);
o.add(s2, Some(ts), vec![4, 5]).unwrap();
}

#[test]
Expand All @@ -319,16 +294,16 @@ mod test {
stacktrace: StackTraceId::from_offset(1),
};

let mut o = Observations::default();
let mut o = Observations::new(3);
let ts = NonZeroI64::new(1).unwrap();
o.add(s1, Some(ts), vec![1, 2, 3]);
o.add(s1, Some(ts), vec![1, 2, 3]).unwrap();
// This should panic
o.add(s1, None, vec![4, 5]);
o.add(s1, None, vec![4, 5]).unwrap();
}

#[test]
fn into_iter_test() {
let mut o = Observations::default();
let mut o = Observations::new(3);
// These are only for test purposes. The only thing that matters is that
// they differ
let s1 = Sample {
Expand All @@ -345,10 +320,10 @@ mod test {
};
let t1 = Some(Timestamp::new(1).unwrap());

o.add(s1, None, vec![1, 2, 3]);
o.add(s1, None, vec![4, 5, 6]);
o.add(s2, None, vec![7, 8, 9]);
o.add(s3, t1, vec![1, 1, 2]);
o.add(s1, None, vec![1, 2, 3]).unwrap();
o.add(s1, None, vec![4, 5, 6]).unwrap();
o.add(s2, None, vec![7, 8, 9]).unwrap();
o.add(s3, t1, vec![1, 1, 2]).unwrap();

let mut count = 0;
o.into_iter().for_each(|(k, ts, v)| {
Expand Down
Loading
Loading