diff --git a/Cargo.lock b/Cargo.lock index 228c7ed19..c6de1ec7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -616,6 +616,7 @@ dependencies = [ "anyhow", "backtrace", "bitmaps", + "byteorder", "bytes", "chrono", "ddcommon", diff --git a/profiling/Cargo.toml b/profiling/Cargo.toml index 0079bee98..49242b99a 100644 --- a/profiling/Cargo.toml +++ b/profiling/Cargo.toml @@ -42,4 +42,5 @@ serde = {version = "1.0", features = ["derive"]} serde_json = {version = "1.0"} tokio = {version = "1.23", features = ["rt", "macros"]} tokio-util = "0.7.1" +byteorder = { version = "1.5", features = ["std"] } uuid = { version = "1.4.1", features = ["v4", "serde"] } diff --git a/profiling/src/internal/label.rs b/profiling/src/internal/label.rs index 487be3083..8ea2af436 100644 --- a/profiling/src/internal/label.rs +++ b/profiling/src/internal/label.rs @@ -150,3 +150,9 @@ impl LabelSetId { self.0 as usize } } + +impl From for u32 { + fn from(value: LabelSetId) -> Self { + value.0 + } +} diff --git a/profiling/src/internal/observation/mod.rs b/profiling/src/internal/observation/mod.rs index 464e6cc6a..aa72bd333 100644 --- a/profiling/src/internal/observation/mod.rs +++ b/profiling/src/internal/observation/mod.rs @@ -9,6 +9,7 @@ //! single pointer of overhead. mod observations; +mod timestamped_observations; mod trimmed_observation; // We keep trimmed_observation private, to ensure that only maps can make and diff --git a/profiling/src/internal/observation/observations.rs b/profiling/src/internal/observation/observations.rs index 6487ee9ce..7291b9563 100644 --- a/profiling/src/internal/observation/observations.rs +++ b/profiling/src/internal/observation/observations.rs @@ -4,21 +4,20 @@ //! See the mod.rs file comment for why this module and file exists. use super::super::Sample; +use super::timestamped_observations::TimestampedObservations; use super::trimmed_observation::{ObservationLength, TrimmedObservation}; use crate::internal::Timestamp; use std::collections::HashMap; struct NonEmptyObservations { + // Samples with no timestamps are aggregated in-place as each observation is added aggregated_data: HashMap, - timestamped_data: Vec, + // Samples with timestamps are all separately kept (so we can know the exact values at the given timestamp) + timestamped_data: TimestampedObservations, obs_len: ObservationLength, + timestamped_samples_count: usize, } -// Timestamp and TrimmedObservation are both 64bit values -// Using a 32 bit SampleId would still take 64 bits due to padding -// So just put the Sample in here -type TrimmedTimestampedObservation = (Sample, Timestamp, TrimmedObservation); - #[derive(Default)] pub struct Observations { inner: Option, @@ -26,24 +25,41 @@ pub struct Observations { /// Public API impl Observations { - pub fn add(&mut self, sample: Sample, timestamp: Option, values: Vec) { - if let Some(inner) = &self.inner { - inner.obs_len.assert_eq(values.len()); - } else { - self.inner = Some(NonEmptyObservations { + pub fn new(observations_len: usize) -> Self { + Observations { + inner: Some(NonEmptyObservations { aggregated_data: Default::default(), - timestamped_data: vec![], - obs_len: ObservationLength::new(values.len()), - }); - }; + timestamped_data: TimestampedObservations::new(observations_len), + obs_len: ObservationLength::new(observations_len), + timestamped_samples_count: 0, + }), + } + } + + pub fn add( + &mut self, + sample: Sample, + timestamp: Option, + values: Vec, + ) -> anyhow::Result<()> { + anyhow::ensure!( + self.inner.is_some(), + "Use of add on Observations that were not initialized" + ); // SAFETY: we just ensured it has an item above. let observations = unsafe { self.inner.as_mut().unwrap_unchecked() }; let obs_len = observations.obs_len; + anyhow::ensure!( + obs_len.eq(values.len()), + "Observation length mismatch, expected {obs_len:?} values, got {} instead", + values.len() + ); + if let Some(ts) = timestamp { - let trimmed = TrimmedObservation::new(values, obs_len); - observations.timestamped_data.push((sample, ts, trimmed)); + observations.timestamped_data.add(sample, ts, values)?; + observations.timestamped_samples_count += 1; } else if let Some(v) = observations.aggregated_data.get_mut(&sample) { // SAFETY: This method is only way to build one of these, and at // the top we already checked the length matches. @@ -55,31 +71,27 @@ impl Observations { let trimmed = TrimmedObservation::new(values, obs_len); observations.aggregated_data.insert(sample, trimmed); } + + Ok(()) } pub fn is_empty(&self) -> bool { self.inner.is_none() + || (self.aggregated_samples_count() == 0 && self.timestamped_samples_count() == 0) + } + + pub fn aggregated_samples_count(&self) -> usize { + self.inner + .as_ref() + .map(|o| o.aggregated_data.len()) + .unwrap_or(0) } - pub fn iter(&self) -> impl Iterator, &[i64])> { - self.inner.iter().flat_map(|observations| { - let obs_len = observations.obs_len; - let aggregated_data = observations - .aggregated_data - .iter() - .map(move |(sample, obs)| (sample, None, obs)); - let timestamped_data = observations - .timestamped_data - .iter() - .map(move |(sample, ts, obs)| (sample, Some(*ts), obs)); - aggregated_data - .chain(timestamped_data) - .map(move |(sample, ts, obs)| { - // SAFETY: The only way to build one of these is through - // [Self::add], which already checked that the length was correct. - (*sample, ts, unsafe { obs.as_slice(obs_len) }) - }) - }) + pub fn timestamped_samples_count(&self) -> usize { + self.inner + .as_ref() + .map(|o| o.timestamped_samples_count) + .unwrap_or(0) } } @@ -100,15 +112,17 @@ impl IntoIterator for Observations { fn into_iter(self) -> Self::IntoIter { let it = self.inner.into_iter().flat_map(|mut observations| { - let timestamped_data_it = std::mem::take(&mut observations.timestamped_data) - .into_iter() - .map(|(s, t, o)| (s, Some(t), o)); + let timestamped_data_it = std::mem::replace( + &mut observations.timestamped_data, + TimestampedObservations::with_no_backing_store(), + ) + .into_iter() + .map(|(s, t, o)| (s, Some(t), o)); let aggregated_data_it = std::mem::take(&mut observations.aggregated_data) .into_iter() - .map(|(s, o)| (s, None, o)); - timestamped_data_it - .chain(aggregated_data_it) - .map(move |(s, t, o)| (s, t, unsafe { o.into_vec(observations.obs_len) })) + .map(|(s, o)| (s, None, o)) + .map(move |(s, t, o)| (s, t, unsafe { o.into_vec(observations.obs_len) })); + timestamped_data_it.chain(aggregated_data_it) }); ObservationsIntoIter { it: Box::new(it) } } @@ -122,11 +136,6 @@ impl Drop for NonEmptyObservations { // [Self::add], which already checked that the length was correct. unsafe { v.consume(o) }; }); - self.timestamped_data.drain(..).for_each(|(_, _, v)| { - // SAFETY: The only way to build one of these is through - // [Self::add], which already checked that the length was correct. - unsafe { v.consume(o) }; - }); } } @@ -139,7 +148,7 @@ mod test { #[test] fn add_and_iter_test() { - let mut o = Observations::default(); + let mut o = Observations::new(3); // These are only for test purposes. The only thing that matters is that // they differ let s1 = Sample { @@ -157,53 +166,23 @@ mod test { let t1 = Some(Timestamp::new(1).unwrap()); let t2 = Some(Timestamp::new(2).unwrap()); - o.add(s1, None, vec![1, 2, 3]); - o.add(s1, None, vec![4, 5, 6]); - o.add(s2, None, vec![7, 8, 9]); - o.iter().for_each(|(k, ts, v)| { - assert!(ts.is_none()); - if k == s1 { - assert_eq!(v, vec![5, 7, 9]); - } else if k == s2 { - assert_eq!(v, vec![7, 8, 9]); - } else { - panic!("Unexpected key"); - } - }); - // Iter twice to make sure there are no issues doing that - o.iter().for_each(|(k, ts, v)| { - assert!(ts.is_none()); - if k == s1 { - assert_eq!(v, vec![5, 7, 9]); - } else if k == s2 { - assert_eq!(v, vec![7, 8, 9]); - } else { - panic!("Unexpected key"); - } - }); - o.add(s3, t1, vec![10, 11, 12]); + o.add(s1, None, vec![1, 2, 3]).unwrap(); + o.add(s1, None, vec![4, 5, 6]).unwrap(); + o.add(s2, None, vec![7, 8, 9]).unwrap(); + o.add(s3, t1, vec![10, 11, 12]).unwrap(); + o.add(s2, t2, vec![13, 14, 15]).unwrap(); - o.iter().for_each(|(k, ts, v)| { - if k == s1 { - assert_eq!(v, vec![5, 7, 9]); - assert!(ts.is_none()); - } else if k == s2 { - assert_eq!(v, vec![7, 8, 9]); - assert!(ts.is_none()); - } else if k == s3 { - assert_eq!(v, vec![10, 11, 12]); - assert_eq!(ts, t1); - } else { - panic!("Unexpected key"); - } - }); + // 2 because they aggregate together + assert_eq!(2, o.aggregated_samples_count()); + + assert_eq!(2, o.timestamped_samples_count()); - o.add(s2, t2, vec![13, 14, 15]); - o.iter().for_each(|(k, ts, v)| { + o.into_iter().for_each(|(k, ts, v)| { if k == s1 { + // Observations without timestamp, these are aggregated together assert_eq!(v, vec![5, 7, 9]); - assert!(ts.is_none()); } else if k == s2 { + // Same stack with and without timestamp if ts.is_some() { assert_eq!(v, vec![13, 14, 15]); assert_eq!(ts, t2); @@ -212,6 +191,7 @@ mod test { assert!(ts.is_none()); } } else if k == s3 { + // Observation with timestamp assert_eq!(v, vec![10, 11, 12]); assert_eq!(ts, t1); } else { @@ -221,7 +201,6 @@ mod test { } #[test] - #[should_panic] fn different_lengths_panic_different_key_no_ts() { // These are only for test purposes. The only thing that matters is that // they differ @@ -234,28 +213,24 @@ mod test { stacktrace: StackTraceId::from_offset(2), }; - let mut o = Observations::default(); - o.add(s1, None, vec![1, 2, 3]); - // This should panic - o.add(s2, None, vec![4, 5]); + let mut o = Observations::new(3); + o.add(s1, None, vec![1, 2, 3]).unwrap(); + o.add(s2, None, vec![4, 5]).unwrap_err(); } #[test] - #[should_panic] fn different_lengths_panic_same_key_no_ts() { let s1 = Sample { labels: LabelSetId::from_offset(1), stacktrace: StackTraceId::from_offset(1), }; - let mut o = Observations::default(); - o.add(s1, None, vec![1, 2, 3]); - // This should panic - o.add(s1, None, vec![4, 5]); + let mut o = Observations::new(3); + o.add(s1, None, vec![1, 2, 3]).unwrap(); + o.add(s1, None, vec![4, 5]).unwrap_err(); } #[test] - #[should_panic] fn different_lengths_panic_different_key_ts() { // These are only for test purposes. The only thing that matters is that // they differ @@ -268,30 +243,26 @@ mod test { stacktrace: StackTraceId::from_offset(2), }; - let mut o = Observations::default(); + let mut o = Observations::new(3); let ts = NonZeroI64::new(1).unwrap(); - o.add(s1, Some(ts), vec![1, 2, 3]); - // This should panic - o.add(s2, Some(ts), vec![4, 5]); + o.add(s1, Some(ts), vec![1, 2, 3]).unwrap(); + o.add(s2, Some(ts), vec![4, 5]).unwrap_err(); } #[test] - #[should_panic] fn different_lengths_panic_same_key_ts() { let s1 = Sample { labels: LabelSetId::from_offset(1), stacktrace: StackTraceId::from_offset(1), }; - let mut o = Observations::default(); + let mut o = Observations::new(3); let ts = NonZeroI64::new(1).unwrap(); - o.add(s1, Some(ts), vec![1, 2, 3]); - // This should panic - o.add(s1, Some(ts), vec![4, 5]); + o.add(s1, Some(ts), vec![1, 2, 3]).unwrap(); + o.add(s1, Some(ts), vec![4, 5]).unwrap_err(); } #[test] - #[should_panic] fn different_lengths_panic_different_key_mixed() { // These are only for test purposes. The only thing that matters is that // they differ @@ -304,11 +275,10 @@ mod test { stacktrace: StackTraceId::from_offset(2), }; - let mut o = Observations::default(); + let mut o = Observations::new(3); let ts = NonZeroI64::new(1).unwrap(); - o.add(s1, None, vec![1, 2, 3]); - // This should panic - o.add(s2, Some(ts), vec![4, 5]); + o.add(s1, None, vec![1, 2, 3]).unwrap(); + o.add(s2, Some(ts), vec![4, 5]).unwrap_err(); } #[test] @@ -319,16 +289,16 @@ mod test { stacktrace: StackTraceId::from_offset(1), }; - let mut o = Observations::default(); + let mut o = Observations::new(3); let ts = NonZeroI64::new(1).unwrap(); - o.add(s1, Some(ts), vec![1, 2, 3]); + o.add(s1, Some(ts), vec![1, 2, 3]).unwrap(); // This should panic - o.add(s1, None, vec![4, 5]); + o.add(s1, None, vec![4, 5]).unwrap(); } #[test] fn into_iter_test() { - let mut o = Observations::default(); + let mut o = Observations::new(3); // These are only for test purposes. The only thing that matters is that // they differ let s1 = Sample { @@ -345,10 +315,10 @@ mod test { }; let t1 = Some(Timestamp::new(1).unwrap()); - o.add(s1, None, vec![1, 2, 3]); - o.add(s1, None, vec![4, 5, 6]); - o.add(s2, None, vec![7, 8, 9]); - o.add(s3, t1, vec![1, 1, 2]); + o.add(s1, None, vec![1, 2, 3]).unwrap(); + o.add(s1, None, vec![4, 5, 6]).unwrap(); + o.add(s2, None, vec![7, 8, 9]).unwrap(); + o.add(s3, t1, vec![1, 1, 2]).unwrap(); let mut count = 0; o.into_iter().for_each(|(k, ts, v)| { diff --git a/profiling/src/internal/observation/timestamped_observations.rs b/profiling/src/internal/observation/timestamped_observations.rs new file mode 100644 index 000000000..49886c890 --- /dev/null +++ b/profiling/src/internal/observation/timestamped_observations.rs @@ -0,0 +1,110 @@ +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2021-Present Datadog, Inc. + +//! Used to store timestamped observations in a compressed buffer. Assumption is that we don't need this data until +// serialization, so it's better to pack it in while we're holding it. + +use super::super::LabelSetId; +use super::super::Sample; +use super::super::StackTraceId; +use crate::collections::identifiable::Id; +use crate::internal::Timestamp; +use byteorder::{NativeEndian, ReadBytesExt}; +use lz4_flex::frame::FrameDecoder; +use lz4_flex::frame::FrameEncoder; +use std::io::Cursor; +use std::io::Write; + +#[derive(Debug)] +pub struct TimestampedObservations { + compressed_timestamped_data: FrameEncoder>, + sample_types_len: usize, +} + +impl TimestampedObservations { + // As documented in the internal Datadog doc "Ruby timeline memory fragmentation impact investigation", + // allowing the timeline storage vec to slowly expand creates A LOT of memory fragmentation for apps that + // employ multiple threads. + // To avoid this, we've picked a default buffer size of 1MB that very rarely needs to grow, and when it does, + // is expected to grow in larger steps. + const DEFAULT_BUFFER_SIZE: usize = 1_048_576; + + pub fn new(sample_types_len: usize) -> Self { + Self { + compressed_timestamped_data: FrameEncoder::new(Vec::with_capacity( + Self::DEFAULT_BUFFER_SIZE, + )), + sample_types_len, + } + } + + pub fn with_no_backing_store() -> Self { + Self { + compressed_timestamped_data: FrameEncoder::new(vec![]), + sample_types_len: 0, + } + } + + pub fn add(&mut self, sample: Sample, ts: Timestamp, values: Vec) -> anyhow::Result<()> { + // We explicitly turn the data into a stream of bytes, feeding it to the compressor. + // @ivoanjo: I played with introducing a structure to serialize it all-at-once, but it seems to be a lot of + // boilerplate (of which cost I'm not sure) to basically do the same as these few lines so in the end I came + // back to this. + + let stack_trace_id: u32 = sample.stacktrace.into(); + let labels_id: u32 = sample.labels.into(); + let timestamp = i64::from(ts); + + self.compressed_timestamped_data + .write_all(&stack_trace_id.to_ne_bytes())?; + self.compressed_timestamped_data + .write_all(&labels_id.to_ne_bytes())?; + self.compressed_timestamped_data + .write_all(×tamp.to_ne_bytes())?; + + for v in values { + self.compressed_timestamped_data + .write_all(&(v).to_ne_bytes())?; + } + + Ok(()) + } + + pub fn into_iter(self) -> TimestampedObservationsIter { + TimestampedObservationsIter { + decoder: FrameDecoder::new(Cursor::new( + self.compressed_timestamped_data.finish().unwrap(), + )), + sample_types_len: self.sample_types_len, + } + } +} + +pub struct TimestampedObservationsIter { + decoder: FrameDecoder>>, + sample_types_len: usize, +} + +impl Iterator for TimestampedObservationsIter { + type Item = (Sample, Timestamp, Vec); + + fn next(&mut self) -> Option { + // In here we read the bytes in the same order as in add above + + let stacktrace = self.decoder.read_u32::().ok()?; + let labels = self.decoder.read_u32::().ok()?; + let ts = self.decoder.read_i64::().ok()?; + let mut values = Vec::with_capacity(self.sample_types_len); + for _ in 0..self.sample_types_len { + values.push(self.decoder.read_i64::().ok()?); + } + Some(( + Sample { + stacktrace: StackTraceId::from_offset(stacktrace as usize), + labels: LabelSetId::from_offset(labels as usize), + }, + std::num::NonZeroI64::new(ts)?, + values, + )) + } +} diff --git a/profiling/src/internal/observation/trimmed_observation.rs b/profiling/src/internal/observation/trimmed_observation.rs index 9536292aa..1fd5cbb8b 100644 --- a/profiling/src/internal/observation/trimmed_observation.rs +++ b/profiling/src/internal/observation/trimmed_observation.rs @@ -11,10 +11,14 @@ use std::mem; /// these. This helps to ensure that the lengths given when we rehydrate a /// slice are the same as when we trimmed it. #[repr(transparent)] -#[derive(Copy, Clone, Default)] +#[derive(Copy, Clone, Default, Debug)] pub(super) struct ObservationLength(usize); impl ObservationLength { + pub fn eq(&self, other: usize) -> bool { + self.0 == other + } + pub fn assert_eq(&self, other: usize) { assert_eq!(self.0, other, "Expected observation lengths to be the same"); } @@ -49,11 +53,6 @@ impl TrimmedObservation { unsafe { std::slice::from_raw_parts_mut(self.data, len.0) } } - /// Safety: the ObservationLength must have come from the same profile as the Observation - pub unsafe fn as_slice(&self, len: ObservationLength) -> &[i64] { - unsafe { std::slice::from_raw_parts(self.data, len.0) } - } - /// Consumes self, ensuring that the memory behind it is dropped. /// It is an error to drop a TrimmedObservation without consuming it first. /// Safety: the ObservationLength must have come from the same profile as the Observation @@ -140,17 +139,6 @@ mod test { } } - #[test] - fn as_ref_test() { - let v = vec![1, 2]; - let o = ObservationLength::new(2); - let t = TrimmedObservation::new(v, o); - unsafe { - assert_eq!(t.as_slice(o), &vec![1, 2]); - t.consume(o); - } - } - #[test] fn drop_after_emptying_test() { let v = vec![1, 2]; @@ -176,9 +164,9 @@ mod test { fn into_boxed_slice_test() { let v = vec![1, 2]; let o = ObservationLength::new(2); - let t = TrimmedObservation::new(v, o); + let mut t = TrimmedObservation::new(v, o); unsafe { - assert_eq!(t.as_slice(o), &vec![1, 2]); + assert_eq!(t.as_mut_slice(o), &vec![1, 2]); let b = t.into_boxed_slice(o); assert_eq!(*b, vec![1, 2]); } @@ -188,9 +176,9 @@ mod test { fn into_vec_test() { let v = vec![1, 2]; let o = ObservationLength::new(2); - let t = TrimmedObservation::new(v, o); + let mut t = TrimmedObservation::new(v, o); unsafe { - assert_eq!(t.as_slice(o), &vec![1, 2]); + assert_eq!(t.as_mut_slice(o), &vec![1, 2]); let b = t.into_vec(o); assert_eq!(*b, vec![1, 2]); } diff --git a/profiling/src/internal/profile.rs b/profiling/src/internal/profile.rs index a5eb44566..420b66e32 100644 --- a/profiling/src/internal/profile.rs +++ b/profiling/src/internal/profile.rs @@ -94,7 +94,7 @@ impl Profile { let stacktrace = self.add_stacktrace(locations); self.observations - .add(Sample::new(labels, stacktrace), timestamp, sample.values); + .add(Sample::new(labels, stacktrace), timestamp, sample.values)?; Ok(()) } @@ -145,7 +145,7 @@ impl Profile { label_sets: Default::default(), locations: Default::default(), mappings: Default::default(), - observations: Default::default(), + observations: Observations::new(sample_types.len()), period: None, sample_types: vec![], stack_traces: Default::default(), @@ -527,17 +527,11 @@ impl Profile { // code, which would break if we did so. We could try to do something with // a test "feature", but this naming scheme is sufficient for now. pub fn only_for_testing_num_aggregated_samples(&self) -> usize { - self.observations - .iter() - .filter(|(_, ts, _)| ts.is_none()) - .count() + self.observations.aggregated_samples_count() } pub fn only_for_testing_num_timestamped_samples(&self) -> usize { - use std::collections::HashSet; - let sample_set: HashSet = - HashSet::from_iter(self.observations.iter().filter_map(|(_, ts, _)| ts)); - sample_set.len() + self.observations.timestamped_samples_count() } } diff --git a/profiling/src/internal/stack_trace.rs b/profiling/src/internal/stack_trace.rs index 8afe96087..5327afe81 100644 --- a/profiling/src/internal/stack_trace.rs +++ b/profiling/src/internal/stack_trace.rs @@ -30,3 +30,9 @@ impl Id for StackTraceId { self.0 as Self::RawId } } + +impl From for u32 { + fn from(value: StackTraceId) -> Self { + value.0 + } +}