diff --git a/apollo-router-core/src/cache.rs b/apollo-router-core/src/cache.rs new file mode 100644 index 0000000000..030addb545 --- /dev/null +++ b/apollo-router-core/src/cache.rs @@ -0,0 +1,210 @@ +use crate::{CacheResolver, CacheResolverError}; +use derivative::Derivative; +use futures::lock::Mutex; +use lru::LruCache; +use std::cmp::Eq; +use std::collections::HashMap; +use std::fmt; +use std::hash::Hash; +use tokio::sync::broadcast::{self, Sender}; + +/// A caching map optimised for slow value resolution. +/// +/// The CachingMap hold values in an LruCache. Values are loaded into the cache on a cache miss and +/// the cache relies on the resolver to provide values. There is no way to manually remove, update +/// or otherwise invalidate a cache value at this time. Values will be evicted from the cache once +/// the cache_limit is reached. +#[derive(Derivative)] +#[derivative(Debug)] +pub struct CachingMap { + #[derivative(Debug = "ignore")] + cached: Mutex>>, + #[allow(clippy::type_complexity)] + #[derivative(Debug = "ignore")] + wait_map: Mutex)>>>, + cache_limit: usize, + #[derivative(Debug = "ignore")] + resolver: Box + Send + Sync>, +} + +impl CachingMap +where + K: Clone + fmt::Debug + Eq + Hash + Send + Sync + 'static, + V: fmt::Debug + Send + Sync + 'static, + Result: Clone, +{ + /// Create a new CachingMap. + /// + /// resolver is used to resolve cache misses. + /// cache_limit specifies the size (number of items) of the cache + pub fn new(resolver: Box<(dyn CacheResolver + Send + Sync)>, cache_limit: usize) -> Self { + Self { + cached: Mutex::new(LruCache::new(cache_limit)), + wait_map: Mutex::new(HashMap::new()), + cache_limit, + resolver, + } + } + + /// Get a value from the cache. + pub async fn get(&self, key: K) -> Result { + let mut locked_cache = self.cached.lock().await; + if let Some(value) = locked_cache.get(&key).cloned() { + return value; + } + + // Holding a lock across the delegated get is a bad idea because + // the delegate get() could take a long time during which all + // other get() requests are blocked. + // Alternatively, if we don't hold the lock, there is a risk + // that we will do the work multiple times. This is also + // sub-optimal. + + // To work around this, we keep a list of keys we are currently + // processing in the delegate. If we try to get a key on this + // list, we block and wait for it to complete and then retry. + // + // This is more complex than either of the two simple + // alternatives but succeeds in providing a mechanism where each + // client only waits for uncached values that they are going to + // use AND avoids generating the value multiple times. + + let mut locked_wait_map = self.wait_map.lock().await; + + // We must only drop the locked cache after we have locked the + // wait map. Otherwise,we might get a race that causes us to + // miss a broadcast. + drop(locked_cache); + + match locked_wait_map.get_mut(&key) { + Some(waiter) => { + // Register interest in key + let mut receiver = waiter.subscribe(); + drop(locked_wait_map); + // Our use case is very specific, so we are sure + // that we won't get any errors here. + let (recv_key, recv_value) = receiver.recv().await.expect( + "the sender won't ever be dropped before all the receivers finish; qed", + ); + debug_assert_eq!(recv_key, key); + recv_value + } + None => { + let (tx, _rx) = broadcast::channel(1); + locked_wait_map.insert(key.clone(), tx.clone()); + drop(locked_wait_map); + // This is the potentially high duration operation where we ask our resolver to + // resolve the key (retrieve a value) for us + // No cache locks are held here + let value = self.resolver.retrieve(key.clone()).await; + // Update our cache + let mut locked_cache = self.cached.lock().await; + locked_cache.put(key.clone(), value.clone()); + // Update our wait list + let mut locked_wait_map = self.wait_map.lock().await; + locked_wait_map.remove(&key); + // Let our waiters know + let broadcast_value = value.clone(); + // Our use case is very specific, so we are sure that + // we won't get any errors here. + tokio::task::spawn_blocking(move || { + tx.send((key, broadcast_value)) + .expect("there is always at least one receiver alive, the _rx guard; qed") + }) + .await + .expect("can only fail if the task is aborted or if the internal code panics, neither is possible here; qed"); + value + } + } + } + + /// Get the top 20% of most recently (LRU) used keys + pub async fn get_hot_keys(&self) -> Vec { + let locked_cache = self.cached.lock().await; + locked_cache + .iter() + .take(self.cache_limit / 5) + .map(|(key, _value)| key.clone()) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::CacheResolverError; + use async_trait::async_trait; + use futures::stream::{FuturesUnordered, StreamExt}; + use mockall::mock; + use test_log::test; + + struct HasACache { + cm: CachingMap, + } + + struct HasACacheResolver {} + + impl HasACache { + // fn new(resolver: limit: usize) -> Self { + fn new( + resolver: Box<(dyn CacheResolver + Send + Sync)>, + cache_limit: usize, + ) -> Self { + // let resolver = Box::new(HasACacheResolver {}); + let cm = CachingMap::new(resolver, cache_limit); + Self { cm } + } + + async fn get(&self, key: usize) -> Result { + self.cm.get(key).await + } + } + + #[async_trait] + impl CacheResolver for HasACacheResolver { + async fn retrieve(&self, key: usize) -> Result { + Ok(key) + } + } + + mock! { + HasACacheResolver {} + + #[async_trait] + impl CacheResolver for HasACacheResolver { + async fn retrieve(&self, key: usize) -> Result; + } + } + + #[test(tokio::test)] + async fn it_should_enforce_cache_limits() { + let cache = HasACache::new(Box::new(HasACacheResolver {}), 13); + + for i in 0..14 { + cache.get(i).await.expect("gets the value"); + } + let guard = cache.cm.cached.lock().await; + assert_eq!(guard.len(), 13); + } + + #[test(tokio::test)] + async fn it_should_only_delegate_once_per_key() { + let mut mock = MockHasACacheResolver::new(); + + mock.expect_retrieve().times(1).return_const(Ok(1)); + + let cache = HasACache::new(Box::new(mock), 10); + + // Let's trigger 100 concurrent gets of the same value and ensure only + // one delegated retrieve is made + let mut computations: FuturesUnordered<_> = (0..100).map(|_| cache.get(1)).collect(); + + while let Some(result) = computations.next().await { + result.expect("result retrieved"); + } + + // To be really sure, check there is only one value in the cache + let guard = cache.cm.cached.lock().await; + assert_eq!(guard.len(), 1); + } +} diff --git a/apollo-router-core/src/error.rs b/apollo-router-core/src/error.rs index a5e0a12a07..0d964964c3 100644 --- a/apollo-router-core/src/error.rs +++ b/apollo-router-core/src/error.rs @@ -150,6 +150,19 @@ impl From for FetchError { } } +/// Error types for CacheResolver +#[derive(Error, Debug, Display, Clone)] +pub enum CacheResolverError { + /// Value retrieval failed: {0} + RetrievalError(Arc), +} + +impl From for CacheResolverError { + fn from(err: QueryPlannerError) -> Self { + CacheResolverError::RetrievalError(Arc::new(err)) + } +} + /// An error while processing JSON data. #[derive(Debug, Error, Display)] pub enum JsonExtError { @@ -168,6 +181,9 @@ pub enum QueryPlannerError { /// Query planning panicked: {0} JoinError(Arc), + /// Cache resolution failed: {0} + CacheResolverError(Arc), + /// Unhandled planner result. UnhandledPlannerResult, } @@ -184,6 +200,12 @@ impl From for QueryPlannerError { } } +impl From for QueryPlannerError { + fn from(err: CacheResolverError) -> Self { + QueryPlannerError::CacheResolverError(Arc::new(err)) + } +} + impl From for ResponseStream { fn from(err: QueryPlannerError) -> Self { stream::once(future::ready(FetchError::from(err).to_response(true))).boxed() diff --git a/apollo-router-core/src/lib.rs b/apollo-router-core/src/lib.rs index d497a8894d..5b7ec760d8 100644 --- a/apollo-router-core/src/lib.rs +++ b/apollo-router-core/src/lib.rs @@ -22,6 +22,7 @@ macro_rules! failfast_error { }}; } +mod cache; mod error; mod json_ext; mod naive_introspection; @@ -33,6 +34,7 @@ mod response; mod schema; mod traits; +pub use cache::*; pub use error::*; pub use json_ext::*; pub use naive_introspection::*; diff --git a/apollo-router-core/src/query_cache.rs b/apollo-router-core/src/query_cache.rs index 679d8b57af..4248553cdf 100644 --- a/apollo-router-core/src/query_cache.rs +++ b/apollo-router-core/src/query_cache.rs @@ -1,109 +1,55 @@ use crate::prelude::graphql::*; -use futures::lock::Mutex; -use lru::LruCache; -use std::collections::HashMap; +use crate::CacheResolver; use std::sync::Arc; -use tokio::sync::broadcast; /// A cache for parsed GraphQL queries. #[derive(Debug)] pub struct QueryCache { - cached: Mutex>>>, - #[allow(clippy::type_complexity)] - wait_map: Mutex>)>>>, + cm: CachingMap>>, +} + +/// A resolver for cache misses +struct QueryCacheResolver { schema: Arc, } +#[async_trait::async_trait] +impl CacheResolver>> for QueryCacheResolver { + async fn retrieve(&self, key: String) -> Result>, CacheResolverError> { + let query_parsing_future = { + let schema = Arc::clone(&self.schema); + tokio::task::spawn_blocking(move || Query::parse(key, &schema)) + }; + let parsed_query = match query_parsing_future.await { + Ok(res) => res.map(Arc::new), + // Silently ignore cancelled tasks (never happen for blocking tasks). + Err(err) if err.is_cancelled() => None, + Err(err) => { + failfast_debug!("Parsing query task failed: {}", err); + None + } + }; + Ok(parsed_query) + } +} + impl QueryCache { /// Instantiate a new cache for parsed GraphQL queries. pub fn new(cache_limit: usize, schema: Arc) -> Self { - Self { - cached: Mutex::new(LruCache::new(cache_limit)), - wait_map: Mutex::new(HashMap::new()), - schema, - } + let resolver = QueryCacheResolver { schema }; + let cm = CachingMap::new(Box::new(resolver), cache_limit); + Self { cm } } /// Attempt to parse a string to a [`Query`] using cache if possible. pub async fn get_query(&self, query: impl AsRef) -> Option> { - let mut locked_cache = self.cached.lock().await; let key = query.as_ref().to_string(); - if let Some(value) = locked_cache.get(&key).cloned() { - return value; - } - - // Holding a lock across the query parsing tasks is a bad idea because this would block all - // other get() requests for a potentially long time. - // - // Alternatively, if we don't hold the lock, there is a risk that we will do the work - // multiple times. This is also sub-optimal. - - // To work around this, we keep a list of keys we are currently processing. If we try to - // get a key on this list, we block and wait for it to complete and then retry. - // - // This is more complex than either of the two simple alternatives but succeeds in - // providing a mechanism where each client only waits for uncached Query they are going to - // use AND avoids generating the query multiple times. - let mut locked_wait_map = self.wait_map.lock().await; - - // We must only drop the locked cache after we have locked the - // wait map. Otherwise,we might get a race that causes us to - // miss a broadcast. - drop(locked_cache); - - match locked_wait_map.get_mut(&key) { - Some(waiter) => { - // Register interest in key - let mut receiver = waiter.subscribe(); - drop(locked_wait_map); - let (recv_key, recv_plan) = receiver.recv().await.expect( - "the sender won't ever be dropped before all the receivers finish; qed", - ); - debug_assert_eq!(recv_key, key); - recv_plan - } - None => { - let (tx, _rx) = broadcast::channel(1); - locked_wait_map.insert(key.clone(), tx.clone()); - drop(locked_wait_map); - // This is the potentially high duration operation - // No locks are held here - let query_parsing_future = { - let query = query.as_ref().to_string(); - let schema = Arc::clone(&self.schema); - tokio::task::spawn_blocking(move || Query::parse(query, &schema)) - }; - let parsed_query = match query_parsing_future.await { - Ok(res) => res.map(Arc::new), - // Silently ignore cancelled tasks (never happen for blocking tasks). - Err(err) if err.is_cancelled() => None, - Err(err) => { - failfast_debug!("Parsing query task failed: {}", err); - None - } - }; - // Update our cache - let mut locked_cache = self.cached.lock().await; - locked_cache.put(key.clone(), parsed_query.clone()); - // Update our wait list - let mut locked_wait_map = self.wait_map.lock().await; - locked_wait_map.remove(&key); - // Let our waiters know - let broadcast_value = parsed_query.clone(); - match tokio::task::spawn_blocking(move || { - let _ = tx - .send((key, broadcast_value)) - .expect("there is always at least one receiver alive, the _rx guard; qed"); - }) - .await - { - Ok(()) => parsed_query, - Err(err) => { - failfast_debug!("Parsing query task failed: {}", err); - None - } - } + match self.cm.get(key).await { + Ok(v) => v, + Err(err) => { + failfast_debug!("Parsing query task failed: {}", err); + None } } } diff --git a/apollo-router-core/src/query_planner/caching_query_planner.rs b/apollo-router-core/src/query_planner/caching_query_planner.rs index c617482e6c..cb841312a6 100644 --- a/apollo-router-core/src/query_planner/caching_query_planner.rs +++ b/apollo-router-core/src/query_planner/caching_query_planner.rs @@ -1,10 +1,8 @@ use crate::prelude::graphql::*; +use crate::CacheResolver; use async_trait::async_trait; -use futures::lock::Mutex; -use lru::LruCache; -use std::collections::HashMap; +use std::marker::PhantomData; use std::sync::Arc; -use tokio::sync::broadcast::{self, Sender}; type PlanResult = Result, QueryPlannerError>; @@ -13,22 +11,39 @@ type PlanResult = Result, QueryPlannerError>; /// The query planner performs LRU caching. #[derive(Debug)] pub struct CachingQueryPlanner { + cm: CachingMap>, + phantom: PhantomData, +} + +/// A resolver for cache misses +struct CachingQueryPlannerResolver { delegate: T, - cached: Mutex>, - wait_map: Mutex>>, - plan_cache_limit: usize, } -impl CachingQueryPlanner { - /// Creates a new query planner that cache the results of another [`QueryPlanner`]. +impl CachingQueryPlanner { + /// Creates a new query planner that caches the results of another [`QueryPlanner`]. pub fn new(delegate: T, plan_cache_limit: usize) -> CachingQueryPlanner { + let resolver = CachingQueryPlannerResolver { delegate }; + let cm = CachingMap::new(Box::new(resolver), plan_cache_limit); Self { - delegate, - cached: Mutex::new(LruCache::new(plan_cache_limit)), - wait_map: Mutex::new(HashMap::new()), - plan_cache_limit, + cm, + phantom: PhantomData, } } + + pub async fn get_hot_keys(&self) -> Vec { + self.cm.get_hot_keys().await + } +} + +#[async_trait] +impl CacheResolver> for CachingQueryPlannerResolver { + async fn retrieve(&self, key: QueryKey) -> Result, CacheResolverError> { + self.delegate + .get(key.0, key.1, key.2) + .await + .map_err(|err| err.into()) + } } #[async_trait] @@ -39,88 +54,8 @@ impl QueryPlanner for CachingQueryPlanner { operation: Option, options: QueryPlanOptions, ) -> PlanResult { - let mut locked_cache = self.cached.lock().await; let key = (query, operation, options); - if let Some(value) = locked_cache.get(&key).cloned() { - return value; - } - - // Holding a lock across the delegated get is a bad idea because - // the delegate get() calls into v8 for processing of the plan. - // This would block all other get() requests for a potentially - // long time. - // Alternatively, if we don't hold the lock, there is a risk - // that we will do the work multiple times. This is also - // sub-optimal. - - // To work around this, we keep a list of keys we are currently - // processing in the delegate. If we try to get a key on this - // list, we block and wait for it to complete and then retry. - // - // This is more complex than either of the two simple - // alternatives but succeeds in providing a mechanism where each - // client only waits for uncached QueryPlans they are going to - // use AND avoids generating the plan multiple times. - - let mut locked_wait_map = self.wait_map.lock().await; - - // We must only drop the locked cache after we have locked the - // wait map. Otherwise,we might get a race that causes us to - // miss a broadcast. - drop(locked_cache); - - match locked_wait_map.get_mut(&key) { - Some(waiter) => { - // Register interest in key - let mut receiver = waiter.subscribe(); - drop(locked_wait_map); - // Our use case is very specific, so we are sure - // that we won't get any errors here. - let (recv_key, recv_plan) = receiver.recv().await.expect( - "the sender won't ever be dropped before all the receivers finish; qed", - ); - debug_assert_eq!(recv_key, key); - recv_plan - } - None => { - let (tx, _rx) = broadcast::channel(1); - locked_wait_map.insert(key.clone(), tx.clone()); - drop(locked_wait_map); - // This is the potentially high duration operation - // No locks are held here - let value = self - .delegate - .get(key.0.clone(), key.1.clone(), key.2.clone()) - .await; - // Update our cache - let mut locked_cache = self.cached.lock().await; - locked_cache.put(key.clone(), value.clone()); - // Update our wait list - let mut locked_wait_map = self.wait_map.lock().await; - locked_wait_map.remove(&key); - // Let our waiters know - let broadcast_value = value.clone(); - // Our use case is very specific, so we are sure that - // we won't get any errors here. - tokio::task::spawn_blocking(move || { - tx.send((key, broadcast_value)) - .expect("there is always at least one receiver alive, the _rx guard; qed") - }) - .await?; - value - } - } - } -} - -impl CachingQueryPlanner { - pub async fn get_hot_keys(&self) -> Vec { - let locked_cache = self.cached.lock().await; - locked_cache - .iter() - .take(self.plan_cache_limit / 5) - .map(|(key, _value)| key.clone()) - .collect() + self.cm.get(key).await.map_err(|err| err.into()) } } diff --git a/apollo-router-core/src/traits.rs b/apollo-router-core/src/traits.rs index 4d88deeae5..f1ff3a13f5 100644 --- a/apollo-router-core/src/traits.rs +++ b/apollo-router-core/src/traits.rs @@ -4,6 +4,15 @@ use futures::prelude::*; use std::sync::Arc; use std::{fmt::Debug, pin::Pin}; +/// A cache resolution trait. +/// +/// Clients of CachingMap are required to provider a resolver during Map creation. The resolver +/// will be used to find values for cache misses. A Result is expected, because retrieval may fail. +#[async_trait] +pub trait CacheResolver { + async fn retrieve(&self, key: K) -> Result; +} + /// A planner key. /// /// This type consists of a query string, an optional operation string and the @@ -45,6 +54,22 @@ pub trait QueryPlanner: Send + Sync + Debug { ) -> Result, QueryPlannerError>; } +/// With caching trait. +/// +/// Adds with_caching to any query planner. +pub trait WithCaching: QueryPlanner +where + Self: Sized + QueryPlanner + 'static, +{ + /// Wrap this query planner in a caching decorator. + /// The original query planner is consumed. + fn with_caching(self, plan_cache_limit: usize) -> CachingQueryPlanner { + CachingQueryPlanner::new(self, plan_cache_limit) + } +} + +impl WithCaching for T where T: QueryPlanner + Sized + 'static {} + /// An object that accepts a [`Request`] and allow creating [`PreparedQuery`]'s. /// /// The call to the function will either succeeds and return a [`PreparedQuery`] or it will fail and return