Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache unification into CachingMap #235

Merged
merged 13 commits into from
Dec 7, 2021
210 changes: 210 additions & 0 deletions apollo-router-core/src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
use crate::{CacheResolver, CacheResolverError};
use derivative::Derivative;
use futures::lock::Mutex;
use lru::LruCache;
use std::cmp::Eq;
use std::collections::HashMap;
use std::fmt;
use std::hash::Hash;
use tokio::sync::broadcast::{self, Sender};

/// A caching map optimised for slow value resolution.
///
/// The CachingMap hold values in an LruCache. Values are loaded into the cache on a cache miss and
/// the cache relies on the resolver to provide values. There is no way to manually remove, update
/// or otherwise invalidate a cache value at this time. Values will be evicted from the cache once
/// the cache_limit is reached.
#[derive(Derivative)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need Derivative here? Does the cache need to implement Debug? the only field that will be displayed is the cache limit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we don't need but it's a nice to have and it costs 5 lines. I'm fine with it and implementing Debug as much as possible makes the API nicer to use.

So I'm not against removing it. Slight preference for keeping it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's required because:

apollo-router-core/src/query_planner/caching_query_planner.rs

#[derive(Debug)]
pub struct CachingQueryPlanner<T: QueryPlanner> {
    cm: CachingMap<QueryKey, Arc<QueryPlan>>,
...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not required. We can use derivative there if we want

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would having the cache implement Debug make the API nice to use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we've missed something here though. We have to have Debug on CachingQueryPlanner right now, because it is collected in a caching span. (#[instrument]). So, it might be the right thing to not have a Debug on the cache, but to avoid it we have to exclude it in all Debug consumers, so right now, it's the right answer (I think). But, if we remove Debug requirements upstream, we can remove this one at the same time.

Maybe putting the above comment on the source would be helpful?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh!! That's because it is on the trait QueryPlanner. I don't mind removing this trait boundary. Then on ApolloRouter you can use derivative to exclude the field from the debug output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to have Debug on CachingQueryPlanner right now, because it is collected in a caching span. (#[instrument]).

that sounds like something we do not want to have (cf the other perf issues we had with #[instrument])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the use of "skip_all" as recommended in the ADR on telemetry will sort out the telemetry perf problem when we implement it.

I'm trying to minimise the "blast radius" of this PR and so I'd like to avoid making changes outside of the required by the PR changes. So, I want to avoid modifying QueryPlanner/ApolloRouter/... and contain the changes to the minimum set. Keeping this change for now and fixing later and documenting the ability to remove later in a comment seems the cleanest approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This discussion has been extracted and deferred into issue: #244

#[derivative(Debug)]
pub struct CachingMap<K, V> {
#[derivative(Debug = "ignore")]
cached: Mutex<LruCache<K, Result<V, CacheResolverError>>>,
#[allow(clippy::type_complexity)]
#[derivative(Debug = "ignore")]
wait_map: Mutex<HashMap<K, Sender<(K, Result<V, CacheResolverError>)>>>,
cache_limit: usize,
#[derivative(Debug = "ignore")]
resolver: Box<dyn CacheResolver<K, V> + Send + Sync>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dynamic dispatching is unnecessary. We should probably use a generic parameter instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put a comment in the PR about how the resolver would be used. It made the code a lot cleaner and I really don't think the performance matters in this use case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry I didn't read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After spending a bit of time exploring different ideas, I've settled
on the idea proposed by Geoffroy to provide a value resolver for use
by the cache map.

This approach is cleanest from the point of view of client interactions,
although it does require clients to implement the CacheResolver trait.

You don't really explain why this approach was better. In my opinion it is much better if the user just provide a function-like.

Ideally we could store a callback in the CachingMap struct, but
lifetimes and async restrictions mean I can't make that work at the
moment.

Is this comment related?

What about this?

use std::marker::PhantomData;

pub struct CachingStuff<K, V, F> {
    callback: F,
    phantom_data: PhantomData<(K, V)>,
}

impl<K, V, F> CachingStuff<K, V, F>
where
    F: Fn(K) -> V,
{
    pub fn new(callback: F) -> Self {
        Self {
            callback,
            phantom_data: PhantomData,
        }
    }

    pub fn get(&self, key: K) -> V {
        (self.callback)(key)
    }
}

#[derive(Debug)]
pub struct MyError;

fn main() {
    let cache = CachingStuff::new(|_key: String| Ok::<u32, MyError>(42));
    let value_res: Result<u32, MyError> = cache.get("boo".to_string());
    println!("Value: {:?}", value_res);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if the detail isn't apparent. The issue was caused by trying to maintain an async function callback. The various things we are calling are async and I wanted to provide an async interface to prevent blocking of threads when retrieving values (I guess I could have used spawn_blocking with a sync interface...).

I couldn't figure out a nice way to store such a "thing" (async fn callback effectively) in a struct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cache could be generic over the CacheResolver without going all the way to using a function though. That would simplify the code a bit. The performance difference is not meaningful

pub struct CachingMap<K, V, R> {
    #[derivative(Debug = "ignore")]
    cached: Mutex<LruCache<K, CacheResult<V>>>,
    #[allow(clippy::type_complexity)]
    #[derivative(Debug = "ignore")]
    wait_map: Mutex<HashMap<K, Sender<(K, CacheResult<V>)>>>,
    cache_limit: usize,
    #[derivative(Debug = "ignore")]
    resolver: R,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use std::future::Future;
use std::marker::PhantomData;

pub struct CachingStuff<K, V, F, FUT> {
    callback: F,
    phantom_data: PhantomData<(K, V, FUT)>,
}

impl<K, V, F, FUT> CachingStuff<K, V, F, FUT>
where
    F: Fn(K) -> FUT,
    FUT: Future<Output = V>,
{
    pub fn new(callback: F) -> Self {
        Self {
            callback,
            phantom_data: PhantomData,
        }
    }

    pub async fn get(&self, key: K) -> V {
        (self.callback)(key).await
    }
}

#[derive(Debug)]
pub struct MyError;

#[tokio::main]
async fn main() {
    let cache = CachingStuff::new(|_key: String| async { Ok::<u32, MyError>(42) });
    let value_res: Result<u32, MyError> = cache.get("boo".to_string()).await;
    println!("Value: {:?}", value_res);
    let cache = CachingStuff::new(|_key: String| async {
        tokio::task::spawn_blocking(move || Ok::<u32, MyError>(42))
            .await
            .unwrap()
    });
    let value_res: Result<u32, MyError> = cache.get("boo".to_string()).await;
    println!("Value: {:?}", value_res);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather we have the trait and a generic member, implement the trait over Fn, and let the calling side decide how they will use it. This will change nothing over the functionality, be more flexible and you'll have the function if you want

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's make the code more complicated for no good reason.

This cache can only call one function per caching thing anyway. If we have new use cases and it feels relevant to add a trait, then we can add a trait. Until then please keep the code stupid simple.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This discussion has been extracted and deferred into issue: #244

}

impl<K, V> CachingMap<K, V>
where
K: Clone + fmt::Debug + Eq + Hash + Send + Sync + 'static,
V: fmt::Debug + Send + Sync + 'static,
Result<V, CacheResolverError>: Clone,
{
/// Create a new CachingMap.
///
/// resolver is used to resolve cache misses.
/// cache_limit specifies the size (number of items) of the cache
pub fn new(resolver: Box<(dyn CacheResolver<K, V> + Send + Sync)>, cache_limit: usize) -> Self {
Self {
cached: Mutex::new(LruCache::new(cache_limit)),
wait_map: Mutex::new(HashMap::new()),
cache_limit,
resolver,
}
}

/// Get a value from the cache.
pub async fn get(&self, key: K) -> Result<V, CacheResolverError> {
let mut locked_cache = self.cached.lock().await;
if let Some(value) = locked_cache.get(&key).cloned() {
return value;
}

// Holding a lock across the delegated get is a bad idea because
// the delegate get() could take a long time during which all
// other get() requests are blocked.
// Alternatively, if we don't hold the lock, there is a risk
// that we will do the work multiple times. This is also
// sub-optimal.

// To work around this, we keep a list of keys we are currently
// processing in the delegate. If we try to get a key on this
// list, we block and wait for it to complete and then retry.
//
// This is more complex than either of the two simple
// alternatives but succeeds in providing a mechanism where each
// client only waits for uncached values that they are going to
// use AND avoids generating the value multiple times.

let mut locked_wait_map = self.wait_map.lock().await;

// We must only drop the locked cache after we have locked the
// wait map. Otherwise,we might get a race that causes us to
// miss a broadcast.
drop(locked_cache);

match locked_wait_map.get_mut(&key) {
Some(waiter) => {
// Register interest in key
let mut receiver = waiter.subscribe();
drop(locked_wait_map);
// Our use case is very specific, so we are sure
// that we won't get any errors here.
let (recv_key, recv_value) = receiver.recv().await.expect(
"the sender won't ever be dropped before all the receivers finish; qed",
);
debug_assert_eq!(recv_key, key);
recv_value
}
None => {
let (tx, _rx) = broadcast::channel(1);
locked_wait_map.insert(key.clone(), tx.clone());
drop(locked_wait_map);
// This is the potentially high duration operation where we ask our resolver to
// resolve the key (retrieve a value) for us
// No cache locks are held here
let value = self.resolver.retrieve(key.clone()).await;
// Update our cache
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be done in a different scope? Like this:

{
                let mut locked_cache = self.cached.lock().await;
                locked_cache.put(key.clone(), value.clone());
                // Update our wait list
                let mut locked_wait_map = self.wait_map.lock().await;
                locked_wait_map.remove(&key);
}

Because with the current code, the cache and map locks are still held when we're await'ing on the blocking task

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that we would introduce a race if we didn't keep both the locks until the end of the notification. There may be opportunities for optimisation here, but I don't want to take the risk since it would only be a micro-optimisation and require careful analysis to make sure there were no mistakes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there will be a race here:

  • the sender was removed from the map after the value was put in the cache, so any new queries for the same key will get it from the cache directly
  • all the receivers have already dropped the locks when they're waiting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so either, but I'm not sure. So, rather than take the risk of being wrong, I don't think the benefit of a small improvement in scalability is justified for the risk.

In particular, I'm concerned that a cache expiration event would cause a race. Very unlikely, but requires some careful thinking that I haven't done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes the concerns are real. Actually, could we work on a proof that it works, even without the modification I want? Not necessarily something formal, just exploring the various cases and check that they will be safe

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the change I propose would affect the last sequence for the first query: ClCiWlWrTxWuCu would become ClCiWlWrWuCuTx

In all the cases I tried, once 1 reaches Wr, 2 is already in a safe state

Copy link
Contributor

@Geal Geal Dec 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another note: the steps 2ClCm and 1WmWiWuG are commutative.

and actually the 3 queries case resolves to less cases, since it's very constrained:
1ClCmWlCu 2ClCm 1WmWiWuG 2WlCu 3ClCm 1Cl: can't happen 3 has Cl
1ClCmWlCu 2ClCm 1WmWiWuG 2WlCu 3ClCm 3Wl: can't happen 2 has Wl
1ClCmWlCu 2ClCm 1WmWiWuG 2WlCu 3ClCm 2WhSWuRx 3WlCu 1ClCi 1Wl: can't happen 3 has Wl
1ClCmWlCu 2ClCm 1WmWiWuG 2WlCu 3ClCm 2WhSWuRx 3WlCu 1ClCi 3WhSWuRx 1WlWrTxWuCu OK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having a hard time to follow the discussion, is there a chance we can land the "conservative approach" and chase the optimisation as a followup?

The code as is looks like a great improvement already IMO

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not just an optimization, but a way to avoid high contention on the cache lock: the broadcast channel works by cloning the value before sending it to each receiver, so if the value is large and there is a lot of receivers, the lock would be held while executing a serie of clones, which is potentially expensive. And so no other queries would be able to use the cache in that time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This discussion has been extracted and deferred into issue: #244

let mut locked_cache = self.cached.lock().await;
locked_cache.put(key.clone(), value.clone());
// Update our wait list
let mut locked_wait_map = self.wait_map.lock().await;
locked_wait_map.remove(&key);
// Let our waiters know
let broadcast_value = value.clone();
// Our use case is very specific, so we are sure that
// we won't get any errors here.
tokio::task::spawn_blocking(move || {
tx.send((key, broadcast_value))
.expect("there is always at least one receiver alive, the _rx guard; qed")
})
.await
.expect("can only fail if the task is aborted or if the internal code panics, neither is possible here; qed");
value
}
}
}

/// Get the top 20% of most recently (LRU) used keys
pub async fn get_hot_keys(&self) -> Vec<K> {
let locked_cache = self.cached.lock().await;
locked_cache
.iter()
.take(self.cache_limit / 5)
.map(|(key, _value)| key.clone())
.collect()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::CacheResolverError;
use async_trait::async_trait;
use futures::stream::{FuturesUnordered, StreamExt};
use mockall::mock;
use test_log::test;

struct HasACache {
cm: CachingMap<usize, usize>,
}

struct HasACacheResolver {}

impl HasACache {
// fn new(resolver: limit: usize) -> Self {
fn new(
resolver: Box<(dyn CacheResolver<usize, usize> + Send + Sync)>,
cache_limit: usize,
) -> Self {
// let resolver = Box::new(HasACacheResolver {});
let cm = CachingMap::new(resolver, cache_limit);
Self { cm }
}

async fn get(&self, key: usize) -> Result<usize, CacheResolverError> {
self.cm.get(key).await
}
}

#[async_trait]
impl CacheResolver<usize, usize> for HasACacheResolver {
async fn retrieve(&self, key: usize) -> Result<usize, CacheResolverError> {
Ok(key)
}
}

mock! {
HasACacheResolver {}

#[async_trait]
impl CacheResolver<usize, usize> for HasACacheResolver {
async fn retrieve(&self, key: usize) -> Result<usize, CacheResolverError>;
}
}

#[test(tokio::test)]
async fn it_should_enforce_cache_limits() {
let cache = HasACache::new(Box::new(HasACacheResolver {}), 13);

for i in 0..14 {
cache.get(i).await.expect("gets the value");
}
let guard = cache.cm.cached.lock().await;
assert_eq!(guard.len(), 13);
}

#[test(tokio::test)]
async fn it_should_only_delegate_once_per_key() {
let mut mock = MockHasACacheResolver::new();

mock.expect_retrieve().times(1).return_const(Ok(1));

let cache = HasACache::new(Box::new(mock), 10);

// Let's trigger 100 concurrent gets of the same value and ensure only
// one delegated retrieve is made
let mut computations: FuturesUnordered<_> = (0..100).map(|_| cache.get(1)).collect();

while let Some(result) = computations.next().await {
result.expect("result retrieved");
}

// To be really sure, check there is only one value in the cache
let guard = cache.cm.cached.lock().await;
assert_eq!(guard.len(), 1);
}
}
22 changes: 22 additions & 0 deletions apollo-router-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,19 @@ impl From<QueryPlannerError> for FetchError {
}
}

/// Error types for CacheResolver
#[derive(Error, Debug, Display, Clone)]
pub enum CacheResolverError {
/// Value retrieval failed: {0}
RetrievalError(Arc<QueryPlannerError>),
}

impl From<QueryPlannerError> for CacheResolverError {
fn from(err: QueryPlannerError) -> Self {
CacheResolverError::RetrievalError(Arc::new(err))
}
}

/// An error while processing JSON data.
#[derive(Debug, Error, Display)]
pub enum JsonExtError {
Expand All @@ -168,6 +181,9 @@ pub enum QueryPlannerError {
/// Query planning panicked: {0}
JoinError(Arc<JoinError>),

/// Cache resolution failed: {0}
CacheResolverError(Arc<CacheResolverError>),

/// Unhandled planner result.
UnhandledPlannerResult,
}
Expand All @@ -184,6 +200,12 @@ impl From<JoinError> for QueryPlannerError {
}
}

impl From<CacheResolverError> for QueryPlannerError {
fn from(err: CacheResolverError) -> Self {
QueryPlannerError::CacheResolverError(Arc::new(err))
}
}

impl From<QueryPlannerError> for ResponseStream {
fn from(err: QueryPlannerError) -> Self {
stream::once(future::ready(FetchError::from(err).to_response(true))).boxed()
Expand Down
2 changes: 2 additions & 0 deletions apollo-router-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ macro_rules! failfast_error {
}};
}

mod cache;
mod error;
mod json_ext;
mod naive_introspection;
Expand All @@ -33,6 +34,7 @@ mod response;
mod schema;
mod traits;

pub use cache::*;
pub use error::*;
pub use json_ext::*;
pub use naive_introspection::*;
Expand Down
Loading