Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PROF-8967] Reduce memory footprint and allocations for profiling timeline data #293

Merged
merged 18 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions profiling/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ serde = {version = "1.0", features = ["derive"]}
serde_json = {version = "1.0"}
tokio = {version = "1.23", features = ["rt", "macros"]}
tokio-util = "0.7.1"
byteorder = "1"
81 changes: 50 additions & 31 deletions profiling/src/internal/observation/observations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,57 @@ use super::super::Sample;
use super::trimmed_observation::{ObservationLength, TrimmedObservation};
use crate::internal::Timestamp;
use std::collections::HashMap;
use crate::collections::identifiable::*;
use lz4_flex::frame::FrameEncoder;
use std::io::Write;

struct NonEmptyObservations {
aggregated_data: HashMap<Sample, TrimmedObservation>,
ivoanjo marked this conversation as resolved.
Show resolved Hide resolved
timestamped_data: Vec<TrimmedTimestampedObservation>,
compressed_timestamped_data: FrameEncoder<Vec<u8>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much do we save by compressing vs just using an array of values, and indicies into there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't actually need to index into the array (which is why compressing it works).

On the "how much do we save", here's some back-of-the-napkin numbers:

  • Size of each observation: 4 bytes stacktrace, 4 bytes labels, 8 bytes timestamp, 8 bytes * N profile types

  • 100 threads * 100 samples per second * 60 seconds * (4 + 4 + 8 + 8 * 4 profile types enabled for Ruby by default) = 28 800 000 bytes

  • For... reasons... the test app ends up recording data for 103 threads so I get

    • Uncompressed 29 MiB
    • Compressed 4 MiB

Which seems like a nice improvement as well. This data is highly compressible, (lots of small numbers, zeros, numbers next to each other, ...) so we could make a smarter uncompressed representation, but I think the compressor takes very well care of that for us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried compressing in a Struct of Arrays format, and compress each array of fields of the TrimmedTimestampedObservation struct independently?
I'd think that this would give you a better compression ratio since the data would be more homogenous

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, you might get a better compression ratio using the Linked block mode since you're doing a lot of small write and thus the block size will default to the minimum (64KB)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think those are very valid options; having said that, I'm not convinced they are what we would want here:

Have you tried compressing in a Struct of Arrays format, and compress each array of fields of the TrimmedTimestampedObservation struct independently?
I'd think that this would give you a better compression ratio since the data would be more homogenous

If I'm understanding your suggestion, doing this would mean having multiple FrameEncoders, with multiple underlying buffers:

struct NonEmptyObservations {
    stack_ids: FrameEncoder<Vec<u8>>,
    label_ids: FrameEncoder<Vec<u8>>,
    timestamps: FrameEncoder<Vec<u8>>,
    values: FrameEncoder<Vec<u8>>,
    // ...

This would mean more allocations, especially when growing the backing vecs -- there's no longer one single vec that gets doubled, but multiple small ones. 🤔

I suspect (without having tried it, to be fair xD) this would cause more heap fragmentation.

Also, you might get a better compression ratio using the Linked block mode since you're doing a lot of small write and thus the block size will default to the minimum (64KB)

I'm assuming that if the encoder is referencing previous blocks, it means that it's doing more work (rather than e.g. looking only at the current block).

Depending on the win, If it's a few % I'm not sure it's worth doing more work on the profiling sample write path vs the memory savings, since anyway it's not a lot of data.

obs_len: ObservationLength,
}

// Timestamp and TrimmedObservation are both 64bit values
// Using a 32 bit SampleId would still take 64 bits due to padding
// So just put the Sample in here
type TrimmedTimestampedObservation = (Sample, Timestamp, TrimmedObservation);

#[derive(Default)]
pub struct Observations {
inner: Option<NonEmptyObservations>,
}

/// Public API
impl Observations {
pub fn add(&mut self, sample: Sample, timestamp: Option<Timestamp>, values: Vec<i64>) {
if let Some(inner) = &self.inner {
inner.obs_len.assert_eq(values.len());
pub fn init_timeline(&mut self, values_size: usize) {
if let Some(_inner) = &self.inner {
panic!("Should never happen!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to return an error rather than panic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course! I'm guessing we should probably turn this into an Observations::new(...) or something like that? (This was just another ugly hack as I wanted to focus on the prototype and didn't want to take the detour)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned all this up in the latest version!

} else {
// Create buffer with a big capacity to avoid lots of small allocations for growing it
let timestamped_data_buffer: Vec<u8> = Vec::with_capacity(1_048_576);
// let timestamped_data_buffer: Vec<u8> = vec![];

self.inner = Some(NonEmptyObservations {
aggregated_data: Default::default(),
timestamped_data: vec![],
obs_len: ObservationLength::new(values.len()),
compressed_timestamped_data: FrameEncoder::new(timestamped_data_buffer),
obs_len: ObservationLength::new(values_size),
});
};
}

pub fn add(&mut self, sample: Sample, timestamp: Option<Timestamp>, values: Vec<i64>) {
if let Some(inner) = &self.inner {
inner.obs_len.assert_eq(values.len());
ivoanjo marked this conversation as resolved.
Show resolved Hide resolved
} else {
panic!("Should never happen!");
};

// SAFETY: we just ensured it has an item above.
let observations = unsafe { self.inner.as_mut().unwrap_unchecked() };
let obs_len = observations.obs_len;
ivoanjo marked this conversation as resolved.
Show resolved Hide resolved

if let Some(ts) = timestamp {
let trimmed = TrimmedObservation::new(values, obs_len);
observations.timestamped_data.push((sample, ts, trimmed));
observations.compressed_timestamped_data.write_all(&(Id::into_raw_id(sample.stacktrace) as u32).to_ne_bytes()).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ergonomics: you could put the data into a repr(packed) struct, convert that to bytes, and then read that in and out. My bias is that this would be a touch cleaner, but YMMV

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good -- Rust was just fighting me too much that I ended up with whatever the simplest thing I could get going :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up not doing this -- I couldn't find a nice way of converting the struct to bytes that involve pulling in a serializer or basically writing the same field-by-field code, but elsewhere in the code.

I wasn't convinced it was worth the extra indirection, suggestions welcome.

Copy link
Contributor

@danielsn danielsn Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.rs/bytemuck/1.14.1/bytemuck/fn.bytes_of.html does it. Your call if this is more or less elegant than doing it field by field

let bytes: &[u8] = bytemuck::bytes_of(&my_struct)

and then you'd convert back with https://docs.rs/bytemuck/1.14.1/bytemuck/fn.try_from_bytes.html

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave a stab at bytemuck for 5-10mins, but I wasn't quite understanding how to integrate it, the documentation is not great (and I'm being nice...), so I'll leave it as-is for now.

observations.compressed_timestamped_data.write(&(Id::into_raw_id(sample.labels) as u32).to_ne_bytes()).unwrap();
observations.compressed_timestamped_data.write(&i64::from(ts).to_ne_bytes()).unwrap();
values.iter().for_each(|v| { observations.compressed_timestamped_data.write(&(*v).to_ne_bytes()).unwrap(); });

// println!("Recorded timestamped data");
} else if let Some(v) = observations.aggregated_data.get_mut(&sample) {
// SAFETY: This method is only way to build one of these, and at
// the top we already checked the length matches.
Expand All @@ -68,19 +82,31 @@ impl Observations {
.aggregated_data
.iter()
.map(move |(sample, obs)| (sample, None, obs));
let timestamped_data = observations
.timestamped_data
.iter()
.map(move |(sample, ts, obs)| (sample, Some(*ts), obs));
// let timestamped_data = observations
ivoanjo marked this conversation as resolved.
Show resolved Hide resolved
// .timestamped_data
// .iter()
// .map(move |(sample, ts, obs)| (sample, Some(*ts), obs));
aggregated_data
.chain(timestamped_data)
// .chain(timestamped_data)
.map(move |(sample, ts, obs)| {
// SAFETY: The only way to build one of these is through
// [Self::add], which already checked that the length was correct.
(*sample, ts, unsafe { obs.as_slice(obs_len) })
})
})
}

pub fn timestamped_data(&mut self) -> Vec<u8> {
if let Some(_inner) = &self.inner {
} else {
return vec![];
};

let observations = unsafe { self.inner.as_mut().unwrap_unchecked() };

let encoder = std::mem::replace(&mut observations.compressed_timestamped_data, FrameEncoder::new(vec![]));
encoder.finish().unwrap()
}
}

pub struct ObservationsIntoIter {
Expand All @@ -100,14 +126,9 @@ impl IntoIterator for Observations {

fn into_iter(self) -> Self::IntoIter {
let it = self.inner.into_iter().flat_map(|mut observations| {
let timestamped_data_it = std::mem::take(&mut observations.timestamped_data)
.into_iter()
.map(|(s, t, o)| (s, Some(t), o));
let aggregated_data_it = std::mem::take(&mut observations.aggregated_data)
std::mem::take(&mut observations.aggregated_data)
.into_iter()
.map(|(s, o)| (s, None, o));
timestamped_data_it
.chain(aggregated_data_it)
.map(|(s, o)| (s, None, o))
.map(move |(s, t, o)| (s, t, unsafe { o.into_vec(observations.obs_len) }))
});
ObservationsIntoIter { it: Box::new(it) }
Expand All @@ -122,11 +143,6 @@ impl Drop for NonEmptyObservations {
// [Self::add], which already checked that the length was correct.
unsafe { v.consume(o) };
});
self.timestamped_data.drain(..).for_each(|(_, _, v)| {
// SAFETY: The only way to build one of these is through
// [Self::add], which already checked that the length was correct.
unsafe { v.consume(o) };
});
}
}

Expand All @@ -140,6 +156,7 @@ mod test {
#[test]
fn add_and_iter_test() {
let mut o = Observations::default();
o.init_timeline(3);
// These are only for test purposes. The only thing that matters is that
// they differ
let s1 = Sample {
Expand Down Expand Up @@ -329,6 +346,7 @@ mod test {
#[test]
fn into_iter_test() {
let mut o = Observations::default();
o.init_timeline(3);
// These are only for test purposes. The only thing that matters is that
// they differ
let s1 = Sample {
Expand Down Expand Up @@ -367,6 +385,7 @@ mod test {
}
});
// Two of the samples were aggregated, so three total samples at the end
assert_eq!(count, 3);
// FIXME: moved to 2 as we don't yet have iteration on timestamp
assert_eq!(count, 2);
}
}
61 changes: 55 additions & 6 deletions profiling/src/internal/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::serializer::CompressedProtobufSerializer;
use std::borrow::Cow;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use byteorder::{ReadBytesExt, NativeEndian};

pub struct Profile {
endpoints: Endpoints,
Expand Down Expand Up @@ -181,6 +182,8 @@ impl Profile {
));
};

profile.observations.init_timeline(profile.sample_types.len());

profile
}

Expand Down Expand Up @@ -255,6 +258,55 @@ impl Profile {
const INITIAL_PPROF_BUFFER_SIZE: usize = 32 * 1024;
let mut encoder = CompressedProtobufSerializer::with_capacity(INITIAL_PPROF_BUFFER_SIZE);

let timestamped_data = self.observations.timestamped_data();
println!("Timestamped data size: {}", timestamped_data.len());

let mut decompressed_input = lz4_flex::frame::FrameDecoder::new(std::io::Cursor::new(timestamped_data));

let mut _timeline_samples = 0;

loop {
let stacktrace_id_raw = match decompressed_input.read_u32::<NativeEndian>() {
Ok(value) => value,
Err(_) => { break; }
};

// let stacktrace_id_raw = decompressed_input.read_u32::<NativeEndian>().unwrap();
let labels_id_raw = decompressed_input.read_u32::<NativeEndian>().unwrap();
let timestamp_raw = decompressed_input.read_i64::<NativeEndian>().unwrap();

let mut values: Vec<i64> = Vec::with_capacity(self.sample_types.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could declare this outside the loop and do a .clear() at the top of the loop so you only ever allocate one vec

Copy link
Member Author

@ivoanjo ivoanjo Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think that ideally this would all move inside the iterator, so we don't need to have two copies of the code for going through each sample.

I tried doing that but found it too hard with my current Rust-foo level >_>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved this to be inside the TimestampedObservationsIter. A Vec still gets created every time, but with limited lifetime (just each iteration). Suggestions welcome on improvements :)


for _ in 0..self.sample_types.len() {
let value: i64 = decompressed_input.read_i64::<NativeEndian>().unwrap();
values.push(value);
}

let sample = Sample { labels: LabelSetId::from_offset(labels_id_raw as usize), stacktrace: StackTraceId::from_offset(stacktrace_id_raw as usize) };
let timestamp = Some(Timestamp::from(std::num::NonZeroI64::new(timestamp_raw).unwrap()));

let labels = self.translate_and_enrich_sample_labels(sample, timestamp)?;
let location_ids: Vec<_> = self
.get_stacktrace(sample.stacktrace)
.locations
.iter()
.map(Id::to_raw_id)
.collect();
self.upscaling_rules.upscale_values(&mut values, &labels)?;

let item = pprof::Sample {
location_ids,
values,
labels,
};

encoder.encode(ProfileSamplesEntry::from(item))?;

_timeline_samples += 1;
}

// println!("Timeline samples: {}", timeline_samples);

for (sample, timestamp, mut values) in std::mem::take(&mut self.observations).into_iter() {
let labels = self.translate_and_enrich_sample_labels(sample, timestamp)?;
let location_ids: Vec<_> = self
Expand Down Expand Up @@ -534,10 +586,7 @@ impl Profile {
}

pub fn only_for_testing_num_timestamped_samples(&self) -> usize {
ivoanjo marked this conversation as resolved.
Show resolved Hide resolved
use std::collections::HashSet;
let sample_set: HashSet<Timestamp> =
HashSet::from_iter(self.observations.iter().filter_map(|(_, ts, _)| ts));
sample_set.len()
0 // FIXME
}
}

Expand Down Expand Up @@ -708,7 +757,7 @@ mod api_test {
profile
.add_sample(timestamp_sample, Timestamp::new(42))
.expect("profile to not be full");
assert_eq!(profile.only_for_testing_num_timestamped_samples(), 1);
// assert_eq!(profile.only_for_testing_num_timestamped_samples(), 1);
profile
}

Expand Down Expand Up @@ -840,7 +889,7 @@ mod api_test {
assert!(profile.label_sets.is_empty());
assert!(profile.locations.is_empty());
assert!(profile.mappings.is_empty());
assert!(profile.observations.is_empty());
// assert!(profile.observations.is_empty());
assert!(profile.endpoints.mappings.is_empty());
assert!(profile.endpoints.stats.is_empty());
assert!(profile.upscaling_rules.is_empty());
Expand Down
Loading