From bf68930ce849c719dca6fc50843ef393a62f6532 Mon Sep 17 00:00:00 2001 From: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> Date: Thu, 27 Jun 2024 14:00:02 +0200 Subject: [PATCH 1/5] feat(coprocessor): add conditions on coprocessor stages Signed-off-by: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> --- apollo-router/src/plugins/coprocessor/mod.rs | 51 +++++++- .../src/plugins/coprocessor/supergraph.rs | 38 +++++- apollo-router/src/plugins/coprocessor/test.rs | 14 +++ .../telemetry/config_new/conditions.rs | 6 +- .../plugins/telemetry/config_new/selectors.rs | 118 +++++++++++++++--- .../src/services/supergraph/service.rs | 15 +++ 6 files changed, 215 insertions(+), 27 deletions(-) diff --git a/apollo-router/src/plugins/coprocessor/mod.rs b/apollo-router/src/plugins/coprocessor/mod.rs index a21d4107e4..cba03ba5c6 100644 --- a/apollo-router/src/plugins/coprocessor/mod.rs +++ b/apollo-router/src/plugins/coprocessor/mod.rs @@ -36,6 +36,9 @@ use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer; use crate::layers::ServiceBuilderExt; use crate::plugin::Plugin; use crate::plugin::PluginInit; +use crate::plugins::telemetry::config_new::conditions::Condition; +use crate::plugins::telemetry::config_new::selectors::RouterSelector; +use crate::plugins::telemetry::config_new::selectors::SubgraphSelector; use crate::plugins::traffic_shaping::Http2Config; use crate::register_plugin; use crate::services; @@ -234,6 +237,9 @@ where #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] #[serde(default, deny_unknown_fields)] pub(super) struct RouterRequestConf { + /// Condition to trigger this stage + #[serde(skip_serializing)] + pub(super) condition: Option>, /// Send the headers pub(super) headers: bool, /// Send the context @@ -252,6 +258,9 @@ pub(super) struct RouterRequestConf { #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] #[serde(default, deny_unknown_fields)] pub(super) struct RouterResponseConf { + /// Condition to trigger this stage + #[serde(skip_serializing)] + pub(super) condition: Option>, /// Send the headers pub(super) headers: bool, /// Send the context @@ -267,6 +276,9 @@ pub(super) struct RouterResponseConf { #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] #[serde(default, deny_unknown_fields)] pub(super) struct SubgraphRequestConf { + /// Condition to trigger this stage + #[serde(skip_serializing)] + pub(super) condition: Option>, /// Send the headers pub(super) headers: bool, /// Send the context @@ -285,6 +297,9 @@ pub(super) struct SubgraphRequestConf { #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] #[serde(default, deny_unknown_fields)] pub(super) struct SubgraphResponseConf { + /// Condition to trigger this stage + #[serde(skip_serializing)] + pub(super) condition: Option>, /// Send the headers pub(super) headers: bool, /// Send the context @@ -598,7 +613,7 @@ async fn process_router_request_stage( coprocessor_url: String, sdl: Arc, mut request: router::Request, - request_config: RouterRequestConf, + mut request_config: RouterRequestConf, ) -> Result, BoxError> where C: Service, Response = http::Response, Error = BoxError> @@ -608,6 +623,14 @@ where + 'static, >>::Future: Send + 'static, { + let should_be_executed = request_config + .condition + .as_mut() + .map(|c| c.evaluate_request(&request) == Some(true)) + .unwrap_or(true); + if !should_be_executed { + return Ok(ControlFlow::Continue(request)); + } // Call into our out of process processor with a body of our body // First, extract the data we need from our request and prepare our // external call. Use our configuration to figure out which data to send. @@ -761,6 +784,14 @@ where + 'static, >>::Future: Send + 'static, { + let should_be_executed = response_config + .condition + .as_ref() + .map(|c| c.evaluate_response(&response)) + .unwrap_or(true); + if !should_be_executed { + return Ok(response); + } // split the response into parts + body let (parts, body) = response.response.into_parts(); @@ -945,7 +976,7 @@ async fn process_subgraph_request_stage( coprocessor_url: String, service_name: String, mut request: subgraph::Request, - request_config: SubgraphRequestConf, + mut request_config: SubgraphRequestConf, ) -> Result, BoxError> where C: Service, Response = http::Response, Error = BoxError> @@ -955,6 +986,14 @@ where + 'static, >>::Future: Send + 'static, { + let should_be_executed = request_config + .condition + .as_mut() + .map(|c| c.evaluate_request(&request) == Some(true)) + .unwrap_or(true); + if !should_be_executed { + return Ok(ControlFlow::Continue(request)); + } // Call into our out of process processor with a body of our body // First, extract the data we need from our request and prepare our // external call. Use our configuration to figure out which data to send. @@ -1100,6 +1139,14 @@ where + 'static, >>::Future: Send + 'static, { + let should_be_executed = response_config + .condition + .as_ref() + .map(|c| c.evaluate_response(&response)) + .unwrap_or(true); + if !should_be_executed { + return Ok(response); + } // Call into our out of process processor with a body of our body // First, extract the data we need from our response and prepare our // external call. Use our configuration to figure out which data to send. diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index 31175882f7..970bcf95c4 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -16,12 +16,17 @@ use crate::graphql; use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer; use crate::layers::ServiceBuilderExt; use crate::plugins::coprocessor::EXTERNAL_SPAN_NAME; +use crate::plugins::telemetry::config_new::conditions::Condition; +use crate::plugins::telemetry::config_new::selectors::SupergraphSelector; use crate::services::supergraph; /// What information is passed to a router request/response stage #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] #[serde(default, deny_unknown_fields)] pub(super) struct SupergraphRequestConf { + /// Condition to trigger this stage + #[serde(skip_serializing)] + pub(super) condition: Option>, /// Send the headers pub(super) headers: bool, /// Send the context @@ -38,6 +43,9 @@ pub(super) struct SupergraphRequestConf { #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] #[serde(default, deny_unknown_fields)] pub(super) struct SupergraphResponseConf { + /// Condition to trigger this stage + #[serde(skip_serializing)] + pub(super) condition: Option>, /// Send the headers pub(super) headers: bool, /// Send the context @@ -183,7 +191,7 @@ async fn process_supergraph_request_stage( coprocessor_url: String, sdl: Arc, mut request: supergraph::Request, - request_config: SupergraphRequestConf, + mut request_config: SupergraphRequestConf, ) -> Result, BoxError> where C: Service, Response = http::Response, Error = BoxError> @@ -193,6 +201,14 @@ where + 'static, >>::Future: Send + 'static, { + let should_be_executed = request_config + .condition + .as_mut() + .map(|c| c.evaluate_request(&request) == Some(true)) + .unwrap_or(true); + if !should_be_executed { + return Ok(ControlFlow::Continue(request)); + } // Call into our out of process processor with a body of our body // First, extract the data we need from our request and prepare our // external call. Use our configuration to figure out which data to send. @@ -331,6 +347,14 @@ where + 'static, >>::Future: Send + 'static, { + let should_be_executed = response_config + .condition + .as_ref() + .map(|c| c.evaluate_response(&response)) + .unwrap_or(true); + if !should_be_executed { + return Ok(response); + } // split the response into parts + body let (mut parts, body) = response.response.into_parts(); @@ -419,8 +443,16 @@ where let generator_map_context = map_context.clone(); let generator_sdl_to_send = sdl_to_send.clone(); let generator_id = map_context.id.clone(); + let should_be_executed = response_config + .condition + .as_ref() + .map(|c| c.evaluate_event_response(&deferred_response, &map_context)) + .unwrap_or(true); async move { + if !should_be_executed { + return Ok(deferred_response); + } let body_to_send = response_config.body.then(|| { serde_json::to_value(&deferred_response).expect("serialization will not fail") }); @@ -564,6 +596,7 @@ mod tests { async fn external_plugin_supergraph_request() { let supergraph_stage = SupergraphStage { request: SupergraphRequestConf { + condition: Default::default(), headers: false, context: false, body: true, @@ -697,6 +730,7 @@ mod tests { async fn external_plugin_supergraph_request_controlflow_break() { let supergraph_stage = SupergraphStage { request: SupergraphRequestConf { + condition: Default::default(), headers: false, context: false, body: true, @@ -768,6 +802,7 @@ mod tests { async fn external_plugin_supergraph_response() { let supergraph_stage = SupergraphStage { response: SupergraphResponseConf { + condition: Default::default(), headers: true, context: true, body: true, @@ -899,6 +934,7 @@ mod tests { async fn multi_part() { let supergraph_stage = SupergraphStage { response: SupergraphResponseConf { + condition: Default::default(), headers: true, context: true, body: true, diff --git a/apollo-router/src/plugins/coprocessor/test.rs b/apollo-router/src/plugins/coprocessor/test.rs index 27fd309382..ed50cd8505 100644 --- a/apollo-router/src/plugins/coprocessor/test.rs +++ b/apollo-router/src/plugins/coprocessor/test.rs @@ -98,6 +98,7 @@ mod tests { async fn coprocessor_returning_the_wrong_version_should_fail() { let router_stage = RouterStage { request: RouterRequestConf { + condition: Default::default(), headers: true, context: true, body: true, @@ -157,6 +158,7 @@ mod tests { async fn coprocessor_returning_the_wrong_stage_should_fail() { let router_stage = RouterStage { request: RouterRequestConf { + condition: Default::default(), headers: true, context: true, body: true, @@ -216,6 +218,7 @@ mod tests { async fn coprocessor_missing_request_control_should_fail() { let router_stage = RouterStage { request: RouterRequestConf { + condition: Default::default(), headers: true, context: true, body: true, @@ -274,6 +277,7 @@ mod tests { async fn coprocessor_subgraph_with_invalid_response_body_should_fail() { let subgraph_stage = SubgraphStage { request: SubgraphRequestConf { + condition: Default::default(), headers: false, context: false, body: true, @@ -336,6 +340,7 @@ mod tests { async fn external_plugin_subgraph_request() { let subgraph_stage = SubgraphStage { request: SubgraphRequestConf { + condition: Default::default(), headers: false, context: false, body: true, @@ -465,6 +470,7 @@ mod tests { async fn external_plugin_subgraph_request_controlflow_break() { let subgraph_stage = SubgraphStage { request: SubgraphRequestConf { + condition: Default::default(), headers: false, context: false, body: true, @@ -533,6 +539,7 @@ mod tests { async fn external_plugin_subgraph_request_controlflow_break_with_message_string() { let subgraph_stage = SubgraphStage { request: SubgraphRequestConf { + condition: Default::default(), headers: false, context: false, body: true, @@ -597,6 +604,7 @@ mod tests { let subgraph_stage = SubgraphStage { request: Default::default(), response: SubgraphResponseConf { + condition: Default::default(), headers: false, context: false, body: true, @@ -708,6 +716,7 @@ mod tests { let supergraph_stage = SupergraphStage { request: Default::default(), response: SupergraphResponseConf { + condition: Default::default(), headers: false, context: false, body: true, @@ -768,6 +777,7 @@ mod tests { async fn external_plugin_router_request() { let router_stage = RouterStage { request: RouterRequestConf { + condition: Default::default(), headers: true, context: true, body: true, @@ -886,6 +896,7 @@ mod tests { async fn external_plugin_router_request_http_get() { let router_stage = RouterStage { request: RouterRequestConf { + condition: Default::default(), headers: true, context: true, body: true, @@ -1014,6 +1025,7 @@ mod tests { async fn external_plugin_router_request_controlflow_break() { let router_stage = RouterStage { request: RouterRequestConf { + condition: Default::default(), headers: true, context: true, body: true, @@ -1102,6 +1114,7 @@ mod tests { async fn external_plugin_router_request_controlflow_break_with_message_string() { let router_stage = RouterStage { request: RouterRequestConf { + condition: Default::default(), headers: true, context: true, body: true, @@ -1181,6 +1194,7 @@ mod tests { async fn external_plugin_router_response() { let router_stage = RouterStage { response: RouterResponseConf { + condition: Default::default(), headers: true, context: true, body: true, diff --git a/apollo-router/src/plugins/telemetry/config_new/conditions.rs b/apollo-router/src/plugins/telemetry/config_new/conditions.rs index 34662e8f97..64c7db71d8 100644 --- a/apollo-router/src/plugins/telemetry/config_new/conditions.rs +++ b/apollo-router/src/plugins/telemetry/config_new/conditions.rs @@ -7,8 +7,7 @@ use crate::plugins::telemetry::config::AttributeValue; use crate::plugins::telemetry::config_new::Selector; use crate::Context; -#[derive(Deserialize, JsonSchema, Clone, Debug)] -#[cfg_attr(test, derive(PartialEq))] +#[derive(Deserialize, JsonSchema, Clone, Debug, PartialEq)] #[serde(deny_unknown_fields, rename_all = "snake_case")] pub(crate) enum Condition { /// A condition to check a selection against a value. @@ -43,8 +42,7 @@ impl Condition<()> { } } -#[derive(Deserialize, JsonSchema, Clone, Debug)] -#[cfg_attr(test, derive(PartialEq))] +#[derive(Deserialize, JsonSchema, Clone, Debug, PartialEq)] #[serde(deny_unknown_fields, rename_all = "snake_case", untagged)] pub(crate) enum SelectorOrValue { /// A constant value. diff --git a/apollo-router/src/plugins/telemetry/config_new/selectors.rs b/apollo-router/src/plugins/telemetry/config_new/selectors.rs index 05503134b0..391f983974 100644 --- a/apollo-router/src/plugins/telemetry/config_new/selectors.rs +++ b/apollo-router/src/plugins/telemetry/config_new/selectors.rs @@ -27,11 +27,11 @@ use crate::query_planner::APOLLO_OPERATION_ID; use crate::services::router; use crate::services::subgraph; use crate::services::supergraph; +use crate::services::FIRST_EVENT_CONTEXT_KEY; use crate::spec::operation_limits::OperationLimits; use crate::Context; -#[derive(Deserialize, JsonSchema, Clone, Debug)] -#[cfg_attr(test, derive(PartialEq))] +#[derive(Deserialize, JsonSchema, Clone, Debug, PartialEq)] #[serde(deny_unknown_fields, rename_all = "snake_case")] pub(crate) enum TraceIdFormat { /// Open Telemetry trace ID, a hex string. @@ -40,8 +40,7 @@ pub(crate) enum TraceIdFormat { Datadog, } -#[derive(Deserialize, JsonSchema, Clone, Debug)] -#[cfg_attr(test, derive(PartialEq))] +#[derive(Deserialize, JsonSchema, Clone, Debug, PartialEq)] #[serde(deny_unknown_fields, rename_all = "snake_case")] pub(crate) enum OperationName { /// The raw operation name. @@ -51,8 +50,7 @@ pub(crate) enum OperationName { } #[allow(dead_code)] -#[derive(Deserialize, JsonSchema, Clone, Debug)] -#[cfg_attr(test, derive(PartialEq))] +#[derive(Deserialize, JsonSchema, Clone, Debug, PartialEq)] #[serde(deny_unknown_fields, rename_all = "snake_case")] pub(crate) enum ErrorRepr { // /// The error code if available @@ -61,8 +59,7 @@ pub(crate) enum ErrorRepr { Reason, } -#[derive(Deserialize, JsonSchema, Clone, Debug)] -#[cfg_attr(test, derive(PartialEq))] +#[derive(Deserialize, JsonSchema, Clone, Debug, PartialEq)] #[serde(deny_unknown_fields, rename_all = "snake_case")] pub(crate) enum Query { /// The raw query kind. @@ -77,16 +74,14 @@ pub(crate) enum Query { RootFields, } -#[derive(Deserialize, JsonSchema, Clone, Debug)] -#[cfg_attr(test, derive(PartialEq))] +#[derive(Deserialize, JsonSchema, Clone, Debug, PartialEq)] #[serde(deny_unknown_fields, rename_all = "snake_case")] pub(crate) enum SubgraphQuery { /// The raw query kind. String, } -#[derive(Deserialize, JsonSchema, Clone, Debug)] -#[cfg_attr(test, derive(PartialEq))] +#[derive(Deserialize, JsonSchema, Clone, Debug, PartialEq)] #[serde(deny_unknown_fields, rename_all = "snake_case")] pub(crate) enum ResponseStatus { /// The http status code. @@ -95,8 +90,7 @@ pub(crate) enum ResponseStatus { Reason, } -#[derive(Deserialize, JsonSchema, Clone, Debug)] -#[cfg_attr(test, derive(PartialEq))] +#[derive(Deserialize, JsonSchema, Clone, Debug, PartialEq)] #[serde(deny_unknown_fields, rename_all = "snake_case")] pub(crate) enum OperationKind { /// The raw operation kind. @@ -119,8 +113,7 @@ impl From<&RouterValue> for InstrumentValue { } } -#[derive(Deserialize, JsonSchema, Clone, Debug)] -#[cfg_attr(test, derive(PartialEq))] +#[derive(Deserialize, JsonSchema, Clone, Debug, PartialEq)] #[serde(deny_unknown_fields, untagged)] pub(crate) enum RouterSelector { /// A header from the request @@ -251,9 +244,8 @@ impl From<&SupergraphValue> for InstrumentValue { } #[derive(Deserialize, JsonSchema, Clone, Derivative)] -#[cfg_attr(test, derivative(PartialEq))] #[serde(deny_unknown_fields, untagged)] -#[derivative(Debug)] +#[derivative(Debug, PartialEq)] pub(crate) enum SupergraphSelector { OperationName { /// The operation name from the query. @@ -402,6 +394,11 @@ pub(crate) enum SupergraphSelector { /// The cost value to select, one of: estimated, actual, delta. cost: CostValue, }, + /// Boolean returning true if it's the primary response and not events like subscription events or deferred responses + IsPrimaryResponse { + /// Boolean returning true if it's the primary response and not events like subscription events or deferred responses + is_primary_response: bool, + }, } #[derive(Deserialize, JsonSchema, Clone, Debug)] @@ -421,9 +418,8 @@ impl From<&SubgraphValue> for InstrumentValue { } #[derive(Deserialize, JsonSchema, Clone, Derivative)] -#[cfg_attr(test, derivative(PartialEq))] #[serde(deny_unknown_fields, rename_all = "snake_case", untagged)] -#[derivative(Debug)] +#[derivative(Debug, PartialEq)] pub(crate) enum SubgraphSelector { SubgraphOperationName { /// The operation name from the subgraph query. @@ -934,6 +930,32 @@ impl Selector for SupergraphSelector { None } } + SupergraphSelector::OperationName { + operation_name, + default, + .. + } => { + let op_name = response.context.get(OPERATION_NAME).ok().flatten(); + match operation_name { + OperationName::String => op_name.or_else(|| default.clone()), + OperationName::Hash => op_name.or_else(|| default.clone()).map(|op_name| { + let mut hasher = sha2::Sha256::new(); + hasher.update(op_name.as_bytes()); + let result = hasher.finalize(); + hex::encode(result) + }), + } + .map(opentelemetry::Value::from) + } + SupergraphSelector::OperationKind { .. } => response + .context + .get::<_, String>(OPERATION_KIND) + .ok() + .flatten() + .map(opentelemetry::Value::from), + SupergraphSelector::IsPrimaryResponse { + is_primary_response: is_primary, + } if *is_primary => Some(true.into()), SupergraphSelector::Static(val) => Some(val.clone().into()), SupergraphSelector::StaticField { r#static } => Some(r#static.clone().into()), // For request @@ -987,6 +1009,34 @@ impl Selector for SupergraphSelector { None } } + SupergraphSelector::OperationName { + operation_name, + default, + .. + } => { + let op_name = ctx.get(OPERATION_NAME).ok().flatten(); + match operation_name { + OperationName::String => op_name.or_else(|| default.clone()), + OperationName::Hash => op_name.or_else(|| default.clone()).map(|op_name| { + let mut hasher = sha2::Sha256::new(); + hasher.update(op_name.as_bytes()); + let result = hasher.finalize(); + hex::encode(result) + }), + } + .map(opentelemetry::Value::from) + } + SupergraphSelector::OperationKind { .. } => ctx + .get::<_, String>(OPERATION_KIND) + .ok() + .flatten() + .map(opentelemetry::Value::from), + SupergraphSelector::IsPrimaryResponse { + is_primary_response: is_primary, + } if *is_primary => Some(opentelemetry::Value::Bool( + ctx.get_json_value(FIRST_EVENT_CONTEXT_KEY) + == Some(serde_json_bytes::Value::Bool(true)), + )), SupergraphSelector::Static(val) => Some(val.clone().into()), SupergraphSelector::StaticField { r#static } => Some(r#static.clone().into()), _ => None, @@ -995,6 +1045,28 @@ impl Selector for SupergraphSelector { fn on_error(&self, error: &tower::BoxError, ctx: &Context) -> Option { match self { + SupergraphSelector::OperationName { + operation_name, + default, + .. + } => { + let op_name = ctx.get(OPERATION_NAME).ok().flatten(); + match operation_name { + OperationName::String => op_name.or_else(|| default.clone()), + OperationName::Hash => op_name.or_else(|| default.clone()).map(|op_name| { + let mut hasher = sha2::Sha256::new(); + hasher.update(op_name.as_bytes()); + let result = hasher.finalize(); + hex::encode(result) + }), + } + .map(opentelemetry::Value::from) + } + SupergraphSelector::OperationKind { .. } => ctx + .get::<_, String>(OPERATION_KIND) + .ok() + .flatten() + .map(opentelemetry::Value::from), SupergraphSelector::Query { query, .. } => { let limits_opt = ctx .extensions() @@ -1026,6 +1098,12 @@ impl Selector for SupergraphSelector { .as_ref() .and_then(|v| v.maybe_to_otel_value()) .or_else(|| default.maybe_to_otel_value()), + SupergraphSelector::IsPrimaryResponse { + is_primary_response: is_primary, + } if *is_primary => Some(opentelemetry::Value::Bool( + ctx.get_json_value(FIRST_EVENT_CONTEXT_KEY) + == Some(serde_json_bytes::Value::Bool(true)), + )), _ => None, } } diff --git a/apollo-router/src/services/supergraph/service.rs b/apollo-router/src/services/supergraph/service.rs index 51f7592e1a..9a1a45294b 100644 --- a/apollo-router/src/services/supergraph/service.rs +++ b/apollo-router/src/services/supergraph/service.rs @@ -77,6 +77,7 @@ use crate::Context; use crate::Notify; pub(crate) const QUERY_PLANNING_SPAN_NAME: &str = "query_planning"; +pub(crate) const FIRST_EVENT_CONTEXT_KEY: &str = "apollo_router::supergraph::first_event"; /// An [`IndexMap`] of available plugins. pub(crate) type Plugins = IndexMap>; @@ -349,6 +350,20 @@ async fn service_call( let supergraph_response_event = context .extensions() .with_lock(|lock| lock.get::().cloned()); + let mut first_event = true; + let mut inserted = false; + let ctx = context.clone(); + let response_stream = response_stream.inspect(move |_| { + if first_event { + first_event = false; + } else if !inserted { + ctx.insert_json_value( + FIRST_EVENT_CONTEXT_KEY, + serde_json_bytes::Value::Bool(false), + ); + inserted = true; + } + }); match supergraph_response_event { Some(supergraph_response_event) => { let mut attrs = Vec::with_capacity(4); From 7d89584899c08bfbe6386968c40d33a1daa076ae Mon Sep 17 00:00:00 2001 From: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> Date: Wed, 3 Jul 2024 16:38:11 +0200 Subject: [PATCH 2/5] add tests for coprocessor and selector Signed-off-by: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> --- ...nfiguration__tests__schema_generation.snap | 78 ++++- .../src/plugins/coprocessor/supergraph.rs | 198 ++++++++++- apollo-router/src/plugins/coprocessor/test.rs | 316 ++++++++++++++++++ .../plugins/telemetry/config_new/selectors.rs | 27 ++ 4 files changed, 598 insertions(+), 21 deletions(-) diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index a63d63b80b..8d4f9d3a24 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -2466,8 +2466,8 @@ expression: "&schema" "type": "boolean" }, "format": { - "$ref": "#/definitions/TraceIdFormat", - "description": "#/definitions/TraceIdFormat" + "$ref": "#/definitions/TraceIdFormat2", + "description": "#/definitions/TraceIdFormat2" }, "header_name": { "description": "Choose the header name to expose trace_id (default: apollo-trace-id)", @@ -4759,6 +4759,11 @@ expression: "&schema" "description": "Send the body", "type": "boolean" }, + "condition": { + "$ref": "#/definitions/Condition_for_RouterSelector", + "description": "#/definitions/Condition_for_RouterSelector", + "nullable": true + }, "context": { "default": false, "description": "Send the context", @@ -4796,6 +4801,11 @@ expression: "&schema" "description": "Send the body", "type": "boolean" }, + "condition": { + "$ref": "#/definitions/Condition_for_RouterSelector", + "description": "#/definitions/Condition_for_RouterSelector", + "nullable": true + }, "context": { "default": false, "description": "Send the context", @@ -4892,8 +4902,8 @@ expression: "&schema" "description": "The trace ID of the request.", "properties": { "trace_id": { - "$ref": "#/definitions/TraceIdFormat2", - "description": "#/definitions/TraceIdFormat2" + "$ref": "#/definitions/TraceIdFormat", + "description": "#/definitions/TraceIdFormat" } }, "required": [ @@ -5703,6 +5713,11 @@ expression: "&schema" "description": "Send the body", "type": "boolean" }, + "condition": { + "$ref": "#/definitions/Condition_for_SubgraphSelector", + "description": "#/definitions/Condition_for_SubgraphSelector", + "nullable": true + }, "context": { "default": false, "description": "Send the context", @@ -5740,6 +5755,11 @@ expression: "&schema" "description": "Send the body", "type": "boolean" }, + "condition": { + "$ref": "#/definitions/Condition_for_SubgraphSelector", + "description": "#/definitions/Condition_for_SubgraphSelector", + "nullable": true + }, "context": { "default": false, "description": "Send the context", @@ -6408,6 +6428,11 @@ expression: "&schema" "description": "Send the body", "type": "boolean" }, + "condition": { + "$ref": "#/definitions/Condition_for_SupergraphSelector", + "description": "#/definitions/Condition_for_SupergraphSelector", + "nullable": true + }, "context": { "default": false, "description": "Send the context", @@ -6440,6 +6465,11 @@ expression: "&schema" "description": "Send the body", "type": "boolean" }, + "condition": { + "$ref": "#/definitions/Condition_for_SupergraphSelector", + "description": "#/definitions/Condition_for_SupergraphSelector", + "nullable": true + }, "context": { "default": false, "description": "Send the context", @@ -6746,6 +6776,20 @@ expression: "&schema" "cost" ], "type": "object" + }, + { + "additionalProperties": false, + "description": "Boolean returning true if it's the primary response and not events like subscription events or deferred responses", + "properties": { + "is_primary_response": { + "description": "Boolean returning true if it's the primary response and not events like subscription events or deferred responses", + "type": "boolean" + } + }, + "required": [ + "is_primary_response" + ], + "type": "object" } ] }, @@ -6907,21 +6951,14 @@ expression: "&schema" "TraceIdFormat": { "oneOf": [ { - "description": "Format the Trace ID as a hexadecimal number\n\n(e.g. Trace ID 16 -> 00000000000000000000000000000010)", - "enum": [ - "hexadecimal" - ], - "type": "string" - }, - { - "description": "Format the Trace ID as a decimal number\n\n(e.g. Trace ID 16 -> 16)", + "description": "Open Telemetry trace ID, a hex string.", "enum": [ - "decimal" + "open_telemetry" ], "type": "string" }, { - "description": "Datadog", + "description": "Datadog trace ID, a u64.", "enum": [ "datadog" ], @@ -6932,14 +6969,21 @@ expression: "&schema" "TraceIdFormat2": { "oneOf": [ { - "description": "Open Telemetry trace ID, a hex string.", + "description": "Format the Trace ID as a hexadecimal number\n\n(e.g. Trace ID 16 -> 00000000000000000000000000000010)", "enum": [ - "open_telemetry" + "hexadecimal" ], "type": "string" }, { - "description": "Datadog trace ID, a u64.", + "description": "Format the Trace ID as a decimal number\n\n(e.g. Trace ID 16 -> 16)", + "enum": [ + "decimal" + ], + "type": "string" + }, + { + "description": "Datadog", "enum": [ "datadog" ], diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index 970bcf95c4..d5aff80b7e 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -544,6 +544,7 @@ mod tests { use super::*; use crate::plugin::test::MockInternalHttpClientService; use crate::plugin::test::MockSupergraphService; + use crate::plugins::telemetry::config_new::conditions::SelectorOrValue; use crate::services::router::body::get_body_bytes; use crate::services::supergraph; @@ -730,7 +731,15 @@ mod tests { async fn external_plugin_supergraph_request_controlflow_break() { let supergraph_stage = SupergraphStage { request: SupergraphRequestConf { - condition: Default::default(), + condition: Condition::Eq([ + SelectorOrValue::Selector(SupergraphSelector::RequestHeader { + request_header: String::from("another_header"), + redact: None, + default: None, + }), + SelectorOrValue::Value("value".to_string().into()), + ]) + .into(), headers: false, context: false, body: true, @@ -762,6 +771,7 @@ mod tests { } }, "headers": { + "another_header": ["another value"], "aheader": ["a value"] } }"#, @@ -770,14 +780,17 @@ mod tests { }) }); - let service = supergraph_stage.as_service( + let service = supergraph_stage.clone().as_service( mock_http_client, mock_supergraph_service.boxed(), "http://test".to_string(), Arc::new("".to_string()), ); - let request = supergraph::Request::fake_builder().build().unwrap(); + let request = supergraph::Request::fake_builder() + .header("another_header", "value") + .build() + .unwrap(); let crate::services::supergraph::Response { mut response, @@ -787,15 +800,74 @@ mod tests { assert!(context.get::<_, bool>("testKey").unwrap().unwrap()); let value = response.headers().get("aheader").unwrap(); - assert_eq!(value, "a value"); + let value = response.headers().get("another_header").unwrap(); + assert_eq!(value, "another value"); + assert_eq!( response.body_mut().next().await.unwrap().errors[0] .message .as_str(), "my error message" ); + + let mut mock_supergraph_service = MockSupergraphService::new(); + mock_supergraph_service + .expect_call() + .returning(|req: supergraph::Request| { + Ok(supergraph::Response::builder() + .data(json!({ "test": 1234_u32 })) + .errors(Vec::new()) + .extensions(crate::json_ext::Object::new()) + .context(req.context) + .build() + .unwrap()) + }); + + // This should not trigger the supergraph response stage because of the condition + let request = supergraph::Request::fake_builder().build().unwrap(); + // let mut mock_http_client = MockInternalHttpClientService::new(); + // mock_http_client.expect_clone().; + let mock_http_client = mock_with_callback(move |_: http::Request| { + Box::pin(async { + Ok(http::Response::builder() + .body(RouterBody::from( + r#"{ + "version": 1, + "stage": "SupergraphRequest", + "control": { + "break": 200 + }, + "body": { + "errors": [{ "message": "my error message" }] + }, + "context": { + "entries": { + "testKey": true + } + }, + "headers": { + "another_header": ["another value"], + "aheader": ["a value"] + } + }"#, + )) + .unwrap()) + }) + }); + + let service = supergraph_stage.as_service( + mock_http_client, + mock_supergraph_service.boxed(), + "http://test".to_string(), + Arc::new("".to_string()), + ); + + let crate::services::supergraph::Response { context, .. } = + service.oneshot(request).await.unwrap(); + + assert!(context.get::<_, bool>("testKey").ok().flatten().is_none()); } #[tokio::test] @@ -1041,4 +1113,122 @@ mod tests { json!({ "data": { "test": 3, "has_next": false }, "hasNext": false }), ); } + + #[tokio::test] + async fn multi_part_only_primary() { + let supergraph_stage = SupergraphStage { + response: SupergraphResponseConf { + condition: Condition::Eq([ + SelectorOrValue::Selector(SupergraphSelector::IsPrimaryResponse { + is_primary_response: true, + }), + SelectorOrValue::Value(true.into()), + ]) + .into(), + headers: true, + context: true, + body: true, + sdl: true, + status_code: false, + }, + request: Default::default(), + }; + + let mut mock_supergraph_service = MockSupergraphService::new(); + + mock_supergraph_service + .expect_call() + .returning(|req: supergraph::Request| { + Ok(supergraph::Response::fake_stream_builder() + .response( + graphql::Response::builder() + .data(json!({ "test": 1 })) + .has_next(true) + .build(), + ) + .response( + graphql::Response::builder() + .data(json!({ "test": 2 })) + .has_next(true) + .build(), + ) + .response( + graphql::Response::builder() + .data(json!({ "test": 3 })) + .has_next(false) + .build(), + ) + .context(req.context) + .build() + .unwrap()) + }); + + let mock_http_client = + mock_with_deferred_callback(move |res: http::Request| { + Box::pin(async { + let mut deserialized_response: Externalizable = + serde_json::from_slice(&get_body_bytes(res.into_body()).await.unwrap()) + .unwrap(); + assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); + assert_eq!( + PipelineStep::SupergraphResponse.to_string(), + deserialized_response.stage + ); + + // Copy the has_next from the body into the data for checking later + deserialized_response + .body + .as_mut() + .unwrap() + .as_object_mut() + .unwrap() + .get_mut("data") + .unwrap() + .as_object_mut() + .unwrap() + .insert( + "has_next".to_string(), + serde_json::Value::from( + deserialized_response.has_next.unwrap_or_default(), + ), + ); + + Ok(http::Response::builder() + .body(RouterBody::from( + serde_json::to_string(&deserialized_response).unwrap_or_default(), + )) + .unwrap()) + }) + }); + + let service = supergraph_stage.as_service( + mock_http_client, + mock_supergraph_service.boxed(), + "http://test".to_string(), + Arc::new("".to_string()), + ); + + let request = supergraph::Request::canned_builder() + .query("foo") + .build() + .unwrap(); + + let mut res = service.oneshot(request).await.unwrap(); + + let body = res.response.body_mut().next().await.unwrap(); + assert_eq!( + serde_json::to_value(&body).unwrap(), + json!({ "data": { "test": 1, "has_next": true }, "hasNext": true }), + ); + let body = res.response.body_mut().next().await.unwrap(); + assert_eq!( + serde_json::to_value(&body).unwrap(), + json!({ "data": { "test": 2 }, "hasNext": true }), + ); + let body = res.response.body_mut().next().await.unwrap(); + assert_eq!( + serde_json::to_value(&body).unwrap(), + json!({ "data": { "test": 3 }, "hasNext": false }), + ); + } } diff --git a/apollo-router/src/plugins/coprocessor/test.rs b/apollo-router/src/plugins/coprocessor/test.rs index ed50cd8505..c5d99e7abd 100644 --- a/apollo-router/src/plugins/coprocessor/test.rs +++ b/apollo-router/src/plugins/coprocessor/test.rs @@ -25,6 +25,7 @@ mod tests { use crate::plugin::test::MockSupergraphService; use crate::plugins::coprocessor::supergraph::SupergraphResponseConf; use crate::plugins::coprocessor::supergraph::SupergraphStage; + use crate::plugins::telemetry::config_new::conditions::SelectorOrValue; use crate::services::external::Externalizable; use crate::services::external::PipelineStep; use crate::services::external::EXTERNALIZABLE_VERSION; @@ -466,6 +467,88 @@ mod tests { ); } + #[tokio::test] + async fn external_plugin_subgraph_request_with_condition() { + let subgraph_stage = SubgraphStage { + request: SubgraphRequestConf { + condition: Condition::Eq([ + SelectorOrValue::Selector(SubgraphSelector::SubgraphRequestHeader { + subgraph_request_header: String::from("another_header"), + redact: None, + default: None, + }), + SelectorOrValue::Value("value".to_string().into()), + ]) + .into(), + headers: false, + context: false, + body: true, + uri: false, + method: false, + service_name: false, + }, + response: Default::default(), + }; + + // This will never be called because we will fail at the coprocessor. + let mut mock_subgraph_service = MockSubgraphService::new(); + + mock_subgraph_service + .expect_call() + .returning(|req: subgraph::Request| { + assert_eq!("/", req.subgraph_request.uri().to_string()); + + Ok(subgraph::Response::builder() + .data(json!({ "test": 1234_u32 })) + .errors(Vec::new()) + .extensions(crate::json_ext::Object::new()) + .context(req.context) + .build()) + }); + + let mock_http_client = mock_with_callback(move |_: http::Request| { + Box::pin(async { + Ok(http::Response::builder() + .body(RouterBody::from( + r#"{ + "version": 1, + "stage": "SubgraphRequest", + "control": "continue", + "body": { + "query": "query Long {\n me {\n name\n}\n}" + }, + "context": { + }, + "serviceName": "service name shouldn't change", + "uri": "http://thisurihaschanged" + }"#, + )) + .unwrap()) + }) + }); + + let service = subgraph_stage.as_service( + mock_http_client, + mock_subgraph_service.boxed(), + "http://test".to_string(), + "my_subgraph_service_name".to_string(), + ); + + let request = subgraph::Request::fake_builder().build(); + + assert_eq!( + serde_json_bytes::json!({ "test": 1234_u32 }), + service + .oneshot(request) + .await + .unwrap() + .response + .into_body() + .data + .unwrap() + ); + } + #[tokio::test] async fn external_plugin_subgraph_request_controlflow_break() { let subgraph_stage = SubgraphStage { @@ -711,6 +794,127 @@ mod tests { ); } + #[tokio::test] + async fn external_plugin_subgraph_response_with_condition() { + let subgraph_stage = SubgraphStage { + request: Default::default(), + response: SubgraphResponseConf { + // Will be satisfied + condition: Condition::Exists(SubgraphSelector::ResponseContext { + response_context: String::from("context_value"), + redact: None, + default: None, + }) + .into(), + headers: false, + context: false, + body: true, + service_name: false, + status_code: false, + }, + }; + + // This will never be called because we will fail at the coprocessor. + let mut mock_subgraph_service = MockSubgraphService::new(); + + mock_subgraph_service + .expect_call() + .returning(|req: subgraph::Request| { + req.context + .insert("context_value", "content".to_string()) + .unwrap(); + Ok(subgraph::Response::builder() + .data(json!({ "test": 1234_u32 })) + .errors(Vec::new()) + .extensions(crate::json_ext::Object::new()) + .context(req.context) + .build()) + }); + + let mock_http_client = mock_with_callback(move |_: http::Request| { + Box::pin(async { + Ok(http::Response::builder() + .body(RouterBody::from( + r#"{ + "version": 1, + "stage": "SubgraphResponse", + "headers": { + "cookie": [ + "tasty_cookie=strawberry" + ], + "content-type": [ + "application/json" + ], + "host": [ + "127.0.0.1:4000" + ], + "apollo-federation-include-trace": [ + "ftv1" + ], + "apollographql-client-name": [ + "manual" + ], + "accept": [ + "*/*" + ], + "user-agent": [ + "curl/7.79.1" + ], + "content-length": [ + "46" + ] + }, + "body": { + "data": { + "test": 5678 + } + }, + "context": { + "entries": { + "accepts-json": false, + "accepts-wildcard": true, + "accepts-multipart": false, + "this-is-a-test-context": 42 + } + } + }"#, + )) + .unwrap()) + }) + }); + + let service = subgraph_stage.as_service( + mock_http_client, + mock_subgraph_service.boxed(), + "http://test".to_string(), + "my_subgraph_service_name".to_string(), + ); + + let request = subgraph::Request::fake_builder().build(); + + let response = service.oneshot(request).await.unwrap(); + + // Let's assert that the subgraph response has been transformed as it should have. + assert_eq!( + response.response.headers().get("cookie").unwrap(), + "tasty_cookie=strawberry" + ); + + assert_eq!( + response + .context + .get::<&str, u8>("this-is-a-test-context") + .unwrap() + .unwrap(), + 42 + ); + + assert_eq!( + serde_json_bytes::json!({ "test": 5678_u32 }), + response.response.into_body().data.unwrap() + ); + } + #[tokio::test] async fn external_plugin_supergraph_response() { let supergraph_stage = SupergraphStage { @@ -892,6 +1096,118 @@ mod tests { service.oneshot(request.try_into().unwrap()).await.unwrap(); } + #[tokio::test] + async fn external_plugin_router_request_with_condition() { + let router_stage = RouterStage { + request: RouterRequestConf { + // Won't be satisfied + condition: Condition::Eq([ + SelectorOrValue::Selector(RouterSelector::RequestMethod { + request_method: true, + }), + SelectorOrValue::Value("GET".to_string().into()), + ]) + .into(), + headers: true, + context: true, + body: true, + sdl: true, + path: true, + method: true, + }, + response: Default::default(), + }; + + let mock_router_service = router::service::from_supergraph_mock_callback(move |req| { + assert!(req + .context + .get::<&str, u8>("this-is-a-test-context") + .ok() + .flatten() + .is_none()); + Ok(supergraph::Response::builder() + .data(json!({ "test": 1234_u32 })) + .context(req.context) + .build() + .unwrap()) + }) + .await; + + let mock_http_client = mock_with_callback(move |req: http::Request| { + Box::pin(async { + let deserialized_request: Externalizable = + serde_json::from_slice(&hyper::body::to_bytes(req.into_body()).await.unwrap()) + .unwrap(); + + assert_eq!(EXTERNALIZABLE_VERSION, deserialized_request.version); + assert_eq!( + PipelineStep::RouterRequest.to_string(), + deserialized_request.stage + ); + + let input = json!( + { + "version": 1, + "stage": "RouterRequest", + "control": "continue", + "id": "1b19c05fdafc521016df33148ad63c1b", + "headers": { + "cookie": [ + "tasty_cookie=strawberry" + ], + "content-type": [ + "application/json" + ], + "host": [ + "127.0.0.1:4000" + ], + "apollo-federation-include-trace": [ + "ftv1" + ], + "apollographql-client-name": [ + "manual" + ], + "accept": [ + "*/*" + ], + "user-agent": [ + "curl/7.79.1" + ], + "content-length": [ + "46" + ] + }, + "body": "{ + \"query\": \"query Long {\n me {\n name\n}\n}\" + }", + "context": { + "entries": { + "accepts-json": false, + "accepts-wildcard": true, + "accepts-multipart": false, + "this-is-a-test-context": 42 + } + }, + "sdl": "the sdl shouldnt change" + }); + Ok(http::Response::builder() + .body(RouterBody::from(serde_json::to_string(&input).unwrap())) + .unwrap()) + }) + }); + + let service = router_stage.as_service( + mock_http_client, + mock_router_service.boxed(), + "http://test".to_string(), + Arc::new("".to_string()), + ); + + let request = supergraph::Request::canned_builder().build().unwrap(); + + service.oneshot(request.try_into().unwrap()).await.unwrap(); + } + #[tokio::test] async fn external_plugin_router_request_http_get() { let router_stage = RouterStage { diff --git a/apollo-router/src/plugins/telemetry/config_new/selectors.rs b/apollo-router/src/plugins/telemetry/config_new/selectors.rs index e931772e87..2e78658639 100644 --- a/apollo-router/src/plugins/telemetry/config_new/selectors.rs +++ b/apollo-router/src/plugins/telemetry/config_new/selectors.rs @@ -1436,6 +1436,7 @@ mod test { use crate::plugins::telemetry::config_new::Selector; use crate::plugins::telemetry::otel; use crate::query_planner::APOLLO_OPERATION_ID; + use crate::services::FIRST_EVENT_CONTEXT_KEY; use crate::spec::operation_limits::OperationLimits; #[test] @@ -1931,6 +1932,32 @@ mod test { ); } + #[test] + fn supergraph_is_primary() { + let selector = SupergraphSelector::IsPrimaryResponse { + is_primary_response: true, + }; + let context = crate::context::Context::new(); + let _ = context.insert(FIRST_EVENT_CONTEXT_KEY, true); + assert_eq!( + selector + .on_response( + &crate::services::SupergraphResponse::fake_builder() + .context(context.clone()) + .build() + .unwrap() + ) + .unwrap(), + true.into() + ); + assert_eq!( + selector + .on_response_event(&crate::graphql::Response::builder().build(), &context) + .unwrap(), + true.into() + ); + } + #[test] fn supergraph_response_context() { let selector = SupergraphSelector::ResponseContext { From 0ffdfd6d54c48d9cde738788e67905fda1446963 Mon Sep 17 00:00:00 2001 From: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> Date: Wed, 3 Jul 2024 17:12:54 +0200 Subject: [PATCH 3/5] add docs Signed-off-by: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> --- docs/source/customizations/coprocessor.mdx | 35 ++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/docs/source/customizations/coprocessor.mdx b/docs/source/customizations/coprocessor.mdx index 94af25cdad..fce69a7cc6 100644 --- a/docs/source/customizations/coprocessor.mdx +++ b/docs/source/customizations/coprocessor.mdx @@ -92,6 +92,41 @@ coprocessor: In this case, the `RouterService` only sends a coprocessor request whenever it receives a client request. The coprocessor request body includes _no_ data related to the client request (only "control" data, which is [covered below](#coprocessor-request-format)). +### Conditions + +You can add [conditions](../configuration/telemetry/instrumentation/conditions) on coprocessor stages (except for `Execution` service) to conditionnally execute these stages in a coprocessor based on headers or context entries using [selectors](../configuration//telemetry//instrumentation/selectors). + +Here is the configuration example if you don't want to execute a coprocessor on `SupergraphResponse` for every subscription events when a subscription is opened : + +```yaml title="router.yaml" +coprocessor: + url: http://127.0.0.1:3000 + supergraph: + response: + condition: + eq: + - true + - is_primary_response: true # Will be true only for the first event received on a supergraph response (like classical queries and mutations for example) + body: true + headers: true +``` + +Another example if you don't want to execute the router request coprocessor if it contains a request header: + + +```yaml title="router.yaml" +coprocessor: + url: http://127.0.0.1:3000 + router: + request: + condition: + eq: + - request_header: should-execute-copro # Header name + - "enabled" # Header value + body: true + headers: true +``` + ### Client configuration From 2f40e1742f43ef2563c2b770e3278d8ae97c6875 Mon Sep 17 00:00:00 2001 From: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> Date: Wed, 3 Jul 2024 17:21:42 +0200 Subject: [PATCH 4/5] changelog Signed-off-by: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> --- .../feat_bnjjj_feat_condition_copro.md | 36 +++++++++++++++++++ .../telemetry/instrumentation/selectors.mdx | 1 + 2 files changed, 37 insertions(+) create mode 100644 .changesets/feat_bnjjj_feat_condition_copro.md diff --git a/.changesets/feat_bnjjj_feat_condition_copro.md b/.changesets/feat_bnjjj_feat_condition_copro.md new file mode 100644 index 0000000000..d405dc0b3d --- /dev/null +++ b/.changesets/feat_bnjjj_feat_condition_copro.md @@ -0,0 +1,36 @@ +### feat(coprocessor): add conditions on coprocessor stages ([PR #5557](https://github.com/apollographql/router/pull/5557)) + +Adding support of conditions on coprocessor stages (except for `Execution` service) based on existing [conditions](https://www.apollographql.com/docs/router/configuration/telemetry/instrumentation/conditions/) we already have in the router for custom telemetry. Including usage of existing [selectors](https://www.apollographql.com/docs/router/configuration/telemetry/instrumentation/selectors/) + +Example if you would like to execute a coprocessor on supergraph response only on primary response and not subscription events for example: + +```yaml title=router.yaml +coprocessor: + url: http://127.0.0.1:3000 # mandatory URL which is the address of the coprocessor + timeout: 2s # optional timeout (2 seconds in this example). If not set, defaults to 1 second + supergraph: + response: + condition: + eq: + - true + - is_primary_response: true + body: true +``` + +or another example to not execute the coprocessor at all if it's a subscription: + +```yaml title=router.yaml +coprocessor: + url: http://127.0.0.1:3000 # mandatory URL which is the address of the coprocessor + timeout: 2s # optional timeout (2 seconds in this example). If not set, defaults to 1 second + supergraph: + response: + condition: + not: + eq: + - subscription + - operation_kind: string + body: true +``` + +By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router/pull/5557 \ No newline at end of file diff --git a/docs/source/configuration/telemetry/instrumentation/selectors.mdx b/docs/source/configuration/telemetry/instrumentation/selectors.mdx index 3512cada15..53bb8c2568 100644 --- a/docs/source/configuration/telemetry/instrumentation/selectors.mdx +++ b/docs/source/configuration/telemetry/instrumentation/selectors.mdx @@ -56,6 +56,7 @@ The supergraph service is executed after query parsing but before query executio | `query_variable` | Yes | | The name of a graphql query variable | | `request_header` | Yes | | The name of a request header | | `response_header` | Yes | | The name of a response header | +| `is_primary_response` | No | `true`\|`false` | Boolean returning true if it's the primary response and not events like subscription events or deferred responses | | `response_data` | Yes | | Json Path into the supergraph response body data (it might impact performances) | | `response_errors` | Yes | | Json Path into the supergraph response body errors (it might impact performances) | | `request_context` | Yes | | The name of a request context key | From 23830fd07e25987d87d13f626596ced3720fb71f Mon Sep 17 00:00:00 2001 From: Coenen Benjamin Date: Thu, 4 Jul 2024 10:16:53 +0200 Subject: [PATCH 5/5] Apply suggestions from code review Co-authored-by: Edward Huang --- .../feat_bnjjj_feat_condition_copro.md | 23 +++++-------------- docs/source/customizations/coprocessor.mdx | 16 ++++++++++--- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/.changesets/feat_bnjjj_feat_condition_copro.md b/.changesets/feat_bnjjj_feat_condition_copro.md index d405dc0b3d..e2ac7b1887 100644 --- a/.changesets/feat_bnjjj_feat_condition_copro.md +++ b/.changesets/feat_bnjjj_feat_condition_copro.md @@ -1,23 +1,10 @@ -### feat(coprocessor): add conditions on coprocessor stages ([PR #5557](https://github.com/apollographql/router/pull/5557)) +### feat(coprocessor): Support conditional coprocessor execution per stage of request lifecycle ([PR #5557](https://github.com/apollographql/router/pull/5557)) -Adding support of conditions on coprocessor stages (except for `Execution` service) based on existing [conditions](https://www.apollographql.com/docs/router/configuration/telemetry/instrumentation/conditions/) we already have in the router for custom telemetry. Including usage of existing [selectors](https://www.apollographql.com/docs/router/configuration/telemetry/instrumentation/selectors/) +The router now supports conditional execution of the coprocessor for each stage of the request lifecycle (except for the `Execution` stage). -Example if you would like to execute a coprocessor on supergraph response only on primary response and not subscription events for example: +To configure, define conditions for a specific stage by using selectors based on headers or context entries. For example, based on a supergraph response you can configure the coprocessor not to execute for any subscription: + -```yaml title=router.yaml -coprocessor: - url: http://127.0.0.1:3000 # mandatory URL which is the address of the coprocessor - timeout: 2s # optional timeout (2 seconds in this example). If not set, defaults to 1 second - supergraph: - response: - condition: - eq: - - true - - is_primary_response: true - body: true -``` - -or another example to not execute the coprocessor at all if it's a subscription: ```yaml title=router.yaml coprocessor: @@ -33,4 +20,6 @@ coprocessor: body: true ``` +To learn more, see the documentation about [coprocessor conditions](https://www.apollographql.com/docs/router/customizations/coprocessor/#conditions). + By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router/pull/5557 \ No newline at end of file diff --git a/docs/source/customizations/coprocessor.mdx b/docs/source/customizations/coprocessor.mdx index fce69a7cc6..96303f2486 100644 --- a/docs/source/customizations/coprocessor.mdx +++ b/docs/source/customizations/coprocessor.mdx @@ -94,9 +94,19 @@ In this case, the `RouterService` only sends a coprocessor request whenever it r ### Conditions -You can add [conditions](../configuration/telemetry/instrumentation/conditions) on coprocessor stages (except for `Execution` service) to conditionnally execute these stages in a coprocessor based on headers or context entries using [selectors](../configuration//telemetry//instrumentation/selectors). +You can define [conditions](../configuration/telemetry/instrumentation/conditions) for a stage of the request lifecycle that you want to run the coprocessor. You can set coprocessor conditions with [selectors](../configuration//telemetry//instrumentation/selectors) based on headers or context entries. -Here is the configuration example if you don't want to execute a coprocessor on `SupergraphResponse` for every subscription events when a subscription is opened : + + +The `Execution` stage doesn't support coprocessor conditions. + + + + +Example configurations: + + - Run during the `SupergraphResponse` stage only for the first event of a supergraph response. Useful for handling only the first subscription event when a subscription is opened: + ```yaml title="router.yaml" coprocessor: @@ -111,7 +121,7 @@ coprocessor: headers: true ``` -Another example if you don't want to execute the router request coprocessor if it contains a request header: +- Run during the `Request` stage only if the request contains a request header: ```yaml title="router.yaml"