Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(coprocessor): add conditions on coprocessor stages #5557

Merged
merged 7 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .changesets/feat_bnjjj_feat_condition_copro.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
### feat(coprocessor): Support conditional coprocessor execution per stage of request lifecycle ([PR #5557](https://github.com/apollographql/router/pull/5557))

The router now supports conditional execution of the coprocessor for each stage of the request lifecycle (except for the `Execution` stage).

To configure, define conditions for a specific stage by using selectors based on headers or context entries. For example, based on a supergraph response you can configure the coprocessor not to execute for any subscription:



```yaml title=router.yaml
coprocessor:
url: http://127.0.0.1:3000 # mandatory URL which is the address of the coprocessor
timeout: 2s # optional timeout (2 seconds in this example). If not set, defaults to 1 second
supergraph:
response:
condition:
not:
eq:
- subscription
- operation_kind: string
body: true
```

bnjjj marked this conversation as resolved.
Show resolved Hide resolved
To learn more, see the documentation about [coprocessor conditions](https://www.apollographql.com/docs/router/customizations/coprocessor/#conditions).

By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router/pull/5557
Original file line number Diff line number Diff line change
Expand Up @@ -2466,8 +2466,8 @@ expression: "&schema"
"type": "boolean"
},
"format": {
"$ref": "#/definitions/TraceIdFormat",
"description": "#/definitions/TraceIdFormat"
"$ref": "#/definitions/TraceIdFormat2",
"description": "#/definitions/TraceIdFormat2"
},
"header_name": {
"description": "Choose the header name to expose trace_id (default: apollo-trace-id)",
Expand Down Expand Up @@ -4759,6 +4759,11 @@ expression: "&schema"
"description": "Send the body",
"type": "boolean"
},
"condition": {
"$ref": "#/definitions/Condition_for_RouterSelector",
"description": "#/definitions/Condition_for_RouterSelector",
"nullable": true
},
"context": {
"default": false,
"description": "Send the context",
Expand Down Expand Up @@ -4796,6 +4801,11 @@ expression: "&schema"
"description": "Send the body",
"type": "boolean"
},
"condition": {
"$ref": "#/definitions/Condition_for_RouterSelector",
"description": "#/definitions/Condition_for_RouterSelector",
"nullable": true
},
"context": {
"default": false,
"description": "Send the context",
Expand Down Expand Up @@ -4892,8 +4902,8 @@ expression: "&schema"
"description": "The trace ID of the request.",
"properties": {
"trace_id": {
"$ref": "#/definitions/TraceIdFormat2",
Copy link
Contributor

@BrynCooke BrynCooke Jul 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is non-determinism here. I did a check and there are two TraceIdFormat enums. We need to unify them. This PR didn't cause this issue, I think it was my last one.

"description": "#/definitions/TraceIdFormat2"
"$ref": "#/definitions/TraceIdFormat",
"description": "#/definitions/TraceIdFormat"
}
},
"required": [
Expand Down Expand Up @@ -5703,6 +5713,11 @@ expression: "&schema"
"description": "Send the body",
"type": "boolean"
},
"condition": {
"$ref": "#/definitions/Condition_for_SubgraphSelector",
"description": "#/definitions/Condition_for_SubgraphSelector",
"nullable": true
},
"context": {
"default": false,
"description": "Send the context",
Expand Down Expand Up @@ -5740,6 +5755,11 @@ expression: "&schema"
"description": "Send the body",
"type": "boolean"
},
"condition": {
"$ref": "#/definitions/Condition_for_SubgraphSelector",
"description": "#/definitions/Condition_for_SubgraphSelector",
"nullable": true
},
"context": {
"default": false,
"description": "Send the context",
Expand Down Expand Up @@ -6408,6 +6428,11 @@ expression: "&schema"
"description": "Send the body",
"type": "boolean"
},
"condition": {
"$ref": "#/definitions/Condition_for_SupergraphSelector",
"description": "#/definitions/Condition_for_SupergraphSelector",
"nullable": true
},
"context": {
"default": false,
"description": "Send the context",
Expand Down Expand Up @@ -6440,6 +6465,11 @@ expression: "&schema"
"description": "Send the body",
"type": "boolean"
},
"condition": {
"$ref": "#/definitions/Condition_for_SupergraphSelector",
"description": "#/definitions/Condition_for_SupergraphSelector",
"nullable": true
},
"context": {
"default": false,
"description": "Send the context",
Expand Down Expand Up @@ -6746,6 +6776,20 @@ expression: "&schema"
"cost"
],
"type": "object"
},
{
"additionalProperties": false,
"description": "Boolean returning true if it's the primary response and not events like subscription events or deferred responses",
"properties": {
"is_primary_response": {
"description": "Boolean returning true if it's the primary response and not events like subscription events or deferred responses",
"type": "boolean"
}
},
"required": [
"is_primary_response"
],
"type": "object"
}
]
},
Expand Down Expand Up @@ -6907,21 +6951,14 @@ expression: "&schema"
"TraceIdFormat": {
"oneOf": [
{
"description": "Format the Trace ID as a hexadecimal number\n\n(e.g. Trace ID 16 -> 00000000000000000000000000000010)",
"enum": [
"hexadecimal"
],
"type": "string"
},
{
"description": "Format the Trace ID as a decimal number\n\n(e.g. Trace ID 16 -> 16)",
"description": "Open Telemetry trace ID, a hex string.",
"enum": [
"decimal"
"open_telemetry"
],
"type": "string"
},
{
"description": "Datadog",
"description": "Datadog trace ID, a u64.",
"enum": [
"datadog"
],
Expand All @@ -6932,14 +6969,21 @@ expression: "&schema"
"TraceIdFormat2": {
"oneOf": [
{
"description": "Open Telemetry trace ID, a hex string.",
"description": "Format the Trace ID as a hexadecimal number\n\n(e.g. Trace ID 16 -> 00000000000000000000000000000010)",
"enum": [
"open_telemetry"
"hexadecimal"
],
"type": "string"
},
{
"description": "Datadog trace ID, a u64.",
"description": "Format the Trace ID as a decimal number\n\n(e.g. Trace ID 16 -> 16)",
"enum": [
"decimal"
],
"type": "string"
},
{
"description": "Datadog",
"enum": [
"datadog"
],
Expand Down
51 changes: 49 additions & 2 deletions apollo-router/src/plugins/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer;
use crate::layers::ServiceBuilderExt;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::plugins::telemetry::config_new::conditions::Condition;
use crate::plugins::telemetry::config_new::selectors::RouterSelector;
use crate::plugins::telemetry::config_new::selectors::SubgraphSelector;
use crate::plugins::traffic_shaping::Http2Config;
use crate::register_plugin;
use crate::services;
Expand Down Expand Up @@ -234,6 +237,9 @@ where
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)]
#[serde(default, deny_unknown_fields)]
pub(super) struct RouterRequestConf {
/// Condition to trigger this stage
#[serde(skip_serializing)]
pub(super) condition: Option<Condition<RouterSelector>>,
/// Send the headers
pub(super) headers: bool,
/// Send the context
Expand All @@ -252,6 +258,9 @@ pub(super) struct RouterRequestConf {
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)]
#[serde(default, deny_unknown_fields)]
pub(super) struct RouterResponseConf {
/// Condition to trigger this stage
#[serde(skip_serializing)]
pub(super) condition: Option<Condition<RouterSelector>>,
/// Send the headers
pub(super) headers: bool,
/// Send the context
Expand All @@ -267,6 +276,9 @@ pub(super) struct RouterResponseConf {
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)]
#[serde(default, deny_unknown_fields)]
pub(super) struct SubgraphRequestConf {
/// Condition to trigger this stage
#[serde(skip_serializing)]
pub(super) condition: Option<Condition<SubgraphSelector>>,
/// Send the headers
pub(super) headers: bool,
/// Send the context
Expand All @@ -285,6 +297,9 @@ pub(super) struct SubgraphRequestConf {
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)]
#[serde(default, deny_unknown_fields)]
pub(super) struct SubgraphResponseConf {
/// Condition to trigger this stage
#[serde(skip_serializing)]
pub(super) condition: Option<Condition<SubgraphSelector>>,
/// Send the headers
pub(super) headers: bool,
/// Send the context
Expand Down Expand Up @@ -598,7 +613,7 @@ async fn process_router_request_stage<C>(
coprocessor_url: String,
sdl: Arc<String>,
mut request: router::Request,
request_config: RouterRequestConf,
mut request_config: RouterRequestConf,
) -> Result<ControlFlow<router::Response, router::Request>, BoxError>
where
C: Service<http::Request<RouterBody>, Response = http::Response<RouterBody>, Error = BoxError>
Expand All @@ -608,6 +623,14 @@ where
+ 'static,
<C as tower::Service<http::Request<RouterBody>>>::Future: Send + 'static,
{
let should_be_executed = request_config
.condition
.as_mut()
.map(|c| c.evaluate_request(&request) == Some(true))
.unwrap_or(true);
if !should_be_executed {
return Ok(ControlFlow::Continue(request));
}
// Call into our out of process processor with a body of our body
// First, extract the data we need from our request and prepare our
// external call. Use our configuration to figure out which data to send.
Expand Down Expand Up @@ -761,6 +784,14 @@ where
+ 'static,
<C as tower::Service<http::Request<RouterBody>>>::Future: Send + 'static,
{
let should_be_executed = response_config
.condition
.as_ref()
.map(|c| c.evaluate_response(&response))
.unwrap_or(true);
if !should_be_executed {
return Ok(response);
}
// split the response into parts + body
let (parts, body) = response.response.into_parts();

Expand Down Expand Up @@ -945,7 +976,7 @@ async fn process_subgraph_request_stage<C>(
coprocessor_url: String,
service_name: String,
mut request: subgraph::Request,
request_config: SubgraphRequestConf,
mut request_config: SubgraphRequestConf,
) -> Result<ControlFlow<subgraph::Response, subgraph::Request>, BoxError>
where
C: Service<http::Request<RouterBody>, Response = http::Response<RouterBody>, Error = BoxError>
Expand All @@ -955,6 +986,14 @@ where
+ 'static,
<C as tower::Service<http::Request<RouterBody>>>::Future: Send + 'static,
{
let should_be_executed = request_config
.condition
.as_mut()
.map(|c| c.evaluate_request(&request) == Some(true))
.unwrap_or(true);
if !should_be_executed {
return Ok(ControlFlow::Continue(request));
}
// Call into our out of process processor with a body of our body
// First, extract the data we need from our request and prepare our
// external call. Use our configuration to figure out which data to send.
Expand Down Expand Up @@ -1100,6 +1139,14 @@ where
+ 'static,
<C as tower::Service<http::Request<RouterBody>>>::Future: Send + 'static,
{
let should_be_executed = response_config
.condition
.as_ref()
.map(|c| c.evaluate_response(&response))
.unwrap_or(true);
if !should_be_executed {
return Ok(response);
}
// Call into our out of process processor with a body of our body
// First, extract the data we need from our response and prepare our
// external call. Use our configuration to figure out which data to send.
Expand Down
Loading
Loading