diff --git a/Cargo.lock b/Cargo.lock index a479b7a94..c9380db6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1989,6 +1989,7 @@ dependencies = [ "mockito", "proptest", "protobuf", + "regex", "reqwest", "serde", "serde_json", @@ -4333,9 +4334,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.4" +version = "1.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" +checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" dependencies = [ "aho-corasick", "memchr", diff --git a/LICENSE-3rdparty.yml b/LICENSE-3rdparty.yml index 75940d939..277bba420 100644 --- a/LICENSE-3rdparty.yml +++ b/LICENSE-3rdparty.yml @@ -22213,7 +22213,7 @@ third_party_libraries: END OF TERMS AND CONDITIONS - package_name: regex - package_version: 1.10.4 + package_version: 1.10.6 repository: https://github.com/rust-lang/regex license: MIT OR Apache-2.0 licenses: diff --git a/dogstatsd/Cargo.toml b/dogstatsd/Cargo.toml index 1540ae9a7..30f3b7cc6 100644 --- a/dogstatsd/Cargo.toml +++ b/dogstatsd/Cargo.toml @@ -22,6 +22,7 @@ thiserror = { version = "1.0.58", default-features = false } tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread"] } tokio-util = { version = "0.7.11", default-features = false } tracing = { version = "0.1.40", default-features = false } +regex = { version = "1.10.6", default-features = false } [dev-dependencies] mockito = { version = "1.5.0", default-features = false } diff --git a/dogstatsd/src/aggregator.rs b/dogstatsd/src/aggregator.rs index 506aadd42..380d93017 100644 --- a/dogstatsd/src/aggregator.rs +++ b/dogstatsd/src/aggregator.rs @@ -6,7 +6,7 @@ use crate::constants; use crate::datadog::{self, Metric as MetricToShip, Series}; use crate::errors; -use crate::metric::{self, Metric as DogstatsdMetric, Type}; +use crate::metric::{self, Metric, MetricValue, SortedTags}; use std::time; use datadog_protos::metrics::{Dogsketch, Sketch, SketchPayload}; @@ -16,29 +16,16 @@ use protobuf::Message; use tracing::{error, warn}; use ustr::Ustr; -#[derive(Debug, Clone)] -pub struct Entry { - id: u64, - name: Ustr, - tags: Option, - pub metric_value: MetricValue, -} - -#[derive(Debug, Clone)] -pub enum MetricValue { - Count(f64), - Gauge(f64), - Distribution(DDSketch), -} - impl MetricValue { - fn insert_metric(&mut self, metric: &DogstatsdMetric) { + fn aggregate(&mut self, metric: Metric) { // safe because we know there's at least one value when we parse match self { - MetricValue::Count(count) => *count += metric.first_value().unwrap_or_default(), - MetricValue::Gauge(gauge) => *gauge = metric.first_value().unwrap_or_default(), + MetricValue::Count(count) => *count += metric.value.get_value().unwrap_or_default(), + MetricValue::Gauge(gauge) => *gauge = metric.value.get_value().unwrap_or_default(), MetricValue::Distribution(distribution) => { - distribution.insert(metric.first_value().unwrap_or_default()); + if let Some(value) = metric.value.get_sketch() { + distribution.merge(value); + } } } } @@ -59,40 +46,12 @@ impl MetricValue { } } -impl Entry { - fn new_from_metric(id: u64, metric: &DogstatsdMetric) -> Self { - let mut metric_value = match metric.kind { - Type::Count => MetricValue::Count(0.0), - Type::Gauge => MetricValue::Gauge(0.0), - Type::Distribution => MetricValue::Distribution(DDSketch::default()), - }; - metric_value.insert_metric(metric); - Self { - id, - name: metric.name, - tags: metric.tags, - metric_value, - } - } - - /// Return an iterator over key, value pairs - fn tag(&self) -> impl Iterator { - self.tags.into_iter().filter_map(|tags| { - let mut split = tags.split(','); - match (split.next(), split.next()) { - (Some(k), Some(v)) => Some((Ustr::from(k), Ustr::from(v))), - _ => None, // Skip tags that lack the proper format - } - }) - } -} - #[derive(Clone)] // NOTE by construction we know that intervals and contexts do not explore the // full space of usize but the type system limits how we can express this today. pub struct Aggregator { - tags: Vec, - map: hash_table::HashTable, + tags: SortedTags, + map: hash_table::HashTable, max_batch_entries_single_metric: usize, max_batch_bytes_single_metric: u64, max_batch_entries_sketch_metric: usize, @@ -109,7 +68,7 @@ impl Aggregator { /// counterparts in `constants`. This would be better as a compile-time /// issue, although leaving this open allows for runtime configuration. #[allow(clippy::cast_precision_loss)] - pub fn new(tags: Vec, max_context: usize) -> Result { + pub fn new(tags: SortedTags, max_context: usize) -> Result { if max_context > constants::MAX_CONTEXTS { return Err(errors::Creation::Contexts); } @@ -130,23 +89,22 @@ impl Aggregator { /// /// Function will return overflow error if more than /// `min(constants::MAX_CONTEXTS, CONTEXTS)` is exceeded. - pub fn insert(&mut self, metric: &DogstatsdMetric) -> Result<(), errors::Insert> { - let id = metric::id(metric.name, metric.tags); + pub fn insert(&mut self, metric: Metric) -> Result<(), errors::Insert> { + let id = metric::id(metric.name, &metric.tags); let len = self.map.len(); match self .map - .entry(id, |m| m.id == id, |m| metric::id(m.name, m.tags)) + .entry(id, |m| m.id == id, |m| metric::id(m.name, &m.tags)) { hash_table::Entry::Vacant(entry) => { if len >= self.max_context { return Err(errors::Insert::Overflow); } - let ent = Entry::new_from_metric(id, metric); - entry.insert(ent); + entry.insert(metric); } hash_table::Entry::Occupied(mut entry) => { - entry.get_mut().metric_value.insert_metric(metric); + entry.get_mut().value.aggregate(metric); } } Ok(()) @@ -168,7 +126,7 @@ impl Aggregator { self.map .iter() - .filter_map(|entry| match entry.metric_value { + .filter_map(|entry| match entry.value { MetricValue::Distribution(_) => build_sketch(now, entry, &self.tags), _ => None, }) @@ -190,7 +148,7 @@ impl Aggregator { for sketch in self .map .extract_if(|entry| { - if let MetricValue::Distribution(_) = entry.metric_value { + if let MetricValue::Distribution(_) = entry.value { return true; } false @@ -227,7 +185,7 @@ impl Aggregator { self.map .iter() - .filter_map(|entry| match entry.metric_value { + .filter_map(|entry| match entry.value { MetricValue::Distribution(_) => None, _ => build_metric(entry, &self.tags), }) @@ -245,7 +203,7 @@ impl Aggregator { for metric in self .map .extract_if(|entry| { - if let MetricValue::Distribution(_) = entry.metric_value { + if let MetricValue::Distribution(_) = entry.value { return false; } true @@ -288,14 +246,14 @@ impl Aggregator { batched_payloads } - pub fn get_entry_by_id(&self, name: Ustr, tags: Option) -> Option<&Entry> { + pub fn get_entry_by_id(&self, name: Ustr, tags: &SortedTags) -> Option<&Metric> { let id = metric::id(name, tags); self.map.find(id, |m| m.id == id) } } -fn build_sketch(now: i64, entry: &Entry, base_tag_vec: &[String]) -> Option { - let sketch = entry.metric_value.get_sketch()?; +fn build_sketch(now: i64, entry: &Metric, base_tag_vec: &SortedTags) -> Option { + let sketch = entry.value.get_sketch()?; let mut dogsketch = Dogsketch::default(); sketch.merge_to_dogsketch(&mut dogsketch); // TODO(Astuyve) allow users to specify timestamp @@ -304,28 +262,21 @@ fn build_sketch(now: i64, entry: &Entry, base_tag_vec: &[String]) -> Option Option { - let mut resources = Vec::with_capacity(constants::MAX_TAGS); - for (name, kind) in entry.tag() { - let resource = datadog::Resource { - name: name.as_str(), - kind: kind.as_str(), - }; - resources.push(resource); - } - let kind = match entry.metric_value { +fn build_metric(entry: &Metric, base_tag_vec: &SortedTags) -> Option { + let resources = entry.tags.to_resources(); + let kind = match entry.value { MetricValue::Count(_) => datadog::DdMetricKind::Count, MetricValue::Gauge(_) => datadog::DdMetricKind::Gauge, MetricValue::Distribution(_) => unreachable!(), }; let point = datadog::Point { - value: entry.metric_value.get_value()?, + value: entry.value.get_value()?, // TODO(astuyve) allow user to specify timestamp timestamp: time::SystemTime::now() .duration_since(time::UNIX_EPOCH) @@ -333,39 +284,24 @@ fn build_metric(entry: &Entry, base_tag_vec: &[String]) -> Option .as_secs(), }; - let mut final_tags = Vec::new(); - // TODO - // These tags are interned so we don't need to clone them here but we're just doing it - // because it's easier than dealing with the lifetimes. - if let Some(tags) = entry.tags { - final_tags = tags.split(',').map(ToString::to_string).collect(); - } - final_tags.extend(base_tag_vec.to_owned()); + let mut tags = entry.tags.clone(); + tags.extend(base_tag_vec); + Some(MetricToShip { metric: entry.name.as_str(), resources, kind, points: [point; 1], - tags: final_tags, + tags: tags.to_strings(), }) } -fn tags_string_to_vector(tags: Option) -> Vec { - if tags.is_none() { - return Vec::new(); - } - tags.unwrap_or_default() - .split(',') - .map(ToString::to_string) - .collect() -} - #[cfg(test)] #[allow(clippy::unwrap_used)] pub mod tests { use crate::aggregator::Aggregator; use crate::metric; - use crate::metric::Metric; + use crate::metric::{parse, SortedTags, EMPTY_TAGS}; use datadog_protos::metrics::SketchPayload; use hashbrown::hash_table; use protobuf::Message; @@ -373,18 +309,22 @@ pub mod tests { const PRECISION: f64 = 0.000_000_01; - const SINGLE_METRIC_SIZE: usize = 187; - const SINGLE_DISTRIBUTION_SIZE: u64 = 135; - const DEFAULT_TAGS: &[&str] = &[ - "dd_extension_version:63-next", - "architecture:x86_64", - "_dd.compute_stats:1", - ]; - - pub fn assert_value(aggregator_mutex: &Mutex, metric_id: &str, value: f64) { + const SINGLE_METRIC_SIZE: usize = 216; // taken from the test, size of a serialized metric with one tag and 1 digit counter value + const SINGLE_DISTRIBUTION_SIZE: u64 = 140; + const DEFAULT_TAGS: &str = + "dd_extension_version:63-next,architecture:x86_64,_dd.compute_stats:1"; + + pub fn assert_value( + aggregator_mutex: &Mutex, + metric_id: &str, + value: f64, + tags: &str, + ) { let aggregator = aggregator_mutex.lock().unwrap(); - if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), None) { - let metric = e.metric_value.get_value().unwrap(); + if let Some(e) = + aggregator.get_entry_by_id(metric_id.into(), &SortedTags::parse(tags).unwrap()) + { + let metric = e.value.get_value().unwrap(); assert!((metric - value).abs() < PRECISION); } else { panic!("{}", format!("{metric_id} not found")); @@ -393,8 +333,8 @@ pub mod tests { pub fn assert_sketch(aggregator_mutex: &Mutex, metric_id: &str, value: f64) { let aggregator = aggregator_mutex.lock().unwrap(); - if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), None) { - let metric = e.metric_value.get_sketch().unwrap(); + if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &EMPTY_TAGS) { + let metric = e.value.get_sketch().unwrap(); assert!((metric.max().unwrap() - value).abs() < PRECISION); assert!((metric.min().unwrap() - value).abs() < PRECISION); assert!((metric.sum().unwrap() - value).abs() < PRECISION); @@ -406,13 +346,13 @@ pub mod tests { #[test] fn insertion() { - let mut aggregator = Aggregator::new(Vec::new(), 2).unwrap(); + let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - let metric1 = Metric::parse("test:1|c|k:v").expect("metric parse failed"); - let metric2 = Metric::parse("foo:1|c|k:v").expect("metric parse failed"); + let metric1 = parse("test:1|c|#k:v").expect("metric parse failed"); + let metric2 = parse("foo:1|c|#k:v").expect("metric parse failed"); - assert!(aggregator.insert(&metric1).is_ok()); - assert!(aggregator.insert(&metric2).is_ok()); + assert!(aggregator.insert(metric1).is_ok()); + assert!(aggregator.insert(metric2).is_ok()); // Both unique contexts get one slot. assert_eq!(aggregator.map.len(), 2); @@ -420,13 +360,13 @@ pub mod tests { #[test] fn distribution_insertion() { - let mut aggregator = Aggregator::new(Vec::new(), 2).unwrap(); + let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - let metric1 = Metric::parse("test:1|d|k:v").expect("metric parse failed"); - let metric2 = Metric::parse("foo:1|d|k:v").expect("metric parse failed"); + let metric1 = parse("test:1|d|#k:v").expect("metric parse failed"); + let metric2 = parse("foo:1|d|#k:v").expect("metric parse failed"); - assert!(aggregator.insert(&metric1).is_ok()); - assert!(aggregator.insert(&metric2).is_ok()); + assert!(aggregator.insert(metric1).is_ok()); + assert!(aggregator.insert(metric2).is_ok()); // Both unique contexts get one slot. assert_eq!(aggregator.map.len(), 2); @@ -434,52 +374,56 @@ pub mod tests { #[test] fn overflow() { - let mut aggregator = Aggregator::new(Vec::new(), 2).unwrap(); + let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - let metric1 = Metric::parse("test:1|c|k:v").expect("metric parse failed"); - let metric2 = Metric::parse("foo:1|c|k:v").expect("metric parse failed"); - let metric3 = Metric::parse("bar:1|c|k:v").expect("metric parse failed"); + let metric1 = parse("test:1|c|#k:v").expect("metric parse failed"); + let metric2 = parse("foo:1|c|#k:v").expect("metric parse failed"); + let metric3 = parse("bar:1|c|#k:v").expect("metric parse failed"); - let id1 = metric::id(metric1.name, metric1.tags); - let id2 = metric::id(metric2.name, metric2.tags); - let id3 = metric::id(metric3.name, metric3.tags); + let id1 = metric::id(metric1.name, &metric1.tags); + let id2 = metric::id(metric2.name, &metric2.tags); + let id3 = metric::id(metric3.name, &metric3.tags); assert_ne!(id1, id2); assert_ne!(id1, id3); assert_ne!(id2, id3); - assert!(aggregator.insert(&metric1).is_ok()); + assert!(aggregator.insert(metric1).is_ok()); assert_eq!(aggregator.map.len(), 1); - assert!(aggregator.insert(&metric2).is_ok()); - assert!(aggregator.insert(&metric2).is_ok()); - assert!(aggregator.insert(&metric2).is_ok()); + assert!(aggregator.insert(metric2.clone()).is_ok()); + assert!(aggregator.insert(metric2.clone()).is_ok()); + assert!(aggregator.insert(metric2).is_ok()); assert_eq!(aggregator.map.len(), 2); - assert!(aggregator.insert(&metric3).is_err()); + assert!(aggregator.insert(metric3).is_err()); assert_eq!(aggregator.map.len(), 2); } #[test] #[allow(clippy::float_cmp)] fn clear() { - let mut aggregator = Aggregator::new(Vec::new(), 2).unwrap(); + let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - let metric1 = Metric::parse("test:3|c|k:v").expect("metric parse failed"); - let metric2 = Metric::parse("foo:5|c|k:v").expect("metric parse failed"); + let metric1 = parse("test:3|c|#k1:v1").expect("metric parse failed"); + let metric2 = parse("foo:5|c|#k2:v2").expect("metric parse failed"); - assert!(aggregator.insert(&metric1).is_ok()); - assert!(aggregator.insert(&metric2).is_ok()); + assert!(aggregator.insert(metric1).is_ok()); + assert!(aggregator.insert(metric2).is_ok()); assert_eq!(aggregator.map.len(), 2); - if let Some(v) = aggregator.get_entry_by_id("foo".into(), None) { - assert_eq!(v.metric_value.get_value().unwrap(), 5f64); + if let Some(v) = + aggregator.get_entry_by_id("foo".into(), &SortedTags::parse("k2:v2").unwrap()) + { + assert_eq!(v.value.get_value().unwrap(), 5f64); } else { panic!("failed to get value by id"); } - if let Some(v) = aggregator.get_entry_by_id("test".into(), None) { - assert_eq!(v.metric_value.get_value().unwrap(), 3f64); + if let Some(v) = + aggregator.get_entry_by_id("test".into(), &SortedTags::parse("k1:v1").unwrap()) + { + assert_eq!(v.value.get_value().unwrap(), 3f64); } else { panic!("failed to get value by id"); } @@ -490,14 +434,14 @@ pub mod tests { #[test] fn to_series() { - let mut aggregator = Aggregator::new(Vec::new(), 2).unwrap(); + let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - let metric1 = Metric::parse("test:1|c|k:v").expect("metric parse failed"); - let metric2 = Metric::parse("foo:1|c|k:v").expect("metric parse failed"); - let metric3 = Metric::parse("bar:1|c|k:v").expect("metric parse failed"); + let metric1 = parse("test:1|c|#k:v").expect("metric parse failed"); + let metric2 = parse("foo:1|c|#k:v").expect("metric parse failed"); + let metric3 = parse("bar:1|c|#k:v").expect("metric parse failed"); - assert!(aggregator.insert(&metric1).is_ok()); - assert!(aggregator.insert(&metric2).is_ok()); + assert!(aggregator.insert(metric1).is_ok()); + assert!(aggregator.insert(metric2).is_ok()); assert_eq!(aggregator.map.len(), 2); assert_eq!(aggregator.to_series().len(), 2); @@ -505,19 +449,19 @@ pub mod tests { assert_eq!(aggregator.to_series().len(), 2); assert_eq!(aggregator.map.len(), 2); - assert!(aggregator.insert(&metric3).is_err()); + assert!(aggregator.insert(metric3).is_err()); assert_eq!(aggregator.to_series().len(), 2); } #[test] fn distributions_to_protobuf() { - let mut aggregator = Aggregator::new(Vec::new(), 2).unwrap(); + let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - let metric1 = Metric::parse("test:1|d|k:v").expect("metric parse failed"); - let metric2 = Metric::parse("foo:1|d|k:v").expect("metric parse failed"); + let metric1 = parse("test:1|d|#k:v").expect("metric parse failed"); + let metric2 = parse("foo:1|d|#k:v").expect("metric parse failed"); - assert!(aggregator.insert(&metric1).is_ok()); - assert!(aggregator.insert(&metric2).is_ok()); + assert!(aggregator.insert(metric1).is_ok()); + assert!(aggregator.insert(metric2).is_ok()); assert_eq!(aggregator.map.len(), 2); assert_eq!(aggregator.distributions_to_protobuf().sketches().len(), 2); @@ -528,18 +472,16 @@ pub mod tests { #[test] fn consume_distributions_ignore_single_metrics() { - let mut aggregator = Aggregator::new(Vec::new(), 1_000).unwrap(); + let mut aggregator = Aggregator::new(EMPTY_TAGS, 1_000).unwrap(); assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0); assert!(aggregator - .insert( - &Metric::parse("test1:1|d|k:v".to_string().as_str()).expect("metric parse failed") - ) + .insert(parse("test1:1|d|#k:v".to_string().as_str()).expect("metric parse failed")) .is_ok()); assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 1); assert!(aggregator - .insert(&Metric::parse("foo:1|c|k:v").expect("metric parse failed")) + .insert(parse("foo:1|c|#k:v").expect("metric parse failed")) .is_ok()); assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 1); } @@ -549,7 +491,7 @@ pub mod tests { let max_batch = 5; let tot = 12; let mut aggregator = Aggregator { - tags: Vec::new(), + tags: EMPTY_TAGS, map: hash_table::HashTable::new(), max_batch_entries_single_metric: 1_000, max_batch_bytes_single_metric: 1_000, @@ -572,18 +514,14 @@ pub mod tests { fn consume_distributions_batch_bytes() { let expected_distribution_per_batch = 2; let total_number_of_distributions = 5; - let max_bytes = SINGLE_METRIC_SIZE * expected_distribution_per_batch + 11; + let max_bytes = SINGLE_DISTRIBUTION_SIZE * expected_distribution_per_batch as u64; let mut aggregator = Aggregator { - tags: DEFAULT_TAGS - .to_vec() - .iter() - .map(ToString::to_string) - .collect(), + tags: to_sorted_tags(), map: hash_table::HashTable::new(), max_batch_entries_single_metric: 1_000, max_batch_bytes_single_metric: 1_000, max_batch_entries_sketch_metric: 1_000, - max_batch_bytes_sketch_metric: max_bytes as u64, + max_batch_bytes_sketch_metric: max_bytes, max_context: 1_000, }; @@ -612,16 +550,16 @@ pub mod tests { ); } + fn to_sorted_tags() -> SortedTags { + SortedTags::parse(DEFAULT_TAGS).unwrap() + } + #[test] fn consume_distribution_one_element_bigger_than_max_size() { let max_bytes = 1; let tot = 5; let mut aggregator = Aggregator { - tags: DEFAULT_TAGS - .to_vec() - .iter() - .map(ToString::to_string) - .collect(), + tags: to_sorted_tags(), map: hash_table::HashTable::new(), max_batch_entries_single_metric: 1_000, max_batch_bytes_single_metric: 1_000, @@ -643,7 +581,7 @@ pub mod tests { for i in 1..=tot { assert!(aggregator .insert( - &Metric::parse(format!("test{i}:{i}|{counter_or_distro}|k:v").as_str()) + parse(format!("test{i}:{i}|{counter_or_distro}|#k:v").as_str()) .expect("metric parse failed") ) .is_ok()); @@ -652,26 +590,22 @@ pub mod tests { #[test] fn consume_series_ignore_distribution() { - let mut aggregator = Aggregator::new(Vec::new(), 1_000).unwrap(); + let mut aggregator = Aggregator::new(EMPTY_TAGS, 1_000).unwrap(); assert_eq!(aggregator.consume_metrics().len(), 0); assert!(aggregator - .insert( - &Metric::parse("test1:1|c|k:v".to_string().as_str()).expect("metric parse failed") - ) + .insert(parse("test1:1|c|#k:v".to_string().as_str()).expect("metric parse failed")) .is_ok()); assert_eq!(aggregator.consume_distributions().len(), 0); assert_eq!(aggregator.consume_metrics().len(), 1); assert_eq!(aggregator.consume_metrics().len(), 0); assert!(aggregator - .insert( - &Metric::parse("test1:1|c|k:v".to_string().as_str()).expect("metric parse failed") - ) + .insert(parse("test1:1|c|#k:v".to_string().as_str()).expect("metric parse failed")) .is_ok()); assert!(aggregator - .insert(&Metric::parse("foo:1|d|k:v").expect("metric parse failed")) + .insert(parse("foo:1|d|#k:v").expect("metric parse failed")) .is_ok()); assert_eq!(aggregator.consume_metrics().len(), 1); assert_eq!(aggregator.consume_distributions().len(), 1); @@ -683,7 +617,7 @@ pub mod tests { let max_batch = 5; let tot = 13; let mut aggregator = Aggregator { - tags: Vec::new(), + tags: EMPTY_TAGS, map: hash_table::HashTable::new(), max_batch_entries_single_metric: max_batch, max_batch_bytes_single_metric: 10_000, @@ -707,14 +641,10 @@ pub mod tests { fn consume_metrics_batch_bytes() { let expected_metrics_per_batch = 2; let total_number_of_metrics = 5; - let two_metrics_size = 362; + let two_metrics_size = 420; let max_bytes = SINGLE_METRIC_SIZE * expected_metrics_per_batch + 13; let mut aggregator = Aggregator { - tags: DEFAULT_TAGS - .to_vec() - .iter() - .map(ToString::to_string) - .collect(), + tags: to_sorted_tags(), map: hash_table::HashTable::new(), max_batch_entries_single_metric: 1_000, max_batch_bytes_single_metric: max_bytes as u64, @@ -749,11 +679,7 @@ pub mod tests { let max_bytes = 1; let tot = 5; let mut aggregator = Aggregator { - tags: DEFAULT_TAGS - .to_vec() - .iter() - .map(ToString::to_string) - .collect(), + tags: to_sorted_tags(), map: hash_table::HashTable::new(), max_batch_entries_single_metric: 1_000, max_batch_bytes_single_metric: max_bytes, @@ -776,7 +702,7 @@ pub mod tests { #[test] fn distribution_serialized_deserialized() { - let mut aggregator = Aggregator::new(Vec::new(), 1_000).unwrap(); + let mut aggregator = Aggregator::new(EMPTY_TAGS, 1_000).unwrap(); add_metrics(10, &mut aggregator, "d".to_string()); let distribution = aggregator.distributions_to_protobuf(); diff --git a/dogstatsd/src/constants.rs b/dogstatsd/src/constants.rs index 70e17d24b..c93ad9653 100644 --- a/dogstatsd/src/constants.rs +++ b/dogstatsd/src/constants.rs @@ -4,7 +4,7 @@ /// The maximum tags that a `Metric` may hold. pub const MAX_TAGS: usize = 32; -pub const CONTEXTS: usize = 1024; +pub const CONTEXTS: usize = 10_240; pub static MAX_CONTEXTS: usize = 65_536; // 2**16, arbitrary diff --git a/dogstatsd/src/datadog.rs b/dogstatsd/src/datadog.rs index f7cec11ea..810284a9d 100644 --- a/dogstatsd/src/datadog.rs +++ b/dogstatsd/src/datadog.rs @@ -17,22 +17,6 @@ pub struct DdApi { fqdn_site: String, client: reqwest::Client, } -/// Error relating to `ship` -#[derive(thiserror::Error, Debug)] -pub enum ShipError { - #[error("Failed to push to API with status {status}: {body}")] - /// Datadog API failure - Failure { - /// HTTP status code - status: u16, - /// HTTP body that failed - body: String, - }, - - /// Json - #[error(transparent)] - Json(#[from] serde_json::Error), -} fn build_http_client( http_proxy: Option, diff --git a/dogstatsd/src/dogstatsd.rs b/dogstatsd/src/dogstatsd.rs index 4d1101bbe..7173d3283 100644 --- a/dogstatsd/src/dogstatsd.rs +++ b/dogstatsd/src/dogstatsd.rs @@ -8,7 +8,7 @@ use std::sync::{Arc, Mutex}; use tracing::{debug, error}; use crate::aggregator::Aggregator; -use crate::metric::Metric; +use crate::metric::{parse, Metric}; pub struct DogStatsD { cancel_token: tokio_util::sync::CancellationToken, @@ -87,7 +87,7 @@ impl DogStatsD { let all_valid_metrics: Vec = msg .filter(|m| !m.is_empty()) .map(|m| m.replace('\n', "")) - .filter_map(|m| match Metric::parse(m.as_str()) { + .filter_map(|m| match parse(m.as_str()) { Ok(metric) => Some(metric), Err(e) => { error!("Failed to parse metric {}: {}", m, e); @@ -98,7 +98,7 @@ impl DogStatsD { if !all_valid_metrics.is_empty() { let mut guarded_aggregator = self.aggregator.lock().expect("lock poisoned"); for a_valid_value in all_valid_metrics { - let _ = guarded_aggregator.insert(&a_valid_value); + let _ = guarded_aggregator.insert(a_valid_value); } } } @@ -111,6 +111,7 @@ mod tests { use crate::aggregator::tests::assert_value; use crate::aggregator::Aggregator; use crate::dogstatsd::{BufferReader, DogStatsD}; + use crate::metric::EMPTY_TAGS; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::{Arc, Mutex}; @@ -151,7 +152,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d #[tokio::test] async fn test_dogstatsd_multi_metric() { let locked_aggregator = setup_dogstatsd( - "metric1:1|c\nmetric2:2|c|tag2:val2\nmetric3:3|c||tag3:val3,tag4:val4\n", + "metric3:3|c|#tag3:val3,tag4:val4\nmetric1:1|c\nmetric2:2|c|#tag2:val2\n", ) .await; let aggregator = locked_aggregator.lock().expect("lock poisoned"); @@ -162,9 +163,9 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0); drop(aggregator); - assert_value(&locked_aggregator, "metric1", 1.0); - assert_value(&locked_aggregator, "metric2", 2.0); - assert_value(&locked_aggregator, "metric3", 3.0); + assert_value(&locked_aggregator, "metric1", 1.0, ""); + assert_value(&locked_aggregator, "metric2", 2.0, "tag2:val2"); + assert_value(&locked_aggregator, "metric3", 3.0, "tag3:val3,tag4:val4"); } #[tokio::test] @@ -177,12 +178,12 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0); drop(aggregator); - assert_value(&locked_aggregator, "metric123", 99_123.0); + assert_value(&locked_aggregator, "metric123", 99_123.0, ""); } async fn setup_dogstatsd(statsd_string: &str) -> Arc> { let aggregator_arc = Arc::new(Mutex::new( - Aggregator::new(Vec::new(), 1_024).expect("aggregator creation failed"), + Aggregator::new(EMPTY_TAGS, 1_024).expect("aggregator creation failed"), )); let cancel_token = tokio_util::sync::CancellationToken::new(); diff --git a/dogstatsd/src/metric.rs b/dogstatsd/src/metric.rs index 38894ef69..4ce7ac5ab 100644 --- a/dogstatsd/src/metric.rs +++ b/dogstatsd/src/metric.rs @@ -1,35 +1,108 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::constants; use crate::errors::ParseError; +use crate::{constants, datadog}; +use ddsketch_agent::DDSketch; use fnv::FnvHasher; +use protobuf::Chars; +use regex::Regex; use std::hash::{Hash, Hasher}; use ustr::Ustr; -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -/// Determine what kind/type of a metric has come in -pub enum Type { + +pub const EMPTY_TAGS: SortedTags = SortedTags { values: Vec::new() }; + +#[derive(Clone, Debug)] +pub enum MetricValue { /// Dogstatsd 'count' metric type, monotonically increasing counter - Count, + Count(f64), /// Dogstatsd 'gauge' metric type, point-in-time value - Gauge, + Gauge(f64), /// Dogstatsd 'distribution' metric type, histogram - Distribution, + Distribution(DDSketch), +} + +#[derive(Clone, Debug)] +pub struct SortedTags { + // We sort tags. This is in feature parity with DogStatsD and also means + // that we avoid storing the same context multiple times because users have + // passed tags in different order through time. + values: Vec<(Ustr, Ustr)>, +} + +impl SortedTags { + pub fn extend(&mut self, other: &SortedTags) { + self.values.extend_from_slice(&other.values); + self.values.dedup(); + self.values.sort_unstable(); + } + + pub fn is_empty(&self) -> bool { + self.values.is_empty() + } + + pub fn parse(tags_section: &str) -> Result { + let tag_parts = tags_section.split(','); + let mut parsed_tags = Vec::new(); + // Validate that the tags have the right form. + for (i, part) in tag_parts.filter(|s| !s.is_empty()).enumerate() { + if i >= constants::MAX_TAGS { + return Err(ParseError::Raw("Too many tags")); + } + if !part.contains(':') { + return Err(ParseError::Raw("Invalid tag format")); + } + if let Some((k, v)) = part.split_once(':') { + parsed_tags.push((Ustr::from(k), Ustr::from(v))); + } + } + parsed_tags.dedup(); + parsed_tags.sort_unstable(); + Ok(SortedTags { + values: parsed_tags, + }) + } + + pub fn to_chars(&self) -> Vec { + let mut tags_as_chars = Vec::new(); + for (k, v) in &self.values { + tags_as_chars.push(format!("{}:{}", k, v).into()); + } + tags_as_chars + } + + pub fn to_strings(&self) -> Vec { + let mut tags_as_vec = Vec::new(); + for (k, v) in &self.values { + tags_as_vec.push(format!("{}:{}", k, v)); + } + tags_as_vec + } + + pub(crate) fn to_resources(&self) -> Vec { + let mut resources = Vec::with_capacity(constants::MAX_TAGS); + for (name, kind) in &self.values { + let resource = datadog::Resource { + name: name.as_str(), + kind: kind.as_str(), + }; + resources.push(resource); + } + resources + } } /// Representation of a dogstatsd Metric /// /// For now this implementation covers only counters and gauges. We hope this is /// enough to demonstrate the impact of this program's design goals. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Debug)] pub struct Metric { /// Name of the metric. /// /// Never more bytes than `constants::MAX_METRIC_NAME_BYTES`, /// enforced by construction. Note utf8 issues. - pub(crate) name: Ustr, - /// What kind/type of metric this is. - pub(crate) kind: Type, + pub name: Ustr, /// Values of the metric. A singular value may be either a floating point or /// a integer. Although undocumented we assume 64 bit. A single metric may /// encode multiple values a time in a message. There must be at least one @@ -40,7 +113,7 @@ pub struct Metric { /// moment. /// /// Never longer than `constants::MAX_VALUE_BYTES`. Note utf8 issues. - values: Ustr, + pub value: MetricValue, /// Tags of the metric. /// /// The key is never longer than `constants::MAX_TAG_KEY_BYTES`, the value @@ -48,124 +121,70 @@ pub struct Metric { /// the parser. We assume here that tags are not sent in random order by the /// clien or that, if they are, the API will tidy that up. That is `a:1,b:2` /// is a different tagset from `b:2,a:1`. - pub(crate) tags: Option, -} + pub tags: SortedTags, -impl Metric { - #[must_use] - pub fn new(name: Ustr, kind: Type, values: Ustr, tags: Option) -> Self { - Self { - name, - kind, - values, - tags, - } - } - /// Parse a metric from given input. - /// - /// This function parses a passed `&str` into a `Metric`. We assume that - /// `DogStatsD` metrics must be utf8 and are not ascii or some other encoding. - /// - /// # Errors - /// - /// This function will return with an error if the input violates any of the - /// limits in [`constants`]. Any non-viable input will be discarded. - /// example aj-test.increment:1|c|#user:aj-test from 127.0.0.1:50983 - pub fn parse(input: &str) -> Result { - // TODO must enforce / exploit constraints given in `constants`. - let mut sections = input.split('|'); - - let nv_section = sections - .next() - .ok_or(ParseError::Raw("Missing metric name and value"))?; - - let (name, values) = nv_section - .split_once(':') - .ok_or(ParseError::Raw("Missing name, value section"))?; - - let kind_section = sections - .next() - .ok_or(ParseError::Raw("Missing metric type"))?; - let kind = match kind_section { - "c" => Type::Count, - "g" => Type::Gauge, - "d" => Type::Distribution, - _ => { - return Err(ParseError::Raw("Unsupported metric type")); - } - }; + /// ID given a name and tagset. + pub(crate) id: u64, +} - let mut tags = None; - for section in sections { - if section.starts_with('@') { - // Sample rate section, skip for now. - continue; +/// Parse a metric from given input. +/// +/// This function parses a passed `&str` into a `Metric`. We assume that +/// `DogStatsD` metrics must be utf8 and are not ascii or some other encoding. +/// +/// # Errors +/// +/// This function will return with an error if the input violates any of the +/// limits in [`constants`]. Any non-viable input will be discarded. +/// example aj-test.increment:1|c|#user:aj-test from 127.0.0.1:50983 +pub fn parse(input: &str) -> Result { + // TODO must enforce / exploit constraints given in `constants`. + if let Ok(re) = Regex::new( + r"^(?P[^:]+):(?P[^|]+)\|(?P[cgd])(?:\|@(?P[\d.]+))?(?:\|#(?P[^|]+))?$", + ) { + if let Some(caps) = re.captures(input) { + // unused for now + // let sample_rate = caps.name("sample_rate").map(|m| m.as_str()); + + let tags; + if let Some(tags_section) = caps.name("tags") { + tags = SortedTags::parse(tags_section.as_str())?; + } else { + tags = EMPTY_TAGS; } - if let Some(tags_section) = section.strip_prefix('#') { - let tag_parts = tags_section.split(','); - // Validate that the tags have the right form. - for (i, part) in tag_parts.filter(|s| !s.is_empty()).enumerate() { - if i >= constants::MAX_TAGS { - return Err(ParseError::Raw("Too many tags")); - } - if !part.contains(':') { - return Err(ParseError::Raw("Invalid tag format")); - } + let val = first_value(caps.name("values").unwrap().as_str())?; + let metric_value = match caps.name("type").unwrap().as_str() { + "c" => MetricValue::Count(val), + "g" => MetricValue::Gauge(val), + "d" => { + let sketch = &mut DDSketch::default(); + sketch.insert(val); + MetricValue::Distribution(sketch.to_owned()) } - tags = Some(tags_section); - break; - } - } - - Ok(Metric { - name: Ustr::from(name), - kind, - values: Ustr::from(values), - tags: tags.map(Ustr::from), - }) - } - /// Return an iterator over values - pub(crate) fn values( - &self, - ) -> impl Iterator> + '_ { - self.values.split(':').map(|b: &str| { - let num = b.parse::()?; - Ok(num) - }) - } - - pub(crate) fn first_value(&self) -> Result { - match self.values().next() { - Some(v) => match v { - Ok(v) => Ok(v), - Err(_e) => Err(ParseError::Raw("Failed to parse value as float")), - }, - None => Err(ParseError::Raw("No value")), + _ => { + return Err(ParseError::Raw("Unsupported metric type")); + } + }; + let name = Ustr::from(caps.name("name").unwrap().as_str()); + let id = id(name, &tags); + return Ok(Metric { + name, + value: metric_value, + tags, + id, + }); } } + Err(ParseError::Raw("Invalid metric format")) +} - #[allow(dead_code)] - pub(crate) fn tags(&self) -> Vec { - self.tags - .unwrap_or_default() - .split(',') - .map(std::string::ToString::to_string) - .collect() - } - - #[cfg(test)] - fn raw_values(&self) -> &str { - self.values.as_str() - } - - #[cfg(test)] - fn raw_name(&self) -> &str { - self.name.as_str() - } - - #[cfg(test)] - fn raw_tagset(&self) -> Option<&str> { - self.tags.map(|t| t.as_str()) +fn first_value(values: &str) -> Result { + match values.split(':').next() { + Some(v) => match v.parse::() { + Ok(v) => Ok(v), + Err(_) => Err(ParseError::Raw("Invalid value")), + }, + None => Err(ParseError::Raw("Missing value")), } } @@ -175,36 +194,21 @@ impl Metric { /// identical no matter the internal order of the tagset. That is, we consider a /// tagset like "a:1,b:2,c:3" to be idential to "b:2,c:3,a:1" to "c:3,a:1,b:2" /// etc. This implies that we must sort the tagset after parsing it, which we -/// do. Note however that we _do not_ handle duplicate tags, so "a:1,a:1" will -/// hash to a distinct ID than "a:1". +/// do. Duplicate tags are removed, so "a:1,a:1" will +/// hash to the same ID of "a:1". /// /// Note also that because we take `Ustr` arguments its possible that we've /// interned many possible combinations of a tagset, even if they are identical /// from the point of view of this function. #[inline] #[must_use] -pub fn id(name: Ustr, tagset: Option) -> u64 { +pub fn id(name: Ustr, tags: &SortedTags) -> u64 { let mut hasher = FnvHasher::default(); name.hash(&mut hasher); - // We sort tags. This is in feature parity with DogStatsD and also means - // that we avoid storing the same context multiple times because users have - // passed tags in differeing order through time. - if let Some(tagset) = tagset { - let mut tag_count = 0; - let mut scratch = [None; constants::MAX_TAGS]; - for kv in tagset.split(',') { - if let Some((k, v)) = kv.split_once(':') { - scratch[tag_count] = Some((Ustr::from(k), Ustr::from(v))); - tag_count += 1; - } - } - scratch[..tag_count].sort_unstable(); - // With the tags sorted -- note they're Copy -- we hash the whole kit. - for kv in scratch[..tag_count].iter().flatten() { - kv.0.as_bytes().hash(&mut hasher); - kv.1.as_bytes().hash(&mut hasher); - } + for kv in tags.values.iter() { + kv.0.as_bytes().hash(&mut hasher); + kv.1.as_bytes().hash(&mut hasher); } hasher.finish() } @@ -223,9 +227,9 @@ mod tests { use proptest::{collection, option, strategy::Strategy, string::string_regex}; use ustr::Ustr; - use crate::metric::id; + use crate::metric::{id, parse, MetricValue, SortedTags}; - use super::{Metric, ParseError}; + use super::ParseError; fn metric_name() -> impl Strategy { string_regex("[a-zA-Z0-9.-]{1,128}").unwrap() @@ -265,10 +269,53 @@ mod tests { } else { format!("{name}:{values}|{mtype}") }; - let metric = Metric::parse(&input).unwrap(); - assert_eq!(name, metric.raw_name()); - assert_eq!(values, metric.raw_values()); - assert_eq!(tagset, metric.raw_tagset().map(String::from)); + let metric = parse(&input).unwrap(); + assert_eq!(name, metric.name.as_str()); + + if let Some(tags) = tagset { + let parsed_metric_tags : SortedTags= metric.tags.clone(); + assert_eq!(tags.split(',').count(), parsed_metric_tags.values.len()); + tags.split(',').for_each(|kv| { + let (original_key, original_value) = kv.split_once(':').unwrap(); + let mut found = false; + for (k,v) in parsed_metric_tags.values.iter() { + // TODO not sure who to handle duplicate keys. To make the test pass, just find any match instead of first + if *k == Ustr::from(original_key) && *v == Ustr::from(original_value) { + found = true; + } + } + assert!(found); + }); + } else { + assert!(metric.tags.is_empty()); + } + + match mtype.as_str() { + "c" => { + if let MetricValue::Count(v) = metric.value { + assert_eq!(v, values.split(':').next().unwrap().parse::().unwrap()); + } else { + panic!("Expected count metric"); + } + } + "g" => { + if let MetricValue::Gauge(v) = metric.value { + assert_eq!(v, values.split(':').next().unwrap().parse::().unwrap()); + } else { + panic!("Expected gauge metric"); + } + } + "d" => { + if let MetricValue::Distribution(d) = metric.value { + assert_eq!(d.min().unwrap(), values.split(':').next().unwrap().parse::().unwrap()); + } else { + panic!("Expected distribution metric"); + } + } + _ => { + panic!("Invalid metric format"); + } + } } #[test] @@ -281,13 +328,9 @@ mod tests { } else { format!("|{mtype}") }; - let result = Metric::parse(&input); + let result = parse(&input); - assert!(result.is_err()); - assert_eq!( - result.unwrap_err(), - ParseError::Raw("Missing name, value section") - ); + assert_eq!(result.unwrap_err(),ParseError::Raw("Invalid metric format")); } #[test] @@ -305,12 +348,11 @@ mod tests { } else { format!("{name}{value}|{mtype}") }; - let result = Metric::parse(&input); + let result = parse(&input); - assert!(result.is_err()); assert_eq!( result.unwrap_err(), - ParseError::Raw("Missing name, value section") + ParseError::Raw("Invalid metric format") ); } @@ -326,12 +368,11 @@ mod tests { } else { format!("{name}:{values}|{mtype}") }; - let result = Metric::parse(&input); + let result = parse(&input); - assert!(result.is_err()); assert_eq!( result.unwrap_err(), - ParseError::Raw("Unsupported metric type") + ParseError::Raw("Invalid metric format") ); } @@ -363,8 +404,8 @@ mod tests { tagset2.pop(); } - let id1 = id(Ustr::from(&name), Some(Ustr::from(&tagset1))); - let id2 = id(Ustr::from(&name), Some(Ustr::from(&tagset2))); + let id1 = id(Ustr::from(&name), &SortedTags::parse(&tagset1).unwrap()); + let id2 = id(Ustr::from(&name), &SortedTags::parse(&tagset2).unwrap()); assert_eq!(id1, id2); } @@ -373,21 +414,21 @@ mod tests { #[test] fn parse_too_many_tags() { // 33 - assert_eq!(Metric::parse("foo:1|g|#a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3").unwrap_err(), + assert_eq!(parse("foo:1|g|#a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3").unwrap_err(), ParseError::Raw("Too many tags")); // 32 - assert!(Metric::parse("foo:1|g|#a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2").is_ok()); + assert!(parse("foo:1|g|#a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2").is_ok()); // 31 - assert!(Metric::parse("foo:1|g|#a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1").is_ok()); + assert!(parse("foo:1|g|#a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1").is_ok()); // 30 - assert!(Metric::parse("foo:1|g|#a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3").is_ok()); + assert!(parse("foo:1|g|#a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3,a:1,b:2,c:3").is_ok()); } #[test] fn invalid_dogstatsd_no_panic() { - assert!(Metric::parse("somerandomstring|c+a;slda").is_err()); + assert!(parse("somerandomstring|c+a;slda").is_err()); } } diff --git a/dogstatsd/tests/integration_test.rs b/dogstatsd/tests/integration_test.rs index e36eb3bfe..bb01466ff 100644 --- a/dogstatsd/tests/integration_test.rs +++ b/dogstatsd/tests/integration_test.rs @@ -1,6 +1,7 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use dogstatsd::metric::SortedTags; use dogstatsd::{ aggregator::Aggregator as MetricsAggregator, constants::CONTEXTS, @@ -30,7 +31,8 @@ async fn dogstatsd_server_ships_series() { .await; let metrics_aggr = Arc::new(Mutex::new( - MetricsAggregator::new(Vec::new(), CONTEXTS).expect("failed to create aggregator"), + MetricsAggregator::new(SortedTags::parse("sometkey:somevalue").unwrap(), CONTEXTS) + .expect("failed to create aggregator"), )); let _ = start_dogstatsd(&metrics_aggr).await; diff --git a/serverless/src/main.rs b/serverless/src/main.rs index 7bc5c390a..d580a9df2 100644 --- a/serverless/src/main.rs +++ b/serverless/src/main.rs @@ -18,6 +18,7 @@ use dogstatsd::{ flusher::{build_fqdn_metrics, Flusher}, }; +use dogstatsd::metric::EMPTY_TAGS; use tokio_util::sync::CancellationToken; const DOGSTATSD_FLUSH_INTERVAL: u64 = 10; @@ -116,7 +117,7 @@ async fn start_dogstatsd( dd_https_proxy: Option, ) -> (CancellationToken, Option) { let metrics_aggr = Arc::new(Mutex::new( - MetricsAggregator::new(Vec::new(), CONTEXTS).expect("Failed to create metrics aggregator"), + MetricsAggregator::new(EMPTY_TAGS, CONTEXTS).expect("Failed to create metrics aggregator"), )); let dogstatsd_config = DogStatsDConfig { diff --git a/sidecar/src/watchdog.rs b/sidecar/src/watchdog.rs index 551609c86..4b8f73a6a 100644 --- a/sidecar/src/watchdog.rs +++ b/sidecar/src/watchdog.rs @@ -1,19 +1,19 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use futures::{ + future::{BoxFuture, Shared}, + FutureExt, +}; use std::{ sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicU32, AtomicUsize, Ordering}, Arc, }, time::Duration, }; -use futures::{ - future::{BoxFuture, Shared}, - FutureExt, -}; - use tokio::{select, sync::mpsc::Receiver}; +use tracing::error; pub struct Watchdog { interval: tokio::time::Interval, @@ -36,7 +36,7 @@ impl WatchdogHandle { impl Watchdog { pub fn from_receiver(shutdown_receiver: Receiver<()>) -> Self { Watchdog { - interval: tokio::time::interval(Duration::from_secs(60)), + interval: tokio::time::interval(Duration::from_secs(5)), max_memory_usage_bytes: 1024 * 1024 * 1024, // 1 GB shutdown_receiver, } @@ -46,12 +46,46 @@ impl Watchdog { let mem_usage_bytes = Arc::new(AtomicUsize::new(0)); let handle_mem_usage_bytes = mem_usage_bytes.clone(); + let still_alive = Arc::new(AtomicU32::new(0)); + let still_alive_thread = still_alive.clone(); + + const SHUTDOWN: u32 = u32::MAX; + + let interval = self.interval.period(); + std::thread::spawn(move || { + let mut maybe_stuck = false; + let mut last = 0; + loop { + std::thread::sleep(interval); + let current = still_alive_thread.load(Ordering::Relaxed); + if last != current { + if current == SHUTDOWN { + return; + } + last = current; + maybe_stuck = false; + } else { + if maybe_stuck { + std::thread::spawn(move || { + error!("Watchdog timeout: Sidecar stuck for at least {} seconds. Sending SIGABRT, possibly dumping core.", interval.as_secs()); + }); + // wait 1 seconds to give log a chance to flush - then kill the process + std::thread::sleep(Duration::from_secs(1)); + unsafe { libc::abort() }; + } + maybe_stuck = true; + } + } + }); + let join_handle = tokio::spawn(async move { mem_usage_bytes.store(0, Ordering::Relaxed); loop { select! { _ = self.interval.tick() => { + still_alive.fetch_add(1, Ordering::Relaxed); + let current_mem_usage_bytes = memory_stats::memory_stats() .map(|s| s.physical_mem) .unwrap_or(0); @@ -69,6 +103,7 @@ impl Watchdog { }, _ = self.shutdown_receiver.recv() => { + still_alive.store(SHUTDOWN, Ordering::Relaxed); return }, }