Skip to content

Commit

Permalink
ref(metrics): Normalize metrics in the processor instead of aggregator (
Browse files Browse the repository at this point in the history
#3903)

Normalizing metrics is quite costly, especially since we always allocate
a new metric name. We should do this in the processor with all the other
normalizations and checks. This will take quite a bit of load of the
aggregator.
  • Loading branch information
Dav1dde authored Aug 7, 2024
1 parent 3f5e6b9 commit 5e3cf56
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 110 deletions.
99 changes: 8 additions & 91 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,14 @@ use thiserror::Error;
use tokio::time::Instant;

use crate::bucket::{Bucket, BucketValue};
use crate::protocol::{self, MetricNamespace, MetricResourceIdentifier};
use crate::statsd::{MetricCounters, MetricGauges, MetricHistograms, MetricSets, MetricTimers};
use crate::{BucketMetadata, FiniteF64, MetricName};
use crate::{BucketMetadata, FiniteF64, MetricName, MetricNamespace};

use hashbrown::HashMap;

/// Any error that may occur during aggregation.
#[derive(Debug, Error, PartialEq)]
pub enum AggregateMetricsError {
/// A metric bucket had invalid characters in the metric name.
#[error("found invalid characters: {0}")]
InvalidCharacters(MetricName),
/// A metric bucket had an unknown namespace in the metric name.
#[error("found unsupported namespace: {0}")]
UnsupportedNamespace(MetricNamespace),
/// A metric bucket's timestamp was out of the configured acceptable range.
#[error("found invalid timestamp: {0}")]
InvalidTimestamp(UnixTimestamp),
Expand Down Expand Up @@ -847,11 +840,11 @@ fn validate_bucket_key(
Ok(key)
}

/// Removes invalid characters from metric names.
/// Validates metric name against [`AggregatorConfig`].
///
/// Returns `Err` if the metric must be dropped.
fn validate_metric_name(
mut key: BucketKey,
key: BucketKey,
aggregator_config: &AggregatorConfig,
) -> Result<BucketKey, AggregateMetricsError> {
let metric_name_length = key.metric_name.len();
Expand All @@ -864,35 +857,12 @@ fn validate_metric_name(
return Err(AggregateMetricsError::InvalidStringLength(key.metric_name));
}

normalize_metric_name(&mut key)?;

Ok(key)
}

fn normalize_metric_name(key: &mut BucketKey) -> Result<(), AggregateMetricsError> {
key.metric_name = match MetricResourceIdentifier::parse(&key.metric_name) {
Ok(mri) => {
if matches!(mri.namespace, MetricNamespace::Unsupported) {
relay_log::debug!("invalid metric namespace {:?}", &key.metric_name);
return Err(AggregateMetricsError::UnsupportedNamespace(mri.namespace));
}

mri.to_string().into()
}
Err(_) => {
relay_log::debug!("invalid metric name {:?}", &key.metric_name);
return Err(AggregateMetricsError::InvalidCharacters(
key.metric_name.clone(),
));
}
};

Ok(())
}

/// Removes tags with invalid characters in the key, and validates tag values.
/// Validates metric tags against [`AggregatorConfig`].
///
/// Tag values are validated with `protocol::validate_tag_value`.
/// Invalid tags are removed.
fn validate_metric_tags(mut key: BucketKey, aggregator_config: &AggregatorConfig) -> BucketKey {
key.tags.retain(|tag_key, tag_value| {
if tag_key.len() > aggregator_config.max_tag_key_length {
Expand All @@ -904,16 +874,8 @@ fn validate_metric_tags(mut key: BucketKey, aggregator_config: &AggregatorConfig
return false;
}

if protocol::is_valid_tag_key(tag_key) {
true
} else {
relay_log::debug!("invalid metric tag key {tag_key:?}");
false
}
true
});
for (_, tag_value) in key.tags.iter_mut() {
protocol::validate_tag_value(tag_value);
}
key
}

Expand Down Expand Up @@ -946,7 +908,7 @@ mod tests {
Bucket {
timestamp,
width: 0,
name: "c:transactions/foo".into(),
name: "c:transactions/foo@none".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(timestamp),
Expand Down Expand Up @@ -1449,52 +1411,7 @@ mod tests {
}

#[test]
fn test_validate_bucket_key_chars() {
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();

let bucket_key = BucketKey {
project_key,
timestamp: UnixTimestamp::now(),
metric_name: "c:transactions/hergus.bergus".into(),
tags: {
let mut tags = BTreeMap::new();
// There are some SDKs which mess up content encodings, and interpret the raw bytes
// of an UTF-16 string as UTF-8. Leading to ASCII
// strings getting null-bytes interleaved.
//
// Somehow those values end up as release tag in sessions, while in error events we
// haven't observed this malformed encoding. We believe it's slightly better to
// strip out NUL-bytes instead of dropping the tag such that those values line up
// again across sessions and events. Should that cause too high cardinality we'll
// have to drop tags.
//
// Note that releases are validated separately against much stricter character set,
// but the above idea should still apply to other tags.
tags.insert(
"is_it_garbage".to_owned(),
"a\0b\0s\0o\0l\0u\0t\0e\0l\0y".to_owned(),
);
tags.insert("another\0garbage".to_owned(), "bye".to_owned());
tags
},
extracted_from_indexed: false,
};

let mut bucket_key = validate_bucket_key(bucket_key, &test_config()).unwrap();

assert_eq!(bucket_key.tags.len(), 1);
assert_eq!(
bucket_key.tags.get("is_it_garbage"),
Some(&"absolutely".to_owned())
);
assert_eq!(bucket_key.tags.get("another\0garbage"), None);

bucket_key.metric_name = "hergus\0bergus".into();
validate_bucket_key(bucket_key, &test_config()).unwrap_err();
}

#[test]
fn test_validate_bucket_key_str_lens() {
fn test_validate_bucket_key_str_length() {
relay_test::setup();
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();

Expand Down
11 changes: 7 additions & 4 deletions relay-metrics/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ use crate::{FiniteF64, MetricNamespace, ParseMetricError};

const VALUE_SEPARATOR: char = ':';

/// Type of [`Bucket::tags`].
pub type MetricTags = BTreeMap<String, String>;

/// A snapshot of values within a [`Bucket`].
#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
pub struct GaugeValue {
Expand Down Expand Up @@ -401,8 +404,8 @@ fn parse_gauge(string: &str) -> Option<GaugeValue> {
/// Parses tags in the format `tag1,tag2:value`.
///
/// Tag values are optional. For tags with missing values, an empty `""` value is assumed.
fn parse_tags(string: &str) -> Option<BTreeMap<String, String>> {
let mut map = BTreeMap::new();
fn parse_tags(string: &str) -> Option<MetricTags> {
let mut map = MetricTags::new();

for pair in string.split(',') {
let mut name_value = pair.splitn(2, ':');
Expand Down Expand Up @@ -606,8 +609,8 @@ pub struct Bucket {
/// ```text
/// endpoint.hits:1|c|#route:user_index,environment:production,release:1.4.0
/// ```
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub tags: BTreeMap<String, String>,
#[serde(default, skip_serializing_if = "MetricTags::is_empty")]
pub tags: MetricTags,

/// Relay internal metadata for a metric bucket.
///
Expand Down
Loading

0 comments on commit 5e3cf56

Please sign in to comment.