From 18c20c5f88a94e01f6e133f42ff07ddcb874622f Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Fri, 11 Feb 2022 11:44:16 +0100 Subject: [PATCH 1/6] query deduplication not compiling for now, will be reimplemented as a layer --- Cargo.lock | 14 +- apollo-router-core/Cargo.toml | 2 +- apollo-router-core/src/error.rs | 2 +- apollo-router-core/src/query_planner/mod.rs | 3 + apollo-router-core/src/request.rs | 2 +- .../src/services/http_compat.rs | 52 +++++- apollo-router/src/reqwest_subgraph_service.rs | 157 ++++++++++++++++-- 7 files changed, 211 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a4b49afe7e..0a4801f4fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -985,9 +985,9 @@ dependencies = [ [[package]] name = "deno_core" -version = "0.112.0" +version = "0.118.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28b0a532994e7d5ddfc029a33a67709c91e6c8926b97e4825e4d3056ee11abfc" +checksum = "68be6990e070141d1413797b84d8a05d3eb426fa18a9962ecf21d2f045014db4" dependencies = [ "anyhow", "futures", @@ -3110,7 +3110,7 @@ dependencies = [ [[package]] name = "router-bridge" version = "0.1.0" -source = "git+https://github.com/apollographql/federation.git?rev=950eb931e38746bb7cfed05382d6970a22e43cc4#950eb931e38746bb7cfed05382d6970a22e43cc4" +source = "git+https://github.com/apollographql/federation.git?rev=17b9ca519b54bad773b89c44a9ef033949788edb#17b9ca519b54bad773b89c44a9ef033949788edb" dependencies = [ "anyhow", "deno_core", @@ -3363,9 +3363,9 @@ dependencies = [ [[package]] name = "serde_v8" -version = "0.23.0" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "487456399b8722492bdc4c1834397f91c935247590dae30f8e779b7f48e3181a" +checksum = "27266c014ef9b11fcf7f1a248f25603529432aab4d0ce777d6c6f6aea2b367bb" dependencies = [ "serde", "serde_bytes", @@ -4318,9 +4318,9 @@ dependencies = [ [[package]] name = "v8" -version = "0.36.0" +version = "0.38.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "506523e86ccc15982be412bdde87a142771c139e94a8ecedda1da051a079b81d" +checksum = "684e95fe02e0acfeaf630df3a1623f6ad02145f9a92c54ceae8a1923319d3273" dependencies = [ "bitflags", "fslock", diff --git a/apollo-router-core/Cargo.toml b/apollo-router-core/Cargo.toml index dfd75cd28e..1a05d30c3b 100644 --- a/apollo-router-core/Cargo.toml +++ b/apollo-router-core/Cargo.toml @@ -30,7 +30,7 @@ paste = "1.0.6" reqwest = { version = "0.11.9", features = ["json", "stream"] } reqwest-middleware = "0.1.3" reqwest-tracing = { version = "0.2", features = ["opentelemetry_0_16"] } -router-bridge = { git = "https://github.com/apollographql/federation.git", rev = "950eb931e38746bb7cfed05382d6970a22e43cc4" } +router-bridge = { git = "https://github.com/apollographql/federation.git", rev = "17b9ca519b54bad773b89c44a9ef033949788edb" } serde = { version = "1.0.136", features = ["derive", "rc"] } serde_json = { version = "1.0.78", features = ["preserve_order"] } serde_json_bytes = { version = "0.2.0", features = ["preserve_order"] } diff --git a/apollo-router-core/src/error.rs b/apollo-router-core/src/error.rs index 227c997e50..21a1964444 100644 --- a/apollo-router-core/src/error.rs +++ b/apollo-router-core/src/error.rs @@ -12,7 +12,7 @@ use tracing::level_filters::LevelFilter; /// /// Note that these are not actually returned to the client, but are instead converted to JSON for /// [`struct@Error`]. -#[derive(Error, Display, Debug, Serialize, Deserialize)] +#[derive(Error, Display, Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] #[ignore_extra_doc_attributes] pub enum FetchError { diff --git a/apollo-router-core/src/query_planner/mod.rs b/apollo-router-core/src/query_planner/mod.rs index d7b58774fe..1aac741934 100644 --- a/apollo-router-core/src/query_planner/mod.rs +++ b/apollo-router-core/src/query_planner/mod.rs @@ -234,6 +234,9 @@ mod fetch { /// The GraphQL subquery that is used for the fetch. operation: String, + + /// The kind of operation (query, mutation or subscription) + operationKind: String, } struct Variables { diff --git a/apollo-router-core/src/request.rs b/apollo-router-core/src/request.rs index 9b71552474..fa630db966 100644 --- a/apollo-router-core/src/request.rs +++ b/apollo-router-core/src/request.rs @@ -10,7 +10,7 @@ use typed_builder::TypedBuilder; #[derive(Clone, Derivative, Serialize, Deserialize, TypedBuilder)] #[serde(rename_all = "camelCase")] #[builder(field_defaults(setter(into)))] -#[derivative(Debug, PartialEq)] +#[derivative(Debug, PartialEq, Eq, Hash)] pub struct Request { /// The graphql query. pub query: String, diff --git a/apollo-router-core/src/services/http_compat.rs b/apollo-router-core/src/services/http_compat.rs index 348a557169..c237f75c82 100644 --- a/apollo-router-core/src/services/http_compat.rs +++ b/apollo-router-core/src/services/http_compat.rs @@ -1,6 +1,10 @@ //! wrapper typpes for Request and Response from the http crate to improve their usability -use std::ops::{Deref, DerefMut}; +use std::{ + cmp::PartialEq, + hash::Hash, + ops::{Deref, DerefMut}, +}; #[derive(Debug, Default)] pub struct Request { @@ -61,6 +65,52 @@ impl DerefMut for Request { } } +impl Hash for Request { + fn hash(&self, state: &mut H) { + self.inner.method().hash(state); + self.inner.version().hash(state); + self.inner.uri().hash(state); + // this assumes headers are in the same order + for (name, value) in self.inner.headers() { + name.hash(state); + value.hash(state); + } + self.inner.body().hash(state); + } +} + +impl PartialEq for Request { + fn eq(&self, other: &Self) -> bool { + let mut res = self.inner.method().eq(other.inner.method()) + && self.inner.version().eq(&other.inner.version()) + && self.inner.uri().eq(other.inner.uri()); + + if !res { + return false; + } + if self.inner.headers().len() != other.inner.headers().len() { + return false; + } + + // this assumes headers are in the same order + for ((name, value), (other_name, other_value)) in self + .inner + .headers() + .iter() + .zip(other.inner.headers().iter()) + { + res = name.eq(other_name) && value.eq(other_value); + if !res { + return false; + } + } + + self.inner.body().eq(other.inner.body()) + } +} + +impl Eq for Request {} + impl From> for Request { fn from(inner: http::Request) -> Self { Request { inner } diff --git a/apollo-router/src/reqwest_subgraph_service.rs b/apollo-router/src/reqwest_subgraph_service.rs index d905ecd5a2..ef10a1abdb 100644 --- a/apollo-router/src/reqwest_subgraph_service.rs +++ b/apollo-router/src/reqwest_subgraph_service.rs @@ -1,7 +1,10 @@ use apollo_router_core::prelude::*; -use futures::future::BoxFuture; +use futures::lock::Mutex; +use futures::{future::BoxFuture, TryFutureExt}; +use std::collections::HashMap; use std::sync::Arc; use std::task::Poll; +use tokio::sync::broadcast::{self, Sender}; use tracing::Instrument; use typed_builder::TypedBuilder; @@ -15,6 +18,20 @@ pub struct ReqwestSubgraphService { // a url::Url instead of using the http crate // for now, to make things work, if the URL in the request is /, we use this URL url: reqwest::Url, + + wait_map: Arc< + Mutex< + HashMap< + apollo_router_core::http_compat::Request, + Sender< + Result< + apollo_router_core::http_compat::Response, + graphql::FetchError, + >, + >, + >, + >, + >, } impl ReqwestSubgraphService { @@ -34,8 +51,133 @@ impl ReqwestSubgraphService { .build(), service: Arc::new(service), url, + wait_map: Arc::new(Mutex::new(HashMap::new())), + } + } + + async fn dedup( + &self, + graphql::SubgraphRequest { + http_request, + context, + }: graphql::SubgraphRequest, + ) -> Result { + loop { + let mut locked_wait_map = self.wait_map.lock().await; + match locked_wait_map.get_mut(&http_request) { + Some(waiter) => { + // Register interest in key + let mut receiver = waiter.subscribe(); + drop(locked_wait_map); + + match receiver.recv().await { + Ok(value) => { + return value + .map(|response| graphql::SubgraphResponse { response, context }) + } + // there was an issue with the broadcast channel, retry fetching + Err(_) => continue, + } + } + None => { + let (tx, _rx) = broadcast::channel(1); + locked_wait_map.insert(http_request.clone(), tx.clone()); + drop(locked_wait_map); + + let res = self.fetch(http_request.clone()).await; + + { + let mut locked_wait_map = self.wait_map.lock().await; + locked_wait_map.remove(&http_request); + } + + // Let our waiters know + let broadcast_value = res.clone(); + // Our use case is very specific, so we are sure that + // we won't get any errors here. + tokio::task::spawn_blocking(move || { + tx.send(broadcast_value) + .expect("there is always at least one receiver alive, the _rx guard; qed") + }).await + .expect("can only fail if the task is aborted or if the internal code panics, neither is possible here; qed"); + return res.map(|response| graphql::SubgraphResponse { response, context }); + } + } } } + + async fn fetch( + &self, + http_request: apollo_router_core::http_compat::Request, + ) -> Result< + apollo_router_core::http_compat::Response, + graphql::FetchError, + > { + let http_client = self.http_client.clone(); + let target_url = if http_request.uri() == "/" { + self.url.clone() + } else { + reqwest::Url::parse(&http_request.uri().to_string()).expect("todo") + }; + let service_name = (*self.service).to_owned(); + + let ( + http::request::Parts { + method, + version, + headers, + extensions: _, + .. + }, + body, + ) = http_request.into_parts(); + + let mut request = http_client + .request(method, target_url) + .json(&body) + .build() + .map_err(|err| { + tracing::error!(fetch_error = err.to_string().as_str()); + graphql::FetchError::SubrequestHttpError { + service: (*self.service).to_owned(), + reason: err.to_string(), + } + })?; + request.headers_mut().extend(headers.into_iter()); + *request.version_mut() = version; + + let response = http_client.execute(request).await.map_err(|err| { + tracing::error!(fetch_error = err.to_string().as_str()); + graphql::FetchError::SubrequestHttpError { + service: (*self.service).to_owned(), + reason: err.to_string(), + } + })?; + let body = response + .bytes() + .instrument(tracing::debug_span!("aggregate_response_data")) + .await + .map_err(|err| { + tracing::error!(fetch_error = format!("{:?}", err).as_str()); + + graphql::FetchError::SubrequestHttpError { + service: service_name.clone(), + reason: err.to_string(), + } + })?; + + let graphql: graphql::Response = + tracing::debug_span!("parse_subgraph_response").in_scope(|| { + graphql::Response::from_bytes(&service_name, body).map_err(|error| { + graphql::FetchError::SubrequestMalformedResponse { + service: service_name.clone(), + reason: error.to_string(), + } + }) + })?; + + Ok(http::Response::builder().body(graphql).expect("no argument can fail to parse or converted to the internal representation here; qed").into()) + } } impl tower::Service for ReqwestSubgraphService { @@ -48,14 +190,9 @@ impl tower::Service for ReqwestSubgraphService { Poll::Ready(Ok(())) } - fn call( - &mut self, - graphql::SubgraphRequest { - http_request, - context, - }: graphql::SubgraphRequest, - ) -> Self::Future { - let http_client = self.http_client.clone(); + fn call(&mut self, request: graphql::SubgraphRequest) -> Self::Future { + Box::pin(self.dedup(request).map_err(|e| e.into())) + /*let http_client = self.http_client.clone(); let target_url = if http_request.uri() == "/" { self.url.clone() } else { @@ -110,7 +247,7 @@ impl tower::Service for ReqwestSubgraphService { response: http::Response::builder().body(graphql).expect("no argument can fail to parse or converted to the internal representation here; qed").into(), context, }) - }) + })*/ } } From b98fe16838570d32f1c0f7a4de314fbdc42875be Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Fri, 11 Feb 2022 15:16:22 +0100 Subject: [PATCH 2/6] implement query deduplication as a layer --- apollo-router-core/src/context.rs | 2 +- .../src/layers/deduplication.rs | 126 +++++++++++++ apollo-router-core/src/layers/mod.rs | 1 + apollo-router-core/src/services/mod.rs | 1 + apollo-router/src/reqwest_subgraph_service.rs | 166 ++---------------- apollo-router/src/router_factory.rs | 7 +- 6 files changed, 151 insertions(+), 152 deletions(-) create mode 100644 apollo-router-core/src/layers/deduplication.rs diff --git a/apollo-router-core/src/context.rs b/apollo-router-core/src/context.rs index 20d8085640..eb28d55a52 100644 --- a/apollo-router-core/src/context.rs +++ b/apollo-router-core/src/context.rs @@ -4,7 +4,7 @@ use futures::Future; use std::sync::Arc; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Context>> { /// Original request to the Router. pub request: T, diff --git a/apollo-router-core/src/layers/deduplication.rs b/apollo-router-core/src/layers/deduplication.rs new file mode 100644 index 0000000000..314f5c4b16 --- /dev/null +++ b/apollo-router-core/src/layers/deduplication.rs @@ -0,0 +1,126 @@ +use crate::{http_compat, Request, SubgraphRequest, SubgraphResponse}; +use futures::{future::BoxFuture, lock::Mutex}; +use std::{collections::HashMap, sync::Arc, task::Poll}; +use tokio::sync::broadcast::{self, Sender}; +use tower::{BoxError, Layer, ServiceExt}; + +pub struct QueryDeduplicationLayer; + +impl Layer for QueryDeduplicationLayer +where + S: tower::Service + Clone, +{ + type Service = QueryDeduplicationService; + + fn layer(&self, service: S) -> Self::Service { + QueryDeduplicationService::new(service) + } +} + +pub struct QueryDeduplicationService { + service: S, + wait_map: Arc< + Mutex, Sender>>>, + >, +} + +impl QueryDeduplicationService +where + S: tower::Service + Clone, +{ + fn new(service: S) -> Self { + QueryDeduplicationService { + service, + wait_map: Arc::new(Mutex::new(HashMap::new())), + } + } + + async fn dedup( + service: S, + wait_map: Arc< + Mutex, Sender>>>, + >, + request: SubgraphRequest, + ) -> Result { + loop { + let mut locked_wait_map = wait_map.lock().await; + match locked_wait_map.get_mut(&request.http_request) { + Some(waiter) => { + // Register interest in key + let mut receiver = waiter.subscribe(); + drop(locked_wait_map); + + match receiver.recv().await { + Ok(value) => { + return value + .map(|response| SubgraphResponse { + response: response.response, + context: request.context, + }) + .map_err(|e| e.into()) + } + // there was an issue with the broadcast channel, retry fetching + Err(_) => continue, + } + } + None => { + let (tx, _rx) = broadcast::channel(1); + locked_wait_map.insert(request.http_request.clone(), tx.clone()); + drop(locked_wait_map); + + let context = request.context.clone(); + let http_request = request.http_request.clone(); + let res = service.ready_oneshot().await?.call(request).await; + + { + let mut locked_wait_map = wait_map.lock().await; + locked_wait_map.remove(&http_request); + } + + // Let our waiters know + let broadcast_value = res + .as_ref() + .map(|response| response.clone()) + .map_err(|e| e.to_string()); + + // Our use case is very specific, so we are sure that + // we won't get any errors here. + tokio::task::spawn_blocking(move || { + tx.send(broadcast_value) + .expect("there is always at least one receiver alive, the _rx guard; qed") + }).await + .expect("can only fail if the task is aborted or if the internal code panics, neither is possible here; qed"); + + return res.map(|response| SubgraphResponse { + response: response.response, + context, + }); + } + } + } + } +} + +impl tower::Service for QueryDeduplicationService +where + S: tower::Service + + Clone + + Send + + 'static, + >::Future: Send + 'static, +{ + type Response = SubgraphResponse; + type Error = BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, request: SubgraphRequest) -> Self::Future { + let service = self.service.clone(); + let wait_map = self.wait_map.clone(); + + Box::pin(async move { Self::dedup(service, wait_map, request).await }) + } +} diff --git a/apollo-router-core/src/layers/mod.rs b/apollo-router-core/src/layers/mod.rs index a4e60a3f8a..62a8ce8f98 100644 --- a/apollo-router-core/src/layers/mod.rs +++ b/apollo-router-core/src/layers/mod.rs @@ -1,2 +1,3 @@ pub mod cache; +pub mod deduplication; pub mod header_manipulation; diff --git a/apollo-router-core/src/services/mod.rs b/apollo-router-core/src/services/mod.rs index 0e9e31af85..f1ef424d61 100644 --- a/apollo-router-core/src/services/mod.rs +++ b/apollo-router-core/src/services/mod.rs @@ -95,6 +95,7 @@ pub struct SubgraphRequest { } assert_impl_all!(SubgraphResponse: Send); +#[derive(Clone, Debug)] pub struct SubgraphResponse { pub response: http_compat::Response, diff --git a/apollo-router/src/reqwest_subgraph_service.rs b/apollo-router/src/reqwest_subgraph_service.rs index ef10a1abdb..42faf21199 100644 --- a/apollo-router/src/reqwest_subgraph_service.rs +++ b/apollo-router/src/reqwest_subgraph_service.rs @@ -1,10 +1,8 @@ use apollo_router_core::prelude::*; -use futures::lock::Mutex; -use futures::{future::BoxFuture, TryFutureExt}; -use std::collections::HashMap; +use futures::future::BoxFuture; use std::sync::Arc; use std::task::Poll; -use tokio::sync::broadcast::{self, Sender}; +use tower::BoxError; use tracing::Instrument; use typed_builder::TypedBuilder; @@ -18,20 +16,6 @@ pub struct ReqwestSubgraphService { // a url::Url instead of using the http crate // for now, to make things work, if the URL in the request is /, we use this URL url: reqwest::Url, - - wait_map: Arc< - Mutex< - HashMap< - apollo_router_core::http_compat::Request, - Sender< - Result< - apollo_router_core::http_compat::Response, - graphql::FetchError, - >, - >, - >, - >, - >, } impl ReqwestSubgraphService { @@ -51,138 +35,13 @@ impl ReqwestSubgraphService { .build(), service: Arc::new(service), url, - wait_map: Arc::new(Mutex::new(HashMap::new())), - } - } - - async fn dedup( - &self, - graphql::SubgraphRequest { - http_request, - context, - }: graphql::SubgraphRequest, - ) -> Result { - loop { - let mut locked_wait_map = self.wait_map.lock().await; - match locked_wait_map.get_mut(&http_request) { - Some(waiter) => { - // Register interest in key - let mut receiver = waiter.subscribe(); - drop(locked_wait_map); - - match receiver.recv().await { - Ok(value) => { - return value - .map(|response| graphql::SubgraphResponse { response, context }) - } - // there was an issue with the broadcast channel, retry fetching - Err(_) => continue, - } - } - None => { - let (tx, _rx) = broadcast::channel(1); - locked_wait_map.insert(http_request.clone(), tx.clone()); - drop(locked_wait_map); - - let res = self.fetch(http_request.clone()).await; - - { - let mut locked_wait_map = self.wait_map.lock().await; - locked_wait_map.remove(&http_request); - } - - // Let our waiters know - let broadcast_value = res.clone(); - // Our use case is very specific, so we are sure that - // we won't get any errors here. - tokio::task::spawn_blocking(move || { - tx.send(broadcast_value) - .expect("there is always at least one receiver alive, the _rx guard; qed") - }).await - .expect("can only fail if the task is aborted or if the internal code panics, neither is possible here; qed"); - return res.map(|response| graphql::SubgraphResponse { response, context }); - } - } } } - - async fn fetch( - &self, - http_request: apollo_router_core::http_compat::Request, - ) -> Result< - apollo_router_core::http_compat::Response, - graphql::FetchError, - > { - let http_client = self.http_client.clone(); - let target_url = if http_request.uri() == "/" { - self.url.clone() - } else { - reqwest::Url::parse(&http_request.uri().to_string()).expect("todo") - }; - let service_name = (*self.service).to_owned(); - - let ( - http::request::Parts { - method, - version, - headers, - extensions: _, - .. - }, - body, - ) = http_request.into_parts(); - - let mut request = http_client - .request(method, target_url) - .json(&body) - .build() - .map_err(|err| { - tracing::error!(fetch_error = err.to_string().as_str()); - graphql::FetchError::SubrequestHttpError { - service: (*self.service).to_owned(), - reason: err.to_string(), - } - })?; - request.headers_mut().extend(headers.into_iter()); - *request.version_mut() = version; - - let response = http_client.execute(request).await.map_err(|err| { - tracing::error!(fetch_error = err.to_string().as_str()); - graphql::FetchError::SubrequestHttpError { - service: (*self.service).to_owned(), - reason: err.to_string(), - } - })?; - let body = response - .bytes() - .instrument(tracing::debug_span!("aggregate_response_data")) - .await - .map_err(|err| { - tracing::error!(fetch_error = format!("{:?}", err).as_str()); - - graphql::FetchError::SubrequestHttpError { - service: service_name.clone(), - reason: err.to_string(), - } - })?; - - let graphql: graphql::Response = - tracing::debug_span!("parse_subgraph_response").in_scope(|| { - graphql::Response::from_bytes(&service_name, body).map_err(|error| { - graphql::FetchError::SubrequestMalformedResponse { - service: service_name.clone(), - reason: error.to_string(), - } - }) - })?; - - Ok(http::Response::builder().body(graphql).expect("no argument can fail to parse or converted to the internal representation here; qed").into()) - } } impl tower::Service for ReqwestSubgraphService { type Response = graphql::SubgraphResponse; - type Error = tower::BoxError; + type Error = BoxError; type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> { @@ -191,8 +50,12 @@ impl tower::Service for ReqwestSubgraphService { } fn call(&mut self, request: graphql::SubgraphRequest) -> Self::Future { - Box::pin(self.dedup(request).map_err(|e| e.into())) - /*let http_client = self.http_client.clone(); + let graphql::SubgraphRequest { + http_request, + context, + } = request; + + let http_client = self.http_client.clone(); let target_url = if http_request.uri() == "/" { self.url.clone() } else { @@ -219,7 +82,14 @@ impl tower::Service for ReqwestSubgraphService { request.headers_mut().extend(headers.into_iter()); *request.version_mut() = version; - let response = http_client.execute(request).await?; + let response = http_client.execute(request).await.map_err(|err| { + tracing::error!(fetch_error = format!("{:?}", err).as_str()); + + graphql::FetchError::SubrequestHttpError { + service: service_name.clone(), + reason: err.to_string(), + } + })?; let body = response .bytes() .instrument(tracing::debug_span!("aggregate_response_data")) @@ -247,7 +117,7 @@ impl tower::Service for ReqwestSubgraphService { response: http::Response::builder().body(graphql).expect("no argument can fail to parse or converted to the internal representation here; qed").into(), context, }) - })*/ + }) } } diff --git a/apollo-router/src/router_factory.rs b/apollo-router/src/router_factory.rs index 0248ce7ced..a636fd4b0a 100644 --- a/apollo-router/src/router_factory.rs +++ b/apollo-router/src/router_factory.rs @@ -1,5 +1,6 @@ use crate::configuration::{Configuration, ConfigurationError}; use crate::reqwest_subgraph_service::ReqwestSubgraphService; +use apollo_router_core::deduplication::QueryDeduplicationLayer; use apollo_router_core::header_manipulation::HeaderManipulationLayer; use apollo_router_core::prelude::*; use apollo_router_core::{ @@ -74,9 +75,9 @@ impl RouterServiceFactory for YamlRouterServiceFactory { let mut builder = PluggableRouterServiceBuilder::new(schema, buffer, dispatcher.clone()); for (name, subgraph) in &configuration.subgraphs { - let mut subgraph_service = BoxService::new(ReqwestSubgraphService::new( - name.to_string(), - subgraph.routing_url.clone(), + let dedup_layer = QueryDeduplicationLayer; + let mut subgraph_service = BoxService::new(dedup_layer.layer( + ReqwestSubgraphService::new(name.to_string(), subgraph.routing_url.clone()), )); for layer in &subgraph.layers { From ab5ffb41f8e6435129a447dc8ad00c175c85fbfb Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Fri, 11 Feb 2022 15:25:07 +0100 Subject: [PATCH 3/6] check the operation kind to deduplicate only for queries mutations and subscriptions should not be deduplicated --- apollo-router-core/src/layers/deduplication.rs | 13 +++++++++---- apollo-router-core/src/query_planner/mod.rs | 10 +++++++++- apollo-router-core/src/services/mod.rs | 9 +++++++++ apollo-router/src/reqwest_subgraph_service.rs | 1 + 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/apollo-router-core/src/layers/deduplication.rs b/apollo-router-core/src/layers/deduplication.rs index 314f5c4b16..8417c520b8 100644 --- a/apollo-router-core/src/layers/deduplication.rs +++ b/apollo-router-core/src/layers/deduplication.rs @@ -1,4 +1,4 @@ -use crate::{http_compat, Request, SubgraphRequest, SubgraphResponse}; +use crate::{http_compat, OperationKind, Request, SubgraphRequest, SubgraphResponse}; use futures::{future::BoxFuture, lock::Mutex}; use std::{collections::HashMap, sync::Arc, task::Poll}; use tokio::sync::broadcast::{self, Sender}; @@ -118,9 +118,14 @@ where } fn call(&mut self, request: SubgraphRequest) -> Self::Future { - let service = self.service.clone(); - let wait_map = self.wait_map.clone(); + let mut service = self.service.clone(); - Box::pin(async move { Self::dedup(service, wait_map, request).await }) + if request.operation_kind == OperationKind::Query { + let wait_map = self.wait_map.clone(); + + Box::pin(async move { Self::dedup(service, wait_map, request).await }) + } else { + Box::pin(async move { service.call(request).await }) + } } } diff --git a/apollo-router-core/src/query_planner/mod.rs b/apollo-router-core/src/query_planner/mod.rs index 1aac741934..3ea651b218 100644 --- a/apollo-router-core/src/query_planner/mod.rs +++ b/apollo-router-core/src/query_planner/mod.rs @@ -236,7 +236,8 @@ mod fetch { operation: String, /// The kind of operation (query, mutation or subscription) - operationKind: String, + #[serde(rename(deserialize = "operationKind"))] + operation_kind: String, } struct Variables { @@ -314,6 +315,7 @@ mod fetch { ) -> Result { let FetchNode { operation, + operation_kind, service_name, .. } = self; @@ -343,6 +345,12 @@ mod fetch { .unwrap() .into(), context: context.clone(), + operation_kind: match operation_kind.as_str() { + "query" => OperationKind::Query, + "mutation" => OperationKind::Mutation, + "subscription" => OperationKind::Subscription, + _ => unreachable!(), + }, }; let service = service_registry diff --git a/apollo-router-core/src/services/mod.rs b/apollo-router-core/src/services/mod.rs index f1ef424d61..0e6d6f5839 100644 --- a/apollo-router-core/src/services/mod.rs +++ b/apollo-router-core/src/services/mod.rs @@ -92,6 +92,15 @@ pub struct SubgraphRequest { pub http_request: http_compat::Request, pub context: Context, + + pub operation_kind: OperationKind, +} + +#[derive(Clone, Debug, PartialEq)] +pub enum OperationKind { + Query, + Mutation, + Subscription, } assert_impl_all!(SubgraphResponse: Send); diff --git a/apollo-router/src/reqwest_subgraph_service.rs b/apollo-router/src/reqwest_subgraph_service.rs index 42faf21199..78931ba6e3 100644 --- a/apollo-router/src/reqwest_subgraph_service.rs +++ b/apollo-router/src/reqwest_subgraph_service.rs @@ -53,6 +53,7 @@ impl tower::Service for ReqwestSubgraphService { let graphql::SubgraphRequest { http_request, context, + .. } = request; let http_client = self.http_client.clone(); From 3e69ce8da317cbb4d6c1a255dc88653a7dbd769f Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Fri, 11 Feb 2022 15:37:57 +0100 Subject: [PATCH 4/6] fix tests --- apollo-router-core/src/layers/deduplication.rs | 11 +++++------ ...ner__router_bridge_query_planner__tests__plan.snap | 3 ++- ...e__query_planner__tests__query_plan_from_json.snap | 6 ++++++ .../src/query_planner/testdata/query_plan.json | 5 +++++ 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/apollo-router-core/src/layers/deduplication.rs b/apollo-router-core/src/layers/deduplication.rs index 8417c520b8..257cfe5fce 100644 --- a/apollo-router-core/src/layers/deduplication.rs +++ b/apollo-router-core/src/layers/deduplication.rs @@ -17,11 +17,12 @@ where } } +type WaitMap = + Arc, Sender>>>>; + pub struct QueryDeduplicationService { service: S, - wait_map: Arc< - Mutex, Sender>>>, - >, + wait_map: WaitMap, } impl QueryDeduplicationService @@ -37,9 +38,7 @@ where async fn dedup( service: S, - wait_map: Arc< - Mutex, Sender>>>, - >, + wait_map: WaitMap, request: SubgraphRequest, ) -> Result { loop { diff --git a/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__router_bridge_query_planner__tests__plan.snap b/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__router_bridge_query_planner__tests__plan.snap index 01e1e8b0f4..96f25c51fb 100644 --- a/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__router_bridge_query_planner__tests__plan.snap +++ b/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__router_bridge_query_planner__tests__plan.snap @@ -1,6 +1,6 @@ --- source: apollo-router-core/src/query_planner/router_bridge_query_planner.rs -assertion_line: 97 +assertion_line: 133 expression: result --- @@ -11,6 +11,7 @@ QueryPlan { requires: [], variable_usages: [], operation: "{me{name{first last}}}", + operation_kind: "query", }, ), } diff --git a/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__tests__query_plan_from_json.snap b/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__tests__query_plan_from_json.snap index 784b6a943d..977ab1f2b0 100644 --- a/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__tests__query_plan_from_json.snap +++ b/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__tests__query_plan_from_json.snap @@ -1,5 +1,6 @@ --- source: apollo-router-core/src/query_planner/mod.rs +assertion_line: 452 expression: query_plan --- @@ -11,6 +12,7 @@ Sequence { requires: [], variable_usages: [], operation: "{topProducts{__typename ...on Book{__typename isbn}...on Furniture{name}}product(upc:\"1\"){__typename ...on Book{__typename isbn}...on Furniture{name}}}", + operation_kind: "query", }, ), Parallel { @@ -59,6 +61,7 @@ Sequence { "test_variable", ], operation: "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{__typename isbn title year}}}", + operation_kind: "query", }, ), }, @@ -117,6 +120,7 @@ Sequence { ], variable_usages: [], operation: "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{name}}}", + operation_kind: "query", }, ), }, @@ -164,6 +168,7 @@ Sequence { ], variable_usages: [], operation: "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{__typename isbn title year}}}", + operation_kind: "query", }, ), }, @@ -221,6 +226,7 @@ Sequence { ], variable_usages: [], operation: "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{name}}}", + operation_kind: "query", }, ), }, diff --git a/apollo-router-core/src/query_planner/testdata/query_plan.json b/apollo-router-core/src/query_planner/testdata/query_plan.json index de7f86d414..42eea43ef3 100644 --- a/apollo-router-core/src/query_planner/testdata/query_plan.json +++ b/apollo-router-core/src/query_planner/testdata/query_plan.json @@ -5,6 +5,7 @@ "kind": "Fetch", "serviceName": "product", "variableUsages": [], + "operationKind": "query", "operation": "{topProducts{__typename ...on Book{__typename isbn}...on Furniture{name}}product(upc:\"1\"){__typename ...on Book{__typename isbn}...on Furniture{name}}}" }, { @@ -30,6 +31,7 @@ } ], "variableUsages": ["test_variable"], + "operationKind": "query", "operation": "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{__typename isbn title year}}}" } }, @@ -52,6 +54,7 @@ } ], "variableUsages": [], + "operationKind": "query", "operation": "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{name}}}" } } @@ -77,6 +80,7 @@ } ], "variableUsages": [], + "operationKind": "query", "operation": "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{__typename isbn title year}}}" } }, @@ -99,6 +103,7 @@ } ], "variableUsages": [], + "operationKind": "query", "operation": "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{name}}}" } } From 941c2aad8f8bc384b77e14704712799269c4642f Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Mon, 14 Feb 2022 18:19:03 +0100 Subject: [PATCH 5/6] ideserialize directly the OperationKind and store an enum --- apollo-router-core/src/query_planner/mod.rs | 33 ++++++++++++++----- ...ter_bridge_query_planner__tests__plan.snap | 2 +- ..._planner__tests__query_plan_from_json.snap | 12 +++---- apollo-router-core/src/services/mod.rs | 2 +- 4 files changed, 32 insertions(+), 17 deletions(-) diff --git a/apollo-router-core/src/query_planner/mod.rs b/apollo-router-core/src/query_planner/mod.rs index 3ea651b218..832e2dd668 100644 --- a/apollo-router-core/src/query_planner/mod.rs +++ b/apollo-router-core/src/query_planner/mod.rs @@ -212,7 +212,7 @@ impl PlanNode { mod fetch { use super::selection::{select_object, Selection}; use crate::prelude::graphql::*; - use serde::Deserialize; + use serde::{Deserialize, Deserializer}; use std::sync::Arc; use tower::ServiceExt; use tracing::{instrument, Instrument}; @@ -236,8 +236,28 @@ mod fetch { operation: String, /// The kind of operation (query, mutation or subscription) - #[serde(rename(deserialize = "operationKind"))] - operation_kind: String, + #[serde( + rename(deserialize = "operationKind"), + deserialize_with = "parse_operation" + )] + operation_kind: OperationKind, + } + + fn parse_operation<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let buf = String::deserialize(deserializer)?; + + match buf.as_str() { + "query" => Ok(OperationKind::Query), + "mutation" => Ok(OperationKind::Mutation), + "subscription" => Ok(OperationKind::Subscription), + s => Err(serde::de::Error::custom(format!( + "unknown operation kind: {}", + s + ))), + } } struct Variables { @@ -345,12 +365,7 @@ mod fetch { .unwrap() .into(), context: context.clone(), - operation_kind: match operation_kind.as_str() { - "query" => OperationKind::Query, - "mutation" => OperationKind::Mutation, - "subscription" => OperationKind::Subscription, - _ => unreachable!(), - }, + operation_kind: operation_kind.clone(), }; let service = service_registry diff --git a/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__router_bridge_query_planner__tests__plan.snap b/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__router_bridge_query_planner__tests__plan.snap index 96f25c51fb..87e223b0ec 100644 --- a/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__router_bridge_query_planner__tests__plan.snap +++ b/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__router_bridge_query_planner__tests__plan.snap @@ -11,7 +11,7 @@ QueryPlan { requires: [], variable_usages: [], operation: "{me{name{first last}}}", - operation_kind: "query", + operation_kind: Query, }, ), } diff --git a/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__tests__query_plan_from_json.snap b/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__tests__query_plan_from_json.snap index 977ab1f2b0..45f25c4487 100644 --- a/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__tests__query_plan_from_json.snap +++ b/apollo-router-core/src/query_planner/snapshots/apollo_router_core__query_planner__tests__query_plan_from_json.snap @@ -1,6 +1,6 @@ --- source: apollo-router-core/src/query_planner/mod.rs -assertion_line: 452 +assertion_line: 467 expression: query_plan --- @@ -12,7 +12,7 @@ Sequence { requires: [], variable_usages: [], operation: "{topProducts{__typename ...on Book{__typename isbn}...on Furniture{name}}product(upc:\"1\"){__typename ...on Book{__typename isbn}...on Furniture{name}}}", - operation_kind: "query", + operation_kind: Query, }, ), Parallel { @@ -61,7 +61,7 @@ Sequence { "test_variable", ], operation: "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{__typename isbn title year}}}", - operation_kind: "query", + operation_kind: Query, }, ), }, @@ -120,7 +120,7 @@ Sequence { ], variable_usages: [], operation: "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{name}}}", - operation_kind: "query", + operation_kind: Query, }, ), }, @@ -168,7 +168,7 @@ Sequence { ], variable_usages: [], operation: "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{__typename isbn title year}}}", - operation_kind: "query", + operation_kind: Query, }, ), }, @@ -226,7 +226,7 @@ Sequence { ], variable_usages: [], operation: "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{name}}}", - operation_kind: "query", + operation_kind: Query, }, ), }, diff --git a/apollo-router-core/src/services/mod.rs b/apollo-router-core/src/services/mod.rs index 3147a8b932..b6d1fc5bb9 100644 --- a/apollo-router-core/src/services/mod.rs +++ b/apollo-router-core/src/services/mod.rs @@ -95,7 +95,7 @@ pub struct SubgraphRequest { pub operation_kind: OperationKind, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Copy, Clone, Debug, PartialEq)] pub enum OperationKind { Query, Mutation, From a228e1bbc79f60e877d891eea3458b98dca0884c Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Mon, 14 Feb 2022 18:35:29 +0100 Subject: [PATCH 6/6] OperationKind is Copy now --- apollo-router-core/src/query_planner/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apollo-router-core/src/query_planner/mod.rs b/apollo-router-core/src/query_planner/mod.rs index 832e2dd668..16d6aa5af4 100644 --- a/apollo-router-core/src/query_planner/mod.rs +++ b/apollo-router-core/src/query_planner/mod.rs @@ -365,7 +365,7 @@ mod fetch { .unwrap() .into(), context: context.clone(), - operation_kind: operation_kind.clone(), + operation_kind: *operation_kind, }; let service = service_registry