diff --git a/.changesets/feat_geal_query_planner_plugins.md b/.changesets/feat_geal_query_planner_plugins.md new file mode 100644 index 0000000000..00d491b811 --- /dev/null +++ b/.changesets/feat_geal_query_planner_plugins.md @@ -0,0 +1,6 @@ +### Query planner plugins ([Issue #3150](https://github.com/apollographql/router/issues/3150)) + +We may need to modify a query between query plan caching and the query planner. This leads to the requirement to provide a query planner plugin capability. This capability is private to the router for now. +The plugins need an ApolloCompiler instance to perform useful work on the query, so the caching layer, in case of cache miss, will generate a compiler instance and transmit it as part of the request going through query planner plugins. At the end of the chain, the query planner extracts the modified query from the compiler, uses it to generate a query plan, and generates the selections of both the original and filtered query for response formatting. This is done to ensure that the response does not leak data removed in the filtered query, but still keeps a shape expected by the original query, using the null propagation. + +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/3177 \ No newline at end of file diff --git a/apollo-router/Cargo.toml b/apollo-router/Cargo.toml index 1449c3de2b..9d8d3f21f3 100644 --- a/apollo-router/Cargo.toml +++ b/apollo-router/Cargo.toml @@ -170,7 +170,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [ "json", "stream", ] } -router-bridge = "0.2.6+v2.4.7" +router-bridge = "0.2.7+v2.4.7" rust-embed="6.7.0" rustls = "0.20.8" rustls-pemfile = "1.0.2" diff --git a/apollo-router/src/query_planner/bridge_query_planner.rs b/apollo-router/src/query_planner/bridge_query_planner.rs index 603bd2123f..22f9cc5426 100644 --- a/apollo-router/src/query_planner/bridge_query_planner.rs +++ b/apollo-router/src/query_planner/bridge_query_planner.rs @@ -170,17 +170,32 @@ impl BridgeQueryPlanner { async fn plan( &self, - mut selections: Query, + original_query: String, + filtered_query: String, operation: Option, + + mut selections: Query, ) -> Result { let planner_result = self .planner - .plan(selections.string.clone(), operation) + .plan(filtered_query.clone(), operation.clone()) .await .map_err(QueryPlannerError::RouterBridgeError)? .into_result() .map_err(QueryPlannerError::from)?; + // the `statsReportKey` field should match the original query instead of the filtered query, to index them all under the same query + let operation_signature = if original_query != filtered_query { + Some( + self.planner + .operation_signature(original_query, operation) + .await + .map_err(QueryPlannerError::RouterBridgeError)?, + ) + } else { + None + }; + match planner_result { PlanSuccess { data: @@ -188,10 +203,15 @@ impl BridgeQueryPlanner { query_plan: QueryPlan { node: Some(node) }, formatted_query_plan, }, - usage_reporting, + mut usage_reporting, } => { let subselections = node.parse_subselections(&self.schema)?; selections.subselections = subselections; + + if let Some(sig) = operation_signature { + usage_reporting.stats_report_key = sig; + } + Ok(QueryPlannerContent::Plan { plan: Arc::new(super::QueryPlan { usage_reporting, @@ -208,9 +228,13 @@ impl BridgeQueryPlanner { query_plan: QueryPlan { node: None }, .. }, - usage_reporting, + mut usage_reporting, } => { failfast_debug!("empty query plan"); + if let Some(sig) = operation_signature { + usage_reporting.stats_report_key = sig; + } + Err(QueryPlannerError::EmptyPlan(usage_reporting)) } } @@ -232,11 +256,34 @@ impl Service for BridgeQueryPlanner { } fn call(&mut self, req: QueryPlannerRequest) -> Self::Future { + let QueryPlannerRequest { + query: original_query, + operation_name, + context, + compiler, + } = req; + let this = self.clone(); let fut = async move { let start = Instant::now(); + + let compiler_guard = compiler.lock().await; + let file_id = compiler_guard + .db + .source_file(QUERY_EXECUTABLE.into()) + .ok_or(QueryPlannerError::SpecError(SpecError::ParsingError( + "missing input file for query".to_string(), + )))?; + let filtered_query = compiler_guard.db.source_code(file_id); + drop(compiler_guard); + let res = this - .get(req.query, req.operation_name.to_owned(), req.compiler) + .get( + original_query, + filtered_query.to_string(), + operation_name.to_owned(), + compiler, + ) .await; let duration = start.elapsed().as_secs_f64(); tracing::info!(histogram.apollo_router_query_planning_time = duration,); @@ -244,18 +291,18 @@ impl Service for BridgeQueryPlanner { match res { Ok(query_planner_content) => Ok(QueryPlannerResponse::builder() .content(query_planner_content) - .context(req.context) + .context(context) .build()), Err(e) => { match &e { QueryPlannerError::PlanningErrors(pe) => { - req.context + context .private_entries .lock() .insert(pe.usage_reporting.clone()); } QueryPlannerError::SpecError(e) => { - req.context.private_entries.lock().insert(UsageReporting { + context.private_entries.lock().insert(UsageReporting { stats_report_key: e.get_error_key().to_string(), referenced_fields_by_type: HashMap::new(), }); @@ -275,12 +322,16 @@ impl Service for BridgeQueryPlanner { impl BridgeQueryPlanner { async fn get( &self, - query: String, + original_query: String, + filtered_query: String, operation_name: Option, compiler: Arc>, ) -> Result { - let selections = self - .parse_selections((query.clone(), operation_name.clone()), compiler) + let mut selections = self + .parse_selections( + (original_query.clone(), operation_name.clone()), + compiler.clone(), + ) .await?; if selections.contains_introspection() { @@ -302,11 +353,19 @@ impl BridgeQueryPlanner { response: Box::new(graphql::Response::builder().data(data).build()), }); } else { - return self.introspection(query.clone()).await; + return self.introspection(original_query).await; } } - self.plan(selections, operation_name).await + if filtered_query != original_query { + selections.filtered_query = Some(Arc::new( + self.parse_selections((filtered_query.clone(), operation_name.clone()), compiler) + .await?, + )); + } + + self.plan(original_query, filtered_query, operation_name, selections) + .await } } @@ -337,9 +396,14 @@ mod tests { #[test(tokio::test)] async fn test_plan() { - let result = plan(EXAMPLE_SCHEMA, include_str!("testdata/query.graphql"), None) - .await - .unwrap(); + let result = plan( + EXAMPLE_SCHEMA, + include_str!("testdata/query.graphql"), + include_str!("testdata/query.graphql"), + None, + ) + .await + .unwrap(); if let QueryPlannerContent::Plan { plan, .. } = result { insta::with_settings!({sort_maps => true}, { @@ -356,6 +420,7 @@ mod tests { let err = plan( EXAMPLE_SCHEMA, "fragment UnusedTestFragment on User { id } query { me { id } }", + "fragment UnusedTestFragment on User { id } query { me { id } }", None, ) .await @@ -397,7 +462,12 @@ mod tests { // it should be caught by the introspection part, but just in case, we check // that the query planner would return an empty plan error if it received an // introspection query - .plan(query, None) + .plan( + include_str!("testdata/unknown_introspection_query.graphql").to_string(), + include_str!("testdata/unknown_introspection_query.graphql").to_string(), + None, + query, + ) .await .unwrap_err(); @@ -415,7 +485,7 @@ mod tests { #[test(tokio::test)] async fn test_plan_error() { - let result = plan(EXAMPLE_SCHEMA, "", None).await; + let result = plan(EXAMPLE_SCHEMA, "", "", None).await; assert_eq!( "couldn't plan query: query validation errors: Syntax Error: Unexpected .", @@ -425,9 +495,14 @@ mod tests { #[test(tokio::test)] async fn test_single_aliased_root_typename() { - let result = plan(EXAMPLE_SCHEMA, "{ x: __typename }", None) - .await - .unwrap(); + let result = plan( + EXAMPLE_SCHEMA, + "{ x: __typename }", + "{ x: __typename }", + None, + ) + .await + .unwrap(); if let QueryPlannerContent::Introspection { response } = result { assert_eq!( r#"{"data":{"x":"Query"}}"#, @@ -440,9 +515,14 @@ mod tests { #[test(tokio::test)] async fn test_two_root_typenames() { - let result = plan(EXAMPLE_SCHEMA, "{ x: __typename __typename }", None) - .await - .unwrap(); + let result = plan( + EXAMPLE_SCHEMA, + "{ x: __typename __typename }", + "{ x: __typename __typename }", + None, + ) + .await + .unwrap(); if let QueryPlannerContent::Introspection { response } = result { assert_eq!( r#"{"data":{"x":"Query","__typename":"Query"}}"#, @@ -455,7 +535,8 @@ mod tests { async fn plan( schema: &str, - query: &str, + original_query: &str, + filtered_query: &str, operation_name: Option, ) -> Result { let mut configuration: Configuration = Default::default(); @@ -466,11 +547,12 @@ mod tests { .await .unwrap(); - let (compiler, _) = Query::make_compiler(query, &planner.schema(), &configuration); + let (compiler, _) = Query::make_compiler(original_query, &planner.schema(), &configuration); planner .get( - query.to_string(), + original_query.to_string(), + filtered_query.to_string(), operation_name, Arc::new(Mutex::new(compiler)), ) diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index 06056882b2..5b764b94bb 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -4,12 +4,16 @@ use std::sync::Arc; use std::task; use futures::future::BoxFuture; +use indexmap::IndexMap; +use query_planner::QueryPlannerPlugin; use router_bridge::planner::Planner; use router_bridge::planner::UsageReporting; use sha2::Digest; use sha2::Sha256; use tokio::sync::Mutex; +use tower::ServiceBuilder; use tower::ServiceExt; +use tower_service::Service; use tracing::Instrument; use crate::cache::DeduplicatingCache; @@ -18,11 +22,17 @@ use crate::error::QueryPlannerError; use crate::query_planner::BridgeQueryPlanner; use crate::query_planner::QueryPlanResult; use crate::services::layers::query_analysis::QueryAnalysisLayer; +use crate::services::query_planner; use crate::services::QueryPlannerContent; use crate::services::QueryPlannerRequest; use crate::services::QueryPlannerResponse; +use crate::spec::Schema; +use crate::Configuration; use crate::Context; +/// An [`IndexMap`] of available plugins. +pub(crate) type Plugins = IndexMap>; + /// A query planner wrapper that caches results. /// /// The query planner performs LRU caching. @@ -32,31 +42,38 @@ pub(crate) struct CachingQueryPlanner { DeduplicatingCache>>, >, delegate: T, - schema_id: Option, + schema: Arc, + plugins: Arc, } impl CachingQueryPlanner where T: tower::Service< - QueryPlannerRequest, - Response = QueryPlannerResponse, - Error = QueryPlannerError, - >, + QueryPlannerRequest, + Response = QueryPlannerResponse, + Error = QueryPlannerError, + > + Send, + >::Future: Send, { /// Creates a new query planner that caches the results of another [`QueryPlanner`]. pub(crate) async fn new( delegate: T, - schema_id: Option, - config: &crate::configuration::QueryPlanning, + schema: Arc, + configuration: &Configuration, + plugins: Plugins, ) -> CachingQueryPlanner { let cache = Arc::new( - DeduplicatingCache::from_configuration(&config.experimental_cache, "query planner") - .await, + DeduplicatingCache::from_configuration( + &configuration.supergraph.query_planning.experimental_cache, + "query planner", + ) + .await, ); Self { cache, delegate, - schema_id, + schema, + plugins: Arc::new(plugins), } } @@ -73,7 +90,16 @@ where query_analysis: &QueryAnalysisLayer, cache_keys: Vec<(String, Option)>, ) { - let schema_id = self.schema_id.clone(); + let schema_id = self.schema.schema_id.clone(); + + let mut service = ServiceBuilder::new().service( + self.plugins + .iter() + .rev() + .fold(self.delegate.clone().boxed(), |acc, (_, e)| { + e.query_planner_service(acc) + }), + ); let mut count = 0usize; for (query, operation) in cache_keys { @@ -94,7 +120,7 @@ where context: context.clone(), }; - let res = match self.delegate.ready().await { + let res = match service.ready().await { Ok(service) => service.call(request).await, Err(_) => break, }; @@ -125,7 +151,8 @@ impl CachingQueryPlanner { } } -impl tower::Service for CachingQueryPlanner +impl tower::Service + for CachingQueryPlanner where T: tower::Service< QueryPlannerRequest, @@ -142,9 +169,9 @@ where task::Poll::Ready(Ok(())) } - fn call(&mut self, request: QueryPlannerRequest) -> Self::Future { + fn call(&mut self, request: query_planner::CachingRequest) -> Self::Future { let mut qp = self.clone(); - let schema_id = self.schema_id.clone(); + let schema_id = self.schema.schema_id.clone(); Box::pin(async move { let caching_key = CachingQueryKey { schema_id, @@ -155,6 +182,20 @@ where let context = request.context.clone(); let entry = qp.cache.get(&caching_key).await; if entry.is_first() { + let query_planner::CachingRequest { + query, + operation_name, + context, + compiler, + } = request; + + let request = QueryPlannerRequest::builder() + .query(query) + .and_operation_name(operation_name) + .context(context) + .compiler(compiler) + .build(); + // some clients might timeout and cancel the request before query planning is finished, // so we execute it in a task that can continue even after the request was canceled and // the join handle was dropped. That way, the next similar query will use the cache instead @@ -343,12 +384,18 @@ mod tests { planner }); - let mut planner = CachingQueryPlanner::new( - delegate, - None, - &crate::configuration::QueryPlanning::default(), - ) - .await; + let configuration = Arc::new(crate::Configuration::default()); + let schema = Arc::new( + Schema::parse( + include_str!("testdata/schema.graphql"), + &configuration, + None, + ) + .unwrap(), + ); + + let mut planner = + CachingQueryPlanner::new(delegate, schema, &configuration, IndexMap::new()).await; let configuration = Configuration::default(); @@ -365,7 +412,7 @@ mod tests { for _ in 0..5 { assert!(planner - .call(QueryPlannerRequest::new( + .call(query_planner::CachingRequest::new( "query Me { me { username } }".to_string(), Some("".into()), compiler1.clone(), @@ -384,7 +431,7 @@ mod tests { )); assert!(planner - .call(QueryPlannerRequest::new( + .call(query_planner::CachingRequest::new( "query Me { me { name { first } } }".to_string(), Some("".into()), compiler2, @@ -440,16 +487,13 @@ mod tests { Query::make_compiler("query Me { me { username } }", &schema, &configuration).0, )); - let mut planner = CachingQueryPlanner::new( - delegate, - None, - &crate::configuration::QueryPlanning::default(), - ) - .await; + let mut planner = + CachingQueryPlanner::new(delegate, Arc::new(schema), &configuration, IndexMap::new()) + .await; for _ in 0..5 { assert!(planner - .call(QueryPlannerRequest::new( + .call(query_planner::CachingRequest::new( "query Me { me { username } }".to_string(), Some("".into()), compiler.clone(), diff --git a/apollo-router/src/services/execution_service.rs b/apollo-router/src/services/execution_service.rs index 01cb4e4430..2c5d28392c 100644 --- a/apollo-router/src/services/execution_service.rs +++ b/apollo-router/src/services/execution_service.rs @@ -109,14 +109,24 @@ impl Service for ExecutionService { let has_next = response.has_next.unwrap_or(true); tracing::debug_span!("format_response").in_scope(|| { - let paths = query.format_response( + let mut paths = Vec::new(); + if let Some(filtered_query) = query.filtered_query.as_ref() { + paths = filtered_query.format_response( + &mut response, + operation_name.as_deref(), + is_deferred, + variables.clone(), + schema.api_schema(), + ); + } + + paths.extend(query.format_response( &mut response, operation_name.as_deref(), is_deferred, variables.clone(), schema.api_schema(), - ); - + ).into_iter()); nullified_paths.extend(paths.into_iter()); }); diff --git a/apollo-router/src/services/query_planner.rs b/apollo-router/src/services/query_planner.rs index 68199b5382..5f0c57568f 100644 --- a/apollo-router/src/services/query_planner.rs +++ b/apollo-router/src/services/query_planner.rs @@ -3,12 +3,14 @@ use std::sync::Arc; use apollo_compiler::ApolloCompiler; +use async_trait::async_trait; use derivative::Derivative; use serde::Deserialize; use serde::Serialize; use static_assertions::assert_impl_all; use tokio::sync::Mutex; +use crate::error::QueryPlannerError; use crate::graphql; use crate::query_planner::QueryPlan; use crate::Context; @@ -34,9 +36,41 @@ impl Request { pub(crate) fn new( query: String, operation_name: Option, - compiler: Arc>, context: Context, + compiler: Arc>, ) -> Request { + Self { + query, + operation_name, + context, + compiler, + } + } +} + +/// [`Context`] for the request. +#[derive(Clone, Derivative)] +#[derivative(Debug)] +pub(crate) struct CachingRequest { + pub(crate) query: String, + pub(crate) operation_name: Option, + pub(crate) context: Context, + #[derivative(Debug = "ignore")] + pub(crate) compiler: Arc>, +} + +#[buildstructor::buildstructor] +impl CachingRequest { + /// This is the constructor (or builder) to use when constructing a real QueryPlannerRequest. + /// + /// Required parameters are required in non-testing code to create a QueryPlannerRequest. + #[builder] + pub(crate) fn new( + query: String, + operation_name: Option, + compiler: Arc>, + context: Context, + ) -> CachingRequest { Self { query, operation_name, @@ -81,3 +115,21 @@ impl Response { } } } + +pub(crate) type BoxService = tower::util::BoxService; +#[allow(dead_code)] +pub(crate) type BoxCloneService = + tower::util::BoxCloneService; +#[allow(dead_code)] +pub(crate) type ServiceResult = Result; +#[allow(dead_code)] +pub(crate) type Body = hyper::Body; +#[allow(dead_code)] +pub(crate) type Error = hyper::Error; + +#[async_trait] +pub(crate) trait QueryPlannerPlugin: Send + Sync + 'static { + /// This service runs right after the query planner cache, which means that it will be called once per unique + /// query, unless the cache entry was evicted + fn query_planner_service(&self, service: BoxService) -> BoxService; +} diff --git a/apollo-router/src/services/supergraph_service.rs b/apollo-router/src/services/supergraph_service.rs index 6b2cc91057..bcba01f531 100644 --- a/apollo-router/src/services/supergraph_service.rs +++ b/apollo-router/src/services/supergraph_service.rs @@ -39,10 +39,10 @@ use crate::plugins::traffic_shaping::APOLLO_TRAFFIC_SHAPING; use crate::query_planner::BridgeQueryPlanner; use crate::query_planner::CachingQueryPlanner; use crate::query_planner::QueryPlanResult; +use crate::services::query_planner; use crate::services::supergraph; use crate::services::ExecutionRequest; use crate::services::ExecutionResponse; -use crate::services::QueryPlannerRequest; use crate::services::QueryPlannerResponse; use crate::services::SupergraphRequest; use crate::services::SupergraphResponse; @@ -277,7 +277,7 @@ async fn plan_query( planning .call( - QueryPlannerRequest::builder() + query_planner::CachingRequest::builder() .query(query_str) .and_operation_name(operation_name) .compiler(compiler) @@ -349,8 +349,9 @@ impl PluggableSupergraphServiceBuilder { let schema = self.planner.schema(); let query_planner_service = CachingQueryPlanner::new( self.planner, - schema.schema_id.clone(), - &configuration.supergraph.query_planning, + schema.clone(), + &configuration, + IndexMap::new(), ) .await; diff --git a/apollo-router/src/spec/query.rs b/apollo-router/src/spec/query.rs index 067f7634ec..2d5b64c65d 100644 --- a/apollo-router/src/spec/query.rs +++ b/apollo-router/src/spec/query.rs @@ -55,6 +55,8 @@ pub(crate) struct Query { pub(crate) operations: Vec, #[derivative(PartialEq = "ignore", Hash = "ignore")] pub(crate) subselections: HashMap, + #[derivative(PartialEq = "ignore", Hash = "ignore")] + pub(crate) filtered_query: Option>, } #[derive(Debug, Derivative, Default)] @@ -292,6 +294,7 @@ impl Query { fragments, operations, subselections: HashMap::new(), + filtered_query: None, }) } @@ -311,6 +314,7 @@ impl Query { fragments, operations, subselections: HashMap::new(), + filtered_query: None, }) }