Skip to content

Commit

Permalink
fix(datadog_agent source, datadog_metrics sink): handle interval for …
Browse files Browse the repository at this point in the history
…non-rate series metrics (vectordotdev#18889)

* fix(datadog_agent source, datadog_metrics sink): handle interval for non-rate series metrics

* spelling

* more spelling

* validate the correct gauge

* update code comment w more details

* feedback toby
  • Loading branch information
neuronull authored and AndrooTheChen committed Sep 23, 2024
1 parent 9e0dbde commit 421f1f1
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 80 deletions.
189 changes: 116 additions & 73 deletions src/sinks/datadog/metrics/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,42 +609,46 @@ fn series_to_proto_message(

let timestamp = encode_timestamp(metric.timestamp());

let (points, metric_type, interval) = match (metric.value(), metric.interval_ms()) {
(MetricValue::Counter { value }, maybe_interval_ms) => {
let (value, interval, metric_type) = match maybe_interval_ms {
None => (*value, 0, ddmetric_proto::metric_payload::MetricType::Count),
// our internal representation is in milliseconds but the expected output is in seconds
let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);

let (points, metric_type) = match metric.value() {
MetricValue::Counter { value } => {
if let Some(interval) = maybe_interval {
// When an interval is defined, it implies the value should be in a per-second form,
// so we need to get back to seconds from our milliseconds-based interval, and then
// divide our value by that amount as well.
Some(interval_ms) => (
(*value) * 1000.0 / (interval_ms.get() as f64),
interval_ms.get() as i64 / 1000,
let value = *value / (interval as f64);
(
vec![ddmetric_proto::metric_payload::MetricPoint { value, timestamp }],
ddmetric_proto::metric_payload::MetricType::Rate,
),
};
let points = vec![ddmetric_proto::metric_payload::MetricPoint { value, timestamp }];
(points, metric_type, interval)
)
} else {
(
vec![ddmetric_proto::metric_payload::MetricPoint {
value: *value,
timestamp,
}],
ddmetric_proto::metric_payload::MetricType::Count,
)
}
}
(MetricValue::Set { values }, _) => {
let points = vec![ddmetric_proto::metric_payload::MetricPoint {
MetricValue::Set { values } => (
vec![ddmetric_proto::metric_payload::MetricPoint {
value: values.len() as f64,
timestamp,
}];
let metric_type = ddmetric_proto::metric_payload::MetricType::Gauge;
let interval = 0;
(points, metric_type, interval)
}
(MetricValue::Gauge { value }, _) => {
let points = vec![ddmetric_proto::metric_payload::MetricPoint {
}],
ddmetric_proto::metric_payload::MetricType::Gauge,
),
MetricValue::Gauge { value } => (
vec![ddmetric_proto::metric_payload::MetricPoint {
value: *value,
timestamp,
}];
let metric_type = ddmetric_proto::metric_payload::MetricType::Gauge;
let interval = 0;
(points, metric_type, interval)
}
}],
ddmetric_proto::metric_payload::MetricType::Gauge,
),
// NOTE: AggregatedSummary will have been previously split into counters and gauges during normalization
(value, _) => {
value => {
// this case should have already been surfaced by encode_single_metric() so this should never be reached
return Err(EncoderError::InvalidMetric {
expected: "series",
Expand All @@ -662,7 +666,7 @@ fn series_to_proto_message(
// unit is omitted
unit: "".to_string(),
source_type_name,
interval,
interval: maybe_interval.unwrap_or(0) as i64,
metadata,
})
}
Expand Down Expand Up @@ -822,6 +826,9 @@ fn generate_series_metrics(
let ts = encode_timestamp(metric.timestamp());
let tags = Some(encode_tags(&tags));

// our internal representation is in milliseconds but the expected output is in seconds
let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);

let event_metadata = metric.metadata();
let metadata = generate_series_metadata(
event_metadata.datadog_origin_metadata(),
Expand All @@ -831,64 +838,43 @@ fn generate_series_metrics(

trace!(?metadata, "Generated series metadata.");

let results = match (metric.value(), metric.interval_ms()) {
(MetricValue::Counter { value }, maybe_interval_ms) => {
let (value, interval, metric_type) = match maybe_interval_ms {
None => (*value, None, DatadogMetricType::Count),
let (points, metric_type) = match metric.value() {
MetricValue::Counter { value } => {
if let Some(interval) = maybe_interval {
// When an interval is defined, it implies the value should be in a per-second form,
// so we need to get back to seconds from our milliseconds-based interval, and then
// divide our value by that amount as well.
Some(interval_ms) => (
(*value) * 1000.0 / (interval_ms.get() as f64),
Some(interval_ms.get() / 1000),
DatadogMetricType::Rate,
),
};

vec![DatadogSeriesMetric {
metric: name,
r#type: metric_type,
interval,
points: vec![DatadogPoint(ts, value)],
tags,
host,
source_type_name,
device,
metadata,
}]
let value = *value / (interval as f64);
(vec![DatadogPoint(ts, value)], DatadogMetricType::Rate)
} else {
(vec![DatadogPoint(ts, *value)], DatadogMetricType::Count)
}
}
(MetricValue::Set { values }, _) => vec![DatadogSeriesMetric {
metric: name,
r#type: DatadogMetricType::Gauge,
interval: None,
points: vec![DatadogPoint(ts, values.len() as f64)],
tags,
host,
source_type_name,
device,
metadata,
}],
(MetricValue::Gauge { value }, _) => vec![DatadogSeriesMetric {
metric: name,
r#type: DatadogMetricType::Gauge,
interval: None,
points: vec![DatadogPoint(ts, *value)],
tags,
host,
source_type_name,
device,
metadata,
}],
MetricValue::Set { values } => (
vec![DatadogPoint(ts, values.len() as f64)],
DatadogMetricType::Gauge,
),
MetricValue::Gauge { value } => (vec![DatadogPoint(ts, *value)], DatadogMetricType::Gauge),
// NOTE: AggregatedSummary will have been previously split into counters and gauges during normalization
(value, _) => {
value => {
return Err(EncoderError::InvalidMetric {
expected: "series",
metric_value: value.as_name(),
})
}
};

Ok(results)
Ok(vec![DatadogSeriesMetric {
metric: name,
r#type: metric_type,
interval: maybe_interval,
points,
tags,
host,
source_type_name,
device,
metadata,
}])
}

fn get_compressor() -> Compressor {
Expand Down Expand Up @@ -1244,6 +1230,63 @@ mod tests {
}
}

#[test]
fn encode_non_rate_metric_with_interval() {
// It is possible that the Agent sends Gauges with an interval set. This
// Occurs when the origin of the metric is Dogstatsd, where the interval
// is set to 10.

let value = 423.1331;
let interval_ms = 10000;

let gauge = Metric::new(
"basic_gauge",
MetricKind::Incremental,
MetricValue::Gauge { value },
)
.with_timestamp(Some(ts()))
.with_interval_ms(NonZeroU32::new(interval_ms));

let expected_value = value; // For gauge, the value should not be modified by interval
let expected_interval = interval_ms / 1000;

// series v1
{
// Encode the metric and make sure we did the rate conversion correctly.
let result = generate_series_metrics(
&gauge,
&None,
log_schema(),
DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
);
assert!(result.is_ok());

let metrics = result.unwrap();
assert_eq!(metrics.len(), 1);

let actual = &metrics[0];
assert_eq!(actual.r#type, DatadogMetricType::Gauge);
assert_eq!(actual.interval, Some(expected_interval));
assert_eq!(actual.points.len(), 1);
assert_eq!(actual.points[0].1, expected_value);
}

// series v2
{
let series_proto = series_to_proto_message(
&gauge,
&None,
log_schema(),
DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
)
.unwrap();
assert_eq!(series_proto.r#type, 3);
assert_eq!(series_proto.interval, expected_interval as i64);
assert_eq!(series_proto.points.len(), 1);
assert_eq!(series_proto.points[0].value, expected_value);
}
}

#[test]
fn encode_origin_metadata_pass_through() {
let product = 10;
Expand Down
16 changes: 10 additions & 6 deletions src/sinks/datadog/metrics/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,15 @@ fn generate_counter_gauge_set() -> Vec<Event> {
let ts = Utc::now().trunc_subsecs(3);
let events = vec![
// gauge
Event::Metric(Metric::new(
"gauge",
MetricKind::Incremental,
MetricValue::Gauge { value: 5678.0 },
)),
Event::Metric(
Metric::new(
"gauge",
MetricKind::Incremental,
MetricValue::Gauge { value: 5678.0 },
)
// Dogstatsd outputs gauges with an interval
.with_interval_ms(NonZeroU32::new(10000)),
),
// counter with interval
Event::Metric(
Metric::new(
Expand Down Expand Up @@ -318,7 +322,7 @@ fn validate_protobuf_set_gauge_rate(request: &(Parts, Bytes)) {
ddmetric_proto::metric_payload::MetricType::Gauge
);
assert_eq!(gauge.points[0].value, 5678.0);
assert_eq!(gauge.interval, 0);
assert_eq!(gauge.interval, 10);
}

// validate counter w interval = rate
Expand Down
28 changes: 28 additions & 0 deletions src/sources/datadog_agent/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,33 @@ pub(crate) fn decode_ddseries_v2(

let event_metadata = get_event_metadata(serie.metadata.as_ref());

// It is possible to receive non-rate metrics from the Agent with an interval set.
// That interval can be applied with the `as_rate` function in the Datadog UI.
// The scenario this happens is when DogStatsD emits non-rate series metrics to the Agent,
// in which it sets an interval to 10. See
// - https://github.com/DataDog/datadog-agent/blob/9f0a85c926596ec9aebe2d8e1f2a8b1af6e45635/pkg/aggregator/time_sampler.go#L49C1-L49C1
// - https://github.com/DataDog/datadog-agent/blob/209b70529caff9ec1c30b6b2eed27bce725ed153/pkg/aggregator/aggregator.go#L39
//
// Note that DogStatsD is the only scenario this occurs; regular Agent checks/services do not set the
// interval for non-rate series metrics.
//
// Note that because Vector does not yet have a specific Metric type to handle Rate,
// we are distinguishing Rate from Count by setting an interval to Rate but not Count.
// Luckily, the only time a Count metric type is emitted by DogStatsD, is in the Sketch endpoint.
// (Regular Count metrics are emitted by DogStatsD as Rate metrics).
//
// In theory we should be safe to set this non-rate-interval to Count metrics below, but to be safe,
// we will only set it for Rate and Gauge. Since Rates already need an interval, the only "odd" case
// is Gauges.
//
// Ultimately if we had a unique internal representation of a Rate metric type, we wouldn't need to
// have special handling for the interval, we would just apply it to all metrics that it came in with.
let non_rate_interval = if serie.interval.is_positive() {
NonZeroU32::new(serie.interval as u32 * 1000) // incoming is seconds, convert to milliseconds
} else {
None
};

serie.resources.into_iter().for_each(|r| {
// As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L180-L189
// the hostname can be found in MetricSeries::resources and that is the only value stored there.
Expand Down Expand Up @@ -323,6 +350,7 @@ pub(crate) fn decode_ddseries_v2(
))
.with_tags(Some(tags.clone()))
.with_namespace(namespace)
.with_interval_ms(non_rate_interval)
})
.collect::<Vec<_>>(),
Ok(metric_payload::MetricType::Rate) => serie
Expand Down
16 changes: 15 additions & 1 deletion src/sources/datadog_agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1902,7 +1902,7 @@ async fn decode_series_endpoint_v2() {
r#type: ddmetric_proto::metric_payload::MetricType::Gauge as i32,
unit: "".to_string(),
source_type_name: "a_random_source_type_name".to_string(),
interval: 0,
interval: 10, // Dogstatsd sets Gauge interval to 10 by default
metadata: None,
},
ddmetric_proto::metric_payload::MetricSeries {
Expand Down Expand Up @@ -1982,6 +1982,13 @@ async fn decode_series_endpoint_v2() {
)
);
assert_eq!(metric.kind(), MetricKind::Absolute);
assert_eq!(
metric
.interval_ms()
.expect("should have set interval")
.get(),
10000
);
assert_eq!(*metric.value(), MetricValue::Gauge { value: 3.14 });
assert_tags(
metric,
Expand All @@ -2006,6 +2013,13 @@ async fn decode_series_endpoint_v2() {
);
assert_eq!(metric.kind(), MetricKind::Absolute);
assert_eq!(*metric.value(), MetricValue::Gauge { value: 3.1415 });
assert_eq!(
metric
.interval_ms()
.expect("should have set interval")
.get(),
10000
);
assert_tags(
metric,
metric_tags!(
Expand Down

0 comments on commit 421f1f1

Please sign in to comment.