diff --git a/profiling-ffi/src/profiles.rs b/profiling-ffi/src/profiles.rs index 9994bf6ea..5f0abeaff 100644 --- a/profiling-ffi/src/profiles.rs +++ b/profiling-ffi/src/profiles.rs @@ -423,7 +423,7 @@ unsafe fn ddog_prof_profile_add_impl( ) -> anyhow::Result<()> { let profile = profile_ptr_to_inner(profile_ptr)?; - match sample.try_into().map(|s| profile.add(s, timestamp)) { + match sample.try_into().map(|s| profile.add_sample(s, timestamp)) { Ok(r) => match r { Ok(_) => Ok(()), Err(err) => Err(err), diff --git a/profiling-replayer/src/main.rs b/profiling-replayer/src/main.rs index 7eae2ded3..aa80342f2 100644 --- a/profiling-replayer/src/main.rs +++ b/profiling-replayer/src/main.rs @@ -183,7 +183,7 @@ fn main() -> anyhow::Result<()> { let before = Instant::now(); for (timestamp, sample) in samples { - outprof.add(sample, timestamp)?; + outprof.add_sample(sample, timestamp)?; } let duration = before.elapsed(); diff --git a/profiling/examples/profiles.rs b/profiling/examples/profiles.rs index 2b9e9c04c..0c23bd395 100644 --- a/profiling/examples/profiles.rs +++ b/profiling/examples/profiles.rs @@ -59,7 +59,7 @@ fn main() { // Intentionally use the current time. let mut profile = Profile::new(SystemTime::now(), &sample_types, Some(period)); - match profile.add(sample, None) { + match profile.add_sample(sample, None) { Ok(_) => {} Err(_) => exit(1), } diff --git a/profiling/src/internal/profile.rs b/profiling/src/internal/profile.rs index f85797853..bfd066b18 100644 --- a/profiling/src/internal/profile.rs +++ b/profiling/src/internal/profile.rs @@ -36,24 +36,94 @@ pub struct EncodedProfile { pub endpoints_stats: ProfiledEndpointsStats, } -// For testing and debugging purposes +/// Public API impl Profile { - pub fn only_for_testing_num_aggregated_samples(&self) -> usize { - self.observations + /// Add the endpoint data to the endpoint mappings. + /// The `endpoint` string will be interned. + pub fn add_endpoint(&mut self, local_root_span_id: u64, endpoint: Cow) { + let interned_endpoint = self.intern(endpoint.as_ref()); + + self.endpoints + .mappings + .insert(local_root_span_id, interned_endpoint); + } + + pub fn add_endpoint_count(&mut self, endpoint: Cow, value: i64) { + self.endpoints + .stats + .add_endpoint_count(endpoint.into_owned(), value); + } + + pub fn add_sample( + &mut self, + sample: api::Sample, + timestamp: Option, + ) -> anyhow::Result<()> { + anyhow::ensure!( + sample.values.len() == self.sample_types.len(), + "expected {} sample types, but sample had {} sample types", + self.sample_types.len(), + sample.values.len(), + ); + + self.validate_sample_labels(&sample)?; + let labels: Vec<_> = sample + .labels .iter() - .filter(|(_, ts, _)| ts.is_none()) - .count() + .map(|label| { + let key = self.intern(label.key); + let internal_label = if let Some(s) = label.str { + let str = self.intern(s); + Label::str(key, str) + } else { + let num = label.num; + let num_unit = label.num_unit.map(|s| self.intern(s)); + Label::num(key, num, num_unit) + }; + + self.labels.dedup(internal_label) + }) + .collect(); + let labels = self.label_sets.dedup(LabelSet::new(labels)); + + let locations = sample + .locations + .iter() + .map(|l| self.add_location(l)) + .collect(); + + let stacktrace = self.add_stacktrace(locations); + self.observations + .add(Sample::new(labels, stacktrace), timestamp, sample.values); + Ok(()) } - pub fn only_for_testing_num_timestamped_samples(&self) -> usize { - use std::collections::HashSet; - let sample_set: HashSet = - HashSet::from_iter(self.observations.iter().filter_map(|(_, ts, _)| ts)); - sample_set.len() + pub fn add_upscaling_rule( + &mut self, + offset_values: &[usize], + label_name: &str, + label_value: &str, + upscaling_info: UpscalingInfo, + ) -> anyhow::Result<()> { + let label_name_id = self.intern(label_name); + let label_value_id = self.intern(label_value); + self.upscaling_rules.add( + offset_values, + (label_name, label_name_id), + (label_value, label_value_id), + upscaling_info, + self.sample_types.len(), + )?; + + Ok(()) + } + + pub fn get_string(&self, id: StringId) -> &str { + self.strings + .get_index(id.to_offset()) + .expect("StringId to have a valid interned index") } -} -impl Profile { /// Creates a profile with `start_time`. /// Initializes the string table to hold: /// - "" (the empty string) @@ -114,156 +184,6 @@ impl Profile { profile } - #[cfg(test)] - fn interned_strings_count(&self) -> usize { - self.strings.len() - } - - /// Interns the `str` as a string, returning the id in the string table. - /// The empty string is guaranteed to have an id of [StringId::ZERO]. - fn intern(&mut self, item: &str) -> StringId { - // For performance, delay converting the [&str] to a [String] until - // after it has been determined to not exist in the set. This avoids - // temporary allocations. - let index = match self.strings.get_index_of(item) { - Some(index) => index, - None => { - let (index, _inserted) = self.strings.insert_full(item.into()); - // This wouldn't make any sense; the item couldn't be found so - // we try to insert it, but suddenly it exists now? - debug_assert!(_inserted); - index - } - }; - StringId::from_offset(index) - } - - fn add_stacktrace(&mut self, locations: Vec) -> StackTraceId { - self.stack_traces.dedup(StackTrace { locations }) - } - - fn get_stacktrace(&self, st: StackTraceId) -> &StackTrace { - self.stack_traces - .get_index(st.to_raw_id()) - .expect("StackTraceId {st} to exist in profile") - } - - fn add_function(&mut self, function: &api::Function) -> FunctionId { - let name = self.intern(function.name); - let system_name = self.intern(function.system_name); - let filename = self.intern(function.filename); - - let start_line = function.start_line; - self.functions.dedup(Function { - name, - system_name, - filename, - start_line, - }) - } - - fn add_location(&mut self, location: &api::Location) -> LocationId { - let mapping_id = self.add_mapping(&location.mapping); - let function_id = self.add_function(&location.function); - self.locations.dedup(Location { - mapping_id, - function_id, - address: location.address, - line: location.line, - }) - } - - fn add_mapping(&mut self, mapping: &api::Mapping) -> MappingId { - let filename = self.intern(mapping.filename); - let build_id = self.intern(mapping.build_id); - - self.mappings.dedup(Mapping { - memory_start: mapping.memory_start, - memory_limit: mapping.memory_limit, - file_offset: mapping.file_offset, - filename, - build_id, - }) - } - - pub fn add(&mut self, sample: api::Sample, timestamp: Option) -> anyhow::Result<()> { - anyhow::ensure!( - sample.values.len() == self.sample_types.len(), - "expected {} sample types, but sample had {} sample types", - self.sample_types.len(), - sample.values.len(), - ); - - self.validate_sample_labels(&sample)?; - let labels: Vec<_> = sample - .labels - .iter() - .map(|label| { - let key = self.intern(label.key); - let internal_label = if let Some(s) = label.str { - let str = self.intern(s); - Label::str(key, str) - } else { - let num = label.num; - let num_unit = label.num_unit.map(|s| self.intern(s)); - Label::num(key, num, num_unit) - }; - - self.labels.dedup(internal_label) - }) - .collect(); - let labels = self.label_sets.dedup(LabelSet::new(labels)); - - let locations = sample - .locations - .iter() - .map(|l| self.add_location(l)) - .collect(); - - let stacktrace = self.add_stacktrace(locations); - self.observations - .add(Sample::new(labels, stacktrace), timestamp, sample.values); - Ok(()) - } - - /// Validates labels - fn validate_sample_labels(&mut self, sample: &api::Sample) -> anyhow::Result<()> { - let mut seen: HashMap<&str, &api::Label> = HashMap::new(); - - for label in sample.labels.iter() { - if let Some(duplicate) = seen.insert(label.key, label) { - anyhow::bail!("Duplicate label on sample: {:?} {:?}", duplicate, label); - } - - if label.key == "local root span id" { - anyhow::ensure!( - label.str.is_none() && label.num != 0, - "Invalid \"local root span id\" label: {:?}", - label - ); - } - - anyhow::ensure!( - label.key != "end_timestamp_ns", - "Timestamp should not be passed as a label {:?}", - label - ); - } - Ok(()) - } - - fn extract_api_sample_types(&self) -> anyhow::Result> { - let sample_types = self - .sample_types - .iter() - .map(|sample_type| api::ValueType { - r#type: self.get_string(sample_type.r#type), - unit: self.get_string(sample_type.unit), - }) - .collect(); - Ok(sample_types) - } - /// Resets all data except the sample types and period. /// Returns the previous Profile on success. pub fn reset_and_return_previous( @@ -291,42 +211,6 @@ impl Profile { Ok(profile) } - /// Add the endpoint data to the endpoint mappings. - /// The `endpoint` string will be interned. - pub fn add_endpoint(&mut self, local_root_span_id: u64, endpoint: Cow) { - let interned_endpoint = self.intern(endpoint.as_ref()); - - self.endpoints - .mappings - .insert(local_root_span_id, interned_endpoint); - } - - pub fn add_endpoint_count(&mut self, endpoint: Cow, value: i64) { - self.endpoints - .stats - .add_endpoint_count(endpoint.into_owned(), value); - } - - pub fn add_upscaling_rule( - &mut self, - offset_values: &[usize], - label_name: &str, - label_value: &str, - upscaling_info: UpscalingInfo, - ) -> anyhow::Result<()> { - let label_name_id = self.intern(label_name); - let label_value_id = self.intern(label_value); - self.upscaling_rules.add( - offset_values, - (label_name, label_name_id), - (label_value, label_value_id), - upscaling_info, - self.sample_types.len(), - )?; - - Ok(()) - } - /// Serialize the aggregated profile, adding the end time and duration. /// # Arguments /// * `end_time` - Optional end time of the profile. Passing None will use the current time. @@ -440,23 +324,62 @@ impl Profile { endpoints_stats, }) } +} - pub fn get_label(&self, id: LabelId) -> &Label { - self.labels - .get_index(id.to_offset()) - .expect("LabelId to have a valid interned index") +/// Private helper functions +impl Profile { + fn add_function(&mut self, function: &api::Function) -> FunctionId { + let name = self.intern(function.name); + let system_name = self.intern(function.system_name); + let filename = self.intern(function.filename); + + let start_line = function.start_line; + self.functions.dedup(Function { + name, + system_name, + filename, + start_line, + }) } - pub fn get_label_set(&self, id: LabelSetId) -> &LabelSet { - self.label_sets - .get_index(id.to_offset()) - .expect("LabelSetId to have a valid interned index") + fn add_location(&mut self, location: &api::Location) -> LocationId { + let mapping_id = self.add_mapping(&location.mapping); + let function_id = self.add_function(&location.function); + self.locations.dedup(Location { + mapping_id, + function_id, + address: location.address, + line: location.line, + }) } - pub fn get_string(&self, id: StringId) -> &str { - self.strings - .get_index(id.to_offset()) - .expect("StringId to have a valid interned index") + fn add_mapping(&mut self, mapping: &api::Mapping) -> MappingId { + let filename = self.intern(mapping.filename); + let build_id = self.intern(mapping.build_id); + + self.mappings.dedup(Mapping { + memory_start: mapping.memory_start, + memory_limit: mapping.memory_limit, + file_offset: mapping.file_offset, + filename, + build_id, + }) + } + + fn add_stacktrace(&mut self, locations: Vec) -> StackTraceId { + self.stack_traces.dedup(StackTrace { locations }) + } + + fn extract_api_sample_types(&self) -> anyhow::Result> { + let sample_types = self + .sample_types + .iter() + .map(|sample_type| api::ValueType { + r#type: self.get_string(sample_type.r#type), + unit: self.get_string(sample_type.unit), + }) + .collect(); + Ok(sample_types) } /// Fetches the endpoint information for the label. There may be errors, @@ -510,6 +433,43 @@ impl Profile { } } + fn get_label(&self, id: LabelId) -> &Label { + self.labels + .get_index(id.to_offset()) + .expect("LabelId to have a valid interned index") + } + + fn get_label_set(&self, id: LabelSetId) -> &LabelSet { + self.label_sets + .get_index(id.to_offset()) + .expect("LabelSetId to have a valid interned index") + } + + fn get_stacktrace(&self, st: StackTraceId) -> &StackTrace { + self.stack_traces + .get_index(st.to_raw_id()) + .expect("StackTraceId {st} to exist in profile") + } + + /// Interns the `str` as a string, returning the id in the string table. + /// The empty string is guaranteed to have an id of [StringId::ZERO]. + fn intern(&mut self, item: &str) -> StringId { + // For performance, delay converting the [&str] to a [String] until + // after it has been determined to not exist in the set. This avoids + // temporary allocations. + let index = match self.strings.get_index_of(item) { + Some(index) => index, + None => { + let (index, _inserted) = self.strings.insert_full(item.into()); + // This wouldn't make any sense; the item couldn't be found so + // we try to insert it, but suddenly it exists now? + debug_assert!(_inserted); + index + } + }; + StringId::from_offset(index) + } + fn translate_and_enrich_sample_labels( &self, sample: Sample, @@ -528,6 +488,57 @@ impl Profile { Ok(labels) } + + /// Validates labels + fn validate_sample_labels(&mut self, sample: &api::Sample) -> anyhow::Result<()> { + let mut seen: HashMap<&str, &api::Label> = HashMap::new(); + + for label in sample.labels.iter() { + if let Some(duplicate) = seen.insert(label.key, label) { + anyhow::bail!("Duplicate label on sample: {:?} {:?}", duplicate, label); + } + + if label.key == "local root span id" { + anyhow::ensure!( + label.str.is_none() && label.num != 0, + "Invalid \"local root span id\" label: {:?}", + label + ); + } + + anyhow::ensure!( + label.key != "end_timestamp_ns", + "Timestamp should not be passed as a label {:?}", + label + ); + } + Ok(()) + } +} + +/// For testing and debugging purposes +impl Profile { + #[cfg(test)] + fn interned_strings_count(&self) -> usize { + self.strings.len() + } + + // Ideally, these would be [cgf(test)]. But its used in other module's test + // code, which would break if we did so. We could try to do something with + // a test "feature", but this naming scheme is sufficient for now. + pub fn only_for_testing_num_aggregated_samples(&self) -> usize { + self.observations + .iter() + .filter(|(_, ts, _)| ts.is_none()) + .count() + } + + pub fn only_for_testing_num_timestamped_samples(&self) -> usize { + use std::collections::HashSet; + let sample_set: HashSet = + HashSet::from_iter(self.observations.iter().filter_map(|(_, ts, _)| ts)); + sample_set.len() + } } #[cfg(test)] @@ -600,7 +611,7 @@ mod api_test { assert_eq!(profile.only_for_testing_num_aggregated_samples(), 0); profile - .add( + .add_sample( api::Sample { locations, values: vec![1, 10000], @@ -684,18 +695,18 @@ mod api_test { assert_eq!(profile.only_for_testing_num_aggregated_samples(), 0); profile - .add(main_sample, None) + .add_sample(main_sample, None) .expect("profile to not be full"); assert_eq!(profile.only_for_testing_num_aggregated_samples(), 1); profile - .add(test_sample, None) + .add_sample(test_sample, None) .expect("profile to not be full"); assert_eq!(profile.only_for_testing_num_aggregated_samples(), 2); assert_eq!(profile.only_for_testing_num_timestamped_samples(), 0); profile - .add(timestamp_sample, Timestamp::new(42)) + .add_sample(timestamp_sample, Timestamp::new(42)) .expect("profile to not be full"); assert_eq!(profile.only_for_testing_num_timestamped_samples(), 1); profile @@ -893,7 +904,7 @@ mod api_test { labels: vec![id_label], }; - assert!(profile.add(sample, None).is_err()); + assert!(profile.add_sample(sample, None).is_err()); } #[test] @@ -944,9 +955,9 @@ mod api_test { labels: vec![id2_label, other_label], }; - profile.add(sample1, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); - profile.add(sample2, None).expect("add to success"); + profile.add_sample(sample2, None).expect("add to success"); profile.add_endpoint(10, Cow::from("my endpoint")); @@ -1100,7 +1111,7 @@ mod api_test { labels, }; - profile.add(sample, None).unwrap_err(); + profile.add_sample(sample, None).unwrap_err(); } #[test] @@ -1131,7 +1142,7 @@ mod api_test { labels: vec![id_label], }; - profile.add(sample1, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); let serialized_profile = pprof::roundtrip_to_pprof(profile).unwrap(); @@ -1180,7 +1191,7 @@ mod api_test { labels: vec![], }; - profile.add(sample1, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); let upscaling_info = UpscalingInfo::Proportional { scale: 2.0 }; let values_offset = vec![0]; @@ -1208,7 +1219,7 @@ mod api_test { labels: vec![], }; - profile.add(sample1, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); let upscaling_info = UpscalingInfo::Proportional { scale: 2.7 }; let values_offset = vec![0]; @@ -1236,7 +1247,7 @@ mod api_test { labels: vec![], }; - profile.add(sample1, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); let upscaling_info = UpscalingInfo::Poisson { sum_value_offset: 1, @@ -1268,7 +1279,7 @@ mod api_test { labels: vec![], }; - profile.add(sample1, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); let upscaling_info = UpscalingInfo::Poisson { sum_value_offset: 1, @@ -1300,7 +1311,7 @@ mod api_test { labels: vec![], }; - profile.add(sample1, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); // invalid sampling_distance vaue let upscaling_info = UpscalingInfo::Poisson { @@ -1370,8 +1381,8 @@ mod api_test { labels: vec![], }; - profile.add(sample1, None).expect("add to success"); - profile.add(sample2, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); + profile.add_sample(sample2, None).expect("add to success"); // upscale the first value and the last one let values_offset: Vec = vec![0, 2]; @@ -1426,8 +1437,8 @@ mod api_test { labels: vec![], }; - profile.add(sample1, None).expect("add to success"); - profile.add(sample2, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); + profile.add_sample(sample2, None).expect("add to success"); let mut values_offset: Vec = vec![0]; @@ -1471,7 +1482,7 @@ mod api_test { labels: vec![id_label], }; - profile.add(sample1, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); let values_offset: Vec = vec![0]; @@ -1527,7 +1538,7 @@ mod api_test { labels: vec![id_label], }; - profile.add(sample1, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); let upscaling_info = UpscalingInfo::Proportional { scale: 2.0 }; let values_offset: Vec = vec![0]; @@ -1584,8 +1595,8 @@ mod api_test { labels: vec![], }; - profile.add(sample1, None).expect("add to success"); - profile.add(sample2, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); + profile.add_sample(sample2, None).expect("add to success"); let upscaling_info = UpscalingInfo::Proportional { scale: 2.0 }; let values_offset: Vec = vec![0]; @@ -1655,8 +1666,8 @@ mod api_test { labels: vec![id_no_match_label, id_label2], }; - profile.add(sample1, None).expect("add to success"); - profile.add(sample2, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); + profile.add_sample(sample2, None).expect("add to success"); // add rule for the first sample on the 1st value let upscaling_info = UpscalingInfo::Proportional { scale: 2.0 }; @@ -1708,7 +1719,7 @@ mod api_test { labels: vec![id_label], }; - profile.add(sample1, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); // upscale samples and wall-time values let values_offset: Vec = vec![0, 1]; @@ -1744,7 +1755,7 @@ mod api_test { labels: vec![id_label], }; - profile.add(sample1, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); let upscaling_info = UpscalingInfo::Proportional { scale: 2.0 }; let mut value_offsets: Vec = vec![0]; @@ -2016,7 +2027,7 @@ mod api_test { labels: vec![id_label], }; - profile.add(sample1, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); let mut value_offsets: Vec = vec![0, 1]; // Add by-label rule first @@ -2084,8 +2095,8 @@ mod api_test { labels: vec![id2_label], }; - profile.add(sample1, None).expect("add to success"); - profile.add(sample2, None).expect("add to success"); + profile.add_sample(sample1, None).expect("add to success"); + profile.add_sample(sample2, None).expect("add to success"); profile.add_endpoint(10, Cow::from("endpoint 10")); profile.add_endpoint(large_span_id, Cow::from("large endpoint"));