From 4a0c250d61d507f39ec40c8ec6cbf6d72d1c622c Mon Sep 17 00:00:00 2001 From: Bryn Cooke Date: Mon, 22 Jul 2024 09:12:43 +0100 Subject: [PATCH] (Fix) telemetry: allow overriding of service.version (#5689) Co-authored-by: bryn --- .changesets/fix_bryn_service_version.md | 17 + .../src/plugins/telemetry/resource.rs | 37 +- .../tests/integration/telemetry/datadog.rs | 699 +++++++++--------- .../telemetry/fixtures/datadog.router.yaml | 3 + 4 files changed, 388 insertions(+), 368 deletions(-) create mode 100644 .changesets/fix_bryn_service_version.md diff --git a/.changesets/fix_bryn_service_version.md b/.changesets/fix_bryn_service_version.md new file mode 100644 index 0000000000..dc36d77894 --- /dev/null +++ b/.changesets/fix_bryn_service_version.md @@ -0,0 +1,17 @@ +### Allow overriding of service version ([PR #5689](https://github.com/apollographql/router/pull/5689)) + +Previously `service.version` was not overridable via yaml and was ignored. It is now possible to set this explicitly which can be useful for users producing custom builds of the Router. + +For example: +```yaml +telemetry: + exporters: + tracing: + common: + resource: + service.version: 1.0 +``` + +Overrides the version to `1.0`. + +By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/5689 diff --git a/apollo-router/src/plugins/telemetry/resource.rs b/apollo-router/src/plugins/telemetry/resource.rs index 90e5b1608c..580bd25e38 100644 --- a/apollo-router/src/plugins/telemetry/resource.rs +++ b/apollo-router/src/plugins/telemetry/resource.rs @@ -11,6 +11,28 @@ use crate::plugins::telemetry::config::AttributeValue; const UNKNOWN_SERVICE: &str = "unknown_service"; const OTEL_SERVICE_NAME: &str = "OTEL_SERVICE_NAME"; +/// This resource detector fills out things like the default service version and executable name. +/// Users can always override them via config. +struct StaticResourceDetector; +impl ResourceDetector for StaticResourceDetector { + fn detect(&self, _timeout: Duration) -> Resource { + let mut config_resources = vec![]; + config_resources.push(KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_VERSION, + std::env!("CARGO_PKG_VERSION"), + )); + + // Some other basic resources + if let Some(executable_name) = executable_name() { + config_resources.push(KeyValue::new( + opentelemetry_semantic_conventions::resource::PROCESS_EXECUTABLE_NAME, + executable_name, + )); + } + Resource::new(config_resources) + } +} + struct EnvServiceNameDetector; // Used instead of SdkProvidedResourceDetector impl ResourceDetector for EnvServiceNameDetector { @@ -42,6 +64,7 @@ pub(crate) trait ConfigResource { let resource = Resource::from_detectors( Duration::from_secs(0), vec![ + Box::new(StaticResourceDetector), Box::new(config_resource_detector), Box::new(EnvResourceDetector::new()), Box::new(EnvServiceNameDetector), @@ -84,24 +107,10 @@ impl ResourceDetector for ConfigResourceDetector { let mut config_resources = vec![]; // For config resources last entry wins - - // Add any other resources from config for (key, value) in self.resources.iter() { config_resources.push(KeyValue::new(key.clone(), value.clone())); } - // Some other basic resources - config_resources.push(KeyValue::new( - opentelemetry_semantic_conventions::resource::SERVICE_VERSION, - std::env!("CARGO_PKG_VERSION"), - )); - if let Some(executable_name) = executable_name() { - config_resources.push(KeyValue::new( - opentelemetry_semantic_conventions::resource::PROCESS_EXECUTABLE_NAME, - executable_name, - )); - } - // Service namespace if let Some(service_namespace) = self.service_namespace.clone() { config_resources.push(KeyValue::new( diff --git a/apollo-router/tests/integration/telemetry/datadog.rs b/apollo-router/tests/integration/telemetry/datadog.rs index 6e8d9be3d4..9a13512e79 100644 --- a/apollo-router/tests/integration/telemetry/datadog.rs +++ b/apollo-router/tests/integration/telemetry/datadog.rs @@ -14,6 +14,16 @@ use crate::integration::common::Telemetry; use crate::integration::IntegrationTest; use crate::integration::ValueExt; +#[derive(buildstructor::Builder)] +struct TraceSpec { + operation_name: Option, + version: Option, + services: HashSet<&'static str>, + span_names: HashSet<&'static str>, + measured_spans: HashSet<&'static str>, + unmeasured_spans: HashSet<&'static str>, +} + #[tokio::test(flavor = "multi_thread")] async fn test_default_span_names() -> Result<(), BoxError> { if !graph_os_enabled() { @@ -41,29 +51,27 @@ async fn test_default_span_names() -> Result<(), BoxError> { .unwrap(), id.to_datadog() ); - validate_trace( - id, - &query, - Some("ExampleQuery"), - &["client", "router", "subgraph"], - false, - &[ - "query_planning", - "client_request", - "subgraph_request", - "subgraph", - "fetch", - "supergraph", - "execution", - "query ExampleQuery", - "subgraph server", - "http_request", - "parse_query", - ], - &[], - &[], - ) - .await?; + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .span_names( + [ + "query_planning", + "client_request", + "subgraph_request", + "subgraph", + "fetch", + "supergraph", + "execution", + "query ExampleQuery", + "subgraph server", + "http_request", + "parse_query", + ] + .into(), + ) + .build() + .validate_trace(id) + .await?; router.graceful_shutdown().await; Ok(()) } @@ -95,29 +103,27 @@ async fn test_override_span_names() -> Result<(), BoxError> { .unwrap(), id.to_datadog() ); - validate_trace( - id, - &query, - Some("ExampleQuery"), - &["client", "router", "subgraph"], - false, - &[ - "query_planning", - "client_request", - "subgraph_request", - "subgraph", - "fetch", - "supergraph", - "execution", - "overridden", - "subgraph server", - "http_request", - "parse_query", - ], - &[], - &[], - ) - .await?; + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .span_names( + [ + "query_planning", + "client_request", + "subgraph_request", + "subgraph", + "fetch", + "supergraph", + "execution", + "overridden", + "subgraph server", + "http_request", + "parse_query", + ] + .into(), + ) + .build() + .validate_trace(id) + .await?; router.graceful_shutdown().await; Ok(()) } @@ -149,29 +155,27 @@ async fn test_override_span_names_late() -> Result<(), BoxError> { .unwrap(), id.to_datadog() ); - validate_trace( - id, - &query, - Some("ExampleQuery"), - &["client", "router", "subgraph"], - false, - &[ - "query_planning", - "client_request", - "subgraph_request", - "subgraph", - "fetch", - "supergraph", - "execution", - "ExampleQuery", - "subgraph server", - "http_request", - "parse_query", - ], - &[], - &[], - ) - .await?; + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .span_names( + [ + "query_planning", + "client_request", + "subgraph_request", + "subgraph", + "fetch", + "supergraph", + "execution", + "ExampleQuery", + "subgraph server", + "http_request", + "parse_query", + ] + .into(), + ) + .build() + .validate_trace(id) + .await?; router.graceful_shutdown().await; Ok(()) } @@ -201,37 +205,40 @@ async fn test_basic() -> Result<(), BoxError> { .unwrap(), id.to_datadog() ); - validate_trace( - id, - &query, - Some("ExampleQuery"), - &["client", "router", "subgraph"], - false, - &[ - "query_planning", - "client_request", - "ExampleQuery__products__0", - "products", - "fetch", - "/", - "execution", - "ExampleQuery", - "subgraph server", - "parse_query", - ], - &[ - "query_planning", - "subgraph", - "http_request", - "subgraph_request", - "router", - "execution", - "supergraph", - "parse_query", - ], - &[], - ) - .await?; + TraceSpec::builder() + .operation_name("ExampleQuery") + .services(["client", "router", "subgraph"].into()) + .span_names( + [ + "query_planning", + "client_request", + "ExampleQuery__products__0", + "products", + "fetch", + "/", + "execution", + "ExampleQuery", + "subgraph server", + "parse_query", + ] + .into(), + ) + .measured_spans( + [ + "query_planning", + "subgraph", + "http_request", + "subgraph_request", + "router", + "execution", + "supergraph", + "parse_query", + ] + .into(), + ) + .build() + .validate_trace(id) + .await?; router.graceful_shutdown().await; Ok(()) } @@ -259,28 +266,27 @@ async fn test_resource_mapping_default() -> Result<(), BoxError> { .get("apollo-custom-trace-id") .unwrap() .is_empty()); - validate_trace( - id, - &query, - Some("ExampleQuery"), - &["client", "router", "subgraph"], - false, - &[ - "parse_query", - "/", - "ExampleQuery", - "client_request", - "execution", - "query_planning", - "products", - "fetch", - "subgraph server", - "ExampleQuery__products__0", - ], - &[], - &[], - ) - .await?; + TraceSpec::builder() + .operation_name("ExampleQuery") + .services(["client", "router", "subgraph"].into()) + .span_names( + [ + "parse_query", + "/", + "ExampleQuery", + "client_request", + "execution", + "query_planning", + "products", + "fetch", + "subgraph server", + "ExampleQuery__products__0", + ] + .into(), + ) + .build() + .validate_trace(id) + .await?; router.graceful_shutdown().await; Ok(()) } @@ -308,28 +314,26 @@ async fn test_resource_mapping_override() -> Result<(), BoxError> { .get("apollo-custom-trace-id") .unwrap() .is_empty()); - validate_trace( - id, - &query, - Some("ExampleQuery"), - &["client", "router", "subgraph"], - false, - &[ - "parse_query", - "ExampleQuery", - "client_request", - "execution", - "query_planning", - "products", - "fetch", - "subgraph server", - "overridden", - "ExampleQuery__products__0", - ], - &[], - &[], - ) - .await?; + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .span_names( + [ + "parse_query", + "ExampleQuery", + "client_request", + "execution", + "query_planning", + "products", + "fetch", + "subgraph server", + "overridden", + "ExampleQuery__products__0", + ] + .into(), + ) + .build() + .validate_trace(id) + .await?; router.graceful_shutdown().await; Ok(()) } @@ -355,234 +359,221 @@ async fn test_span_metrics() -> Result<(), BoxError> { .get("apollo-custom-trace-id") .unwrap() .is_empty()); - validate_trace( - id, - &query, - Some("ExampleQuery"), - &["client", "router", "subgraph"], - false, - &[ - "parse_query", - "ExampleQuery", - "client_request", - "execution", - "query_planning", - "products", - "fetch", - "subgraph server", - "ExampleQuery__products__0", - ], - &["subgraph"], - &["supergraph"], - ) - .await?; + TraceSpec::builder() + .operation_name("ExampleQuery") + .services(["client", "router", "subgraph"].into()) + .span_names( + [ + "parse_query", + "ExampleQuery", + "client_request", + "execution", + "query_planning", + "products", + "fetch", + "subgraph server", + "ExampleQuery__products__0", + ] + .into(), + ) + .measured_span("subgraph") + .unmeasured_span("supergraph") + .build() + .validate_trace(id) + .await?; router.graceful_shutdown().await; Ok(()) } -#[allow(clippy::too_many_arguments)] -async fn validate_trace( - id: TraceId, - query: &Value, - operation_name: Option<&str>, - services: &[&'static str], - custom_span_instrumentation: bool, - expected_span_names: &[&'static str], - expected_measured: &[&'static str], - unexpected_measured: &[&'static str], -) -> Result<(), BoxError> { - let datadog_id = id.to_datadog(); - let url = format!("http://localhost:8126/test/traces?trace_ids={datadog_id}"); - for _ in 0..10 { - if find_valid_trace( - &url, - query, - operation_name, - services, - custom_span_instrumentation, - expected_span_names, - expected_measured, - unexpected_measured, - ) - .await - .is_ok() - { - return Ok(()); - } - tokio::time::sleep(Duration::from_millis(100)).await; +pub(crate) trait DatadogId { + fn to_datadog(&self) -> String; +} +impl DatadogId for TraceId { + fn to_datadog(&self) -> String { + let bytes = &self.to_bytes()[std::mem::size_of::()..std::mem::size_of::()]; + u64::from_be_bytes(bytes.try_into().unwrap()).to_string() } - find_valid_trace( - &url, - query, - operation_name, - services, - custom_span_instrumentation, - expected_span_names, - expected_measured, - unexpected_measured, - ) - .await?; - Ok(()) } -#[allow(clippy::too_many_arguments)] -async fn find_valid_trace( - url: &str, - _query: &Value, - operation_name: Option<&str>, - services: &[&'static str], - _custom_span_instrumentation: bool, - expected_span_names: &[&'static str], - expected_measured: &[&'static str], - unexpected_measured: &[&'static str], -) -> Result<(), BoxError> { - // A valid trace has: - // * All three services - // * The correct spans - // * All spans are parented - // * Required attributes of 'router' span has been set - - // For now just validate service name. - let trace: Value = reqwest::get(url) - .await - .map_err(|e| anyhow!("failed to contact datadog; {}", e))? - .json() - .await?; - tracing::debug!("{}", serde_json::to_string_pretty(&trace)?); - verify_trace_participants(&trace, services)?; - verify_spans_present(&trace, operation_name, services, expected_span_names)?; - validate_span_kinds(&trace)?; - validate_measured_spans(&trace, expected_measured, unexpected_measured)?; - Ok(()) -} +impl TraceSpec { + #[allow(clippy::too_many_arguments)] + async fn validate_trace(&self, id: TraceId) -> Result<(), BoxError> { + let datadog_id = id.to_datadog(); + let url = format!("http://localhost:8126/test/traces?trace_ids={datadog_id}"); + for _ in 0..10 { + if self.find_valid_trace(&url).await.is_ok() { + return Ok(()); + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + self.find_valid_trace(&url).await?; + Ok(()) + } -fn validate_measured_spans( - trace: &Value, - expected: &[&'static str], - unexpected: &[&'static str], -) -> Result<(), BoxError> { - for expected in expected { - assert!( - measured_span(trace, expected)?, - "missing measured span {}", - expected - ); + #[allow(clippy::too_many_arguments)] + async fn find_valid_trace(&self, url: &str) -> Result<(), BoxError> { + // A valid trace has: + // * All three services + // * The correct spans + // * All spans are parented + // * Required attributes of 'router' span has been set + + // For now just validate service name. + let trace: Value = reqwest::get(url) + .await + .map_err(|e| anyhow!("failed to contact datadog; {}", e))? + .json() + .await?; + tracing::debug!("{}", serde_json::to_string_pretty(&trace)?); + self.verify_trace_participants(&trace)?; + self.verify_operation_name(&trace)?; + self.verify_version(&trace)?; + self.verify_spans_present(&trace)?; + self.validate_span_kinds(&trace)?; + self.validate_measured_spans(&trace)?; + Ok(()) } - for unexpected in unexpected { - assert!( - !measured_span(trace, unexpected)?, - "unexpected measured span {}", - unexpected - ); + + fn verify_version(&self, trace: &Value) -> Result<(), BoxError> { + if let Some(expected_version) = &self.version { + let binding = trace.select_path("$..version")?; + let version = binding.first(); + assert_eq!( + version + .expect("version expected") + .as_str() + .expect("version must be a string"), + expected_version + ); + } + Ok(()) } - Ok(()) -} -fn measured_span(trace: &Value, name: &&str) -> Result { - let binding1 = trace.select_path(&format!( - "$..[?(@.meta.['otel.original_name'] == '{}')].metrics.['_dd.measured']", - name - ))?; - let binding2 = trace.select_path(&format!( - "$..[?(@.name == '{}')].metrics.['_dd.measured']", - name - ))?; - Ok(binding1 - .first() - .or(binding2.first()) - .and_then(|v| v.as_f64()) - .map(|v| v == 1.0) - .unwrap_or_default()) -} + fn validate_measured_spans(&self, trace: &Value) -> Result<(), BoxError> { + for expected in &self.measured_spans { + assert!( + self.measured_span(trace, expected)?, + "missing measured span {}", + expected + ); + } + for unexpected in &self.unmeasured_spans { + assert!( + !self.measured_span(trace, unexpected)?, + "unexpected measured span {}", + unexpected + ); + } + Ok(()) + } -fn validate_span_kinds(trace: &Value) -> Result<(), BoxError> { - // Validate that the span.kind has been propagated. We can just do this for a selection of spans. - validate_span_kind(trace, "router", "server")?; - validate_span_kind(trace, "supergraph", "internal")?; - validate_span_kind(trace, "http_request", "client")?; - Ok(()) -} + fn measured_span(&self, trace: &Value, name: &str) -> Result { + let binding1 = trace.select_path(&format!( + "$..[?(@.meta.['otel.original_name'] == '{}')].metrics.['_dd.measured']", + name + ))?; + let binding2 = trace.select_path(&format!( + "$..[?(@.name == '{}')].metrics.['_dd.measured']", + name + ))?; + Ok(binding1 + .first() + .or(binding2.first()) + .and_then(|v| v.as_f64()) + .map(|v| v == 1.0) + .unwrap_or_default()) + } -fn verify_trace_participants(trace: &Value, services: &[&'static str]) -> Result<(), BoxError> { - let actual_services: HashSet = trace - .select_path("$..service")? - .into_iter() - .filter_map(|service| service.as_string()) - .collect(); - tracing::debug!("found services {:?}", actual_services); - - let expected_services = services - .iter() - .map(|s| s.to_string()) - .collect::>(); - if actual_services != expected_services { - return Err(BoxError::from(format!( - "incomplete traces, got {actual_services:?} expected {expected_services:?}" - ))); + fn validate_span_kinds(&self, trace: &Value) -> Result<(), BoxError> { + // Validate that the span.kind has been propagated. We can just do this for a selection of spans. + self.validate_span_kind(trace, "router", "server")?; + self.validate_span_kind(trace, "supergraph", "internal")?; + self.validate_span_kind(trace, "http_request", "client")?; + Ok(()) } - Ok(()) -} -fn verify_spans_present( - trace: &Value, - _operation_name: Option<&str>, - services: &[&'static str], - expected_span_names: &[&'static str], -) -> Result<(), BoxError> { - let operation_names: HashSet = trace - .select_path("$..resource")? - .into_iter() - .filter_map(|span_name| span_name.as_string()) - .collect(); - let mut expected_span_names: HashSet = - expected_span_names.iter().map(|s| s.to_string()).collect(); - if services.contains(&"client") { - expected_span_names.insert("client_request".into()); + fn verify_trace_participants(&self, trace: &Value) -> Result<(), BoxError> { + let actual_services: HashSet = trace + .select_path("$..service")? + .into_iter() + .filter_map(|service| service.as_string()) + .collect(); + tracing::debug!("found services {:?}", actual_services); + + let expected_services = self + .services + .iter() + .map(|s| s.to_string()) + .collect::>(); + if actual_services != expected_services { + return Err(BoxError::from(format!( + "incomplete traces, got {actual_services:?} expected {expected_services:?}" + ))); + } + Ok(()) } - tracing::debug!("found spans {:?}", operation_names); - let missing_operation_names: Vec<_> = expected_span_names - .iter() - .filter(|o| !operation_names.contains(*o)) - .collect(); - if !missing_operation_names.is_empty() { - return Err(BoxError::from(format!( - "spans did not match, got {operation_names:?}, missing {missing_operation_names:?}" - ))); + + fn verify_spans_present(&self, trace: &Value) -> Result<(), BoxError> { + let operation_names: HashSet = trace + .select_path("$..resource")? + .into_iter() + .filter_map(|span_name| span_name.as_string()) + .collect(); + let mut span_names: HashSet<&str> = self.span_names.clone(); + if self.services.contains("client") { + span_names.insert("client_request"); + } + tracing::debug!("found spans {:?}", operation_names); + let missing_operation_names: Vec<_> = span_names + .iter() + .filter(|o| !operation_names.contains(**o)) + .collect(); + if !missing_operation_names.is_empty() { + return Err(BoxError::from(format!( + "spans did not match, got {operation_names:?}, missing {missing_operation_names:?}" + ))); + } + Ok(()) } - Ok(()) -} -fn validate_span_kind(trace: &Value, name: &str, kind: &str) -> Result<(), BoxError> { - let binding1 = trace.select_path(&format!( - "$..[?(@.meta.['otel.original_name'] == '{}')].meta.['span.kind']", - name - ))?; - let binding2 = - trace.select_path(&format!("$..[?(@.name == '{}')].meta.['span.kind']", name))?; - let binding = binding1.first().or(binding2.first()); - - assert!( - binding.is_some(), - "span.kind missing or incorrect {}, {}", - name, - trace - ); - assert_eq!( - binding - .expect("expected binding") - .as_str() - .expect("expected string"), - kind - ); - Ok(()) -} + fn validate_span_kind(&self, trace: &Value, name: &str, kind: &str) -> Result<(), BoxError> { + let binding1 = trace.select_path(&format!( + "$..[?(@.meta.['otel.original_name'] == '{}')].meta.['span.kind']", + name + ))?; + let binding2 = + trace.select_path(&format!("$..[?(@.name == '{}')].meta.['span.kind']", name))?; + let binding = binding1.first().or(binding2.first()); -pub(crate) trait DatadogId { - fn to_datadog(&self) -> String; -} -impl DatadogId for TraceId { - fn to_datadog(&self) -> String { - let bytes = &self.to_bytes()[std::mem::size_of::()..std::mem::size_of::()]; - u64::from_be_bytes(bytes.try_into().unwrap()).to_string() + assert!( + binding.is_some(), + "span.kind missing or incorrect {}, {}", + name, + trace + ); + assert_eq!( + binding + .expect("expected binding") + .as_str() + .expect("expected string"), + kind + ); + Ok(()) + } + + fn verify_operation_name(&self, trace: &Value) -> Result<(), BoxError> { + if let Some(expected_operation_name) = &self.operation_name { + let binding = + trace.select_path("$..[?(@.name == 'supergraph')]..['graphql.operation.name']")?; + let operation_name = binding.first(); + assert_eq!( + operation_name + .expect("graphql.operation.name expected") + .as_str() + .expect("graphql.operation.name must be a string"), + expected_operation_name + ); + } + Ok(()) } } diff --git a/apollo-router/tests/integration/telemetry/fixtures/datadog.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/datadog.router.yaml index 18294a77ff..f95964ae6d 100644 --- a/apollo-router/tests/integration/telemetry/fixtures/datadog.router.yaml +++ b/apollo-router/tests/integration/telemetry/fixtures/datadog.router.yaml @@ -7,6 +7,9 @@ telemetry: format: datadog common: service_name: router + resource: + env: local1 + service.version: router_version_override datadog: enabled: true batch_processor: