From 74dfaebee98be7b2ded390cc4b81f6e0bc9477f1 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 27 Jun 2024 11:22:43 +0300 Subject: [PATCH 01/13] traffic_shaping: report timeout as graphql error --- .../axum_factory/axum_http_server_factory.rs | 7 -- apollo-router/src/axum_factory/tests.rs | 11 ++- .../layers/map_future_with_request_data.rs | 1 + .../src/plugins/traffic_shaping/mod.rs | 87 ++++++++++++------- .../plugins/traffic_shaping/timeout/error.rs | 32 ------- .../plugins/traffic_shaping/timeout/future.rs | 57 ------------ .../plugins/traffic_shaping/timeout/layer.rs | 26 ------ .../plugins/traffic_shaping/timeout/mod.rs | 60 ------------- 8 files changed, 68 insertions(+), 213 deletions(-) delete mode 100644 apollo-router/src/plugins/traffic_shaping/timeout/error.rs delete mode 100644 apollo-router/src/plugins/traffic_shaping/timeout/future.rs delete mode 100644 apollo-router/src/plugins/traffic_shaping/timeout/layer.rs delete mode 100644 apollo-router/src/plugins/traffic_shaping/timeout/mod.rs diff --git a/apollo-router/src/axum_factory/axum_http_server_factory.rs b/apollo-router/src/axum_factory/axum_http_server_factory.rs index 8bd0d3e0e8..4f592709f5 100644 --- a/apollo-router/src/axum_factory/axum_http_server_factory.rs +++ b/apollo-router/src/axum_factory/axum_http_server_factory.rs @@ -59,7 +59,6 @@ use crate::http_server_factory::HttpServerFactory; use crate::http_server_factory::HttpServerHandle; use crate::http_server_factory::Listener; use crate::plugins::telemetry::SpanMode; -use crate::plugins::traffic_shaping::Elapsed; use crate::plugins::traffic_shaping::RateLimited; use crate::router::ApolloRouterError; use crate::router_factory::Endpoint; @@ -668,16 +667,10 @@ async fn handle_graphql( if source_err.is::() { return RateLimited::new().into_response(); } - if source_err.is::() { - return Elapsed::new().into_response(); - } } if err.is::() { return RateLimited::new().into_response(); } - if err.is::() { - return Elapsed::new().into_response(); - } internal_server_error(err) } diff --git a/apollo-router/src/axum_factory/tests.rs b/apollo-router/src/axum_factory/tests.rs index 4d668c6dd4..94318eff81 100644 --- a/apollo-router/src/axum_factory/tests.rs +++ b/apollo-router/src/axum_factory/tests.rs @@ -2384,6 +2384,15 @@ async fn test_supergraph_timeout() { .unwrap(); assert_eq!(response.status(), StatusCode::GATEWAY_TIMEOUT); + let body = response.bytes().await.unwrap(); - assert_eq!(std::str::from_utf8(&body).unwrap(), "request timed out"); + let body: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(body, json!({ + "errors": [{ + "message": "Request timed out", + "extensions": { + "code": "REQUEST_TIMED_OUT" + } + }] + })); } diff --git a/apollo-router/src/layers/map_future_with_request_data.rs b/apollo-router/src/layers/map_future_with_request_data.rs index 53faf6a299..0eeff03363 100644 --- a/apollo-router/src/layers/map_future_with_request_data.rs +++ b/apollo-router/src/layers/map_future_with_request_data.rs @@ -36,6 +36,7 @@ where } /// [`Service`] for mapping futures with request data. See [`ServiceBuilderExt::map_future_with_request_data()`](crate::layers::ServiceBuilderExt::map_future_with_request_data()). +#[derive(Clone)] pub struct MapFutureWithRequestDataService { inner: S, req_fn: RF, diff --git a/apollo-router/src/plugins/traffic_shaping/mod.rs b/apollo-router/src/plugins/traffic_shaping/mod.rs index 6c4dd43a0b..c9f74f582a 100644 --- a/apollo-router/src/plugins/traffic_shaping/mod.rs +++ b/apollo-router/src/plugins/traffic_shaping/mod.rs @@ -9,7 +9,6 @@ mod deduplication; pub(crate) mod rate; mod retry; -pub(crate) mod timeout; use std::collections::HashMap; use std::num::NonZeroU64; @@ -17,13 +16,15 @@ use std::sync::Mutex; use std::time::Duration; use futures::future::BoxFuture; +use futures::Future; +use futures::FutureExt; +use futures::TryFutureExt; use http::header::CONTENT_ENCODING; use http::HeaderValue; +use http::StatusCode; use schemars::JsonSchema; use serde::Deserialize; -use tower::retry::Retry; use tower::util::Either; -use tower::util::Oneshot; use tower::BoxError; use tower::Service; use tower::ServiceBuilder; @@ -33,9 +34,9 @@ use self::deduplication::QueryDeduplicationLayer; use self::rate::RateLimitLayer; pub(crate) use self::rate::RateLimited; pub(crate) use self::retry::RetryPolicy; -pub(crate) use self::timeout::Elapsed; -use self::timeout::TimeoutLayer; use crate::error::ConfigurationError; +use crate::graphql; +use crate::layers::ServiceBuilderExt; use crate::plugin::Plugin; use crate::plugin::PluginInit; use crate::register_plugin; @@ -266,15 +267,7 @@ impl Plugin for TrafficShaping { pub(crate) type TrafficShapingSubgraphFuture = Either< Either< BoxFuture<'static, Result>, - timeout::future::ResponseFuture< - Oneshot< - Either< - Retry, S>>, - Either, S>, - >, - subgraph::Request, - >, - >, + BoxFuture<'static, Result>, >, >::Future, >; @@ -295,9 +288,7 @@ impl TrafficShaping { supergraph::Request, Response = supergraph::Response, Error = BoxError, - Future = timeout::future::ResponseFuture< - Oneshot, S>, supergraph::Request>, - >, + Future = BoxFuture<'static, Result>, > + Clone + Send + Sync @@ -310,14 +301,27 @@ impl TrafficShaping { + 'static, >::Future: std::marker::Send, { + let timeout = self + .config + .router + .as_ref() + .and_then(|r| r.timeout) + .unwrap_or(DEFAULT_TIMEOUT); ServiceBuilder::new() - .layer(TimeoutLayer::new( - self.config - .router - .as_ref() - .and_then(|r| r.timeout) - .unwrap_or(DEFAULT_TIMEOUT), - )) + .map_future_with_request_data( + |req: &supergraph::Request| req.context.clone(), + move |ctx, response| { + request_timeout(timeout, response) + .unwrap_or_else(|error| { + supergraph::Response::error_builder() + .status_code(StatusCode::GATEWAY_TIMEOUT) + .error(error) + .context(ctx) + .build() + }) + .boxed() + }, + ) .option_layer(self.rate_limit_router.clone()) .service(service) } @@ -375,16 +379,23 @@ impl TrafficShaping { tower::retry::RetryLayer::new(retry_policy) }); + let timeout = config.shaping.timeout.unwrap_or(DEFAULT_TIMEOUT); Either::A(ServiceBuilder::new() - .option_layer(config.shaping.deduplicate_query.unwrap_or_default().then( QueryDeduplicationLayer::default )) - .layer(TimeoutLayer::new( - config.shaping - .timeout - .unwrap_or(DEFAULT_TIMEOUT), - )) + .map_future_with_request_data( + |req: &subgraph::Request| req.context.clone(), + move |ctx, response| { + request_timeout(timeout, response).unwrap_or_else(|error| { + subgraph::Response::error_builder() + .status_code(StatusCode::GATEWAY_TIMEOUT) + .error(error) + .context(ctx) + .build() + }).boxed() + }, + ) .option_layer(retry) .option_layer(rate_limit) .service(service) @@ -413,6 +424,22 @@ impl TrafficShaping { register_plugin!("apollo", "traffic_shaping", TrafficShaping); +fn request_timeout( + duration: Duration, + future: F, +) -> impl Future> +where + F: Future + std::marker::Send, +{ + tokio::time::timeout(duration, future).map_err(|_| { + tracing::info!(monotonic_counter.apollo_router_timeout = 1u64,); + graphql::Error::builder() + .message(String::from("Request timed out")) + .extension_code("REQUEST_TIMED_OUT") + .build() + }) +} + #[cfg(test)] mod test { use std::num::NonZeroUsize; diff --git a/apollo-router/src/plugins/traffic_shaping/timeout/error.rs b/apollo-router/src/plugins/traffic_shaping/timeout/error.rs deleted file mode 100644 index 66ac450b4a..0000000000 --- a/apollo-router/src/plugins/traffic_shaping/timeout/error.rs +++ /dev/null @@ -1,32 +0,0 @@ -//! Error types - -use std::error; -use std::fmt; - -use axum::response::IntoResponse; -use http::StatusCode; - -/// The timeout elapsed. -#[derive(Debug, Default)] -pub(crate) struct Elapsed; - -impl Elapsed { - /// Construct a new elapsed error - pub(crate) fn new() -> Self { - Elapsed {} - } -} - -impl fmt::Display for Elapsed { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("request timed out") - } -} - -impl IntoResponse for Elapsed { - fn into_response(self) -> axum::response::Response { - (StatusCode::GATEWAY_TIMEOUT, self.to_string()).into_response() - } -} - -impl error::Error for Elapsed {} diff --git a/apollo-router/src/plugins/traffic_shaping/timeout/future.rs b/apollo-router/src/plugins/traffic_shaping/timeout/future.rs deleted file mode 100644 index 8a390b393e..0000000000 --- a/apollo-router/src/plugins/traffic_shaping/timeout/future.rs +++ /dev/null @@ -1,57 +0,0 @@ -//! Future types - -use std::future::Future; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -use pin_project_lite::pin_project; -use tokio::time::Sleep; - -use super::error::Elapsed; - -pin_project! { - /// [`Timeout`] response future - /// - /// [`Timeout`]: crate::timeout::Timeout - #[derive(Debug)] - pub(crate) struct ResponseFuture { - #[pin] - response: T, - #[pin] - sleep: Pin>, - } -} - -impl ResponseFuture { - pub(crate) fn new(response: T, sleep: Pin>) -> Self { - ResponseFuture { response, sleep } - } -} - -impl Future for ResponseFuture -where - F: Future>, - E: Into, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - - // First, try polling the future - match this.response.poll(cx) { - Poll::Ready(v) => return Poll::Ready(v.map_err(Into::into)), - Poll::Pending => {} - } - - // Now check the sleep - match Pin::new(&mut this.sleep).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(_) => { - tracing::info!(monotonic_counter.apollo_router_timeout = 1u64,); - Poll::Ready(Err(Elapsed::new().into())) - } - } - } -} diff --git a/apollo-router/src/plugins/traffic_shaping/timeout/layer.rs b/apollo-router/src/plugins/traffic_shaping/timeout/layer.rs deleted file mode 100644 index fd1b5ea59e..0000000000 --- a/apollo-router/src/plugins/traffic_shaping/timeout/layer.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::time::Duration; - -use tower::Layer; - -use super::Timeout; - -/// Applies a timeout to requests via the supplied inner service. -#[derive(Debug, Clone)] -pub(crate) struct TimeoutLayer { - timeout: Duration, -} - -impl TimeoutLayer { - /// Create a timeout from a duration - pub(crate) fn new(timeout: Duration) -> Self { - TimeoutLayer { timeout } - } -} - -impl Layer for TimeoutLayer { - type Service = Timeout; - - fn layer(&self, service: S) -> Self::Service { - Timeout::new(service, self.timeout) - } -} diff --git a/apollo-router/src/plugins/traffic_shaping/timeout/mod.rs b/apollo-router/src/plugins/traffic_shaping/timeout/mod.rs deleted file mode 100644 index 6b7cb9abce..0000000000 --- a/apollo-router/src/plugins/traffic_shaping/timeout/mod.rs +++ /dev/null @@ -1,60 +0,0 @@ -//! This is a modified Timeout service copy/pasted from the tower codebase. -//! This Timeout is also checking if we do not timeout on the `poll_ready` and not only on the `call` part -//! Middleware that applies a timeout to requests. -//! -//! If the response does not complete within the specified timeout, the response -//! will be aborted. - -pub(crate) mod error; -pub(crate) mod future; -mod layer; - -use std::task::Context; -use std::task::Poll; -use std::time::Duration; - -use tower::util::Oneshot; -use tower::Service; -use tower::ServiceExt; - -use self::future::ResponseFuture; -pub(crate) use self::layer::TimeoutLayer; -pub(crate) use crate::plugins::traffic_shaping::timeout::error::Elapsed; - -/// Applies a timeout to requests. -#[derive(Debug, Clone)] -pub(crate) struct Timeout { - inner: T, - timeout: Duration, -} - -// ===== impl Timeout ===== - -impl Timeout { - /// Creates a new [`Timeout`] - pub(crate) fn new(inner: T, timeout: Duration) -> Self { - Timeout { inner, timeout } - } -} - -impl Service for Timeout -where - S: Service + Clone, - S::Error: Into, -{ - type Response = S::Response; - type Error = tower::BoxError; - type Future = ResponseFuture>; - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, request: Request) -> Self::Future { - let service = self.inner.clone(); - - let response = service.oneshot(request); - - ResponseFuture::new(response, Box::pin(tokio::time::sleep(self.timeout))) - } -} From ba68ca2d1fcdc5697a7b1650cc04de57c6162527 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 4 Jul 2024 03:19:36 +0300 Subject: [PATCH 02/13] fix test --- apollo-router/src/axum_factory/tests.rs | 2 +- .../src/plugins/traffic_shaping/mod.rs | 17 ++++++----------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/apollo-router/src/axum_factory/tests.rs b/apollo-router/src/axum_factory/tests.rs index 94318eff81..b331a71c7a 100644 --- a/apollo-router/src/axum_factory/tests.rs +++ b/apollo-router/src/axum_factory/tests.rs @@ -2391,7 +2391,7 @@ async fn test_supergraph_timeout() { "errors": [{ "message": "Request timed out", "extensions": { - "code": "REQUEST_TIMED_OUT" + "code": "REQUEST_TIMEOUT" } }] })); diff --git a/apollo-router/src/plugins/traffic_shaping/mod.rs b/apollo-router/src/plugins/traffic_shaping/mod.rs index c9f74f582a..c33f2eefec 100644 --- a/apollo-router/src/plugins/traffic_shaping/mod.rs +++ b/apollo-router/src/plugins/traffic_shaping/mod.rs @@ -435,7 +435,7 @@ where tracing::info!(monotonic_counter.apollo_router_timeout = 1u64,); graphql::Error::builder() .message(String::from("Request timed out")) - .extension_code("REQUEST_TIMED_OUT") + .extension_code("REQUEST_TIMEOUT") .build() }) } @@ -825,16 +825,11 @@ mod test { let mut mock_service = MockSupergraphService::new(); mock_service.expect_clone().returning(|| { let mut mock_service = MockSupergraphService::new(); - - mock_service.expect_clone().returning(|| { - let mut mock_service = MockSupergraphService::new(); - mock_service.expect_call().times(0..2).returning(move |_| { - Ok(SupergraphResponse::fake_builder() - .data(json!({ "test": 1234_u32 })) - .build() - .unwrap()) - }); - mock_service + mock_service.expect_call().times(0..2).returning(move |_| { + Ok(SupergraphResponse::fake_builder() + .data(json!({ "test": 1234_u32 })) + .build() + .unwrap()) }); mock_service }); From e57d12894e5abc7144742da1176e693a822f9d44 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 4 Jul 2024 03:37:33 +0300 Subject: [PATCH 03/13] add integration tests --- apollo-router/tests/integration/mod.rs | 1 + ...on__traffic_shaping__subgraph_timeout.snap | 5 ++ ...__traffic_shaping__supergraph_timeout.snap | 5 ++ .../tests/integration/traffic_shaping.rs | 57 +++++++++++++++++++ 4 files changed, 68 insertions(+) create mode 100644 apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__subgraph_timeout.snap create mode 100644 apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__supergraph_timeout.snap create mode 100644 apollo-router/tests/integration/traffic_shaping.rs diff --git a/apollo-router/tests/integration/mod.rs b/apollo-router/tests/integration/mod.rs index f3432b12c4..2cbb1029d3 100644 --- a/apollo-router/tests/integration/mod.rs +++ b/apollo-router/tests/integration/mod.rs @@ -8,6 +8,7 @@ mod docs; mod file_upload; mod lifecycle; mod operation_limits; +mod traffic_shaping; #[cfg(any(not(feature = "ci"), all(target_arch = "x86_64", target_os = "linux")))] mod redis; diff --git a/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__subgraph_timeout.snap b/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__subgraph_timeout.snap new file mode 100644 index 0000000000..671e207784 --- /dev/null +++ b/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__subgraph_timeout.snap @@ -0,0 +1,5 @@ +--- +source: apollo-router/tests/integration/traffic_shaping.rs +expression: response.text().await? +--- +"{\"data\":null,\"errors\":[{\"message\":\"Request timed out\",\"extensions\":{\"code\":\"REQUEST_TIMEOUT\"}}]}" diff --git a/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__supergraph_timeout.snap b/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__supergraph_timeout.snap new file mode 100644 index 0000000000..d09e20a31d --- /dev/null +++ b/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__supergraph_timeout.snap @@ -0,0 +1,5 @@ +--- +source: apollo-router/tests/integration/traffic_shaping.rs +expression: response.text().await? +--- +"{\"errors\":[{\"message\":\"Request timed out\",\"extensions\":{\"code\":\"REQUEST_TIMEOUT\"}}]}" diff --git a/apollo-router/tests/integration/traffic_shaping.rs b/apollo-router/tests/integration/traffic_shaping.rs new file mode 100644 index 0000000000..ae5d7b2041 --- /dev/null +++ b/apollo-router/tests/integration/traffic_shaping.rs @@ -0,0 +1,57 @@ +use std::time::Duration; + +use insta::assert_yaml_snapshot; +use tower::BoxError; +use wiremock::ResponseTemplate; + +use crate::integration::IntegrationTest; + +#[tokio::test(flavor = "multi_thread")] +async fn test_supergraph_timeout() -> Result<(), BoxError> { + let mut router = IntegrationTest::builder() + .config( + r#" + traffic_shaping: + router: + timeout: 10ms + "#, + ) + .responder(ResponseTemplate::new(500).set_delay(Duration::from_millis(20))) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let (_trace_id, response) = router.execute_default_query().await; + assert_eq!(response.status(), 504); + assert_yaml_snapshot!(response.text().await?); + router.graceful_shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_subgraph_timeout() -> Result<(), BoxError> { + let mut router = IntegrationTest::builder() + .config( + r#" + include_subgraph_errors: + all: true + traffic_shaping: + all: + timeout: 10ms + "#, + ) + .responder(ResponseTemplate::new(500).set_delay(Duration::from_millis(20))) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let (_trace_id, response) = router.execute_default_query().await; + assert_eq!(response.status(), 200); + assert_yaml_snapshot!(response.text().await?); + router.graceful_shutdown().await; + Ok(()) +} From 58e1c2d2488ad9fc73a5f520f99f9301a9f1eec2 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 4 Jul 2024 05:16:28 +0300 Subject: [PATCH 04/13] bring back timeout layer --- .../src/plugins/traffic_shaping/mod.rs | 89 ++++++++++--------- .../plugins/traffic_shaping/timeout/error.rs | 35 ++++++++ .../plugins/traffic_shaping/timeout/future.rs | 57 ++++++++++++ .../plugins/traffic_shaping/timeout/layer.rs | 26 ++++++ .../plugins/traffic_shaping/timeout/mod.rs | 60 +++++++++++++ 5 files changed, 223 insertions(+), 44 deletions(-) create mode 100644 apollo-router/src/plugins/traffic_shaping/timeout/error.rs create mode 100644 apollo-router/src/plugins/traffic_shaping/timeout/future.rs create mode 100644 apollo-router/src/plugins/traffic_shaping/timeout/layer.rs create mode 100644 apollo-router/src/plugins/traffic_shaping/timeout/mod.rs diff --git a/apollo-router/src/plugins/traffic_shaping/mod.rs b/apollo-router/src/plugins/traffic_shaping/mod.rs index c33f2eefec..32f83e95eb 100644 --- a/apollo-router/src/plugins/traffic_shaping/mod.rs +++ b/apollo-router/src/plugins/traffic_shaping/mod.rs @@ -9,16 +9,15 @@ mod deduplication; pub(crate) mod rate; mod retry; +pub(crate) mod timeout; use std::collections::HashMap; use std::num::NonZeroU64; use std::sync::Mutex; use std::time::Duration; -use futures::future::BoxFuture; -use futures::Future; use futures::FutureExt; -use futures::TryFutureExt; +use futures::future::BoxFuture; use http::header::CONTENT_ENCODING; use http::HeaderValue; use http::StatusCode; @@ -34,6 +33,8 @@ use self::deduplication::QueryDeduplicationLayer; use self::rate::RateLimitLayer; pub(crate) use self::rate::RateLimited; pub(crate) use self::retry::RetryPolicy; +pub(crate) use self::timeout::Elapsed; +use self::timeout::TimeoutLayer; use crate::error::ConfigurationError; use crate::graphql; use crate::layers::ServiceBuilderExt; @@ -301,27 +302,32 @@ impl TrafficShaping { + 'static, >::Future: std::marker::Send, { - let timeout = self - .config - .router - .as_ref() - .and_then(|r| r.timeout) - .unwrap_or(DEFAULT_TIMEOUT); ServiceBuilder::new() .map_future_with_request_data( |req: &supergraph::Request| req.context.clone(), - move |ctx, response| { - request_timeout(timeout, response) - .unwrap_or_else(|error| { - supergraph::Response::error_builder() - .status_code(StatusCode::GATEWAY_TIMEOUT) - .error(error) - .context(ctx) - .build() - }) - .boxed() + move |ctx, future| { + async { + let response: Result = future.await; + match response { + Err(error) if error.is::() => { + supergraph::Response::error_builder() + .status_code(StatusCode::GATEWAY_TIMEOUT) + .error::(Elapsed::new().into()) + .context(ctx) + .build() + } + _ => response, + } + }.boxed() }, ) + .layer(TimeoutLayer::new( + self.config + .router + .as_ref() + .and_then(|r| r.timeout) + .unwrap_or(DEFAULT_TIMEOUT), + )) .option_layer(self.rate_limit_router.clone()) .service(service) } @@ -379,23 +385,34 @@ impl TrafficShaping { tower::retry::RetryLayer::new(retry_policy) }); - let timeout = config.shaping.timeout.unwrap_or(DEFAULT_TIMEOUT); Either::A(ServiceBuilder::new() + .option_layer(config.shaping.deduplicate_query.unwrap_or_default().then( QueryDeduplicationLayer::default )) .map_future_with_request_data( |req: &subgraph::Request| req.context.clone(), - move |ctx, response| { - request_timeout(timeout, response).unwrap_or_else(|error| { - subgraph::Response::error_builder() - .status_code(StatusCode::GATEWAY_TIMEOUT) - .error(error) - .context(ctx) - .build() - }).boxed() + move |ctx, future| { + async { + let response: Result = future.await; + match response { + Err(error) if error.is::() => { + subgraph::Response::error_builder() + .status_code(StatusCode::GATEWAY_TIMEOUT) + .error::(Elapsed::new().into()) + .context(ctx) + .build() + } + _ => response, + } + }.boxed() }, ) + .layer(TimeoutLayer::new( + config.shaping + .timeout + .unwrap_or(DEFAULT_TIMEOUT), + )) .option_layer(retry) .option_layer(rate_limit) .service(service) @@ -424,22 +441,6 @@ impl TrafficShaping { register_plugin!("apollo", "traffic_shaping", TrafficShaping); -fn request_timeout( - duration: Duration, - future: F, -) -> impl Future> -where - F: Future + std::marker::Send, -{ - tokio::time::timeout(duration, future).map_err(|_| { - tracing::info!(monotonic_counter.apollo_router_timeout = 1u64,); - graphql::Error::builder() - .message(String::from("Request timed out")) - .extension_code("REQUEST_TIMEOUT") - .build() - }) -} - #[cfg(test)] mod test { use std::num::NonZeroUsize; diff --git a/apollo-router/src/plugins/traffic_shaping/timeout/error.rs b/apollo-router/src/plugins/traffic_shaping/timeout/error.rs new file mode 100644 index 0000000000..6fdb86d28b --- /dev/null +++ b/apollo-router/src/plugins/traffic_shaping/timeout/error.rs @@ -0,0 +1,35 @@ +//! Error types + +use std::error; +use std::fmt; + +use http::StatusCode; +use crate::graphql; + +/// The timeout elapsed. +#[derive(Debug, Default)] +pub(crate) struct Elapsed; + +impl Elapsed { + /// Construct a new elapsed error + pub(crate) fn new() -> Self { + Elapsed {} + } +} + +impl fmt::Display for Elapsed { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("request timed out") + } +} + +impl Into for Elapsed { + fn into(self) -> graphql::Error { + graphql::Error::builder() + .message(String::from("Request timed out")) + .extension_code("REQUEST_TIMEOUT") + .build() + } +} + +impl error::Error for Elapsed {} diff --git a/apollo-router/src/plugins/traffic_shaping/timeout/future.rs b/apollo-router/src/plugins/traffic_shaping/timeout/future.rs new file mode 100644 index 0000000000..8a390b393e --- /dev/null +++ b/apollo-router/src/plugins/traffic_shaping/timeout/future.rs @@ -0,0 +1,57 @@ +//! Future types + +use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use pin_project_lite::pin_project; +use tokio::time::Sleep; + +use super::error::Elapsed; + +pin_project! { + /// [`Timeout`] response future + /// + /// [`Timeout`]: crate::timeout::Timeout + #[derive(Debug)] + pub(crate) struct ResponseFuture { + #[pin] + response: T, + #[pin] + sleep: Pin>, + } +} + +impl ResponseFuture { + pub(crate) fn new(response: T, sleep: Pin>) -> Self { + ResponseFuture { response, sleep } + } +} + +impl Future for ResponseFuture +where + F: Future>, + E: Into, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + // First, try polling the future + match this.response.poll(cx) { + Poll::Ready(v) => return Poll::Ready(v.map_err(Into::into)), + Poll::Pending => {} + } + + // Now check the sleep + match Pin::new(&mut this.sleep).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(_) => { + tracing::info!(monotonic_counter.apollo_router_timeout = 1u64,); + Poll::Ready(Err(Elapsed::new().into())) + } + } + } +} diff --git a/apollo-router/src/plugins/traffic_shaping/timeout/layer.rs b/apollo-router/src/plugins/traffic_shaping/timeout/layer.rs new file mode 100644 index 0000000000..fd1b5ea59e --- /dev/null +++ b/apollo-router/src/plugins/traffic_shaping/timeout/layer.rs @@ -0,0 +1,26 @@ +use std::time::Duration; + +use tower::Layer; + +use super::Timeout; + +/// Applies a timeout to requests via the supplied inner service. +#[derive(Debug, Clone)] +pub(crate) struct TimeoutLayer { + timeout: Duration, +} + +impl TimeoutLayer { + /// Create a timeout from a duration + pub(crate) fn new(timeout: Duration) -> Self { + TimeoutLayer { timeout } + } +} + +impl Layer for TimeoutLayer { + type Service = Timeout; + + fn layer(&self, service: S) -> Self::Service { + Timeout::new(service, self.timeout) + } +} diff --git a/apollo-router/src/plugins/traffic_shaping/timeout/mod.rs b/apollo-router/src/plugins/traffic_shaping/timeout/mod.rs new file mode 100644 index 0000000000..6b7cb9abce --- /dev/null +++ b/apollo-router/src/plugins/traffic_shaping/timeout/mod.rs @@ -0,0 +1,60 @@ +//! This is a modified Timeout service copy/pasted from the tower codebase. +//! This Timeout is also checking if we do not timeout on the `poll_ready` and not only on the `call` part +//! Middleware that applies a timeout to requests. +//! +//! If the response does not complete within the specified timeout, the response +//! will be aborted. + +pub(crate) mod error; +pub(crate) mod future; +mod layer; + +use std::task::Context; +use std::task::Poll; +use std::time::Duration; + +use tower::util::Oneshot; +use tower::Service; +use tower::ServiceExt; + +use self::future::ResponseFuture; +pub(crate) use self::layer::TimeoutLayer; +pub(crate) use crate::plugins::traffic_shaping::timeout::error::Elapsed; + +/// Applies a timeout to requests. +#[derive(Debug, Clone)] +pub(crate) struct Timeout { + inner: T, + timeout: Duration, +} + +// ===== impl Timeout ===== + +impl Timeout { + /// Creates a new [`Timeout`] + pub(crate) fn new(inner: T, timeout: Duration) -> Self { + Timeout { inner, timeout } + } +} + +impl Service for Timeout +where + S: Service + Clone, + S::Error: Into, +{ + type Response = S::Response; + type Error = tower::BoxError; + type Future = ResponseFuture>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: Request) -> Self::Future { + let service = self.inner.clone(); + + let response = service.oneshot(request); + + ResponseFuture::new(response, Box::pin(tokio::time::sleep(self.timeout))) + } +} From 3b9ba13c4ea4be27595d3b83e5a44d19cd5a6daf Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 4 Jul 2024 07:58:16 +0300 Subject: [PATCH 05/13] Also fix rate limit error --- .../axum_factory/axum_http_server_factory.rs | 14 +- .../src/plugins/traffic_shaping/mod.rs | 128 +++++++++++++----- .../src/plugins/traffic_shaping/rate/error.rs | 12 +- .../plugins/traffic_shaping/timeout/error.rs | 1 - ..._traffic_shaping__router_rate_limit-2.snap | 5 + ...n__traffic_shaping__router_rate_limit.snap | 5 + ...ion__traffic_shaping__router_timeout.snap} | 0 ...raffic_shaping__subgraph_rate_limit-2.snap | 5 + ..._traffic_shaping__subgraph_rate_limit.snap | 5 + .../tests/integration/traffic_shaping.rs | 98 ++++++++++++-- 10 files changed, 208 insertions(+), 65 deletions(-) create mode 100644 apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__router_rate_limit-2.snap create mode 100644 apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__router_rate_limit.snap rename apollo-router/tests/integration/snapshots/{integration_tests__integration__traffic_shaping__supergraph_timeout.snap => integration_tests__integration__traffic_shaping__router_timeout.snap} (100%) create mode 100644 apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__subgraph_rate_limit-2.snap create mode 100644 apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__subgraph_rate_limit.snap diff --git a/apollo-router/src/axum_factory/axum_http_server_factory.rs b/apollo-router/src/axum_factory/axum_http_server_factory.rs index 4f592709f5..f687440f4c 100644 --- a/apollo-router/src/axum_factory/axum_http_server_factory.rs +++ b/apollo-router/src/axum_factory/axum_http_server_factory.rs @@ -59,7 +59,6 @@ use crate::http_server_factory::HttpServerFactory; use crate::http_server_factory::HttpServerHandle; use crate::http_server_factory::Listener; use crate::plugins::telemetry::SpanMode; -use crate::plugins::traffic_shaping::RateLimited; use crate::router::ApolloRouterError; use crate::router_factory::Endpoint; use crate::router_factory::RouterFactory; @@ -662,18 +661,7 @@ async fn handle_graphql( ); match res { - Err(err) => { - if let Some(source_err) = err.source() { - if source_err.is::() { - return RateLimited::new().into_response(); - } - } - if err.is::() { - return RateLimited::new().into_response(); - } - - internal_server_error(err) - } + Err(err) => internal_server_error(err), Ok(response) => { let (mut parts, body) = response.response.into_parts(); diff --git a/apollo-router/src/plugins/traffic_shaping/mod.rs b/apollo-router/src/plugins/traffic_shaping/mod.rs index 32f83e95eb..433f3f0d09 100644 --- a/apollo-router/src/plugins/traffic_shaping/mod.rs +++ b/apollo-router/src/plugins/traffic_shaping/mod.rs @@ -16,8 +16,8 @@ use std::num::NonZeroU64; use std::sync::Mutex; use std::time::Duration; -use futures::FutureExt; use futures::future::BoxFuture; +use futures::FutureExt; use http::header::CONTENT_ENCODING; use http::HeaderValue; use http::StatusCode; @@ -31,9 +31,9 @@ use tower::ServiceExt; use self::deduplication::QueryDeduplicationLayer; use self::rate::RateLimitLayer; -pub(crate) use self::rate::RateLimited; +use self::rate::RateLimited; pub(crate) use self::retry::RetryPolicy; -pub(crate) use self::timeout::Elapsed; +use self::timeout::Elapsed; use self::timeout::TimeoutLayer; use crate::error::ConfigurationError; use crate::graphql; @@ -316,9 +316,17 @@ impl TrafficShaping { .context(ctx) .build() } + Err(error) if error.is::() => { + supergraph::Response::error_builder() + .status_code(StatusCode::TOO_MANY_REQUESTS) + .error::(RateLimited::new().into()) + .context(ctx) + .build() + } _ => response, } - }.boxed() + } + .boxed() }, ) .layer(TimeoutLayer::new( @@ -403,6 +411,13 @@ impl TrafficShaping { .context(ctx) .build() } + Err(error) if error.is::() => { + subgraph::Response::error_builder() + .status_code(StatusCode::TOO_MANY_REQUESTS) + .error::(RateLimited::new().into()) + .context(ctx) + .build() + } _ => response, } }.boxed() @@ -452,6 +467,7 @@ mod test { use serde_json_bytes::ByteString; use serde_json_bytes::Value; use tower::Service; + use maplit::hashmap; use super::*; use crate::json_ext::Object; @@ -772,41 +788,64 @@ mod test { let plugin = get_traffic_shaping_plugin(&config).await; - let test_service = MockSubgraph::new(HashMap::new()); + let test_service = MockSubgraph::new(hashmap! { + graphql::Request::default() => graphql::Response::default() + }); - let _response = plugin + assert!(&plugin .as_any() .downcast_ref::() .unwrap() .subgraph_service_internal("test", test_service.clone()) .oneshot(SubgraphRequest::fake_builder().build()) .await - .unwrap(); - let _response = plugin - .as_any() - .downcast_ref::() .unwrap() - .subgraph_service_internal("test", test_service.clone()) - .oneshot(SubgraphRequest::fake_builder().build()) - .await - .expect_err("should be in error due to a timeout and rate limit"); - let _response = plugin + .response + .body() + .errors + .is_empty()); + assert_eq!( + plugin + .as_any() + .downcast_ref::() + .unwrap() + .subgraph_service_internal("test", test_service.clone()) + .oneshot(SubgraphRequest::fake_builder().build()) + .await + .unwrap() + .response + .body() + .errors[0] + .extensions + .get("code") + .unwrap(), + "REQUEST_RATE_LIMITED" + ); + assert!(plugin .as_any() .downcast_ref::() .unwrap() .subgraph_service_internal("another", test_service.clone()) .oneshot(SubgraphRequest::fake_builder().build()) .await - .unwrap(); + .unwrap() + .response + .body() + .errors + .is_empty()); tokio::time::sleep(Duration::from_millis(300)).await; - let _response = plugin + assert!(plugin .as_any() .downcast_ref::() .unwrap() .subgraph_service_internal("test", test_service.clone()) .oneshot(SubgraphRequest::fake_builder().build()) .await - .unwrap(); + .unwrap() + .response + .body() + .errors + .is_empty()); } #[tokio::test(flavor = "multi_thread")] @@ -826,16 +865,21 @@ mod test { let mut mock_service = MockSupergraphService::new(); mock_service.expect_clone().returning(|| { let mut mock_service = MockSupergraphService::new(); - mock_service.expect_call().times(0..2).returning(move |_| { - Ok(SupergraphResponse::fake_builder() - .data(json!({ "test": 1234_u32 })) - .build() - .unwrap()) + + mock_service.expect_clone().returning(|| { + let mut mock_service = MockSupergraphService::new(); + mock_service.expect_call().times(0..2).returning(move |_| { + Ok(SupergraphResponse::fake_builder() + .data(json!({ "test": 1234_u32 })) + .build() + .unwrap()) + }); + mock_service }); mock_service }); - let _response = plugin + assert!(plugin .as_any() .downcast_ref::() .unwrap() @@ -845,18 +889,30 @@ mod test { .unwrap() .next_response() .await - .unwrap(); - - assert!(plugin - .as_any() - .downcast_ref::() .unwrap() - .supergraph_service_internal(mock_service.clone()) - .oneshot(SupergraphRequest::fake_builder().build().unwrap()) - .await - .is_err()); + .errors + .is_empty()); + + assert_eq!( + plugin + .as_any() + .downcast_ref::() + .unwrap() + .supergraph_service_internal(mock_service.clone()) + .oneshot(SupergraphRequest::fake_builder().build().unwrap()) + .await + .unwrap() + .next_response() + .await + .unwrap() + .errors[0] + .extensions + .get("code") + .unwrap(), + "REQUEST_RATE_LIMITED" + ); tokio::time::sleep(Duration::from_millis(300)).await; - let _response = plugin + assert!(plugin .as_any() .downcast_ref::() .unwrap() @@ -866,6 +922,8 @@ mod test { .unwrap() .next_response() .await - .unwrap(); + .unwrap() + .errors + .is_empty()); } } diff --git a/apollo-router/src/plugins/traffic_shaping/rate/error.rs b/apollo-router/src/plugins/traffic_shaping/rate/error.rs index 6e06c5823a..f413c6bd16 100644 --- a/apollo-router/src/plugins/traffic_shaping/rate/error.rs +++ b/apollo-router/src/plugins/traffic_shaping/rate/error.rs @@ -3,8 +3,7 @@ use std::error; use std::fmt; -use axum::response::IntoResponse; -use http::StatusCode; +use crate::graphql; /// The rate limit error. #[derive(Debug, Default)] @@ -23,9 +22,12 @@ impl fmt::Display for RateLimited { } } -impl IntoResponse for RateLimited { - fn into_response(self) -> axum::response::Response { - (StatusCode::TOO_MANY_REQUESTS, self.to_string()).into_response() +impl Into for RateLimited { + fn into(self) -> graphql::Error { + graphql::Error::builder() + .message(String::from("Your request has been rate limited")) + .extension_code("REQUEST_RATE_LIMITED") + .build() } } diff --git a/apollo-router/src/plugins/traffic_shaping/timeout/error.rs b/apollo-router/src/plugins/traffic_shaping/timeout/error.rs index 6fdb86d28b..90ccc78c25 100644 --- a/apollo-router/src/plugins/traffic_shaping/timeout/error.rs +++ b/apollo-router/src/plugins/traffic_shaping/timeout/error.rs @@ -3,7 +3,6 @@ use std::error; use std::fmt; -use http::StatusCode; use crate::graphql; /// The timeout elapsed. diff --git a/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__router_rate_limit-2.snap b/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__router_rate_limit-2.snap new file mode 100644 index 0000000000..4f2f8fe321 --- /dev/null +++ b/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__router_rate_limit-2.snap @@ -0,0 +1,5 @@ +--- +source: apollo-router/tests/integration/traffic_shaping.rs +expression: response +--- +"{\"errors\":[{\"message\":\"Your request has been rate limited\",\"extensions\":{\"code\":\"REQUEST_RATE_LIMITED\"}}]}" diff --git a/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__router_rate_limit.snap b/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__router_rate_limit.snap new file mode 100644 index 0000000000..ba9702ddeb --- /dev/null +++ b/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__router_rate_limit.snap @@ -0,0 +1,5 @@ +--- +source: apollo-router/tests/integration/traffic_shaping.rs +expression: response +--- +"{\"data\":{\"topProducts\":[{\"name\":\"Table\"},{\"name\":\"Couch\"},{\"name\":\"Chair\"}]}}" diff --git a/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__supergraph_timeout.snap b/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__router_timeout.snap similarity index 100% rename from apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__supergraph_timeout.snap rename to apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__router_timeout.snap diff --git a/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__subgraph_rate_limit-2.snap b/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__subgraph_rate_limit-2.snap new file mode 100644 index 0000000000..584b125252 --- /dev/null +++ b/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__subgraph_rate_limit-2.snap @@ -0,0 +1,5 @@ +--- +source: apollo-router/tests/integration/traffic_shaping.rs +expression: response +--- +"{\"data\":null,\"errors\":[{\"message\":\"Your request has been rate limited\",\"extensions\":{\"code\":\"REQUEST_RATE_LIMITED\"}}]}" diff --git a/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__subgraph_rate_limit.snap b/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__subgraph_rate_limit.snap new file mode 100644 index 0000000000..72c6aac169 --- /dev/null +++ b/apollo-router/tests/integration/snapshots/integration_tests__integration__traffic_shaping__subgraph_rate_limit.snap @@ -0,0 +1,5 @@ +--- +source: apollo-router/tests/integration/traffic_shaping.rs +expression: response.text().await? +--- +"{\"data\":{\"topProducts\":[{\"name\":\"Table\"},{\"name\":\"Couch\"},{\"name\":\"Chair\"}]}}" diff --git a/apollo-router/tests/integration/traffic_shaping.rs b/apollo-router/tests/integration/traffic_shaping.rs index ae5d7b2041..ced919a047 100644 --- a/apollo-router/tests/integration/traffic_shaping.rs +++ b/apollo-router/tests/integration/traffic_shaping.rs @@ -7,13 +7,13 @@ use wiremock::ResponseTemplate; use crate::integration::IntegrationTest; #[tokio::test(flavor = "multi_thread")] -async fn test_supergraph_timeout() -> Result<(), BoxError> { +async fn test_router_timeout() -> Result<(), BoxError> { let mut router = IntegrationTest::builder() .config( r#" - traffic_shaping: - router: - timeout: 10ms + traffic_shaping: + router: + timeout: 10ms "#, ) .responder(ResponseTemplate::new(500).set_delay(Duration::from_millis(20))) @@ -25,7 +25,10 @@ async fn test_supergraph_timeout() -> Result<(), BoxError> { let (_trace_id, response) = router.execute_default_query().await; assert_eq!(response.status(), 504); - assert_yaml_snapshot!(response.text().await?); + let response = response.text().await?; + assert!(response.contains("REQUEST_TIMEOUT")); + assert_yaml_snapshot!(response); + router.graceful_shutdown().await; Ok(()) } @@ -35,11 +38,11 @@ async fn test_subgraph_timeout() -> Result<(), BoxError> { let mut router = IntegrationTest::builder() .config( r#" - include_subgraph_errors: - all: true - traffic_shaping: - all: - timeout: 10ms + include_subgraph_errors: + all: true + traffic_shaping: + all: + timeout: 10ms "#, ) .responder(ResponseTemplate::new(500).set_delay(Duration::from_millis(20))) @@ -51,7 +54,80 @@ async fn test_subgraph_timeout() -> Result<(), BoxError> { let (_trace_id, response) = router.execute_default_query().await; assert_eq!(response.status(), 200); - assert_yaml_snapshot!(response.text().await?); + let response = response.text().await?; + assert!(response.contains("REQUEST_TIMEOUT")); + assert_yaml_snapshot!(response); + + router.graceful_shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_router_rate_limit() -> Result<(), BoxError> { + let mut router = IntegrationTest::builder() + .config( + r#" + traffic_shaping: + router: + global_rate_limit: + capacity: 1 + interval: 100ms + "#, + ) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let (_, response) = router.execute_default_query().await; + assert_eq!(response.status(), 200); + let response = response.text().await?; + assert!(!response.contains("REQUEST_RATE_LIMITED")); + assert_yaml_snapshot!(response); + + let (_, response) = router.execute_default_query().await; + assert_eq!(response.status(), 429); + let response = response.text().await?; + assert!(response.contains("REQUEST_RATE_LIMITED")); + assert_yaml_snapshot!(response); + + router.graceful_shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_subgraph_rate_limit() -> Result<(), BoxError> { + let mut router = IntegrationTest::builder() + .config( + r#" + include_subgraph_errors: + all: true + traffic_shaping: + all: + global_rate_limit: + capacity: 1 + interval: 100ms + "#, + ) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let (_, response) = router.execute_default_query().await; + assert_eq!(response.status(), 200); + let response = response.text().await?; + assert!(!response.contains("REQUEST_RATE_LIMITED")); + assert_yaml_snapshot!(response); + + let (_, response) = router.execute_default_query().await; + assert_eq!(response.status(), 200); + let response = response.text().await?; + assert!(response.contains("REQUEST_RATE_LIMITED")); + assert_yaml_snapshot!(response); + router.graceful_shutdown().await; Ok(()) } From 59501274a0d1372df86862e9244553d035f5c561 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 4 Jul 2024 09:02:56 +0300 Subject: [PATCH 06/13] add metrics assertion to tests --- .../tests/integration/traffic_shaping.rs | 44 ++++++++++++++----- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/apollo-router/tests/integration/traffic_shaping.rs b/apollo-router/tests/integration/traffic_shaping.rs index ced919a047..170bbeeb11 100644 --- a/apollo-router/tests/integration/traffic_shaping.rs +++ b/apollo-router/tests/integration/traffic_shaping.rs @@ -6,16 +6,27 @@ use wiremock::ResponseTemplate; use crate::integration::IntegrationTest; +const PROMETHEUS_CONFIG: &str = r#" + telemetry: + exporters: + metrics: + prometheus: + listen: 127.0.0.1:4000 + enabled: true + path: /metrics +"#; + #[tokio::test(flavor = "multi_thread")] async fn test_router_timeout() -> Result<(), BoxError> { let mut router = IntegrationTest::builder() - .config( + .config(format!( r#" + {PROMETHEUS_CONFIG} traffic_shaping: router: timeout: 10ms - "#, - ) + "# + )) .responder(ResponseTemplate::new(500).set_delay(Duration::from_millis(20))) .build() .await; @@ -29,6 +40,8 @@ async fn test_router_timeout() -> Result<(), BoxError> { assert!(response.contains("REQUEST_TIMEOUT")); assert_yaml_snapshot!(response); + router.assert_metrics_contains(r#"apollo_router_graphql_error_total{code="REQUEST_TIMEOUT",otel_scope_name="apollo/router"} 1"#, None).await; + router.graceful_shutdown().await; Ok(()) } @@ -36,15 +49,16 @@ async fn test_router_timeout() -> Result<(), BoxError> { #[tokio::test(flavor = "multi_thread")] async fn test_subgraph_timeout() -> Result<(), BoxError> { let mut router = IntegrationTest::builder() - .config( + .config(format!( r#" + {PROMETHEUS_CONFIG} include_subgraph_errors: all: true traffic_shaping: all: timeout: 10ms - "#, - ) + "# + )) .responder(ResponseTemplate::new(500).set_delay(Duration::from_millis(20))) .build() .await; @@ -58,6 +72,8 @@ async fn test_subgraph_timeout() -> Result<(), BoxError> { assert!(response.contains("REQUEST_TIMEOUT")); assert_yaml_snapshot!(response); + router.assert_metrics_contains(r#"apollo_router_graphql_error_total{code="REQUEST_TIMEOUT",otel_scope_name="apollo/router"} 1"#, None).await; + router.graceful_shutdown().await; Ok(()) } @@ -65,15 +81,16 @@ async fn test_subgraph_timeout() -> Result<(), BoxError> { #[tokio::test(flavor = "multi_thread")] async fn test_router_rate_limit() -> Result<(), BoxError> { let mut router = IntegrationTest::builder() - .config( + .config(format!( r#" + {PROMETHEUS_CONFIG} traffic_shaping: router: global_rate_limit: capacity: 1 interval: 100ms - "#, - ) + "# + )) .build() .await; @@ -92,6 +109,8 @@ async fn test_router_rate_limit() -> Result<(), BoxError> { assert!(response.contains("REQUEST_RATE_LIMITED")); assert_yaml_snapshot!(response); + router.assert_metrics_contains(r#"apollo_router_graphql_error_total{code="REQUEST_RATE_LIMITED",otel_scope_name="apollo/router"} 1"#, None).await; + router.graceful_shutdown().await; Ok(()) } @@ -99,8 +118,9 @@ async fn test_router_rate_limit() -> Result<(), BoxError> { #[tokio::test(flavor = "multi_thread")] async fn test_subgraph_rate_limit() -> Result<(), BoxError> { let mut router = IntegrationTest::builder() - .config( + .config(format!( r#" + {PROMETHEUS_CONFIG} include_subgraph_errors: all: true traffic_shaping: @@ -109,7 +129,7 @@ async fn test_subgraph_rate_limit() -> Result<(), BoxError> { capacity: 1 interval: 100ms "#, - ) + )) .build() .await; @@ -128,6 +148,8 @@ async fn test_subgraph_rate_limit() -> Result<(), BoxError> { assert!(response.contains("REQUEST_RATE_LIMITED")); assert_yaml_snapshot!(response); + router.assert_metrics_contains(r#"apollo_router_graphql_error_total{code="REQUEST_RATE_LIMITED",otel_scope_name="apollo/router"} 1"#, None).await; + router.graceful_shutdown().await; Ok(()) } From 7eb41d11d44790564a335b8febb671f7fe978348 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 4 Jul 2024 09:17:54 +0300 Subject: [PATCH 07/13] xtask dev fixes --- apollo-router/src/axum_factory/tests.rs | 19 +++++++++++-------- .../src/plugins/traffic_shaping/mod.rs | 2 +- .../src/plugins/traffic_shaping/rate/error.rs | 4 ++-- .../plugins/traffic_shaping/timeout/error.rs | 4 ++-- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/apollo-router/src/axum_factory/tests.rs b/apollo-router/src/axum_factory/tests.rs index b331a71c7a..b5946284ac 100644 --- a/apollo-router/src/axum_factory/tests.rs +++ b/apollo-router/src/axum_factory/tests.rs @@ -2387,12 +2387,15 @@ async fn test_supergraph_timeout() { let body = response.bytes().await.unwrap(); let body: serde_json::Value = serde_json::from_slice(&body).unwrap(); - assert_eq!(body, json!({ - "errors": [{ - "message": "Request timed out", - "extensions": { - "code": "REQUEST_TIMEOUT" - } - }] - })); + assert_eq!( + body, + json!({ + "errors": [{ + "message": "Request timed out", + "extensions": { + "code": "REQUEST_TIMEOUT" + } + }] + }) + ); } diff --git a/apollo-router/src/plugins/traffic_shaping/mod.rs b/apollo-router/src/plugins/traffic_shaping/mod.rs index 433f3f0d09..a3ddda0e6d 100644 --- a/apollo-router/src/plugins/traffic_shaping/mod.rs +++ b/apollo-router/src/plugins/traffic_shaping/mod.rs @@ -462,12 +462,12 @@ mod test { use std::sync::Arc; use bytes::Bytes; + use maplit::hashmap; use once_cell::sync::Lazy; use serde_json_bytes::json; use serde_json_bytes::ByteString; use serde_json_bytes::Value; use tower::Service; - use maplit::hashmap; use super::*; use crate::json_ext::Object; diff --git a/apollo-router/src/plugins/traffic_shaping/rate/error.rs b/apollo-router/src/plugins/traffic_shaping/rate/error.rs index f413c6bd16..d1c7ef09e3 100644 --- a/apollo-router/src/plugins/traffic_shaping/rate/error.rs +++ b/apollo-router/src/plugins/traffic_shaping/rate/error.rs @@ -22,8 +22,8 @@ impl fmt::Display for RateLimited { } } -impl Into for RateLimited { - fn into(self) -> graphql::Error { +impl From for graphql::Error { + fn from(_: RateLimited) -> Self { graphql::Error::builder() .message(String::from("Your request has been rate limited")) .extension_code("REQUEST_RATE_LIMITED") diff --git a/apollo-router/src/plugins/traffic_shaping/timeout/error.rs b/apollo-router/src/plugins/traffic_shaping/timeout/error.rs index 90ccc78c25..38e36dc8ad 100644 --- a/apollo-router/src/plugins/traffic_shaping/timeout/error.rs +++ b/apollo-router/src/plugins/traffic_shaping/timeout/error.rs @@ -22,8 +22,8 @@ impl fmt::Display for Elapsed { } } -impl Into for Elapsed { - fn into(self) -> graphql::Error { +impl From for graphql::Error { + fn from(_: Elapsed) -> Self { graphql::Error::builder() .message(String::from("Request timed out")) .extension_code("REQUEST_TIMEOUT") From 22d1792e1feff6f7891ddf68b72e78f73cdee106 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 4 Jul 2024 09:30:27 +0300 Subject: [PATCH 08/13] add changeset --- .changesets/fix_timeout_and_rate_limit_error.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changesets/fix_timeout_and_rate_limit_error.md diff --git a/.changesets/fix_timeout_and_rate_limit_error.md b/.changesets/fix_timeout_and_rate_limit_error.md new file mode 100644 index 0000000000..e96954f593 --- /dev/null +++ b/.changesets/fix_timeout_and_rate_limit_error.md @@ -0,0 +1,7 @@ +### Request timeout and rate limited error responses are structured errors ([PR #5578](https://github.com/apollographql/router/pull/5578)) + +Request timeout (`408 Request Timeout`) and request rate limited (`429 Too Many Requests`)errors will now result in a structured GraphQL error (i.e., `{"errors": [...]}`) being returned to the client rather than a plain-text error as was the case previously. + +Also both errors are now properly tracked in telemetry, including `apollo_router_graphql_error_total` metric. + +By [@IvanGoncharov](https://github.com/IvanGoncharov) in https://github.com/apollographql/router/pull/5578 \ No newline at end of file From 40250132b80ff1ca9bc3621a3c19ad98f195411b Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 4 Jul 2024 10:44:55 +0300 Subject: [PATCH 09/13] fix batching snapshots --- apollo-router/tests/integration/batching.rs | 93 +++++++++------------ 1 file changed, 39 insertions(+), 54 deletions(-) diff --git a/apollo-router/tests/integration/batching.rs b/apollo-router/tests/integration/batching.rs index a9e8e3234d..09281ea677 100644 --- a/apollo-router/tests/integration/batching.rs +++ b/apollo-router/tests/integration/batching.rs @@ -244,28 +244,22 @@ async fn it_handles_short_timeouts() -> Result<(), BoxError> { if test_is_enabled() { assert_yaml_snapshot!(responses, @r###" - --- - - data: - entryA: - index: 0 - - errors: - - message: "HTTP fetch failed from 'b': request timed out" - path: [] - extensions: - code: SUBREQUEST_HTTP_ERROR - service: b - reason: request timed out - - data: - entryA: - index: 1 - - errors: - - message: "HTTP fetch failed from 'b': request timed out" - path: [] - extensions: - code: SUBREQUEST_HTTP_ERROR - service: b - reason: request timed out - "###); + --- + - data: + entryA: + index: 0 + - errors: + - message: Request timed out + extensions: + code: REQUEST_TIMEOUT + - data: + entryA: + index: 1 + - errors: + - message: Request timed out + extensions: + code: REQUEST_TIMEOUT + "###); } Ok(()) @@ -317,38 +311,29 @@ async fn it_handles_indefinite_timeouts() -> Result<(), BoxError> { let responses = [results_a, results_b].concat(); if test_is_enabled() { assert_yaml_snapshot!(responses, @r###" - --- - - data: - entryA: - index: 0 - - data: - entryA: - index: 1 - - data: - entryA: - index: 2 - - errors: - - message: "HTTP fetch failed from 'b': request timed out" - path: [] - extensions: - code: SUBREQUEST_HTTP_ERROR - service: b - reason: request timed out - - errors: - - message: "HTTP fetch failed from 'b': request timed out" - path: [] - extensions: - code: SUBREQUEST_HTTP_ERROR - service: b - reason: request timed out - - errors: - - message: "HTTP fetch failed from 'b': request timed out" - path: [] - extensions: - code: SUBREQUEST_HTTP_ERROR - service: b - reason: request timed out - "###); + --- + - data: + entryA: + index: 0 + - data: + entryA: + index: 1 + - data: + entryA: + index: 2 + - errors: + - message: Request timed out + extensions: + code: REQUEST_TIMEOUT + - errors: + - message: Request timed out + extensions: + code: REQUEST_TIMEOUT + - errors: + - message: Request timed out + extensions: + code: REQUEST_TIMEOUT + "###); } Ok(()) From e3088c99276a8525d34b513c9060937677454c6f Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 4 Jul 2024 10:57:31 +0300 Subject: [PATCH 10/13] increase rate limit interval in tests --- apollo-router/tests/integration/traffic_shaping.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apollo-router/tests/integration/traffic_shaping.rs b/apollo-router/tests/integration/traffic_shaping.rs index 170bbeeb11..398127a1c0 100644 --- a/apollo-router/tests/integration/traffic_shaping.rs +++ b/apollo-router/tests/integration/traffic_shaping.rs @@ -127,7 +127,7 @@ async fn test_subgraph_rate_limit() -> Result<(), BoxError> { all: global_rate_limit: capacity: 1 - interval: 100ms + interval: 1min "#, )) .build() From 5f5e3a7a2b09add78a6fe10eb465adfaecaa8149 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 4 Jul 2024 11:01:06 +0300 Subject: [PATCH 11/13] make it even longer just in case --- apollo-router/tests/integration/traffic_shaping.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apollo-router/tests/integration/traffic_shaping.rs b/apollo-router/tests/integration/traffic_shaping.rs index 398127a1c0..d4f3cee1b4 100644 --- a/apollo-router/tests/integration/traffic_shaping.rs +++ b/apollo-router/tests/integration/traffic_shaping.rs @@ -88,7 +88,7 @@ async fn test_router_rate_limit() -> Result<(), BoxError> { router: global_rate_limit: capacity: 1 - interval: 100ms + interval: 10min "# )) .build() @@ -127,7 +127,7 @@ async fn test_subgraph_rate_limit() -> Result<(), BoxError> { all: global_rate_limit: capacity: 1 - interval: 1min + interval: 10min "#, )) .build() From c34e6024458fd4af72890c637273a1da52d0c978 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 4 Jul 2024 18:10:24 +0300 Subject: [PATCH 12/13] add more tests --- .../tests/integration/traffic_shaping.rs | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/apollo-router/tests/integration/traffic_shaping.rs b/apollo-router/tests/integration/traffic_shaping.rs index d4f3cee1b4..c538c64ff5 100644 --- a/apollo-router/tests/integration/traffic_shaping.rs +++ b/apollo-router/tests/integration/traffic_shaping.rs @@ -1,9 +1,12 @@ use std::time::Duration; use insta::assert_yaml_snapshot; +use serde_json::json; use tower::BoxError; use wiremock::ResponseTemplate; +use crate::integration::common::graph_os_enabled; +use crate::integration::common::Telemetry; use crate::integration::IntegrationTest; const PROMETHEUS_CONFIG: &str = r#" @@ -78,6 +81,83 @@ async fn test_subgraph_timeout() -> Result<(), BoxError> { Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn test_router_timeout_operation_name_in_tracing() -> Result<(), BoxError> { + let mut router = IntegrationTest::builder() + .config(format!( + r#" + traffic_shaping: + router: + timeout: 10ms + "# + )) + .responder(ResponseTemplate::new(500).set_delay(Duration::from_millis(20))) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let (_trace_id, response) = router + .execute_query(&json!({ + "query": "query UniqueName { topProducts { name } }" + })) + .await; + assert_eq!(response.status(), 504); + let response = response.text().await?; + assert!(response.contains("REQUEST_TIMEOUT")); + + router + .assert_log_contains(r#""otel.name":"query UniqueName""#) + .await; + + router.graceful_shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_router_timeout_custom_metric() -> Result<(), BoxError> { + if !graph_os_enabled() { + return Ok(()); + } + + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Jaeger) + .config(format!( + r#" + {PROMETHEUS_CONFIG} + instrumentation: + instruments: + router: + http.server.request.duration: + attributes: + # Standard attributes + http.response.status_code: true + graphql.error: + on_graphql_error: true + traffic_shaping: + router: + timeout: 10ms + "# + )) + .responder(ResponseTemplate::new(500).set_delay(Duration::from_millis(20))) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let (_trace_id, response) = router.execute_default_query().await; + assert_eq!(response.status(), 504); + let response = response.text().await?; + assert!(response.contains("REQUEST_TIMEOUT")); + + router.assert_metrics_contains(r#"http_server_request_duration_seconds_count{error_type="Gateway Timeout",graphql_error="true",http_request_method="POST",http_response_status_code="504""#, None).await; + + router.graceful_shutdown().await; + Ok(()) +} + #[tokio::test(flavor = "multi_thread")] async fn test_router_rate_limit() -> Result<(), BoxError> { let mut router = IntegrationTest::builder() From 426c26f5d7f549520d97938be718d0903a6f7ee8 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 4 Jul 2024 20:55:33 +0300 Subject: [PATCH 13/13] fixes --- apollo-router/tests/integration/traffic_shaping.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/apollo-router/tests/integration/traffic_shaping.rs b/apollo-router/tests/integration/traffic_shaping.rs index c538c64ff5..feb9a7e725 100644 --- a/apollo-router/tests/integration/traffic_shaping.rs +++ b/apollo-router/tests/integration/traffic_shaping.rs @@ -27,7 +27,7 @@ async fn test_router_timeout() -> Result<(), BoxError> { {PROMETHEUS_CONFIG} traffic_shaping: router: - timeout: 10ms + timeout: 1ns "# )) .responder(ResponseTemplate::new(500).set_delay(Duration::from_millis(20))) @@ -59,7 +59,7 @@ async fn test_subgraph_timeout() -> Result<(), BoxError> { all: true traffic_shaping: all: - timeout: 10ms + timeout: 1ns "# )) .responder(ResponseTemplate::new(500).set_delay(Duration::from_millis(20))) @@ -84,13 +84,13 @@ async fn test_subgraph_timeout() -> Result<(), BoxError> { #[tokio::test(flavor = "multi_thread")] async fn test_router_timeout_operation_name_in_tracing() -> Result<(), BoxError> { let mut router = IntegrationTest::builder() - .config(format!( + .config( r#" traffic_shaping: router: - timeout: 10ms - "# - )) + timeout: 1ns + "#, + ) .responder(ResponseTemplate::new(500).set_delay(Duration::from_millis(20))) .build() .await; @@ -137,7 +137,7 @@ async fn test_router_timeout_custom_metric() -> Result<(), BoxError> { on_graphql_error: true traffic_shaping: router: - timeout: 10ms + timeout: 1ns "# )) .responder(ResponseTemplate::new(500).set_delay(Duration::from_millis(20)))