Skip to content

Commit

Permalink
chore(datadog_metrics sink): Set partial Origin Metrics in edge cases (
Browse files Browse the repository at this point in the history
…vectordotdev#18677)

* chore(datadog_metrics): Set partial Origin Metrics in edge cases

* Add note to docs
  • Loading branch information
neuronull authored Sep 26, 2023
1 parent ca9e5b4 commit 3dab239
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 20 deletions.
9 changes: 8 additions & 1 deletion docs/REVIEWING.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,17 @@ following items should also be checked:
- [ ] Does it comply with [component spec](specs/component.md)?
- [ ] Does it comply with the [instrumentation spec](specs/instrumentation.md)?

### Checklist - new source

This checklist is specific for Vector's sources.

- [ ] Does the source handle metrics? If it does, the Datadog Origin Metadata function (`sinks::datadog::metrics::encoder::source_type_to_service`),
which maps the source to the correct Service value, needs to be updated. If this source is an Agent role and thus is the true origin of it's
metrics, this will need to be a follow-up PR by a member of the Vector team.

### Checklist - new sink

This checklist is specific for Vector's sink code.
This checklist is specific for Vector's sinks.

#### Logic

Expand Down
57 changes: 38 additions & 19 deletions src/sinks/datadog/metrics/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{

use bytes::{BufMut, Bytes};
use chrono::{DateTime, Utc};
use once_cell::sync::Lazy;
use prost::Message;
use snafu::{ResultExt, Snafu};
use vector_common::request_metadata::GroupedCountByteSize;
Expand All @@ -32,9 +33,19 @@ const SERIES_PAYLOAD_HEADER: &[u8] = b"{\"series\":[";
const SERIES_PAYLOAD_FOOTER: &[u8] = b"]}";
const SERIES_PAYLOAD_DELIMITER: &[u8] = b",";

const DEFAULT_DD_ORIGIN_PRODUCT_VALUE: u32 = 14;
const ORIGIN_CATEGORY_VALUE: u32 = 11;

const DEFAULT_DD_ORIGIN_PRODUCT_VALUE: u32 = 14;

static ORIGIN_PRODUCT_VALUE: Lazy<u32> = Lazy::new(|| {
option_env!("DD_ORIGIN_PRODUCT")
.map(|p| {
p.parse::<u32>()
.expect("Env var DD_ORIGIN_PRODUCT must be an unsigned 32 bit integer.")
})
.unwrap_or(DEFAULT_DD_ORIGIN_PRODUCT_VALUE)
});

#[allow(warnings, clippy::pedantic, clippy::nursery)]
mod ddmetric_proto {
include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs"));
Expand Down Expand Up @@ -189,20 +200,11 @@ impl DatadogMetricsEncoder {
compressed_limit,
state: EncoderState::default(),
log_schema: log_schema(),
origin_product_value: determine_origin_product_value(),
origin_product_value: *ORIGIN_PRODUCT_VALUE,
})
}
}

fn determine_origin_product_value() -> u32 {
option_env!("DD_ORIGIN_PRODUCT")
.map(|p| {
p.parse::<u32>()
.expect("Env var DD_ORIGIN_PRODUCT must be an unsigned 32 bit integer.")
})
.unwrap_or(DEFAULT_DD_ORIGIN_PRODUCT_VALUE)
}

impl DatadogMetricsEncoder {
fn reset_state(&mut self) -> EncoderState {
mem::take(&mut self.state)
Expand Down Expand Up @@ -564,13 +566,14 @@ fn encode_timestamp(timestamp: Option<DateTime<Utc>>) -> i64 {
}
}

// Given the vector source type, return the OriginService value associated with that integration.
//
// Some sources such as `kafka`, `nats`, `redis` for example, are only capable of receiving metrics
// with the `native` or `native_json` codec. In such cases we intentionally do not set the origin
// metadata here, because the true origin will have already been determined to be a pass-through.
// Given the vector source type, return the OriginService value associated with that integration, if any.
fn source_type_to_service(source_type: &'static str) -> Option<u32> {
match source_type {
// In order to preserve consistent behavior, we intentionally don't set origin metadata
// for the case where the Datadog Agent did not set it.
"datadog_agent" => None,

// These are the sources for which metrics truly originated from this Vector instance.
"apache_metrics" => Some(17),
"aws_ecs_metrics" => Some(209),
"eventstoredb_metrics" => Some(210),
Expand All @@ -583,7 +586,23 @@ fn source_type_to_service(source_type: &'static str) -> Option<u32> {
"prometheus_remote_write" => Some(214),
"prometheus_scrape" => Some(215),
"statsd" => Some(153),
_ => None,

// These sources are only capable of receiving metrics with the `native` or `native_json` codec.
// Generally that means the Origin Metadata will have been set as a pass through.
// However, if the upstream Vector instance did not set Origin Metadata (for example if it is an
// older version version), we will at least set the OriginProduct and OriginCategory.
"kafka" | "nats" | "redis" | "gcp_pubsub" | "http_client" | "http_server" | "vector" => {
Some(0)
}

// This scenario should not occur- if it does it means we added a source that deals with metrics,
// and did not update this function.
// But if it does occur, by setting the Service value to be undefined, we at least populate the
// OriginProduct and OriginCategory.
_ => {
debug!("Source {source_type} OriginService value is undefined! This source needs to be properly mapped to a Service value.");
Some(0)
}
}
}

Expand Down Expand Up @@ -872,7 +891,7 @@ mod tests {
common::datadog::DatadogMetricType,
sinks::datadog::metrics::{
config::DatadogMetricsEndpoint,
encoder::{determine_origin_product_value, DEFAULT_DD_ORIGIN_PRODUCT_VALUE},
encoder::{DEFAULT_DD_ORIGIN_PRODUCT_VALUE, ORIGIN_PRODUCT_VALUE},
},
};

Expand Down Expand Up @@ -1082,7 +1101,7 @@ mod tests {

#[test]
fn encode_origin_metadata_vector_sourced() {
let product = determine_origin_product_value();
let product = *ORIGIN_PRODUCT_VALUE;

let category = 11;
let service = 153;
Expand Down

0 comments on commit 3dab239

Please sign in to comment.