Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query deduplication #285

Merged
merged 13 commits into from
Feb 16, 2022
2 changes: 1 addition & 1 deletion apollo-router-core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::Future;
use std::sync::Arc;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct Context<T = Arc<http_compat::Request<Request>>> {
/// Original request to the Router.
pub request: T,
Expand Down
2 changes: 1 addition & 1 deletion apollo-router-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing::level_filters::LevelFilter;
///
/// Note that these are not actually returned to the client, but are instead converted to JSON for
/// [`struct@Error`].
#[derive(Error, Display, Debug, Serialize, Deserialize)]
#[derive(Error, Display, Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
#[ignore_extra_doc_attributes]
pub enum FetchError {
Expand Down
130 changes: 130 additions & 0 deletions apollo-router-core/src/layers/deduplication.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use crate::{http_compat, OperationKind, Request, SubgraphRequest, SubgraphResponse};
use futures::{future::BoxFuture, lock::Mutex};
use std::{collections::HashMap, sync::Arc, task::Poll};
use tokio::sync::broadcast::{self, Sender};
use tower::{BoxError, Layer, ServiceExt};

pub struct QueryDeduplicationLayer;

impl<S> Layer<S> for QueryDeduplicationLayer
where
S: tower::Service<SubgraphRequest, Response = SubgraphResponse, Error = BoxError> + Clone,
{
type Service = QueryDeduplicationService<S>;

fn layer(&self, service: S) -> Self::Service {
QueryDeduplicationService::new(service)
}
}

type WaitMap =
Arc<Mutex<HashMap<http_compat::Request<Request>, Sender<Result<SubgraphResponse, String>>>>>;

pub struct QueryDeduplicationService<S> {
service: S,
wait_map: WaitMap,
}

impl<S> QueryDeduplicationService<S>
where
S: tower::Service<SubgraphRequest, Response = SubgraphResponse, Error = BoxError> + Clone,
{
fn new(service: S) -> Self {
QueryDeduplicationService {
service,
wait_map: Arc::new(Mutex::new(HashMap::new())),
}
}

async fn dedup(
service: S,
wait_map: WaitMap,
request: SubgraphRequest,
) -> Result<SubgraphResponse, BoxError> {
loop {
let mut locked_wait_map = wait_map.lock().await;
match locked_wait_map.get_mut(&request.http_request) {
Some(waiter) => {
// Register interest in key
let mut receiver = waiter.subscribe();
drop(locked_wait_map);

match receiver.recv().await {
Ok(value) => {
return value
.map(|response| SubgraphResponse {
response: response.response,
context: request.context,
})
.map_err(|e| e.into())
}
// there was an issue with the broadcast channel, retry fetching
Err(_) => continue,
}
}
None => {
let (tx, _rx) = broadcast::channel(1);
locked_wait_map.insert(request.http_request.clone(), tx.clone());
drop(locked_wait_map);

let context = request.context.clone();
let http_request = request.http_request.clone();
let res = service.ready_oneshot().await?.call(request).await;

{
let mut locked_wait_map = wait_map.lock().await;
locked_wait_map.remove(&http_request);
}

// Let our waiters know
let broadcast_value = res
.as_ref()
.map(|response| response.clone())
.map_err(|e| e.to_string());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use BoxError all over the code (mainly because Buffer enforces BoxError), and BoxError is not Clone. But here we want to copy the response to all of the waiting queries, even if we got an error. Since we cannot access the underlying type, the best we can do is convert to a String and pass it around

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok as a String? Would it be better as a json Value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could serialize a JSON value to a string and pass it as error. Unfortunately, the only way to interact with std::error::Error is through strings: https://doc.rust-lang.org/nightly/std/error/trait.Error.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we convert to String and preserve the error as source()? If we did, would it deliver any value?
(I'm just trying to avoid lowest common denominator of error type, but I guess we don't have many options.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC we discussed (with @BrynCooke I think?) using FetchError as the error type for all subgraph services and layers, and convert to BoxError just before passing it back to the Buffer layer, but deferred that to later as that would be a big change introduced by that PR


// Our use case is very specific, so we are sure that
// we won't get any errors here.
tokio::task::spawn_blocking(move || {
tx.send(broadcast_value)
.expect("there is always at least one receiver alive, the _rx guard; qed")
}).await
.expect("can only fail if the task is aborted or if the internal code panics, neither is possible here; qed");

return res.map(|response| SubgraphResponse {
response: response.response,
context,
});
}
}
}
}
}

impl<S> tower::Service<SubgraphRequest> for QueryDeduplicationService<S>
where
S: tower::Service<SubgraphRequest, Response = SubgraphResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
<S as tower::Service<SubgraphRequest>>::Future: Send + 'static,
{
type Response = SubgraphResponse;
type Error = BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}

fn call(&mut self, request: SubgraphRequest) -> Self::Future {
let mut service = self.service.clone();

if request.operation_kind == OperationKind::Query {
let wait_map = self.wait_map.clone();

Box::pin(async move { Self::dedup(service, wait_map, request).await })
} else {
Box::pin(async move { service.call(request).await })
}
}
}
1 change: 1 addition & 0 deletions apollo-router-core/src/layers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod cache;
pub mod deduplication;
pub mod header_manipulation;
28 changes: 27 additions & 1 deletion apollo-router-core/src/query_planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl PlanNode {
mod fetch {
use super::selection::{select_object, Selection};
use crate::prelude::graphql::*;
use serde::Deserialize;
use serde::{Deserialize, Deserializer};
use std::sync::Arc;
use tower::ServiceExt;
use tracing::{instrument, Instrument};
Expand All @@ -234,6 +234,30 @@ mod fetch {

/// The GraphQL subquery that is used for the fetch.
operation: String,

/// The kind of operation (query, mutation or subscription)
#[serde(
rename(deserialize = "operationKind"),
deserialize_with = "parse_operation"
)]
operation_kind: OperationKind,
}

fn parse_operation<'de, D>(deserializer: D) -> Result<OperationKind, D::Error>
where
D: Deserializer<'de>,
{
let buf = String::deserialize(deserializer)?;

match buf.as_str() {
"query" => Ok(OperationKind::Query),
"mutation" => Ok(OperationKind::Mutation),
"subscription" => Ok(OperationKind::Subscription),
s => Err(serde::de::Error::custom(format!(
"unknown operation kind: {}",
s
))),
}
}

struct Variables {
Expand Down Expand Up @@ -311,6 +335,7 @@ mod fetch {
) -> Result<Value, FetchError> {
let FetchNode {
operation,
operation_kind,
service_name,
..
} = self;
Expand Down Expand Up @@ -340,6 +365,7 @@ mod fetch {
.unwrap()
.into(),
context: context.clone(),
operation_kind: *operation_kind,
};

let service = service_registry
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
source: apollo-router-core/src/query_planner/router_bridge_query_planner.rs
assertion_line: 97
assertion_line: 133
expression: result

---
Expand All @@ -11,6 +11,7 @@ QueryPlan {
requires: [],
variable_usages: [],
operation: "{me{name{first last}}}",
operation_kind: Query,
},
),
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
---
source: apollo-router-core/src/query_planner/mod.rs
assertion_line: 467
expression: query_plan

---
Expand All @@ -11,6 +12,7 @@ Sequence {
requires: [],
variable_usages: [],
operation: "{topProducts{__typename ...on Book{__typename isbn}...on Furniture{name}}product(upc:\"1\"){__typename ...on Book{__typename isbn}...on Furniture{name}}}",
operation_kind: Query,
},
),
Parallel {
Expand Down Expand Up @@ -59,6 +61,7 @@ Sequence {
"test_variable",
],
operation: "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{__typename isbn title year}}}",
operation_kind: Query,
},
),
},
Expand Down Expand Up @@ -117,6 +120,7 @@ Sequence {
],
variable_usages: [],
operation: "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{name}}}",
operation_kind: Query,
},
),
},
Expand Down Expand Up @@ -164,6 +168,7 @@ Sequence {
],
variable_usages: [],
operation: "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{__typename isbn title year}}}",
operation_kind: Query,
},
),
},
Expand Down Expand Up @@ -221,6 +226,7 @@ Sequence {
],
variable_usages: [],
operation: "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{name}}}",
operation_kind: Query,
},
),
},
Expand Down
5 changes: 5 additions & 0 deletions apollo-router-core/src/query_planner/testdata/query_plan.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"kind": "Fetch",
"serviceName": "product",
"variableUsages": [],
"operationKind": "query",
"operation": "{topProducts{__typename ...on Book{__typename isbn}...on Furniture{name}}product(upc:\"1\"){__typename ...on Book{__typename isbn}...on Furniture{name}}}"
},
{
Expand All @@ -30,6 +31,7 @@
}
],
"variableUsages": ["test_variable"],
"operationKind": "query",
"operation": "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{__typename isbn title year}}}"
}
},
Expand All @@ -52,6 +54,7 @@
}
],
"variableUsages": [],
"operationKind": "query",
"operation": "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{name}}}"
}
}
Expand All @@ -77,6 +80,7 @@
}
],
"variableUsages": [],
"operationKind": "query",
"operation": "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{__typename isbn title year}}}"
}
},
Expand All @@ -99,6 +103,7 @@
}
],
"variableUsages": [],
"operationKind": "query",
"operation": "query($representations:[_Any!]!){_entities(representations:$representations){...on Book{name}}}"
}
}
Expand Down
2 changes: 1 addition & 1 deletion apollo-router-core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use typed_builder::TypedBuilder;
#[derive(Clone, Derivative, Serialize, Deserialize, TypedBuilder)]
#[serde(rename_all = "camelCase")]
#[builder(field_defaults(setter(into)))]
#[derivative(Debug, PartialEq)]
#[derivative(Debug, PartialEq, Eq, Hash)]
pub struct Request {
/// The graphql query.
pub query: String,
Expand Down
52 changes: 51 additions & 1 deletion apollo-router-core/src/services/http_compat.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
//! wrapper typpes for Request and Response from the http crate to improve their usability

use std::ops::{Deref, DerefMut};
use std::{
cmp::PartialEq,
hash::Hash,
ops::{Deref, DerefMut},
};

#[derive(Debug, Default)]
pub struct Request<T> {
Expand Down Expand Up @@ -61,6 +65,52 @@ impl<T> DerefMut for Request<T> {
}
}

impl<T: Hash> Hash for Request<T> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.inner.method().hash(state);
self.inner.version().hash(state);
self.inner.uri().hash(state);
// this assumes headers are in the same order
for (name, value) in self.inner.headers() {
name.hash(state);
value.hash(state);
}
Comment on lines +73 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really dislike HeaderMap. We can't even sort it...

I suppose the downside of not spotting duplicates is that our query can't be de-duplicated. Not a bug, but defeats the purpose of de-duplicating.

We can do something a bit crafty here. HeaderValue does implement Ord and HeaderName always convert to &str, so:

        // Map Header names into &str so we can sort
        let mut tmp: Vec<(&str, &HeaderValue)> = self
            .inner
            .headers()
            .iter()
            .map(|(k, v)| (k.as_str(), v))
            .collect();
        tmp.sort();
        for (name, value) in tmp {
            name.hash(state);
            value.hash(state);
        }

would give us a consistent ordering for hashing purposes. I think we could do the same for Eq as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should refrain from sorting the headers, since their order can have an impact. Example: the Accept header can have multiple values, which can come either as comma separated in one header value, or as multiple separated Accept headers. In that second case, if we sort the headers, that might reorder the values and change the behaviour.

The assumption for the cache key here is that similar queries coming from the same client will have the same shape (same user agent, same list of headers in the same order...).

What we could do though is decide on which headers we consider for the cache. Once we do that, we would get stronger guarantees, and could expose in the docs the issues around ordering. Could we explore that in another PR though?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are only sorting our copy, the original headers are un-impacted. I agree that we can't touch the original headers, which we couldn't sort anyway since HeaderName doesn't implement Ord.

Does the original order of headers matter for hashing purposes? i.e.: don't we want the ordering to be consistent to help improve our de-duplication chances?

I'm fine with moving this discussion to a follow-up. I think it's important to make the comparison less fragile, but it doesn't need to be decided before merging to the tower branch. If you don't want to put my suggestion in, perhaps preserve it in the follow-up issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's definitely a good idea to revisit it and find a robust solution

self.inner.body().hash(state);
}
}

impl<T: PartialEq> PartialEq for Request<T> {
fn eq(&self, other: &Self) -> bool {
let mut res = self.inner.method().eq(other.inner.method())
&& self.inner.version().eq(&other.inner.version())
&& self.inner.uri().eq(other.inner.uri());

if !res {
return false;
}
if self.inner.headers().len() != other.inner.headers().len() {
return false;
}

// this assumes headers are in the same order
for ((name, value), (other_name, other_value)) in self
.inner
.headers()
.iter()
.zip(other.inner.headers().iter())
{
res = name.eq(other_name) && value.eq(other_value);
if !res {
return false;
}
}

self.inner.body().eq(other.inner.body())
}
}

impl<T: PartialEq> Eq for Request<T> {}

impl<T> From<http::Request<T>> for Request<T> {
fn from(inner: http::Request<T>) -> Self {
Request { inner }
Expand Down
Loading