diff --git a/.circleci/config.yml b/.circleci/config.yml index 96c199a336..edb8a9fc57 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -41,12 +41,33 @@ commands: command: | sudo apt-get update sudo apt-get install -y libssl-dev cmake + - run: + name: Download jaeger + command: | + curl -L https://github.com/jaegertracing/jaeger/releases/download/v1.33.0/jaeger-1.33.0-linux-amd64.tar.gz --output jaeger.tar.gz + tar -xf jaeger.tar.gz + mv jaeger-1.33.0-linux-amd64 jaeger macos_install_baseline: steps: - run: echo "HOMEBREW_NO_AUTO_UPDATE=1" >> $BASH_ENV - run: echo "export OPENSSL_ROOT_DIR=/usr/local/opt/openssl@1.1" >> $BASH_ENV - run: test -e "$OPENSSL_ROOT_DIR" - run: brew install cmake + - run: + name: Download jaeger + command: | + curl -L https://github.com/jaegertracing/jaeger/releases/download/v1.33.0/jaeger-1.33.0-darwin-amd64.tar.gz --output jaeger.tar.gz + tar -xf jaeger.tar.gz + mv jaeger-1.33.0-darwin-amd64 jaeger + windows_install_baseline: + steps: + - run: + name: Download jaeger + shell: bash.exe + command: | + curl -L https://github.com/jaegertracing/jaeger/releases/download/v1.33.0/jaeger-1.33.0-windows-amd64.tar.gz --output jaeger.tar.gz + tar -xf jaeger.tar.gz + mv jaeger-1.33.0-windows-amd64 jaeger # This job sets up our nodejs dependencies, # and makes sure everything is ready to run integration tests @@ -67,6 +88,12 @@ commands: - run: name: Assert npm version command: test "$(npm --version)" = "${NPM_VERSION}" + - run: + # The jaeger exporter won't work without this + name: Increase udp packet size + command: | + sudo sysctl net.inet.udp.maxdgram=65536 + sudo sysctl net.inet.udp.maxdgram linux_prepare_node_env: steps: #TODO[igni]: check for node version before we try to install it @@ -122,33 +149,6 @@ commands: command: | if ((npm --version) -Ne "${Env:NPM_VERSION}") { exit 1 } - windows_prepare_test_env: - steps: - - restore_cache: - keys: - - subgraph-node-modules-v2-windows-{{ checksum "dockerfiles/federation-demo/federation-demo/package.json" }}-{{ checksum "dockerfiles/federation-demo/federation-demo/package-lock.json" }} - - subgraph-node-modules-v2-windows-{{ checksum "dockerfiles/federation-demo/federation-demo/package.json" }} - - run: - name: npm clean-install - working_directory: dockerfiles/federation-demo/federation-demo - command: npm clean-install - - save_cache: - key: subgraph-node-modules-v2-windows-{{ checksum "dockerfiles/federation-demo/federation-demo/package.json" }}-{{ checksum "dockerfiles/federation-demo/federation-demo/package-lock.json" }} - paths: - - dockerfiles/federation-demo/federation-demo/node_modules - - # TODO: normally xtask can run the federation by itself and it - # works on GitHub Actions on Windows. Unfortunately it - # doesn't work here on CircleCI on Windows only. - - run: - name: start federation-demo (background) - working_directory: dockerfiles/federation-demo/federation-demo - command: npm start - background: true - - run: - name: wait for federation demo to start - command: npx wait-on tcp:4001 tcp:4002 tcp:4003 tcp:4004 tcp:4100 - windows_prepare_rust_env: steps: - run: @@ -480,6 +480,7 @@ jobs: condition: equal: [*rust_windows_executor, << parameters.platform >>] steps: + - windows_install_baseline - windows_prepare_node_env - windows_prepare_rust_env - windows_build_workspace @@ -516,8 +517,8 @@ jobs: condition: equal: [*rust_windows_executor, << parameters.platform >>] steps: + - windows_install_baseline - windows_prepare_node_env - - windows_prepare_test_env - windows_prepare_rust_env - windows_test_workspace - when: diff --git a/.gitignore b/.gitignore index 8867087e35..48e029d4b2 100644 --- a/.gitignore +++ b/.gitignore @@ -22,9 +22,11 @@ crates/*/Cargo.lock # Docs /docs/node_modules/ /docs/.cache/ +/docs/public # Dynamically retrieve proto file apollo-spaceport/proto/reports.proto # # Prevent accidental commits of router.yaml in the root /router.yaml +/jaeger/ diff --git a/Cargo.lock b/Cargo.lock index c38984ee56..bc17a0b966 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,6 +119,7 @@ dependencies = [ "indexmap", "insta", "itertools", + "jsonpath_lib", "jsonschema", "libc", "maplit", @@ -2296,6 +2297,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonpath_lib" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaa63191d68230cccb81c5aa23abd53ed64d83337cacbb25a7b8c7979523774f" +dependencies = [ + "log", + "serde", + "serde_json", +] + [[package]] name = "jsonschema" version = "0.15.1" diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 555a7847be..8791c6bd50 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -46,28 +46,14 @@ Use `cargo build --all-targets` to build the project. Some tests run against the existing Node.js implementation of the Apollo Router. This requires that the `federation-demo` project is running: - * If you have Docker and Docker Compose installed: - - ``` - docker-compose up -d - ``` - - **Note:** `-d` is for running into background. You can remove `-d` if you - have issues and you want to see the logs or if you want to run the service - in foreground. - - * Otherwise: +``` +docker-compose up -d +``` - You will need Node.js and npm to be installed. It is known to be working on - Node.js 16 and npm 7.18. +**Note:** `-d` is for running into background. You can remove `-d` if you +have issues and you want to see the logs or if you want to run the service +in foreground. - ```shell - git submodule sync --recursive - git submodule update --recursive --init - cd dockerfiles/federation-demo/federation-demo - npm install; - npm run start - ``` ### Run Apollo Router against the docker-compose or Node.js setup diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 764a74ed3e..06d575dc83 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -53,10 +53,30 @@ In addition, the `activate` lifecycle hook is now not marked as deprecated, and ## 🚀 Features +### Configurable client identification headers [PR #850](https://github.com/apollographql/router/pull/850) +The router uses the HTTP headers `apollographql-client-name` and `apollographql-client-version` to identify clients in Studio telemetry. Those headers can now be overriden in the configuration: +```yaml title="router.yaml" +telemetry: + apollo: + # Header identifying the client name. defaults to apollographql-client-name + client_name_header: + # Header identifying the client version. defaults to apollographql-client-version + client_version_header: +``` + ## 🐛 Fixes +### Configuration errors on hot-reload are output [PR #850](https://github.com/apollographql/router/pull/850) +If a configuration file had errors on reload these were silently swallowed. These are now added to the logs. ## 🛠 Maintenance +### End to end integration tests for Jaeger [PR #850](https://github.com/apollographql/router/pull/850) +Jaeger tracing end to end test including client->router->subgraphs + +### Router tracing span cleanup [PR #850](https://github.com/apollographql/router/pull/850) +Spans generated by the Router are now aligned with plugin services. +### Simplified CI for windows [PR #850](https://github.com/apollographql/router/pull/850) +All windows processes are spawned via xtask rather than a separate CircleCI stage. ## 📚 Documentation ### Enhanced rust docs ([PR #819](https://github.com/apollographql/router/pull/819)) Many more rust docs have been added. diff --git a/apollo-router-core/src/layers/instrument.rs b/apollo-router-core/src/layers/instrument.rs new file mode 100644 index 0000000000..468b6fe34d --- /dev/null +++ b/apollo-router-core/src/layers/instrument.rs @@ -0,0 +1,94 @@ +//! Instrumentation layer that allows services to be wrapped in a span. +//! +//! See [`Layer`] and [`Service`] for more details. +//! +//! Using ServiceBuilderExt: +//! ```rust +//! # use tower::ServiceBuilder; +//! # use tower_service::Service; +//! # use tracing::info_span; +//! # use apollo_router_core::ServiceBuilderExt; +//! # fn test(service: impl Service) { +//! let instrumented = ServiceBuilder::new() +//! .instrument(|_request| info_span!("query_planning")) +//! .service(service); +//! # } +//! ``` +//! Now calls to the wrapped service will be wrapped in a span. You can attach attributes to the span from the request. +//! + +use futures::future::BoxFuture; +use futures::FutureExt; +use std::marker::PhantomData; +use std::task::{Context, Poll}; +use tower::Layer; +use tower_service::Service; +use tracing::Instrument; + +/// [`Layer`] for instrumentation. +pub struct InstrumentLayer +where + F: Fn(&Request) -> tracing::Span, +{ + span_fn: F, + phantom: PhantomData, +} + +impl InstrumentLayer +where + F: Fn(&Request) -> tracing::Span, +{ + pub fn new(span_fn: F) -> InstrumentLayer { + Self { + span_fn, + phantom: Default::default(), + } + } +} + +impl Layer for InstrumentLayer +where + S: Service, + F: Fn(&Request) -> tracing::Span + Clone, +{ + type Service = InstrumentService; + + fn layer(&self, inner: S) -> Self::Service { + InstrumentService { + inner, + span_fn: self.span_fn.clone(), + phantom: Default::default(), + } + } +} + +/// [`Service`] for instrumentation. +pub struct InstrumentService +where + S: Service, + F: Fn(&Request) -> tracing::Span, +{ + inner: S, + span_fn: F, + phantom: PhantomData, +} + +impl Service for InstrumentService +where + F: Fn(&Request) -> tracing::Span, + S: Service, + >::Future: Send + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let span = (self.span_fn)(&req); + self.inner.call(req).instrument(span).boxed() + } +} diff --git a/apollo-router-core/src/layers/mod.rs b/apollo-router-core/src/layers/mod.rs index d504c703de..6753dc32ca 100644 --- a/apollo-router-core/src/layers/mod.rs +++ b/apollo-router-core/src/layers/mod.rs @@ -3,3 +3,4 @@ pub mod cache; pub mod deduplication; pub mod ensure_query_presence; pub mod forbid_http_get_mutations; +pub mod instrument; diff --git a/apollo-router-core/src/plugins/headers.rs b/apollo-router-core/src/plugins/headers.rs index f28b4f1dbf..5a2eda81e1 100644 --- a/apollo-router-core/src/plugins/headers.rs +++ b/apollo-router-core/src/plugins/headers.rs @@ -1,3 +1,7 @@ +use super::serde_utils::{ + deserialize_header_name, deserialize_header_value, deserialize_option_header_name, + deserialize_option_header_value, deserialize_regex, +}; use crate::plugin::Plugin; use crate::{register_plugin, SubgraphRequest, SubgraphResponse}; use http::header::{ @@ -8,11 +12,8 @@ use http::HeaderValue; use lazy_static::lazy_static; use regex::Regex; use schemars::JsonSchema; -use serde::de::{Error, Visitor}; -use serde::{de, Deserialize, Deserializer}; +use serde::Deserialize; use std::collections::HashMap; -use std::fmt::Formatter; -use std::str::FromStr; use std::task::{Context, Poll}; use tower::util::BoxService; use tower::{BoxError, Layer, ServiceBuilder, ServiceExt}; @@ -31,7 +32,7 @@ enum Operation { #[derive(Clone, JsonSchema, Deserialize)] #[serde(rename_all = "snake_case")] enum Remove { - #[schemars(schema_with = "string_schema")] + #[schemars(with = "String")] #[serde(deserialize_with = "deserialize_header_name")] Named(HeaderName), @@ -229,140 +230,6 @@ where } } -// We may want to eventually pull these serializers out -fn deserialize_option_header_name<'de, D>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - struct OptionHeaderNameVisitor; - - impl<'de> Visitor<'de> for OptionHeaderNameVisitor { - type Value = Option; - - fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { - formatter.write_str("struct HeaderName") - } - - fn visit_none(self) -> Result - where - E: de::Error, - { - Ok(None) - } - - fn visit_some(self, deserializer: D) -> Result - where - D: de::Deserializer<'de>, - { - Ok(Some(deserializer.deserialize_str(HeaderNameVisitor)?)) - } - } - deserializer.deserialize_option(OptionHeaderNameVisitor) -} - -fn deserialize_option_header_value<'de, D>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - struct OptionHeaderValueVisitor; - - impl<'de> Visitor<'de> for OptionHeaderValueVisitor { - type Value = Option; - - fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { - formatter.write_str("struct HeaderValue") - } - - fn visit_none(self) -> Result - where - E: de::Error, - { - Ok(None) - } - - fn visit_some(self, deserializer: D) -> Result - where - D: de::Deserializer<'de>, - { - Ok(Some(deserializer.deserialize_str(HeaderValueVisitor)?)) - } - } - - deserializer.deserialize_option(OptionHeaderValueVisitor) -} - -struct HeaderNameVisitor; - -impl<'de> Visitor<'de> for HeaderNameVisitor { - type Value = HeaderName; - - fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { - formatter.write_str("struct HeaderName") - } - - fn visit_str(self, v: &str) -> Result - where - E: Error, - { - HeaderName::try_from(v).map_err(|e| de::Error::custom(format!("Invalid header name {}", e))) - } -} - -fn deserialize_header_name<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - deserializer.deserialize_str(HeaderNameVisitor) -} - -struct HeaderValueVisitor; - -impl<'de> Visitor<'de> for HeaderValueVisitor { - type Value = HeaderValue; - - fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { - formatter.write_str("struct HeaderValue") - } - - fn visit_str(self, v: &str) -> Result - where - E: Error, - { - HeaderValue::try_from(v) - .map_err(|e| de::Error::custom(format!("Invalid header value {}", e))) - } -} - -fn deserialize_header_value<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - deserializer.deserialize_str(HeaderValueVisitor) -} - -fn deserialize_regex<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - struct RegexVisitor; - - impl<'de> Visitor<'de> for RegexVisitor { - type Value = Regex; - - fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { - formatter.write_str("struct Regex") - } - - fn visit_str(self, v: &str) -> Result - where - E: Error, - { - Regex::from_str(v).map_err(|e| de::Error::custom(format!("{}", e))) - } - } - deserializer.deserialize_str(RegexVisitor) -} - #[cfg(test)] mod test { use super::*; @@ -372,6 +239,7 @@ mod test { use crate::plugins::headers::{Config, HeadersLayer}; use crate::{Context, Request, Response, SubgraphRequest, SubgraphResponse}; use std::collections::HashSet; + use std::str::FromStr; use std::sync::Arc; use tower::BoxError; diff --git a/apollo-router-core/src/plugins/mod.rs b/apollo-router-core/src/plugins/mod.rs index a06af49aa2..db64ab5bfb 100644 --- a/apollo-router-core/src/plugins/mod.rs +++ b/apollo-router-core/src/plugins/mod.rs @@ -5,4 +5,5 @@ mod forbid_mutations; mod headers; mod include_subgraph_errors; +pub mod serde_utils; mod traffic_shaping; diff --git a/apollo-router-core/src/plugins/serde_utils.rs b/apollo-router-core/src/plugins/serde_utils.rs new file mode 100644 index 0000000000..8d61247c30 --- /dev/null +++ b/apollo-router-core/src/plugins/serde_utils.rs @@ -0,0 +1,144 @@ +use http::header::HeaderName; +use http::HeaderValue; +use regex::Regex; +use serde::de::{Error, Visitor}; +use serde::{de, Deserializer}; +use std::fmt::Formatter; +use std::str::FromStr; + +pub fn deserialize_option_header_name<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + struct OptionHeaderNameVisitor; + + impl<'de> Visitor<'de> for OptionHeaderNameVisitor { + type Value = Option; + + fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { + formatter.write_str("struct HeaderName") + } + + fn visit_none(self) -> Result + where + E: de::Error, + { + Ok(None) + } + + fn visit_some(self, deserializer: D) -> Result + where + D: de::Deserializer<'de>, + { + Ok(Some(deserializer.deserialize_str(HeaderNameVisitor)?)) + } + } + deserializer.deserialize_option(OptionHeaderNameVisitor) +} + +pub fn deserialize_option_header_value<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + struct OptionHeaderValueVisitor; + + impl<'de> Visitor<'de> for OptionHeaderValueVisitor { + type Value = Option; + + fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { + formatter.write_str("struct HeaderValue") + } + + fn visit_none(self) -> Result + where + E: de::Error, + { + Ok(None) + } + + fn visit_some(self, deserializer: D) -> Result + where + D: de::Deserializer<'de>, + { + Ok(Some(deserializer.deserialize_str(HeaderValueVisitor)?)) + } + } + + deserializer.deserialize_option(OptionHeaderValueVisitor) +} + +struct HeaderNameVisitor; + +impl<'de> Visitor<'de> for HeaderNameVisitor { + type Value = HeaderName; + + fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { + formatter.write_str("struct HeaderName") + } + + fn visit_str(self, v: &str) -> Result + where + E: Error, + { + HeaderName::try_from(v).map_err(|e| de::Error::custom(format!("Invalid header name {}", e))) + } +} + +pub fn deserialize_header_name<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + deserializer.deserialize_str(HeaderNameVisitor) +} + +struct HeaderValueVisitor; + +impl<'de> Visitor<'de> for HeaderValueVisitor { + type Value = HeaderValue; + + fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { + formatter.write_str("struct HeaderValue") + } + + fn visit_str(self, v: &str) -> Result + where + E: Error, + { + HeaderValue::try_from(v) + .map_err(|e| de::Error::custom(format!("Invalid header value {}", e))) + } +} + +pub fn deserialize_header_value<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + deserializer.deserialize_str(HeaderValueVisitor) +} + +pub fn deserialize_regex<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + struct RegexVisitor; + + impl<'de> Visitor<'de> for RegexVisitor { + type Value = Regex; + + fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { + formatter.write_str("struct Regex") + } + + fn visit_str(self, v: &str) -> Result + where + E: Error, + { + Regex::from_str(v).map_err(|e| de::Error::custom(format!("{}", e))) + } + } + deserializer.deserialize_str(RegexVisitor) +} diff --git a/apollo-router-core/src/query_cache.rs b/apollo-router-core/src/query_cache.rs index 40cc36688a..87c364c251 100644 --- a/apollo-router-core/src/query_cache.rs +++ b/apollo-router-core/src/query_cache.rs @@ -1,3 +1,4 @@ +//! A cache for queries. use crate::prelude::graphql::*; use crate::CacheResolver; use std::sync::Arc; @@ -40,8 +41,7 @@ impl QueryCache { } /// Attempt to parse a string to a [`Query`] using cache if possible. - #[tracing::instrument(skip_all, level = "info" name = "get_query")] - pub async fn get_query(&self, query: impl AsRef) -> Option> { + pub async fn get(&self, query: impl AsRef) -> Option> { let key = query.as_ref().to_string(); match self.cm.get(key).await { diff --git a/apollo-router-core/src/query_planner/bridge_query_planner.rs b/apollo-router-core/src/query_planner/bridge_query_planner.rs index ef7b60acc6..d57fd773ea 100644 --- a/apollo-router-core/src/query_planner/bridge_query_planner.rs +++ b/apollo-router-core/src/query_planner/bridge_query_planner.rs @@ -66,7 +66,6 @@ impl Service for BridgeQueryPlanner { #[async_trait] impl QueryPlanner for BridgeQueryPlanner { - #[tracing::instrument(skip_all, level = "info", name = "plan")] async fn get( &self, query: String, diff --git a/apollo-router-core/src/services/execution_service.rs b/apollo-router-core/src/services/execution_service.rs index 40d5222973..46e9ac530a 100644 --- a/apollo-router-core/src/services/execution_service.rs +++ b/apollo-router-core/src/services/execution_service.rs @@ -38,7 +38,6 @@ impl Service for ExecutionService { Poll::Ready(Ok(())) } - #[tracing::instrument(skip_all, level = "info", name = "execute")] fn call(&mut self, req: ExecutionRequest) -> Self::Future { let this = self.clone(); let fut = async move { diff --git a/apollo-router-core/src/services/mod.rs b/apollo-router-core/src/services/mod.rs index 89e6b79b29..eb2aa5f619 100644 --- a/apollo-router-core/src/services/mod.rs +++ b/apollo-router-core/src/services/mod.rs @@ -22,12 +22,14 @@ use tower::buffer::BufferLayer; use tower::layer::util::Stack; use tower::{BoxError, ServiceBuilder}; use tower_service::Service; +use tracing::Span; pub mod checkpoint; mod execution_service; pub mod http_compat; mod router_service; mod tower_subgraph_service; +use crate::instrument::InstrumentLayer; pub use tower_subgraph_service::TowerSubgraphService; pub const DEFAULT_BUFFER_SIZE: usize = 20_000; @@ -686,6 +688,15 @@ pub trait ServiceBuilderExt: Sized { self.layer(AsyncCheckpointLayer::new(async_checkpoint_fn)) } fn buffered(self) -> ServiceBuilder, L>>; + fn instrument( + self, + span_fn: F, + ) -> ServiceBuilder, L>> + where + F: Fn(&Request) -> Span, + { + self.layer(InstrumentLayer::new(span_fn)) + } fn layer(self, layer: T) -> ServiceBuilder>; } diff --git a/apollo-router-core/src/services/router_service.rs b/apollo-router-core/src/services/router_service.rs index 81195e3866..017596b95e 100644 --- a/apollo-router-core/src/services/router_service.rs +++ b/apollo-router-core/src/services/router_service.rs @@ -108,7 +108,7 @@ where let body = req.originating_request.body(); let variables = body.variables.clone(); let query = query_cache - .get_query( + .get( body.query .as_ref() .expect("apollo.ensure-query-is-present has checked this already; qed") diff --git a/apollo-router-core/src/spec/query.rs b/apollo-router-core/src/spec/query.rs index 39be78c0b0..5217fc5a6e 100644 --- a/apollo-router-core/src/spec/query.rs +++ b/apollo-router-core/src/spec/query.rs @@ -82,7 +82,7 @@ impl Query { } } - #[tracing::instrument(skip_all, level = "trace")] + #[tracing::instrument(skip_all, level = "info" name = "parse_query")] pub fn parse(query: impl Into, schema: &Schema) -> Option { let string = query.into(); diff --git a/apollo-router/Cargo.toml b/apollo-router/Cargo.toml index 1cf4c21e47..cf7a7e2cd8 100644 --- a/apollo-router/Cargo.toml +++ b/apollo-router/Cargo.toml @@ -106,6 +106,7 @@ yaml-rust = "0.4.5" [dev-dependencies] insta = "1.12.0" +jsonpath_lib = "0.3.0" maplit = "1.0.2" mockall = "0.11.0" reqwest = { version = "0.11.10", default-features = false, features = [ diff --git a/apollo-router/src/axum_http_server_factory.rs b/apollo-router/src/axum_http_server_factory.rs index f36765c063..1d06249f10 100644 --- a/apollo-router/src/axum_http_server_factory.rs +++ b/apollo-router/src/axum_http_server_factory.rs @@ -1,3 +1,4 @@ +//! Axum http server factory. Axum provides routing capability on top of Hyper HTTP. use crate::configuration::{Configuration, Cors, ListenAddr}; use crate::http_server_factory::{HttpServerFactory, HttpServerHandle, Listener, NetworkStream}; use crate::FederatedServerError; @@ -34,7 +35,7 @@ use tower_http::trace::{DefaultMakeSpan, MakeSpan, TraceLayer}; use tower_service::Service; use tracing::{Level, Span}; -/// A basic http server using warp. +/// A basic http server using Axum. /// Uses streaming as primary method of response. /// Redirects to studio for GET requests. #[derive(Debug)] @@ -456,17 +457,6 @@ async fn health_check() -> impl IntoResponse { Json(json!({ "status": "pass" })) } -// graphql_request is traced at the info level so that it can be processed normally in apollo telemetry. -#[tracing::instrument(skip_all, - level = "info" - name = "graphql_request", - fields( - query = %http_request.body().query.as_deref().unwrap_or_default(), - operation_name = %http_request.body().operation_name.as_deref().unwrap_or_default(), - client_name, - client_version - ) -)] async fn run_graphql_request( service: Buffer< BoxService< @@ -478,18 +468,6 @@ async fn run_graphql_request( >, http_request: Request, ) -> impl IntoResponse { - if let Some(client_name) = http_request.headers().get("apollographql-client-name") { - // Record the client name as part of the current span - Span::current().record("client_name", &client_name.to_str().unwrap_or_default()); - } - if let Some(client_version) = http_request.headers().get("apollographql-client-version") { - // Record the client version as part of the current span - Span::current().record( - "client_version", - &client_version.to_str().unwrap_or_default(), - ); - } - match service.ready_oneshot().await { Ok(mut service) => { let (head, body) = http_request.into_parts(); diff --git a/apollo-router/src/configuration/mod.rs b/apollo-router/src/configuration/mod.rs index 4f37aa0c10..f5f2d32c3a 100644 --- a/apollo-router/src/configuration/mod.rs +++ b/apollo-router/src/configuration/mod.rs @@ -545,15 +545,6 @@ mod tests { use std::fs; use walkdir::WalkDir; - macro_rules! assert_config_snapshot { - ($file:expr) => {{ - let config = serde_yaml::from_str::(include_str!($file)).unwrap(); - insta::with_settings!({sort_maps => true}, { - insta::assert_yaml_snapshot!(config); - }); - }}; - } - #[cfg(unix)] #[test] fn schema_generation() { @@ -567,53 +558,6 @@ mod tests { assert_json_snapshot!(&schema) } - #[test] - fn test_supergraph_config_serde() { - assert_config_snapshot!("testdata/supergraph_config.yaml"); - } - - #[test] - fn ensure_configuration_api_does_not_change() { - assert_config_snapshot!("testdata/config_basic.yml"); - assert_config_snapshot!("testdata/config_full.yml"); - assert_config_snapshot!("testdata/config_opentelemetry_jaeger_basic.yml"); - assert_config_snapshot!("testdata/config_opentelemetry_jaeger_full.yml"); - } - - #[test] - fn ensure_configuration_api_does_not_change_common() { - // NOTE: don't take a snapshot here because the optional fields appear with ~ and they vary - // per implementation - - serde_yaml::from_str::(include_str!( - "testdata/config_opentelemetry_otlp_tracing_http_common.yml" - )) - .unwrap(); - - serde_yaml::from_str::(include_str!( - "testdata/config_opentelemetry_otlp_tracing_grpc_common.yml" - )) - .unwrap(); - } - - #[test] - fn ensure_configuration_api_does_not_change_grpc() { - assert_config_snapshot!("testdata/config_opentelemetry_otlp_tracing_grpc_basic.yml"); - assert_config_snapshot!("testdata/config_opentelemetry_otlp_tracing_grpc_full.yml"); - } - - #[test] - fn ensure_configuration_api_does_not_change_http() { - assert_config_snapshot!("testdata/config_opentelemetry_otlp_tracing_http_basic.yml"); - assert_config_snapshot!("testdata/config_opentelemetry_otlp_tracing_http_full.yml"); - } - - #[cfg(all(feature = "otlp-grpc"))] - #[test] - fn ensure_configuration_api_does_not_change_tls_config() { - assert_config_snapshot!("testdata/config_opentelemetry_otlp_tracing_grpc_tls.yml"); - } - #[test] fn routing_url_in_schema() { let schema: graphql::Schema = r#" diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index a088c809aa..04419adc8e 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -430,6 +430,16 @@ expression: "&schema" "type": "string", "nullable": true }, + "client_name_header": { + "default": "apollographql-client-name", + "type": "string", + "nullable": true + }, + "client_version_header": { + "default": "apollographql-client-version", + "type": "string", + "nullable": true + }, "endpoint": { "type": "string", "format": "uri", @@ -687,6 +697,9 @@ expression: "&schema" } }, "additionalProperties": false + }, + "scheduled_delay": { + "type": "string" } }, "additionalProperties": false @@ -717,11 +730,20 @@ expression: "&schema" } }, "additionalProperties": false + }, + "scheduled_delay": { + "type": "string" } }, "additionalProperties": false } ], + "properties": { + "scheduled_delay": { + "default": null, + "type": "string" + } + }, "nullable": true }, "otlp": { diff --git a/apollo-router/src/configuration/testdata/config_basic.yml b/apollo-router/src/configuration/testdata/config_basic.router.yaml similarity index 100% rename from apollo-router/src/configuration/testdata/config_basic.yml rename to apollo-router/src/configuration/testdata/config_basic.router.yaml diff --git a/apollo-router/src/configuration/testdata/config_full.yml b/apollo-router/src/configuration/testdata/config_full.router.yaml similarity index 100% rename from apollo-router/src/configuration/testdata/config_full.yml rename to apollo-router/src/configuration/testdata/config_full.router.yaml diff --git a/apollo-router/src/configuration/testdata/config_opentelemetry_jaeger_basic.yml b/apollo-router/src/configuration/testdata/config_opentelemetry_jaeger_basic.yml deleted file mode 100644 index d1b76d4ced..0000000000 --- a/apollo-router/src/configuration/testdata/config_opentelemetry_jaeger_basic.yml +++ /dev/null @@ -1,2 +0,0 @@ -server: - listen: 1.2.3.4:5 diff --git a/apollo-router/src/configuration/testdata/config_opentelemetry_jaeger_full.yml b/apollo-router/src/configuration/testdata/config_opentelemetry_jaeger_full.yml deleted file mode 100644 index d1b76d4ced..0000000000 --- a/apollo-router/src/configuration/testdata/config_opentelemetry_jaeger_full.yml +++ /dev/null @@ -1,2 +0,0 @@ -server: - listen: 1.2.3.4:5 diff --git a/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_grpc_basic.yml b/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_grpc_basic.yml deleted file mode 100644 index d1b76d4ced..0000000000 --- a/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_grpc_basic.yml +++ /dev/null @@ -1,2 +0,0 @@ -server: - listen: 1.2.3.4:5 diff --git a/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_grpc_common.yml b/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_grpc_common.yml deleted file mode 100644 index d1b76d4ced..0000000000 --- a/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_grpc_common.yml +++ /dev/null @@ -1,2 +0,0 @@ -server: - listen: 1.2.3.4:5 diff --git a/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_grpc_full.yml b/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_grpc_full.yml deleted file mode 100644 index d1b76d4ced..0000000000 --- a/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_grpc_full.yml +++ /dev/null @@ -1,2 +0,0 @@ -server: - listen: 1.2.3.4:5 diff --git a/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_grpc_tls.yml b/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_grpc_tls.yml deleted file mode 100644 index d1b76d4ced..0000000000 --- a/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_grpc_tls.yml +++ /dev/null @@ -1,2 +0,0 @@ -server: - listen: 1.2.3.4:5 diff --git a/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_http_basic.yml b/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_http_basic.yml deleted file mode 100644 index d1b76d4ced..0000000000 --- a/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_http_basic.yml +++ /dev/null @@ -1,2 +0,0 @@ -server: - listen: 1.2.3.4:5 diff --git a/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_http_common.yml b/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_http_common.yml deleted file mode 100644 index d1b76d4ced..0000000000 --- a/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_http_common.yml +++ /dev/null @@ -1,2 +0,0 @@ -server: - listen: 1.2.3.4:5 diff --git a/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_http_full.yml b/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_http_full.yml deleted file mode 100644 index d1b76d4ced..0000000000 --- a/apollo-router/src/configuration/testdata/config_opentelemetry_otlp_tracing_http_full.yml +++ /dev/null @@ -1,2 +0,0 @@ -server: - listen: 1.2.3.4:5 diff --git a/apollo-router/src/configuration/testdata/invalid_url.yaml b/apollo-router/src/configuration/testdata/invalid_url.router.yaml similarity index 100% rename from apollo-router/src/configuration/testdata/invalid_url.yaml rename to apollo-router/src/configuration/testdata/invalid_url.router.yaml diff --git a/apollo-router/src/configuration/testdata/supergraph_config.yaml b/apollo-router/src/configuration/testdata/supergraph_config.router.yaml similarity index 100% rename from apollo-router/src/configuration/testdata/supergraph_config.yaml rename to apollo-router/src/configuration/testdata/supergraph_config.router.yaml diff --git a/apollo-router/src/configuration/testdata/supergraph_config_invalid_listen.yaml b/apollo-router/src/configuration/testdata/supergraph_config_invalid_listen.router.yaml similarity index 100% rename from apollo-router/src/configuration/testdata/supergraph_config_invalid_listen.yaml rename to apollo-router/src/configuration/testdata/supergraph_config_invalid_listen.router.yaml diff --git a/apollo-router/src/configuration/testdata/tracing_apollo.router.yaml b/apollo-router/src/configuration/testdata/tracing_apollo.router.yaml new file mode 100644 index 0000000000..db89dc2e2d --- /dev/null +++ b/apollo-router/src/configuration/testdata/tracing_apollo.router.yaml @@ -0,0 +1,10 @@ +server: + listen: 1.2.3.4:5 +telemetry: + apollo: + endpoint: "http://example.com" + client_version_header: apollographql-client-version + client_name_header: apollographql-client-name + apollo_graph_ref: "" + apollo_key: "" + diff --git a/apollo-router/src/configuration/testdata/tracing_config.router.yaml b/apollo-router/src/configuration/testdata/tracing_config.router.yaml new file mode 100644 index 0000000000..6c8c77df2a --- /dev/null +++ b/apollo-router/src/configuration/testdata/tracing_config.router.yaml @@ -0,0 +1,23 @@ +server: + listen: 1.2.3.4:5 +telemetry: + tracing: + trace_config: + attributes: + foo: bar + max_attributes_per_event: 2 + max_attributes_per_link: 3 + max_attributes_per_span: 3 + max_events_per_span: 3 + max_links_per_span: 3 + parent_based_sampler: false + sampler: 0.3 + service_name: "foo" + service_namespace: "bar" + propagation: + datadog: false + jaeger: false + baggage: false + trace_context: false + zipkin: false + diff --git a/apollo-router/src/configuration/testdata/tracing_datadog.router.yaml b/apollo-router/src/configuration/testdata/tracing_datadog.router.yaml new file mode 100644 index 0000000000..8c7d10becc --- /dev/null +++ b/apollo-router/src/configuration/testdata/tracing_datadog.router.yaml @@ -0,0 +1,7 @@ +server: + listen: 1.2.3.4:5 +telemetry: + tracing: + datadog: + endpoint: default + diff --git a/apollo-router/src/configuration/testdata/tracing_jaeger_agent.router.yaml b/apollo-router/src/configuration/testdata/tracing_jaeger_agent.router.yaml new file mode 100644 index 0000000000..da43c9fb0f --- /dev/null +++ b/apollo-router/src/configuration/testdata/tracing_jaeger_agent.router.yaml @@ -0,0 +1,7 @@ +server: + listen: 1.2.3.4:5 +telemetry: + tracing: + jaeger: + agent: + endpoint: default diff --git a/apollo-router/src/configuration/testdata/tracing_jaeger_collector.router.yaml b/apollo-router/src/configuration/testdata/tracing_jaeger_collector.router.yaml new file mode 100644 index 0000000000..161fabf623 --- /dev/null +++ b/apollo-router/src/configuration/testdata/tracing_jaeger_collector.router.yaml @@ -0,0 +1,7 @@ +server: + listen: 1.2.3.4:5 +telemetry: + tracing: + jaeger: + collector: + endpoint: http://example.com diff --git a/apollo-router/src/configuration/testdata/tracing_jaeger_full.router.yaml b/apollo-router/src/configuration/testdata/tracing_jaeger_full.router.yaml new file mode 100644 index 0000000000..cac9c7f9b4 --- /dev/null +++ b/apollo-router/src/configuration/testdata/tracing_jaeger_full.router.yaml @@ -0,0 +1,9 @@ +server: + listen: 1.2.3.4:5 +telemetry: + tracing: + jaeger: + collector: + endpoint: "http://foo" + password: "" + username: "" diff --git a/apollo-router/src/configuration/testdata/tracing_otlp_full.router.yaml b/apollo-router/src/configuration/testdata/tracing_otlp_full.router.yaml new file mode 100644 index 0000000000..a5c3530c9d --- /dev/null +++ b/apollo-router/src/configuration/testdata/tracing_otlp_full.router.yaml @@ -0,0 +1,22 @@ +server: + listen: 1.2.3.4:5 +telemetry: + tracing: + otlp: + endpoint: default + grpc: + ca: + env: "" + cert: + env: "" + domain_name: "" + key: + env: "" + metadata: + foo: bar + http: + headers: + foo: bar + timeout: 4s + protocol: grpc + diff --git a/apollo-router/src/configuration/testdata/tracing_otlp_grpc_basic.router.yaml b/apollo-router/src/configuration/testdata/tracing_otlp_grpc_basic.router.yaml new file mode 100644 index 0000000000..cb67346643 --- /dev/null +++ b/apollo-router/src/configuration/testdata/tracing_otlp_grpc_basic.router.yaml @@ -0,0 +1,7 @@ +server: + listen: 1.2.3.4:5 +telemetry: + tracing: + otlp: + endpoint: default + protocol: http diff --git a/apollo-router/src/configuration/testdata/tracing_otlp_http_basic.router.yaml b/apollo-router/src/configuration/testdata/tracing_otlp_http_basic.router.yaml new file mode 100644 index 0000000000..cb67346643 --- /dev/null +++ b/apollo-router/src/configuration/testdata/tracing_otlp_http_basic.router.yaml @@ -0,0 +1,7 @@ +server: + listen: 1.2.3.4:5 +telemetry: + tracing: + otlp: + endpoint: default + protocol: http diff --git a/apollo-router/src/lib.rs b/apollo-router/src/lib.rs index 68f1f22c0b..6885651dfa 100644 --- a/apollo-router/src/lib.rs +++ b/apollo-router/src/lib.rs @@ -1,5 +1,7 @@ //! Starts a server that will handle http graphql requests. +extern crate core; + #[cfg(feature = "axum-server")] mod axum_http_server_factory; pub mod configuration; @@ -266,7 +268,13 @@ impl ConfigurationKind { if watch { files::watch(path.to_owned(), delay) .filter_map(move |_| { - future::ready(ConfigurationKind::read_config(&path).ok()) + future::ready(match ConfigurationKind::read_config(&path) { + Ok(config) => Some(config), + Err(err) => { + tracing::error!("{}", err); + None + } + }) }) .map(|x| UpdateConfiguration(Box::new(x))) .boxed() diff --git a/apollo-router/src/plugins/telemetry/apollo.rs b/apollo-router/src/plugins/telemetry/apollo.rs new file mode 100644 index 0000000000..dd8f9d960c --- /dev/null +++ b/apollo-router/src/plugins/telemetry/apollo.rs @@ -0,0 +1,56 @@ +//! Configuration for apollo telemetry. +use crate::graphql::serde_utils::deserialize_header_name; +use http::header::HeaderName; +use schemars::JsonSchema; +use serde::Deserialize; +use url::Url; + +#[derive(Debug, Clone, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct Config { + pub endpoint: Option, + pub apollo_key: Option, + pub apollo_graph_ref: Option, + + #[schemars(with = "Option", default = "client_name_header_default_str")] + #[serde( + deserialize_with = "deserialize_header_name", + default = "client_name_header_default" + )] + pub client_name_header: HeaderName, + + #[schemars(with = "Option", default = "client_version_header_default_str")] + #[serde( + deserialize_with = "deserialize_header_name", + default = "client_version_header_default" + )] + pub client_version_header: HeaderName, +} + +fn client_name_header_default_str() -> &'static str { + "apollographql-client-name" +} + +fn client_name_header_default() -> HeaderName { + HeaderName::from_static(client_name_header_default_str()) +} + +fn client_version_header_default_str() -> &'static str { + "apollographql-client-version" +} + +fn client_version_header_default() -> HeaderName { + HeaderName::from_static(client_version_header_default_str()) +} + +impl Default for Config { + fn default() -> Self { + Self { + endpoint: None, + apollo_key: None, + apollo_graph_ref: None, + client_name_header: client_name_header_default(), + client_version_header: client_version_header_default(), + } + } +} diff --git a/apollo-router/src/plugins/telemetry/config.rs b/apollo-router/src/plugins/telemetry/config.rs index 3f646ebdb0..a79697f641 100644 --- a/apollo-router/src/plugins/telemetry/config.rs +++ b/apollo-router/src/plugins/telemetry/config.rs @@ -1,3 +1,4 @@ +//! Configuration for the telemetry plugin. use super::*; use crate::plugins::telemetry::metrics; use opentelemetry::sdk::Resource; diff --git a/apollo-router/src/plugins/telemetry/mod.rs b/apollo-router/src/plugins/telemetry/mod.rs index 588cd62cfd..12a1e3184a 100644 --- a/apollo-router/src/plugins/telemetry/mod.rs +++ b/apollo-router/src/plugins/telemetry/mod.rs @@ -4,16 +4,18 @@ use crate::plugins::telemetry::metrics::{ AggregateMeterProvider, BasicMetrics, MetricsBuilder, MetricsConfigurator, MetricsExporterHandle, }; -use crate::plugins::telemetry::tracing::{apollo, TracingConfigurator}; +use crate::plugins::telemetry::tracing::TracingConfigurator; use crate::subscriber::replace_layer; +use ::tracing::{info_span, Span}; use apollo_router_core::{ - http_compat, register_plugin, Handler, Plugin, ResponseBody, RouterRequest, RouterResponse, - SubgraphRequest, SubgraphResponse, + http_compat, register_plugin, ExecutionRequest, ExecutionResponse, Handler, Plugin, + QueryPlannerRequest, QueryPlannerResponse, ResponseBody, RouterRequest, RouterResponse, + ServiceBuilderExt, SubgraphRequest, SubgraphResponse, }; use apollo_spaceport::server::ReportSpaceport; use bytes::Bytes; use futures::FutureExt; -use http::StatusCode; +use http::{HeaderValue, StatusCode}; use opentelemetry::propagation::TextMapPropagator; use opentelemetry::sdk::propagation::{ BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator, @@ -27,14 +29,17 @@ use std::fmt; use std::time::Instant; use tower::steer::Steer; use tower::util::BoxService; -use tower::{service_fn, BoxError, ServiceExt}; +use tower::{service_fn, BoxError, ServiceBuilder, ServiceExt}; use url::Url; +mod apollo; mod config; mod metrics; mod otlp; mod tracing; +pub static ROUTER_SPAN_NAME: &str = "router"; + pub struct Telemetry { config: config::Conf, tracer_provider: Option, @@ -95,17 +100,13 @@ impl Drop for Telemetry { // Tracer providers must be flushed. This may happen as part of otel if the provider was set // as the global, but may also happen in the case of an failed config reload. // If the tracer prover is present then it was not handed over so we must flush it. - // The magic incantation seems to be that the flush MUST happen within a tokio runtime, - // and then within a spawn blocking. + // The magic incantation seems to be that the flush MUST happen in a separate thread. ::tracing::debug!("flushing telemetry"); - std::thread::spawn(|| { - let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); - runtime.block_on(async { - let jh = tokio::task::spawn_blocking(move || { - opentelemetry::trace::TracerProvider::force_flush(&tracer_provider); - }); - futures::executor::block_on(jh).expect("failed to flush tracer provider"); + std::thread::spawn(|| async { + let jh = tokio::task::spawn_blocking(move || { + opentelemetry::trace::TracerProvider::force_flush(&tracer_provider); }); + futures::executor::block_on(jh).expect("failed to flush tracer provider"); }); } @@ -160,6 +161,7 @@ impl Plugin for Telemetry { apollo_key: Some(_), apollo_graph_ref: Some(_), endpoint: None, + .. } => { ::tracing::debug!("starting Spaceport"); let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel(); @@ -226,12 +228,16 @@ impl Plugin for Telemetry { service: BoxService, ) -> BoxService { let metrics = BasicMetrics::new(&self.meter_provider); - service + ServiceBuilder::new() + .instrument(Self::router_service_span( + self.config.apollo.clone().unwrap_or_default(), + )) + .service(service) .map_future(move |f| { let metrics = metrics.clone(); // Using Instant because it is guaranteed to be monotonically increasing. let now = Instant::now(); - f.map(move |r| { + f.map(move |r: Result| { match &r { Ok(response) => { metrics.http_requests_total.add( @@ -255,6 +261,26 @@ impl Plugin for Telemetry { .boxed() } + fn query_planning_service( + &mut self, + service: BoxService, + ) -> BoxService { + ServiceBuilder::new() + .instrument(move |_| info_span!("query_planning")) + .service(service) + .boxed() + } + + fn execution_service( + &mut self, + service: BoxService, + ) -> BoxService { + ServiceBuilder::new() + .instrument(move |_| info_span!("execution")) + .service(service) + .boxed() + } + fn subgraph_service( &mut self, name: &str, @@ -262,7 +288,10 @@ impl Plugin for Telemetry { ) -> BoxService { let metrics = BasicMetrics::new(&self.meter_provider); let subgraph_attribute = KeyValue::new("subgraph", name.to_string()); - service + let name = name.to_owned(); + ServiceBuilder::new() + .instrument(move |_| info_span!("subgraph", name = name.as_str())) + .service(service) .map_future(move |f| { let metrics = metrics.clone(); let subgraph_attribute = subgraph_attribute.clone(); @@ -409,6 +438,38 @@ impl Telemetry { }); futures::executor::block_on(jh).expect("failed to replace tracer provider"); } + + fn router_service_span(config: apollo::Config) -> impl Fn(&RouterRequest) -> Span + Clone { + let client_name_header = config.client_name_header; + let client_version_header = config.client_version_header; + + move |request: &RouterRequest| { + let http_request = &request.originating_request; + let headers = http_request.headers(); + let query = http_request.body().query.clone().unwrap_or_default(); + let operation_name = http_request + .body() + .operation_name + .clone() + .unwrap_or_default(); + let client_name = headers + .get(&client_name_header) + .cloned() + .unwrap_or_else(|| HeaderValue::from_static("")); + let client_version = headers + .get(&client_version_header) + .cloned() + .unwrap_or_else(|| HeaderValue::from_static("")); + let span = info_span!( + ROUTER_SPAN_NAME, + query = query.as_str(), + operation_name = operation_name.as_str(), + client_name = client_name.to_str().unwrap_or_default(), + client_version = client_version.to_str().unwrap_or_default() + ); + span + } + } } fn handle_error>(err: T) { @@ -429,6 +490,7 @@ register_plugin!("apollo", "telemetry", Telemetry); #[cfg(test)] mod tests { + #[tokio::test] async fn plugin_registered() { apollo_router_core::plugins() diff --git a/apollo-router/src/plugins/telemetry/otlp.rs b/apollo-router/src/plugins/telemetry/otlp.rs index ccf0790761..5eaffa5562 100644 --- a/apollo-router/src/plugins/telemetry/otlp.rs +++ b/apollo-router/src/plugins/telemetry/otlp.rs @@ -1,3 +1,4 @@ +//! Shared configuration for Otlp tracing and metrics. use crate::configuration::ConfigurationError; use crate::plugins::telemetry::config::GenericWith; use opentelemetry_otlp::{HttpExporterBuilder, TonicExporterBuilder, WithExportConfig}; diff --git a/apollo-router/src/plugins/telemetry/tracing/apollo.rs b/apollo-router/src/plugins/telemetry/tracing/apollo.rs index d81474348d..6988a3a105 100644 --- a/apollo-router/src/plugins/telemetry/tracing/apollo.rs +++ b/apollo-router/src/plugins/telemetry/tracing/apollo.rs @@ -1,19 +1,10 @@ +//! Tracing configuration for apollo telemetry. +use crate::plugins::telemetry::apollo::Config; use crate::plugins::telemetry::config::Trace; use crate::plugins::telemetry::tracing::apollo_telemetry::{SpaceportConfig, StudioGraph}; use crate::plugins::telemetry::tracing::{apollo_telemetry, TracingConfigurator}; use opentelemetry::sdk::trace::Builder; -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; use tower::BoxError; -use url::Url; - -#[derive(Default, Debug, Clone, Deserialize, Serialize, JsonSchema)] -#[serde(deny_unknown_fields)] -pub struct Config { - pub endpoint: Option, - pub apollo_key: Option, - pub apollo_graph_ref: Option, -} impl TracingConfigurator for Config { fn apply(&self, builder: Builder, trace_config: &Trace) -> Result { @@ -23,6 +14,7 @@ impl TracingConfigurator for Config { endpoint: Some(endpoint), apollo_key: Some(key), apollo_graph_ref: Some(reference), + .. } => { tracing::debug!("configuring exporter to Spaceport"); let exporter = apollo_telemetry::new_pipeline() diff --git a/apollo-router/src/plugins/telemetry/tracing/apollo_telemetry.rs b/apollo-router/src/plugins/telemetry/tracing/apollo_telemetry.rs index 4b8e742320..e736bac309 100644 --- a/apollo-router/src/plugins/telemetry/tracing/apollo_telemetry.rs +++ b/apollo-router/src/plugins/telemetry/tracing/apollo_telemetry.rs @@ -26,6 +26,7 @@ //! shutdown_tracer_provider(); // sending remaining spans //! } //! ``` +use crate::plugins::telemetry::ROUTER_SPAN_NAME; use apollo_parser::{ast, Parser}; use apollo_spaceport::report::{ContextualizedStats, QueryLatencyStats, StatsContext}; use apollo_spaceport::{Reporter, ReporterGraph}; @@ -333,7 +334,7 @@ impl SpanExporter for Exporter { /* * Process the batch */ - for span in batch.iter().filter(|span| span.name == "graphql_request") { + for span in batch.iter().filter(|span| span.name == ROUTER_SPAN_NAME) { // We can't process a span if we don't have a query if let Some(query) = span .attributes diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog.rs b/apollo-router/src/plugins/telemetry/tracing/datadog.rs index 60605ade24..0ec91f5dae 100644 --- a/apollo-router/src/plugins/telemetry/tracing/datadog.rs +++ b/apollo-router/src/plugins/telemetry/tracing/datadog.rs @@ -1,3 +1,4 @@ +//! Configuration for datadog tracing. use crate::plugins::telemetry::config::{GenericWith, Trace}; use crate::plugins::telemetry::tracing::TracingConfigurator; use opentelemetry::sdk::trace::Builder; @@ -42,39 +43,3 @@ impl TracingConfigurator for Config { Ok(builder.with_batch_exporter(exporter, opentelemetry::runtime::Tokio)) } } - -#[cfg(test)] -mod tests { - use crate::plugins::telemetry::tracing::test::run_query; - use opentelemetry::global; - use tower::BoxError; - use tracing::instrument::WithSubscriber; - use tracing_subscriber::layer::SubscriberExt; - use tracing_subscriber::Registry; - - // This test can be run manually from your IDE to help with testing otel - // It is set to ignore by default as Datadog may not be set up - #[ignore] - #[tokio::test(flavor = "multi_thread")] - async fn test_tracing() -> Result<(), BoxError> { - tracing_subscriber::fmt().init(); - - global::set_text_map_propagator(opentelemetry_datadog::DatadogPropagator::new()); - let tracer = opentelemetry_datadog::new_pipeline() - .with_service_name("my_app") - .install_batch(opentelemetry::runtime::Tokio)?; - - // Create a tracing layer with the configured tracer - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - - // Use the tracing subscriber `Registry`, or any other subscriber - // that impls `LookupSpan` - let subscriber = Registry::default().with(telemetry); - - // Trace executed code - run_query().with_subscriber(subscriber).await; - global::shutdown_tracer_provider(); - - Ok(()) - } -} diff --git a/apollo-router/src/plugins/telemetry/tracing/jaeger.rs b/apollo-router/src/plugins/telemetry/tracing/jaeger.rs index e4a2c8ba18..085b0cfdfc 100644 --- a/apollo-router/src/plugins/telemetry/tracing/jaeger.rs +++ b/apollo-router/src/plugins/telemetry/tracing/jaeger.rs @@ -1,17 +1,50 @@ +//! Configuration for jaeger tracing. use crate::plugins::telemetry::config::{GenericWith, Trace}; use crate::plugins::telemetry::tracing::TracingConfigurator; -use opentelemetry::sdk::trace::Builder; +use opentelemetry::sdk::trace::{BatchSpanProcessor, Builder}; +use schemars::gen::SchemaGenerator; +use schemars::schema::{Schema, SchemaObject}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; +use std::time::Duration; use tower::BoxError; use url::Url; #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] -#[serde()] pub struct Config { #[serde(flatten)] + #[schemars(schema_with = "endpoint_schema")] pub endpoint: Endpoint, + + #[serde(deserialize_with = "humantime_serde::deserialize", default)] + #[schemars(with = "String", default)] + pub scheduled_delay: Option, +} + +// This is needed because of the use of flatten. +fn endpoint_schema(gen: &mut SchemaGenerator) -> Schema { + let mut schema: SchemaObject = ::json_schema(gen).into(); + + schema + .subschemas + .as_mut() + .unwrap() + .one_of + .as_mut() + .unwrap() + .iter_mut() + .for_each(|s| { + if let Schema::Object(o) = s { + o.object + .as_mut() + .unwrap() + .properties + .insert("scheduled_delay".to_string(), String::json_schema(gen)); + } + }); + + schema.into() } #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] @@ -73,42 +106,10 @@ impl TracingConfigurator for Config { .init_async_exporter(opentelemetry::runtime::Tokio)?, }; - Ok(builder.with_batch_exporter(exporter, opentelemetry::runtime::Tokio)) - } -} - -#[cfg(test)] -mod tests { - use crate::plugins::telemetry::tracing::test::run_query; - use opentelemetry::global; - use tower::BoxError; - use tracing::instrument::WithSubscriber; - use tracing_subscriber::layer::SubscriberExt; - use tracing_subscriber::Registry; - - // This test can be run manually from your IDE to help with testing otel - // It is set to ignore by default as jaeger may not be set up - #[ignore] - #[tokio::test(flavor = "multi_thread")] - async fn test_tracing() -> Result<(), BoxError> { - tracing_subscriber::fmt().init(); - - global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); - let tracer = opentelemetry_jaeger::new_pipeline() - .with_service_name("my_app") - .install_batch(opentelemetry::runtime::Tokio)?; - - // Create a tracing layer with the configured tracer - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - - // Use the tracing subscriber `Registry`, or any other subscriber - // that impls `LookupSpan` - let subscriber = Registry::default().with(telemetry); - - // Trace executed code - run_query().with_subscriber(subscriber).await; - global::shutdown_tracer_provider(); - - Ok(()) + Ok(builder.with_span_processor( + BatchSpanProcessor::builder(exporter, opentelemetry::runtime::Tokio) + .with(&self.scheduled_delay, |b, d| b.with_scheduled_delay(*d)) + .build(), + )) } } diff --git a/apollo-router/src/plugins/telemetry/tracing/mod.rs b/apollo-router/src/plugins/telemetry/tracing/mod.rs index 41eb7e63d1..d402d9ea18 100644 --- a/apollo-router/src/plugins/telemetry/tracing/mod.rs +++ b/apollo-router/src/plugins/telemetry/tracing/mod.rs @@ -12,31 +12,3 @@ pub mod zipkin; pub trait TracingConfigurator { fn apply(&self, builder: Builder, trace_config: &Trace) -> Result; } - -#[cfg(test)] -mod test { - use http::{Method, Request, Uri}; - use opentelemetry::global; - use opentelemetry_http::HttpClient; - use tracing::{info_span, Instrument}; - use tracing_opentelemetry::OpenTelemetrySpanExt; - pub async fn run_query() { - let span = info_span!("client_request"); - let client = reqwest::Client::new(); - - let mut request = Request::builder() - .method(Method::POST) - .uri(Uri::from_static("http://localhost:4000")) - .body(r#"{"query":"query {\n topProducts {\n name\n }\n}","variables":{}}"#.into()) - .unwrap(); - - global::get_text_map_propagator(|propagator| { - propagator.inject_context( - &span.context(), - &mut opentelemetry_http::HeaderInjector(request.headers_mut()), - ) - }); - - client.send(request).instrument(span).await.unwrap(); - } -} diff --git a/apollo-router/src/plugins/telemetry/tracing/otlp.rs b/apollo-router/src/plugins/telemetry/tracing/otlp.rs index 0fedee3869..d6c4834c52 100644 --- a/apollo-router/src/plugins/telemetry/tracing/otlp.rs +++ b/apollo-router/src/plugins/telemetry/tracing/otlp.rs @@ -1,3 +1,4 @@ +//! Configuration for Otlp tracing. use crate::plugins::telemetry::config::Trace; use crate::plugins::telemetry::tracing::TracingConfigurator; use opentelemetry::sdk::trace::Builder; @@ -15,41 +16,3 @@ impl TracingConfigurator for super::super::otlp::Config { )) } } - -#[cfg(test)] -mod tests { - use crate::plugins::telemetry::tracing::test::run_query; - use opentelemetry::global; - use opentelemetry::sdk::propagation::TraceContextPropagator; - use tower::BoxError; - use tracing::instrument::WithSubscriber; - use tracing_subscriber::layer::SubscriberExt; - use tracing_subscriber::Registry; - - // This test can be run manually from your IDE to help with testing otel - // It is set to ignore by default as otlp may not be set up - #[ignore] - #[tokio::test(flavor = "multi_thread")] - async fn test_tracing() -> Result<(), BoxError> { - tracing_subscriber::fmt().init(); - - global::set_text_map_propagator(TraceContextPropagator::new()); - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(opentelemetry_otlp::new_exporter().http()) - .install_batch(opentelemetry::runtime::Tokio)?; - - // Create a tracing layer with the configured tracer - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - - // Use the tracing subscriber `Registry`, or any other subscriber - // that impls `LookupSpan` - let subscriber = Registry::default().with(telemetry); - - // Trace executed code - run_query().with_subscriber(subscriber).await; - global::shutdown_tracer_provider(); - - Ok(()) - } -} diff --git a/apollo-router/src/plugins/telemetry/tracing/zipkin.rs b/apollo-router/src/plugins/telemetry/tracing/zipkin.rs index 9fecbced69..c4a9e73c4f 100644 --- a/apollo-router/src/plugins/telemetry/tracing/zipkin.rs +++ b/apollo-router/src/plugins/telemetry/tracing/zipkin.rs @@ -1,3 +1,4 @@ +//! Configuration for zipkin tracing. use crate::plugins::telemetry::config::{GenericWith, Trace}; use crate::plugins::telemetry::tracing::TracingConfigurator; use opentelemetry::sdk::trace::Builder; @@ -67,39 +68,3 @@ impl TracingConfigurator for Config { Ok(builder.with_batch_exporter(exporter, opentelemetry::runtime::Tokio)) } } - -#[cfg(test)] -mod tests { - use crate::plugins::telemetry::tracing::test::run_query; - use opentelemetry::global; - use tower::BoxError; - use tracing::instrument::WithSubscriber; - use tracing_subscriber::layer::SubscriberExt; - use tracing_subscriber::Registry; - - // This test can be run manually from your IDE to help with testing otel - // It is set to ignore by default as zipkin may not be set up - #[ignore] - #[tokio::test(flavor = "multi_thread")] - async fn test_tracing() -> Result<(), BoxError> { - tracing_subscriber::fmt().init(); - - global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new()); - let tracer = opentelemetry_zipkin::new_pipeline() - .with_service_name("my_app") - .install_batch(opentelemetry::runtime::Tokio)?; - - // Create a tracing layer with the configured tracer - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - - // Use the tracing subscriber `Registry`, or any other subscriber - // that impls `LookupSpan` - let subscriber = Registry::default().with(telemetry); - - // Trace executed code - run_query().with_subscriber(subscriber).await; - global::shutdown_tracer_provider(); - - Ok(()) - } -} diff --git a/apollo-router/src/testdata/datadog.router.yaml b/apollo-router/src/testdata/datadog.router.yaml new file mode 100644 index 0000000000..0e9e98346d --- /dev/null +++ b/apollo-router/src/testdata/datadog.router.yaml @@ -0,0 +1,6 @@ +telemetry: + tracing: + trace_config: + service_name: router + datadog: + endpoint: default diff --git a/apollo-router/src/testdata/jaeger.router.yaml b/apollo-router/src/testdata/jaeger.router.yaml new file mode 100644 index 0000000000..6173ca774c --- /dev/null +++ b/apollo-router/src/testdata/jaeger.router.yaml @@ -0,0 +1,8 @@ +telemetry: + tracing: + trace_config: + service_name: router + jaeger: + scheduled_delay: 100ms + agent: + endpoint: default diff --git a/apollo-router/src/testdata/otlp.router.yaml b/apollo-router/src/testdata/otlp.router.yaml new file mode 100644 index 0000000000..cbfa7ad982 --- /dev/null +++ b/apollo-router/src/testdata/otlp.router.yaml @@ -0,0 +1,6 @@ +telemetry: + tracing: + trace_config: + service_name: router + otlp: + endpoint: default diff --git a/apollo-router/src/testdata/zipkin.router.yaml b/apollo-router/src/testdata/zipkin.router.yaml new file mode 100644 index 0000000000..19bb68b9ae --- /dev/null +++ b/apollo-router/src/testdata/zipkin.router.yaml @@ -0,0 +1,7 @@ +telemetry: + tracing: + trace_config: + service_name: router + zipkin: + agent: + endpoint: default diff --git a/apollo-router/src/warp_http_server_factory.rs b/apollo-router/src/warp_http_server_factory.rs index 1a41188f81..b4e4c214b9 100644 --- a/apollo-router/src/warp_http_server_factory.rs +++ b/apollo-router/src/warp_http_server_factory.rs @@ -492,17 +492,6 @@ where ) } -// graphql_request is traced at the info level so that it can be processed normally in apollo telemetry. -#[tracing::instrument(skip_all, - level = "info" - name = "graphql_request", - fields( - query = %request.query.clone().unwrap_or_default(), - operation_name = %request.operation_name.clone().unwrap_or_else(|| "".to_string()), - client_name, - client_version - ) -)] fn run_graphql_request( service: RS, authority: Option, @@ -518,18 +507,6 @@ where + 'static, >>::Future: std::marker::Send, { - if let Some(client_name) = header_map.get("apollographql-client-name") { - // Record the client name as part of the current span - Span::current().record("client_name", &client_name.to_str().unwrap_or_default()); - } - if let Some(client_version) = header_map.get("apollographql-client-version") { - // Record the client version as part of the current span - Span::current().record( - "client_version", - &client_version.to_str().unwrap_or_default(), - ); - } - async move { match service.ready_oneshot().await { Ok(mut service) => { diff --git a/apollo-router/tests/common.rs b/apollo-router/tests/common.rs new file mode 100644 index 0000000000..47d6d12f3f --- /dev/null +++ b/apollo-router/tests/common.rs @@ -0,0 +1,143 @@ +use http::header::CONTENT_TYPE; +use http::{HeaderValue, Method, Request, Uri}; +use jsonpath_lib::Selector; +use opentelemetry::global; +use opentelemetry::propagation::TextMapPropagator; +use opentelemetry::sdk::trace::Tracer; +use opentelemetry_http::HttpClient; +use serde_json::Value; +use std::fs; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::process::{Child, Command, Stdio}; +use std::time::Duration; +use tower::BoxError; +use tracing::info_span; +use tracing_core::LevelFilter; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::{EnvFilter, Layer, Registry}; +use uuid::Uuid; + +pub struct TracingTest { + router: Child, + test_config_location: PathBuf, +} + +impl TracingTest { + pub fn new( + tracer: Tracer, + propagator: P, + config_location: &Path, + ) -> Self { + let telemetry = tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(LevelFilter::INFO); + let subscriber = Registry::default().with(telemetry).with( + tracing_subscriber::fmt::Layer::default() + .compact() + .with_filter(EnvFilter::from_default_env()), + ); + + let config_location = + PathBuf::from_iter(["..", "apollo-router", "src", "testdata"]).join(config_location); + let test_config_location = PathBuf::from_iter(["..", "target", "test_config.yaml"]); + fs::copy(&config_location, &test_config_location).expect("could not copy config"); + + tracing::subscriber::set_global_default(subscriber).unwrap(); + global::set_text_map_propagator(propagator); + + let router_location = if cfg!(windows) { + PathBuf::from_iter(["..", "target", "debug", "router.exe"]) + } else { + PathBuf::from_iter(["..", "target", "debug", "router"]) + }; + + Self { + test_config_location: test_config_location.clone(), + router: Command::new(router_location) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .args([ + "--hr", + "--config", + &test_config_location.to_string_lossy().to_string(), + "--supergraph", + &PathBuf::from_iter(["..", "examples", "graphql", "local.graphql"]) + .to_string_lossy() + .to_string(), + ]) + .spawn() + .expect("Router should start"), + } + } + + #[allow(dead_code)] + pub fn touch_config(&self) -> Result<(), BoxError> { + let mut f = fs::OpenOptions::new() + .append(true) + .open(&self.test_config_location)?; + f.write_all("#touched\n".as_bytes())?; + Ok(()) + } + + pub async fn run_query(&self) -> String { + let client = reqwest::Client::new(); + let id = Uuid::new_v4().to_string(); + let span = info_span!("client_request", unit_test = id.as_str()); + let _span_guard = span.enter(); + + for _i in 0..100 { + let mut request = Request::builder() + .method(Method::POST) + .header(CONTENT_TYPE, HeaderValue::from_static("application/json")) + .header("apollographql-client-name", "custom_name") + .header("apollographql-client-version", "1.0") + .uri(Uri::from_static("http://localhost:4000")) + .body(r#"{"query":"{topProducts{name}}","variables":{}}"#.into()) + .unwrap(); + + global::get_text_map_propagator(|propagator| { + propagator.inject_context( + &span.context(), + &mut opentelemetry_http::HeaderInjector(request.headers_mut()), + ) + }); + match client.send(request).await { + Ok(result) => { + tracing::debug!( + "got {}", + String::from_utf8(result.body().to_vec()).unwrap_or_default() + ); + return id; + } + Err(e) => { + tracing::debug!("query failed: {}", e); + } + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + panic!("unable to send successful request to router") + } +} + +impl Drop for TracingTest { + fn drop(&mut self) { + self.router.kill().expect("router could not be halted"); + global::shutdown_tracer_provider(); + } +} + +pub trait ValueExt { + fn select_path<'a>(&'a self, path: &str) -> Result, BoxError>; + fn as_string(&self) -> Option; +} + +impl ValueExt for Value { + fn select_path<'a>(&'a self, path: &str) -> Result, BoxError> { + Ok(Selector::new().str_path(path)?.value(self).select()?) + } + fn as_string(&self) -> Option { + self.as_str().map(|s| s.to_string()) + } +} diff --git a/apollo-router/tests/datadog_test.rs b/apollo-router/tests/datadog_test.rs new file mode 100644 index 0000000000..d40c17dcaa --- /dev/null +++ b/apollo-router/tests/datadog_test.rs @@ -0,0 +1,20 @@ +mod common; +use crate::common::TracingTest; +use std::path::Path; +use tower::BoxError; + +#[ignore] +#[tokio::test(flavor = "multi_thread")] +async fn test_datadog_tracing() -> Result<(), BoxError> { + let tracer = opentelemetry_datadog::new_pipeline() + .with_service_name("my_app") + .install_batch(opentelemetry::runtime::Tokio)?; + + let router = TracingTest::new( + tracer, + opentelemetry_datadog::DatadogPropagator::new(), + Path::new("datadog.router.yaml"), + ); + router.run_query().await; + Ok(()) +} diff --git a/apollo-router/tests/jaeger_test.rs b/apollo-router/tests/jaeger_test.rs new file mode 100644 index 0000000000..21deb8254c --- /dev/null +++ b/apollo-router/tests/jaeger_test.rs @@ -0,0 +1,202 @@ +mod common; + +use crate::common::TracingTest; +use crate::common::ValueExt; +use serde_json::{json, Value}; +use std::collections::HashSet; +use std::path::Path; +use std::time::Duration; +use tower::BoxError; + +#[tokio::test(flavor = "multi_thread")] +async fn test_jaeger_tracing() -> Result<(), BoxError> { + let tracer = opentelemetry_jaeger::new_pipeline() + .with_service_name("my_app") + .install_simple()?; + + let router = TracingTest::new( + tracer, + opentelemetry_jaeger::Propagator::new(), + Path::new("jaeger.router.yaml"), + ); + + for _ in 0..10 { + let id = router.run_query().await; + query_jaeger_for_trace(id).await?; + router.touch_config()?; + tokio::time::sleep(Duration::from_millis(100)).await; + } + Ok(()) +} + +async fn query_jaeger_for_trace(id: String) -> Result<(), BoxError> { + let tags = json!({ "unit_test": id }); + let params = url::form_urlencoded::Serializer::new(String::new()) + .append_pair("service", "my_app") + .append_pair("tags", &tags.to_string()) + .finish(); + + let url = format!("http://localhost:16686/api/traces?{}", params); + for _ in 0..10 { + match find_valid_trace(&url).await { + Ok(_) => { + return Ok(()); + } + Err(e) => { + tracing::warn!("{}", e); + } + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + panic!("did not get full otel trace"); +} + +async fn find_valid_trace(url: &str) -> Result<(), BoxError> { + // A valid trace has: + // * All three services + // * The correct spans + // * All spans are parented + // * Required attributes of 'router' span has been set + + let trace: Value = reqwest::get(url).await?.json().await?; + tracing::debug!("{}", serde_json::to_string_pretty(&trace)?); + + // Verify that we got all the participants in the trace + verify_trace_participants(&trace)?; + + // Verify that we got the expected span operation names + verify_spans_pesent(&trace)?; + + // Verify that all spans have a path to the root 'client_request' span + verify_span_parenting(&trace)?; + + // Verify that router span fields are present + verify_router_span_fields(&trace)?; + + Ok(()) +} + +fn verify_router_span_fields(trace: &Value) -> Result<(), BoxError> { + let router_span = trace.select_path("$..spans[?(@.operationName == 'router')]")?[0]; + // We can't actually assert the values on a span. Only that a field has been set. + assert_eq!( + router_span + .select_path("$.tags[?(@.key == 'query')].value")? + .get(0), + Some(&&Value::String("{topProducts{name}}".to_string())) + ); + assert_eq!( + router_span + .select_path("$.tags[?(@.key == 'operation_name')].value")? + .get(0), + Some(&&Value::String("".to_string())) + ); + assert_eq!( + router_span + .select_path("$.tags[?(@.key == 'client_name')].value")? + .get(0), + Some(&&Value::String("custom_name".to_string())) + ); + assert_eq!( + router_span + .select_path("$.tags[?(@.key == 'client_version')].value")? + .get(0), + Some(&&Value::String("1.0".to_string())) + ); + + Ok(()) +} + +fn verify_trace_participants(trace: &Value) -> Result<(), BoxError> { + let services: HashSet = trace + .select_path("$..serviceName")? + .into_iter() + .filter_map(|service| service.as_string()) + .collect(); + tracing::debug!("found services {:?}", services); + + let expected_services = HashSet::from(["my_app", "router", "products"].map(|s| s.into())); + if services != expected_services { + return Err(BoxError::from(format!( + "incomplete traces, got {:?} expected {:?}", + services, expected_services + ))); + } + Ok(()) +} + +fn verify_spans_pesent(trace: &Value) -> Result<(), BoxError> { + let operation_names: HashSet = trace + .select_path("$..operationName")? + .into_iter() + .filter_map(|span_name| span_name.as_string()) + .collect(); + let expected_operation_names: HashSet = HashSet::from( + [ + "execution", + "HTTP POST", + "request", + "router", + "fetch", + //"parse_query", Parse query will only happen once + "query_planning", + "subgraph", + "client_request", + ] + .map(|s| s.into()), + ); + tracing::debug!("found spans {:?}", operation_names); + let missing_operation_names: Vec<_> = expected_operation_names + .iter() + .filter(|o| !operation_names.contains(*o)) + .collect(); + if !missing_operation_names.is_empty() { + return Err(BoxError::from(format!( + "spans did not match, got {:?}, missing {:?}", + operation_names, missing_operation_names + ))); + } + Ok(()) +} + +fn verify_span_parenting(trace: &Value) -> Result<(), BoxError> { + let root_span = trace.select_path("$..spans[?(@.operationName == 'client_request')]")?[0]; + let spans = trace.select_path("$..spans[*]")?; + for span in spans { + let mut span_path = vec![span.select_path("$.operationName")?[0] + .as_str() + .expect("operation name not not found")]; + let mut current = span; + while let Some(parent) = parent_span(trace, current) { + span_path.push( + parent.select_path("$.operationName")?[0] + .as_str() + .expect("operation name not not found"), + ); + current = parent; + } + tracing::debug!("span path to root: '{:?}'", span_path); + if current != root_span { + return Err(BoxError::from(format!( + "span {:?} did not have a path to the root span", + span.select_path("$.operationName")?, + ))); + } + } + Ok(()) +} + +fn parent_span<'a>(trace: &'a Value, span: &'a Value) -> Option<&'a Value> { + span.select_path("$.references[?(@.refType == 'CHILD_OF')].spanID") + .ok()? + .into_iter() + .filter_map(|id| id.as_str()) + .filter_map(|id| { + trace + .select_path(&format!("$..spans[?(@.spanID == '{}')]", id)) + .ok()? + .into_iter() + .next() + }) + .next() +} diff --git a/apollo-router/tests/otlp_test.rs b/apollo-router/tests/otlp_test.rs new file mode 100644 index 0000000000..419737266d --- /dev/null +++ b/apollo-router/tests/otlp_test.rs @@ -0,0 +1,23 @@ +mod common; +use crate::common::TracingTest; +use opentelemetry::sdk::propagation::TraceContextPropagator; +use std::path::Path; +use std::result::Result; +use tower::BoxError; + +#[ignore] +#[tokio::test(flavor = "multi_thread")] +async fn test_otlp_tracing() -> Result<(), BoxError> { + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(opentelemetry_otlp::new_exporter().http()) + .install_batch(opentelemetry::runtime::Tokio)?; + + let router = TracingTest::new( + tracer, + TraceContextPropagator::new(), + Path::new("otlp.router.yaml"), + ); + router.run_query().await; + Ok(()) +} diff --git a/apollo-router/tests/snapshots/integration_tests__traced_basic_composition.snap b/apollo-router/tests/snapshots/integration_tests__traced_basic_composition.snap index b86f0a9e9c..12ef40fbf1 100644 --- a/apollo-router/tests/snapshots/integration_tests__traced_basic_composition.snap +++ b/apollo-router/tests/snapshots/integration_tests__traced_basic_composition.snap @@ -1,6 +1,6 @@ --- source: apollo-router/tests/integration_tests.rs -assertion_line: 141 +assertion_line: 140 expression: get_spans() --- { @@ -18,59 +18,27 @@ expression: get_spans() } }, "children": { - "apollo_router_core::query_cache::get_query": { - "name": "apollo_router_core::query_cache::get_query", + "apollo_router_core::query_planner::sequence": { + "name": "apollo_router_core::query_planner::sequence", "record": { "entries": [], "metadata": { - "name": "get_query", - "target": "apollo_router_core::query_cache", + "name": "sequence", + "target": "apollo_router_core::query_planner", "level": "INFO", - "module_path": "apollo_router_core::query_cache", - "fields": { - "names": [] - } - } - }, - "children": {} - }, - "apollo_router_core::query_planner::bridge_query_planner::plan": { - "name": "apollo_router_core::query_planner::bridge_query_planner::plan", - "record": { - "entries": [], - "metadata": { - "name": "plan", - "target": "apollo_router_core::query_planner::bridge_query_planner", - "level": "INFO", - "module_path": "apollo_router_core::query_planner::bridge_query_planner", - "fields": { - "names": [] - } - } - }, - "children": {} - }, - "apollo_router_core::services::execution_service::execute": { - "name": "apollo_router_core::services::execution_service::execute", - "record": { - "entries": [], - "metadata": { - "name": "execute", - "target": "apollo_router_core::services::execution_service", - "level": "INFO", - "module_path": "apollo_router_core::services::execution_service", + "module_path": "apollo_router_core::query_planner", "fields": { "names": [] } } }, "children": { - "apollo_router_core::query_planner::sequence": { - "name": "apollo_router_core::query_planner::sequence", + "apollo_router_core::query_planner::fetch": { + "name": "apollo_router_core::query_planner::fetch", "record": { "entries": [], "metadata": { - "name": "sequence", + "name": "fetch", "target": "apollo_router_core::query_planner", "level": "INFO", "module_path": "apollo_router_core::query_planner", @@ -80,64 +48,31 @@ expression: get_spans() } }, "children": { - "apollo_router_core::query_planner::fetch": { - "name": "apollo_router_core::query_planner::fetch", + "apollo_router_core::query_planner::fetch::make_variables": { + "name": "apollo_router_core::query_planner::fetch::make_variables", "record": { "entries": [], "metadata": { - "name": "fetch", - "target": "apollo_router_core::query_planner", - "level": "INFO", - "module_path": "apollo_router_core::query_planner", + "name": "make_variables", + "target": "apollo_router_core::query_planner::fetch", + "level": "DEBUG", + "module_path": "apollo_router_core::query_planner::fetch", "fields": { "names": [] } } }, - "children": { - "apollo_router_core::query_planner::fetch::make_variables": { - "name": "apollo_router_core::query_planner::fetch::make_variables", - "record": { - "entries": [], - "metadata": { - "name": "make_variables", - "target": "apollo_router_core::query_planner::fetch", - "level": "DEBUG", - "module_path": "apollo_router_core::query_planner::fetch", - "fields": { - "names": [] - } - } - }, - "children": {} - }, - "apollo_router_core::query_planner::fetch::response_insert": { - "name": "apollo_router_core::query_planner::fetch::response_insert", - "record": { - "entries": [], - "metadata": { - "name": "response_insert", - "target": "apollo_router_core::query_planner::fetch", - "level": "DEBUG", - "module_path": "apollo_router_core::query_planner::fetch", - "fields": { - "names": [] - } - } - }, - "children": {} - } - } + "children": {} }, - "apollo_router_core::query_planner::parallel": { - "name": "apollo_router_core::query_planner::parallel", + "apollo_router_core::query_planner::fetch::response_insert": { + "name": "apollo_router_core::query_planner::fetch::response_insert", "record": { "entries": [], "metadata": { - "name": "parallel", - "target": "apollo_router_core::query_planner", - "level": "INFO", - "module_path": "apollo_router_core::query_planner", + "name": "response_insert", + "target": "apollo_router_core::query_planner::fetch", + "level": "DEBUG", + "module_path": "apollo_router_core::query_planner::fetch", "fields": { "names": [] } @@ -146,6 +81,22 @@ expression: get_spans() "children": {} } } + }, + "apollo_router_core::query_planner::parallel": { + "name": "apollo_router_core::query_planner::parallel", + "record": { + "entries": [], + "metadata": { + "name": "parallel", + "target": "apollo_router_core::query_planner", + "level": "INFO", + "module_path": "apollo_router_core::query_planner", + "fields": { + "names": [] + } + } + }, + "children": {} } } }, diff --git a/apollo-router/tests/snapshots/integration_tests__traced_basic_request.snap b/apollo-router/tests/snapshots/integration_tests__traced_basic_request.snap index b0d3ea1633..e33b9b39b9 100644 --- a/apollo-router/tests/snapshots/integration_tests__traced_basic_request.snap +++ b/apollo-router/tests/snapshots/integration_tests__traced_basic_request.snap @@ -1,6 +1,6 @@ --- source: apollo-router/tests/integration_tests.rs -assertion_line: 126 +assertion_line: 125 expression: get_spans() --- { @@ -18,101 +18,52 @@ expression: get_spans() } }, "children": { - "apollo_router_core::query_cache::get_query": { - "name": "apollo_router_core::query_cache::get_query", + "apollo_router_core::query_planner::fetch": { + "name": "apollo_router_core::query_planner::fetch", "record": { "entries": [], "metadata": { - "name": "get_query", - "target": "apollo_router_core::query_cache", + "name": "fetch", + "target": "apollo_router_core::query_planner", "level": "INFO", - "module_path": "apollo_router_core::query_cache", - "fields": { - "names": [] - } - } - }, - "children": {} - }, - "apollo_router_core::query_planner::bridge_query_planner::plan": { - "name": "apollo_router_core::query_planner::bridge_query_planner::plan", - "record": { - "entries": [], - "metadata": { - "name": "plan", - "target": "apollo_router_core::query_planner::bridge_query_planner", - "level": "INFO", - "module_path": "apollo_router_core::query_planner::bridge_query_planner", - "fields": { - "names": [] - } - } - }, - "children": {} - }, - "apollo_router_core::services::execution_service::execute": { - "name": "apollo_router_core::services::execution_service::execute", - "record": { - "entries": [], - "metadata": { - "name": "execute", - "target": "apollo_router_core::services::execution_service", - "level": "INFO", - "module_path": "apollo_router_core::services::execution_service", + "module_path": "apollo_router_core::query_planner", "fields": { "names": [] } } }, "children": { - "apollo_router_core::query_planner::fetch": { - "name": "apollo_router_core::query_planner::fetch", + "apollo_router_core::query_planner::fetch::make_variables": { + "name": "apollo_router_core::query_planner::fetch::make_variables", "record": { "entries": [], "metadata": { - "name": "fetch", - "target": "apollo_router_core::query_planner", - "level": "INFO", - "module_path": "apollo_router_core::query_planner", + "name": "make_variables", + "target": "apollo_router_core::query_planner::fetch", + "level": "DEBUG", + "module_path": "apollo_router_core::query_planner::fetch", "fields": { "names": [] } } }, - "children": { - "apollo_router_core::query_planner::fetch::make_variables": { - "name": "apollo_router_core::query_planner::fetch::make_variables", - "record": { - "entries": [], - "metadata": { - "name": "make_variables", - "target": "apollo_router_core::query_planner::fetch", - "level": "DEBUG", - "module_path": "apollo_router_core::query_planner::fetch", - "fields": { - "names": [] - } - } - }, - "children": {} - }, - "apollo_router_core::query_planner::fetch::response_insert": { - "name": "apollo_router_core::query_planner::fetch::response_insert", - "record": { - "entries": [], - "metadata": { - "name": "response_insert", - "target": "apollo_router_core::query_planner::fetch", - "level": "DEBUG", - "module_path": "apollo_router_core::query_planner::fetch", - "fields": { - "names": [] - } - } - }, - "children": {} + "children": {} + }, + "apollo_router_core::query_planner::fetch::response_insert": { + "name": "apollo_router_core::query_planner::fetch::response_insert", + "record": { + "entries": [], + "metadata": { + "name": "response_insert", + "target": "apollo_router_core::query_planner::fetch", + "level": "DEBUG", + "module_path": "apollo_router_core::query_planner::fetch", + "fields": { + "names": [] + } } - } + }, + "children": {} } } }, diff --git a/apollo-router/tests/zipkin_test.rs b/apollo-router/tests/zipkin_test.rs new file mode 100644 index 0000000000..310d9022db --- /dev/null +++ b/apollo-router/tests/zipkin_test.rs @@ -0,0 +1,20 @@ +mod common; +use crate::common::TracingTest; +use std::path::Path; +use tower::BoxError; +#[ignore] +#[tokio::test(flavor = "multi_thread")] +async fn test_tracing() -> Result<(), BoxError> { + let tracer = opentelemetry_zipkin::new_pipeline() + .with_service_name("my_app") + .install_batch(opentelemetry::runtime::Tokio)?; + + let router = TracingTest::new( + tracer, + opentelemetry_zipkin::Propagator::new(), + Path::new("zipkin.router.yaml"), + ); + router.run_query().await; + + Ok(()) +} diff --git a/dockerfiles/federation-demo/federation-demo b/dockerfiles/federation-demo/federation-demo index aa5299624b..56a6c41e8b 160000 --- a/dockerfiles/federation-demo/federation-demo +++ b/dockerfiles/federation-demo/federation-demo @@ -1 +1 @@ -Subproject commit aa5299624b456bccefcd76ce3082d679be0b113a +Subproject commit 56a6c41e8b5177658640df2a4c1a29eca470802a diff --git a/xtask/src/commands/test.rs b/xtask/src/commands/test.rs index 32b6986308..c2fd31a30e 100644 --- a/xtask/src/commands/test.rs +++ b/xtask/src/commands/test.rs @@ -33,16 +33,7 @@ impl Test { "--no-demo and --with-demo are mutually exclusive", ); - // NOTE: it worked nicely on GitHub Actions but it hangs on CircleCI on Windows - let _guard: Box = if !std::env::var("CIRCLECI") - .ok() - .unwrap_or_default() - .is_empty() - && cfg!(windows) - { - eprintln!("Not running federation-demo because it makes the step hang on Circle CI."); - Box::new(()) - } else if self.no_demo { + let _guard: Box = if self.no_demo { eprintln!("Flag --no-demo is the default now. Not running federation-demo."); Box::new(()) } else if !self.with_demo { @@ -64,19 +55,30 @@ impl Test { }; let demo = FederationDemoRunner::new()?; - let guard = demo.start_background()?; + let demo_guard = demo.start_background()?; + + let jaeger = JaegerRunner::new()?; + let jaeger_guard = jaeger.start_background()?; if let Some(sub_process) = maybe_pre_compile.as_mut() { eprintln!("Waiting for background process that pre-compiles the test to finish..."); sub_process.wait()?; } - Box::new((demo, guard)) + Box::new((demo, demo_guard, jaeger, jaeger_guard)) }; eprintln!("Running tests"); cargo!(TEST_DEFAULT_ARGS); + #[cfg(windows)] + { + // dirty hack. Node processes on windows will not shut down cleanly. + let _ = std::process::Command::new("taskkill") + .args(["/f", "/im", "node.exe"]) + .spawn(); + } + Ok(()) } } diff --git a/xtask/src/federation_demo.rs b/xtask/src/federation_demo.rs index 1585024f12..62866b1ca1 100644 --- a/xtask/src/federation_demo.rs +++ b/xtask/src/federation_demo.rs @@ -27,11 +27,14 @@ impl FederationDemoRunner { eprintln!("Running federation-demo in background..."); let mut command = Command::new(which::which("npm")?); + + // Pipe to NULL is required for Windows to not hang + // https://github.com/rust-lang/rust/issues/45572 command .current_dir(&self.path) .args(["run", "start"]) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); + .stdout(Stdio::null()) + .stderr(Stdio::null()); let task = BackgroundTask::new(command)?; eprintln!("Waiting for federation-demo services and gateway to be ready..."); diff --git a/xtask/src/jaeger.rs b/xtask/src/jaeger.rs new file mode 100644 index 0000000000..65b9b49132 --- /dev/null +++ b/xtask/src/jaeger.rs @@ -0,0 +1,53 @@ +use crate::*; +use anyhow::Result; +use camino::Utf8PathBuf; +use std::path::PathBuf; +use std::{ + process::{Command, Stdio}, + thread::sleep, + time::Duration, +}; + +pub struct JaegerRunner { + path: Utf8PathBuf, +} + +impl JaegerRunner { + pub fn new() -> Result { + let path = PKG_PROJECT_ROOT.to_path_buf(); + + Ok(Self { path }) + } + + pub fn start_background(&self) -> Result { + eprintln!("Running jaeger in background..."); + let jaeger_path = if cfg!(windows) { + PathBuf::from_iter(["jaeger", "jaeger-all-in-one.exe"]) + } else { + PathBuf::from_iter(["jaeger", "jaeger-all-in-one"]) + }; + + let mut command = Command::new(jaeger_path); + + // Pipe to NULL is required for Windows to not hang + // https://github.com/rust-lang/rust/issues/45572 + command + .current_dir(&self.path) + .stdout(Stdio::null()) + .stderr(Stdio::null()); + + if cfg!(windows) {} + + let task = BackgroundTask::new(command)?; + + loop { + match reqwest::blocking::get("http://localhost:16686") { + Ok(_) => break, + Err(err) => eprintln!("{}", err), + } + sleep(Duration::from_secs(2)); + } + + Ok(task) + } +} diff --git a/xtask/src/lib.rs b/xtask/src/lib.rs index 8b1e3d1ecc..3d41fa9bbb 100644 --- a/xtask/src/lib.rs +++ b/xtask/src/lib.rs @@ -1,10 +1,12 @@ mod federation_demo; +mod jaeger; pub use anyhow; use anyhow::{Context, Result}; use camino::Utf8PathBuf; use cargo_metadata::MetadataCommand; pub use federation_demo::*; +pub use jaeger::*; use once_cell::sync::Lazy; use std::process::{Child, Command}; use std::{convert::TryFrom, env, str};