-
Notifications
You must be signed in to change notification settings - Fork 272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Query deduplication #285
Query deduplication #285
Conversation
in the perf tests ( https://github.com/apollographql/router/pull/285/checks?check_run_id=4562777771 ):
|
this should get faster once #284 is merged, because |
a few local benchmarks. The router has to cores, 300kreqs done by 1000 concurrent clients by "hey": main 0146576 (using serde_json_bytes)Summary: Total data: 3576900000 bytes Response time histogram: Latency distribution: bc14a79 query deduplication without serde_json_bytesSummary: Total data: 3576900000 bytes Response time histogram: Latency distribution: 1102ccf query deduplication with serde_json_bytesSummary: Total data: 3576900000 bytes Response time histogram: Latency distribution: 90cd1de query deduplication with serde_json_bytes and using the Request as key in the wait map (couldn't do that before because serde_json::Value does not implement Hash)Summary: Total data: 3576900000 bytes Response time histogram: Latency distribution: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks familiar with the caching mechanism Gary made... 🤔 the difference is that we don't store in cache here but other than that we do queue identical. Am I wrong? Maybe with a bit of refactoring it would be possible to get more parts of the code in common.
you're right, I copied that code and removed the storage part 😁 |
Ah!! But I didn't mean you should do caching! I meant we could generalize our implementation more so it supports this scenario (0 length cache?) too |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok so as I understand it's a temporary boost until we get also caching on this. Looks good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those performance improvement figures are impressive.
apollo-router/src/http_subgraph.rs
Outdated
async fn dedup( | ||
&self, | ||
request: graphql::Request, | ||
) -> Result<graphql::Response, graphql::FetchError> { | ||
loop { | ||
let mut locked_wait_map = self.wait_map.lock().await; | ||
match locked_wait_map.get_mut(&request) { | ||
Some(waiter) => { | ||
// Register interest in key | ||
let mut receiver = waiter.subscribe(); | ||
drop(locked_wait_map); | ||
|
||
match receiver.recv().await { | ||
Ok(value) => return value, | ||
// there was an issue with the broadcast channel, retry fetching | ||
Err(_) => continue, | ||
} | ||
} | ||
None => { | ||
let (tx, _rx) = broadcast::channel(1); | ||
locked_wait_map.insert(request.clone(), tx.clone()); | ||
drop(locked_wait_map); | ||
|
||
let res = self.fetch(request.clone()).await; | ||
|
||
{ | ||
let mut locked_wait_map = self.wait_map.lock().await; | ||
locked_wait_map.remove(&request); | ||
} | ||
|
||
// Let our waiters know | ||
let broadcast_value = res.clone(); | ||
// Our use case is very specific, so we are sure that | ||
// we won't get any errors here. | ||
tokio::task::spawn_blocking(move || { | ||
tx.send(broadcast_value) | ||
.expect("there is always at least one receiver alive, the _rx guard; qed") | ||
}).await | ||
.expect("can only fail if the task is aborted or if the internal code panics, neither is possible here; qed"); | ||
return res; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm definitely beginning to feel there is some kind of generic functionality which we can extract for common use. I might try and prototype something up...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, that could be generalized
I'm waiting on apollographql/federation#1423 before I continue on this |
not compiling for now, will be reimplemented as a layer
0ff36a3
to
18c20c5
Compare
@Geal can you please ping me when i can review this ? 🙏 |
it's far from ready 😅 |
Ah I m going to mute notifications, i dunno why but github keeps asking me to review it 😂 |
mutations and subscriptions should not be deduplicated
let broadcast_value = res | ||
.as_ref() | ||
.map(|response| response.clone()) | ||
.map_err(|e| e.to_string()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use BoxError
all over the code (mainly because Buffer enforces BoxError), and BoxError
is not Clone
. But here we want to copy the response to all of the waiting queries, even if we got an error. Since we cannot access the underlying type, the best we can do is convert to a String and pass it around
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok as a String? Would it be better as a json Value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could serialize a JSON value to a string and pass it as error. Unfortunately, the only way to interact with std::error::Error
is through strings: https://doc.rust-lang.org/nightly/std/error/trait.Error.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we convert to String and preserve the error as source()? If we did, would it deliver any value?
(I'm just trying to avoid lowest common denominator of error type, but I guess we don't have many options.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC we discussed (with @BrynCooke I think?) using FetchError
as the error type for all subgraph services and layers, and convert to BoxError
just before passing it back to the Buffer
layer, but deferred that to later as that would be a big change introduced by that PR
@@ -92,9 +92,19 @@ pub struct SubgraphRequest { | |||
pub http_request: http_compat::Request<Request>, | |||
|
|||
pub context: Context, | |||
|
|||
pub operation_kind: OperationKind, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm adding the operation kind here for now, as I am not sure yet if it should be in the Context
object: this is data that's only needed for this request and does not need to be shared with other request, and there's no place in the HTTP request to put it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh that's interesting, we have it deep in the query planner, I use a rough variant of this in the http get related PR but this doesn't fit your usecase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has to be in the query planner, and from there it's used in subgraph queries. So both use cases are linked
name.to_string(), | ||
subgraph.routing_url.clone(), | ||
let dedup_layer = QueryDeduplicationLayer; | ||
let mut subgraph_service = BoxService::new(dedup_layer.layer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding the dedup layer here has less impact on the types than making the subgraph a BoxCloneService then adding the layer then boxing it again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good generally. Some questions and when you have time to look at them, let me know how you want to proceed and I can approve the PR from there.
let broadcast_value = res | ||
.as_ref() | ||
.map(|response| response.clone()) | ||
.map_err(|e| e.to_string()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok as a String? Would it be better as a json Value?
// this assumes headers are in the same order | ||
for (name, value) in self.inner.headers() { | ||
name.hash(state); | ||
value.hash(state); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really dislike HeaderMap. We can't even sort it...
I suppose the downside of not spotting duplicates is that our query can't be de-duplicated. Not a bug, but defeats the purpose of de-duplicating.
We can do something a bit crafty here. HeaderValue
does implement Ord
and HeaderName
always convert to &str
, so:
// Map Header names into &str so we can sort
let mut tmp: Vec<(&str, &HeaderValue)> = self
.inner
.headers()
.iter()
.map(|(k, v)| (k.as_str(), v))
.collect();
tmp.sort();
for (name, value) in tmp {
name.hash(state);
value.hash(state);
}
would give us a consistent ordering for hashing purposes. I think we could do the same for Eq as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should refrain from sorting the headers, since their order can have an impact. Example: the Accept
header can have multiple values, which can come either as comma separated in one header value, or as multiple separated Accept
headers. In that second case, if we sort the headers, that might reorder the values and change the behaviour.
The assumption for the cache key here is that similar queries coming from the same client will have the same shape (same user agent, same list of headers in the same order...).
What we could do though is decide on which headers we consider for the cache. Once we do that, we would get stronger guarantees, and could expose in the docs the issues around ordering. Could we explore that in another PR though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are only sorting our copy, the original headers are un-impacted. I agree that we can't touch the original headers, which we couldn't sort anyway since HeaderName
doesn't implement Ord
.
Does the original order of headers matter for hashing purposes? i.e.: don't we want the ordering to be consistent to help improve our de-duplication chances?
I'm fine with moving this discussion to a follow-up. I think it's important to make the comparison less fragile, but it doesn't need to be decided before merging to the tower branch. If you don't want to put my suggestion in, perhaps preserve it in the follow-up issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's definitely a good idea to revisit it and find a robust solution
we will land this after #429 |
LGTM, I'd favor serde camelCase directives over "rename" but it's just a nit |
alright, now that #429 is merged, I could remove the manual deserialization for |
related:
This is an experiment around query deduplication. It is done crudely by having a mutex and a wait map in the subgraph fetcher (as is done in the cache), and hashing over a JSON serialization of the request (since
Hash
is not implemented onserde_json::Value
).In my local tests, using the products and reviews subgraphs from the perf test project, assigning 2 cores to the router, with 100 concurrent clients, I see:
So even a naïve implementation gets a 1.8x performance boost, with a 10x reduction of CPU time in subgraphs.
Current issues:
both cores are stuck at 80%, so I suspect some contention over the mutexartefact of the benchmarks I was doing, I can saturate the cores easily nowserializing the json value instead of a Hash implementation is a hackfixed with Store JSON strings in a Bytes instance #284