-
Notifications
You must be signed in to change notification settings - Fork 272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cache unification into CachingMap #235
Conversation
I don't really love either approach to unifying: - delegate callback - function parameter Ideally we could store a callback in the CachingMap struct, but lifetimes and async restrictions mean I can't make that work at the moment. This draft PR is to discuss the pros/cons of the two approaches and see if anyone has suggestions for ways to improve.
Before raising Draft PR...
Unify the two cache implementations into a single implementation. After spending a bit of time exploring different ideas, I've settled on the idea proposed by Geoffrey to provide a value resolver for use by the cache map. This approach is cleanest from the point of view of client interactions, although it does require clients to implement the CacheResolver trait. Although CacheResolver is using dynamic dispatch, it's only called for once per cache miss (which is expected to be slow by definition), so I think that's an acceptable performance tradeoff.
#[allow(clippy::type_complexity)] | ||
wait_map: Mutex<HashMap<K, Sender<(K, CacheResult<V>)>>>, | ||
cache_limit: usize, | ||
resolver: Box<dyn CacheResolver<K, V> + Send + Sync>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dynamic dispatching is unnecessary. We should probably use a generic parameter instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put a comment in the PR about how the resolver would be used. It made the code a lot cleaner and I really don't think the performance matters in this use case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry I didn't read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After spending a bit of time exploring different ideas, I've settled
on the idea proposed by Geoffroy to provide a value resolver for use
by the cache map.This approach is cleanest from the point of view of client interactions,
although it does require clients to implement the CacheResolver trait.
You don't really explain why this approach was better. In my opinion it is much better if the user just provide a function-like.
Ideally we could store a callback in the CachingMap struct, but
lifetimes and async restrictions mean I can't make that work at the
moment.
Is this comment related?
What about this?
use std::marker::PhantomData;
pub struct CachingStuff<K, V, F> {
callback: F,
phantom_data: PhantomData<(K, V)>,
}
impl<K, V, F> CachingStuff<K, V, F>
where
F: Fn(K) -> V,
{
pub fn new(callback: F) -> Self {
Self {
callback,
phantom_data: PhantomData,
}
}
pub fn get(&self, key: K) -> V {
(self.callback)(key)
}
}
#[derive(Debug)]
pub struct MyError;
fn main() {
let cache = CachingStuff::new(|_key: String| Ok::<u32, MyError>(42));
let value_res: Result<u32, MyError> = cache.get("boo".to_string());
println!("Value: {:?}", value_res);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if the detail isn't apparent. The issue was caused by trying to maintain an async function callback. The various things we are calling are async and I wanted to provide an async interface to prevent blocking of threads when retrieving values (I guess I could have used spawn_blocking with a sync interface...).
I couldn't figure out a nice way to store such a "thing" (async fn callback effectively) in a struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the cache could be generic over the CacheResolver
without going all the way to using a function though. That would simplify the code a bit. The performance difference is not meaningful
pub struct CachingMap<K, V, R> {
#[derivative(Debug = "ignore")]
cached: Mutex<LruCache<K, CacheResult<V>>>,
#[allow(clippy::type_complexity)]
#[derivative(Debug = "ignore")]
wait_map: Mutex<HashMap<K, Sender<(K, CacheResult<V>)>>>,
cache_limit: usize,
#[derivative(Debug = "ignore")]
resolver: R,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use std::future::Future;
use std::marker::PhantomData;
pub struct CachingStuff<K, V, F, FUT> {
callback: F,
phantom_data: PhantomData<(K, V, FUT)>,
}
impl<K, V, F, FUT> CachingStuff<K, V, F, FUT>
where
F: Fn(K) -> FUT,
FUT: Future<Output = V>,
{
pub fn new(callback: F) -> Self {
Self {
callback,
phantom_data: PhantomData,
}
}
pub async fn get(&self, key: K) -> V {
(self.callback)(key).await
}
}
#[derive(Debug)]
pub struct MyError;
#[tokio::main]
async fn main() {
let cache = CachingStuff::new(|_key: String| async { Ok::<u32, MyError>(42) });
let value_res: Result<u32, MyError> = cache.get("boo".to_string()).await;
println!("Value: {:?}", value_res);
let cache = CachingStuff::new(|_key: String| async {
tokio::task::spawn_blocking(move || Ok::<u32, MyError>(42))
.await
.unwrap()
});
let value_res: Result<u32, MyError> = cache.get("boo".to_string()).await;
println!("Value: {:?}", value_res);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather we have the trait and a generic member, implement the trait over Fn, and let the calling side decide how they will use it. This will change nothing over the functionality, be more flexible and you'll have the function if you want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's make the code more complicated for no good reason.
This cache can only call one function per caching thing anyway. If we have new use cases and it feels relevant to add a trait, then we can add a trait. Until then please keep the code stupid simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This discussion has been extracted and deferred into issue: #244
Make sure code compiles.
As per code review comment.
#[allow(clippy::type_complexity)] | ||
wait_map: Mutex<HashMap<K, Sender<(K, CacheResult<V>)>>>, | ||
cache_limit: usize, | ||
resolver: Box<dyn CacheResolver<K, V> + Send + Sync>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the cache could be generic over the CacheResolver
without going all the way to using a function though. That would simplify the code a bit. The performance difference is not meaningful
pub struct CachingMap<K, V, R> {
#[derivative(Debug = "ignore")]
cached: Mutex<LruCache<K, CacheResult<V>>>,
#[allow(clippy::type_complexity)]
#[derivative(Debug = "ignore")]
wait_map: Mutex<HashMap<K, Sender<(K, CacheResult<V>)>>>,
cache_limit: usize,
#[derivative(Debug = "ignore")]
resolver: R,
/// the cache relies on the resolver to provide values. There is no way to manually remove, update | ||
/// or otherwise invalidate a cache value at this time. Values will be evicted from the cache once | ||
/// the cache_limit is reached. | ||
#[derive(Derivative)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need Derivative here? Does the cache need to implement Debug? the only field that will be displayed is the cache limit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No we don't need but it's a nice to have and it costs 5 lines. I'm fine with it and implementing Debug as much as possible makes the API nicer to use.
So I'm not against removing it. Slight preference for keeping it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's required because:
apollo-router-core/src/query_planner/caching_query_planner.rs
#[derive(Debug)]
pub struct CachingQueryPlanner<T: QueryPlanner> {
cm: CachingMap<QueryKey, Arc<QueryPlan>>,
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not required. We can use derivative there if we want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why would having the cache implement Debug make the API nice to use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we've missed something here though. We have to have Debug on CachingQueryPlanner right now, because it is collected in a caching span. (#[instrument]). So, it might be the right thing to not have a Debug on the cache, but to avoid it we have to exclude it in all Debug consumers, so right now, it's the right answer (I think). But, if we remove Debug requirements upstream, we can remove this one at the same time.
Maybe putting the above comment on the source would be helpful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh!! That's because it is on the trait QueryPlanner
. I don't mind removing this trait boundary. Then on ApolloRouter
you can use derivative to exclude the field from the debug output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to have Debug on CachingQueryPlanner right now, because it is collected in a caching span. (#[instrument]).
that sounds like something we do not want to have (cf the other perf issues we had with #[instrument])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the use of "skip_all" as recommended in the ADR on telemetry will sort out the telemetry perf problem when we implement it.
I'm trying to minimise the "blast radius" of this PR and so I'd like to avoid making changes outside of the required by the PR changes. So, I want to avoid modifying QueryPlanner/ApolloRouter/... and contain the changes to the minimum set. Keeping this change for now and fixing later and documenting the ability to remove later in a comment seems the cleanest approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This discussion has been extracted and deferred into issue: #244
// resolve the key (retrieve a value) for us | ||
// No cache locks are held here | ||
let value = self.resolver.retrieve(key.clone()).await; | ||
// Update our cache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this be done in a different scope? Like this:
{
let mut locked_cache = self.cached.lock().await;
locked_cache.put(key.clone(), value.clone());
// Update our wait list
let mut locked_wait_map = self.wait_map.lock().await;
locked_wait_map.remove(&key);
}
Because with the current code, the cache and map locks are still held when we're await'ing on the blocking task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm concerned that we would introduce a race if we didn't keep both the locks until the end of the notification. There may be opportunities for optimisation here, but I don't want to take the risk since it would only be a micro-optimisation and require careful analysis to make sure there were no mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there will be a race here:
- the sender was removed from the map after the value was put in the cache, so any new queries for the same key will get it from the cache directly
- all the receivers have already dropped the locks when they're waiting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so either, but I'm not sure. So, rather than take the risk of being wrong, I don't think the benefit of a small improvement in scalability is justified for the risk.
In particular, I'm concerned that a cache expiration event would cause a race. Very unlikely, but requires some careful thinking that I haven't done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes the concerns are real. Actually, could we work on a proof that it works, even without the modification I want? Not necessarily something formal, just exploring the various cases and check that they will be safe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the change I propose would affect the last sequence for the first query: ClCiWlWrTxWuCu would become ClCiWlWrWuCuTx
In all the cases I tried, once 1 reaches Wr, 2 is already in a safe state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another note: the steps 2ClCm and 1WmWiWuG are commutative.
and actually the 3 queries case resolves to less cases, since it's very constrained:
1ClCmWlCu 2ClCm 1WmWiWuG 2WlCu 3ClCm 1Cl: can't happen 3 has Cl
1ClCmWlCu 2ClCm 1WmWiWuG 2WlCu 3ClCm 3Wl: can't happen 2 has Wl
1ClCmWlCu 2ClCm 1WmWiWuG 2WlCu 3ClCm 2WhSWuRx 3WlCu 1ClCi 1Wl: can't happen 3 has Wl
1ClCmWlCu 2ClCm 1WmWiWuG 2WlCu 3ClCm 2WhSWuRx 3WlCu 1ClCi 3WhSWuRx 1WlWrTxWuCu OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm having a hard time to follow the discussion, is there a chance we can land the "conservative approach" and chase the optimisation as a followup?
The code as is looks like a great improvement already IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is not just an optimization, but a way to avoid high contention on the cache lock: the broadcast channel works by cloning the value before sending it to each receiver, so if the value is large and there is a lot of receivers, the lock would be held while executing a serie of clones, which is potentially expensive. And so no other queries would be able to use the cache in that time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This discussion has been extracted and deferred into issue: #244
Mock the retrieve trait and perform a concurrent retrieve test.
By using FuturesUnordered. This should be the most concurrent thing you can do in rust - (https://fasterthanli.me/articles/understanding-rust-futures-by-going-way-too-deep)
In particular: - Remove the type definition for CacheResult - Remove the error propagation for spawn task when sending broadcast
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All concerns have been moved to #244
PR Description
After spending a bit of time exploring different ideas, I've settled
on the idea proposed by Geoffroy to provide a value resolver for use
by the cache map.
This approach is cleanest from the point of view of client interactions,
although it does require clients to implement the CacheResolver trait.
Although CacheResolver is using dynamic dispatch, it's only called for
once per cache miss (which is expected to be slow by definition), so
I think that's an acceptable performance tradeoff.
Draft Description
(preserved in case you are interested in ideas I explored and discarded)
Draft of proposed changes for cache unification
I don't really love either approach to unifying:
Ideally we could store a callback in the CachingMap struct, but
lifetimes and async restrictions mean I can't make that work at the
moment.
This draft PR is to discuss the pros/cons of the two approaches and see
if anyone has suggestions for ways to improve.