Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request timeout and rate limited error responses are structured errors #5578

Merged
merged 16 commits into from
Jul 5, 2024
Merged
7 changes: 7 additions & 0 deletions .changesets/fix_timeout_and_rate_limit_error.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
### Request timeout and rate limited error responses are structured errors ([PR #5578](https://github.com/apollographql/router/pull/5578))

Request timeout (`408 Request Timeout`) and request rate limited (`429 Too Many Requests`)errors will now result in a structured GraphQL error (i.e., `{"errors": [...]}`) being returned to the client rather than a plain-text error as was the case previously.

Also both errors are now properly tracked in telemetry, including `apollo_router_graphql_error_total` metric.

By [@IvanGoncharov](https://github.com/IvanGoncharov) in https://github.com/apollographql/router/pull/5578
21 changes: 1 addition & 20 deletions apollo-router/src/axum_factory/axum_http_server_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ use crate::http_server_factory::HttpServerFactory;
use crate::http_server_factory::HttpServerHandle;
use crate::http_server_factory::Listener;
use crate::plugins::telemetry::SpanMode;
use crate::plugins::traffic_shaping::Elapsed;
use crate::plugins::traffic_shaping::RateLimited;
use crate::router::ApolloRouterError;
use crate::router_factory::Endpoint;
use crate::router_factory::RouterFactory;
Expand Down Expand Up @@ -663,24 +661,7 @@ async fn handle_graphql(
);

match res {
Err(err) => {
if let Some(source_err) = err.source() {
if source_err.is::<RateLimited>() {
return RateLimited::new().into_response();
}
if source_err.is::<Elapsed>() {
return Elapsed::new().into_response();
}
}
if err.is::<RateLimited>() {
return RateLimited::new().into_response();
}
if err.is::<Elapsed>() {
return Elapsed::new().into_response();
}

internal_server_error(err)
}
Err(err) => internal_server_error(err),
Ok(response) => {
let (mut parts, body) = response.response.into_parts();

Expand Down
14 changes: 13 additions & 1 deletion apollo-router/src/axum_factory/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2384,6 +2384,18 @@ async fn test_supergraph_timeout() {
.unwrap();

assert_eq!(response.status(), StatusCode::GATEWAY_TIMEOUT);

let body = response.bytes().await.unwrap();
assert_eq!(std::str::from_utf8(&body).unwrap(), "request timed out");
let body: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(
body,
json!({
"errors": [{
"message": "Request timed out",
"extensions": {
"code": "REQUEST_TIMEOUT"
}
}]
})
);
}
1 change: 1 addition & 0 deletions apollo-router/src/layers/map_future_with_request_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ where
}

/// [`Service`] for mapping futures with request data. See [`ServiceBuilderExt::map_future_with_request_data()`](crate::layers::ServiceBuilderExt::map_future_with_request_data()).
#[derive(Clone)]
IvanGoncharov marked this conversation as resolved.
Show resolved Hide resolved
pub struct MapFutureWithRequestDataService<S, RF, MF> {
inner: S,
req_fn: RF,
Expand Down
165 changes: 123 additions & 42 deletions apollo-router/src/plugins/traffic_shaping/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,27 @@ use std::sync::Mutex;
use std::time::Duration;

use futures::future::BoxFuture;
use futures::FutureExt;
use http::header::CONTENT_ENCODING;
use http::HeaderValue;
use http::StatusCode;
use schemars::JsonSchema;
use serde::Deserialize;
use tower::retry::Retry;
use tower::util::Either;
use tower::util::Oneshot;
use tower::BoxError;
use tower::Service;
use tower::ServiceBuilder;
use tower::ServiceExt;

use self::deduplication::QueryDeduplicationLayer;
use self::rate::RateLimitLayer;
pub(crate) use self::rate::RateLimited;
use self::rate::RateLimited;
pub(crate) use self::retry::RetryPolicy;
pub(crate) use self::timeout::Elapsed;
use self::timeout::Elapsed;
use self::timeout::TimeoutLayer;
use crate::error::ConfigurationError;
use crate::graphql;
use crate::layers::ServiceBuilderExt;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::register_plugin;
Expand Down Expand Up @@ -266,15 +268,7 @@ impl Plugin for TrafficShaping {
pub(crate) type TrafficShapingSubgraphFuture<S> = Either<
Either<
BoxFuture<'static, Result<subgraph::Response, BoxError>>,
timeout::future::ResponseFuture<
Oneshot<
Either<
Retry<RetryPolicy, Either<rate::service::RateLimit<S>, S>>,
Either<rate::service::RateLimit<S>, S>,
>,
subgraph::Request,
>,
>,
BoxFuture<'static, Result<subgraph::Response, BoxError>>,
IvanGoncharov marked this conversation as resolved.
Show resolved Hide resolved
>,
<S as Service<subgraph::Request>>::Future,
>;
Expand All @@ -295,9 +289,7 @@ impl TrafficShaping {
supergraph::Request,
Response = supergraph::Response,
Error = BoxError,
Future = timeout::future::ResponseFuture<
Oneshot<tower::util::Either<rate::service::RateLimit<S>, S>, supergraph::Request>,
>,
Future = BoxFuture<'static, Result<supergraph::Response, BoxError>>,
> + Clone
+ Send
+ Sync
Expand All @@ -311,6 +303,32 @@ impl TrafficShaping {
<S as Service<supergraph::Request>>::Future: std::marker::Send,
{
ServiceBuilder::new()
.map_future_with_request_data(
bnjjj marked this conversation as resolved.
Show resolved Hide resolved
|req: &supergraph::Request| req.context.clone(),
move |ctx, future| {
async {
let response: Result<supergraph::Response, BoxError> = future.await;
match response {
Err(error) if error.is::<Elapsed>() => {
supergraph::Response::error_builder()
.status_code(StatusCode::GATEWAY_TIMEOUT)
.error::<graphql::Error>(Elapsed::new().into())
.context(ctx)
.build()
}
Err(error) if error.is::<RateLimited>() => {
supergraph::Response::error_builder()
.status_code(StatusCode::TOO_MANY_REQUESTS)
.error::<graphql::Error>(RateLimited::new().into())
.context(ctx)
.build()
}
_ => response,
}
}
.boxed()
},
)
.layer(TimeoutLayer::new(
self.config
.router
Expand Down Expand Up @@ -380,6 +398,31 @@ impl TrafficShaping {
.option_layer(config.shaping.deduplicate_query.unwrap_or_default().then(
QueryDeduplicationLayer::default
))
.map_future_with_request_data(
|req: &subgraph::Request| req.context.clone(),
move |ctx, future| {
async {
let response: Result<subgraph::Response, BoxError> = future.await;
match response {
Err(error) if error.is::<Elapsed>() => {
subgraph::Response::error_builder()
.status_code(StatusCode::GATEWAY_TIMEOUT)
.error::<graphql::Error>(Elapsed::new().into())
.context(ctx)
.build()
}
Err(error) if error.is::<RateLimited>() => {
subgraph::Response::error_builder()
.status_code(StatusCode::TOO_MANY_REQUESTS)
.error::<graphql::Error>(RateLimited::new().into())
.context(ctx)
.build()
}
_ => response,
}
}.boxed()
},
)
.layer(TimeoutLayer::new(
config.shaping
.timeout
Expand Down Expand Up @@ -419,6 +462,7 @@ mod test {
use std::sync::Arc;

use bytes::Bytes;
use maplit::hashmap;
use once_cell::sync::Lazy;
use serde_json_bytes::json;
use serde_json_bytes::ByteString;
Expand Down Expand Up @@ -744,41 +788,64 @@ mod test {

let plugin = get_traffic_shaping_plugin(&config).await;

let test_service = MockSubgraph::new(HashMap::new());
let test_service = MockSubgraph::new(hashmap! {
graphql::Request::default() => graphql::Response::default()
});

let _response = plugin
assert!(&plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
.subgraph_service_internal("test", test_service.clone())
.oneshot(SubgraphRequest::fake_builder().build())
.await
.unwrap();
let _response = plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
.subgraph_service_internal("test", test_service.clone())
.oneshot(SubgraphRequest::fake_builder().build())
.await
.expect_err("should be in error due to a timeout and rate limit");
let _response = plugin
.response
.body()
.errors
.is_empty());
assert_eq!(
plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
.subgraph_service_internal("test", test_service.clone())
.oneshot(SubgraphRequest::fake_builder().build())
.await
.unwrap()
.response
.body()
.errors[0]
.extensions
.get("code")
.unwrap(),
"REQUEST_RATE_LIMITED"
);
assert!(plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
.subgraph_service_internal("another", test_service.clone())
.oneshot(SubgraphRequest::fake_builder().build())
.await
.unwrap();
.unwrap()
.response
.body()
.errors
.is_empty());
tokio::time::sleep(Duration::from_millis(300)).await;
let _response = plugin
assert!(plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
.subgraph_service_internal("test", test_service.clone())
.oneshot(SubgraphRequest::fake_builder().build())
.await
.unwrap();
.unwrap()
.response
.body()
.errors
.is_empty());
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -812,7 +879,7 @@ mod test {
mock_service
});

let _response = plugin
assert!(plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
Expand All @@ -822,18 +889,30 @@ mod test {
.unwrap()
.next_response()
.await
.unwrap();

assert!(plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
.supergraph_service_internal(mock_service.clone())
.oneshot(SupergraphRequest::fake_builder().build().unwrap())
.await
.is_err());
.errors
.is_empty());

assert_eq!(
plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
.supergraph_service_internal(mock_service.clone())
.oneshot(SupergraphRequest::fake_builder().build().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap()
.errors[0]
.extensions
.get("code")
.unwrap(),
"REQUEST_RATE_LIMITED"
);
tokio::time::sleep(Duration::from_millis(300)).await;
let _response = plugin
assert!(plugin
.as_any()
.downcast_ref::<TrafficShaping>()
.unwrap()
Expand All @@ -843,6 +922,8 @@ mod test {
.unwrap()
.next_response()
.await
.unwrap();
.unwrap()
.errors
.is_empty());
}
}
12 changes: 7 additions & 5 deletions apollo-router/src/plugins/traffic_shaping/rate/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
use std::error;
use std::fmt;

use axum::response::IntoResponse;
use http::StatusCode;
use crate::graphql;

/// The rate limit error.
#[derive(Debug, Default)]
Expand All @@ -23,9 +22,12 @@ impl fmt::Display for RateLimited {
}
}

impl IntoResponse for RateLimited {
fn into_response(self) -> axum::response::Response {
(StatusCode::TOO_MANY_REQUESTS, self.to_string()).into_response()
impl From<RateLimited> for graphql::Error {
fn from(_: RateLimited) -> Self {
graphql::Error::builder()
.message(String::from("Your request has been rate limited"))
.extension_code("REQUEST_RATE_LIMITED")
.build()
}
}

Expand Down
12 changes: 7 additions & 5 deletions apollo-router/src/plugins/traffic_shaping/timeout/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
use std::error;
use std::fmt;

use axum::response::IntoResponse;
use http::StatusCode;
use crate::graphql;

/// The timeout elapsed.
#[derive(Debug, Default)]
Expand All @@ -23,9 +22,12 @@ impl fmt::Display for Elapsed {
}
}

impl IntoResponse for Elapsed {
fn into_response(self) -> axum::response::Response {
(StatusCode::GATEWAY_TIMEOUT, self.to_string()).into_response()
impl From<Elapsed> for graphql::Error {
fn from(_: Elapsed) -> Self {
graphql::Error::builder()
.message(String::from("Request timed out"))
.extension_code("REQUEST_TIMEOUT")
.build()
}
}

Expand Down
Loading
Loading