From b31a614957c7fc21dc316ae8dd4857f3f82a62c7 Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Thu, 2 Dec 2021 13:23:51 +0000 Subject: [PATCH 1/8] Draft of proposed changes for cache unification I don't really love either approach to unifying: - delegate callback - function parameter Ideally we could store a callback in the CachingMap struct, but lifetimes and async restrictions mean I can't make that work at the moment. This draft PR is to discuss the pros/cons of the two approaches and see if anyone has suggestions for ways to improve. --- apollo-router-core/src/cache.rs | 232 ++++++++++++++++++ apollo-router-core/src/lib.rs | 2 + apollo-router-core/src/query_cache.rs | 137 ++++------- .../query_planner/caching_query_planner.rs | 114 ++------- apollo-router-core/src/traits.rs | 5 + 5 files changed, 309 insertions(+), 181 deletions(-) create mode 100644 apollo-router-core/src/cache.rs diff --git a/apollo-router-core/src/cache.rs b/apollo-router-core/src/cache.rs new file mode 100644 index 0000000000..9d0f924641 --- /dev/null +++ b/apollo-router-core/src/cache.rs @@ -0,0 +1,232 @@ +use crate::CacheCallback; +use futures::lock::Mutex; +use lru::{KeyRef, LruCache}; +use std::borrow::Borrow; +use std::cmp::Eq; +use std::collections::HashMap; +use std::error::Error; +use std::fmt; +use std::future::Future; +use std::hash::Hash; +use tokio::sync::broadcast::{self, Sender}; +use tokio::task::JoinError; + +/// A query planner wrapper that caches results. +/// +/// The query planner performs LRU caching. +pub struct CachingMap { + // delegate: Option + Send + Sync + 'static>>, + cached: Mutex>>, + #[allow(clippy::type_complexity)] + wait_map: Mutex)>>>, + cache_limit: usize, +} + +impl fmt::Debug for CachingMap { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CachingMap") + .field("cache_limit", &self.cache_limit) + .finish() + } +} + +impl CachingMap +where + E: Error + From + Send + Sync + 'static, + K: Clone + fmt::Debug + Eq + Hash + Send + Sync + 'static, + V: fmt::Debug + Send + Sync + 'static, + KeyRef: Borrow, + Result: Clone, +{ + /// Creates a new CachingMap + // pub fn new(delegate: Option>>, cache_limit: usize) -> Self { + pub fn new(cache_limit: usize) -> Self { + Self { + // delegate, + cached: Mutex::new(LruCache::new(cache_limit)), + wait_map: Mutex::new(HashMap::new()), + cache_limit, + } + } + + pub async fn get( + &self, + callback: &(dyn CacheCallback + Send + Sync + 'static), + key: K, + ) -> Result { + let mut locked_cache = self.cached.lock().await; + if let Some(value) = locked_cache.get(&key).cloned() { + tracing::info!("FOUND in cache: {:?}", &value); + return value; + } + + // Holding a lock across the delegated get is a bad idea because + // the delegate get() calls into v8 for processing of the plan. + // This would block all other get() requests for a potentially + // long time. + // Alternatively, if we don't hold the lock, there is a risk + // that we will do the work multiple times. This is also + // sub-optimal. + + // To work around this, we keep a list of keys we are currently + // processing in the delegate. If we try to get a key on this + // list, we block and wait for it to complete and then retry. + // + // This is more complex than either of the two simple + // alternatives but succeeds in providing a mechanism where each + // client only waits for uncached QueryPlans they are going to + // use AND avoids generating the plan multiple times. + + let mut locked_wait_map = self.wait_map.lock().await; + + // We must only drop the locked cache after we have locked the + // wait map. Otherwise,we might get a race that causes us to + // miss a broadcast. + drop(locked_cache); + + match locked_wait_map.get_mut(&key) { + Some(waiter) => { + // Register interest in key + let mut receiver = waiter.subscribe(); + drop(locked_wait_map); + // Our use case is very specific, so we are sure + // that we won't get any errors here. + let (recv_key, recv_plan) = receiver.recv().await.expect( + "the sender won't ever be dropped before all the receivers finish; qed", + ); + debug_assert_eq!(recv_key, key); + recv_plan + } + None => { + let (tx, _rx) = broadcast::channel(1); + locked_wait_map.insert(key.clone(), tx.clone()); + drop(locked_wait_map); + // This is the potentially high duration operation + // No cache locks are held here + let value = callback.delegated_get(key.clone()).await; + // Update our cache + let mut locked_cache = self.cached.lock().await; + locked_cache.put(key.clone(), value.clone()); + // Update our wait list + let mut locked_wait_map = self.wait_map.lock().await; + locked_wait_map.remove(&key); + // Let our waiters know + let broadcast_value = value.clone(); + // Our use case is very specific, so we are sure that + // we won't get any errors here. + tokio::task::spawn_blocking(move || { + tx.send((key, broadcast_value)) + .expect("there is always at least one receiver alive, the _rx guard; qed") + }) + .await?; + tracing::info!("NOT FOUND in cache: {:?}", &value); + value + } + } + } + + pub async fn get_with>>( + &self, + callback: impl FnOnce(K) -> Fut, + key: K, + ) -> Result { + let mut locked_cache = self.cached.lock().await; + if let Some(value) = locked_cache.get(&key).cloned() { + tracing::info!("FOUND in cache: {:?}", &value); + return value; + } + + // Holding a lock across the delegated get is a bad idea because + // the delegate get() calls into v8 for processing of the plan. + // This would block all other get() requests for a potentially + // long time. + // Alternatively, if we don't hold the lock, there is a risk + // that we will do the work multiple times. This is also + // sub-optimal. + + // To work around this, we keep a list of keys we are currently + // processing in the delegate. If we try to get a key on this + // list, we block and wait for it to complete and then retry. + // + // This is more complex than either of the two simple + // alternatives but succeeds in providing a mechanism where each + // client only waits for uncached QueryPlans they are going to + // use AND avoids generating the plan multiple times. + + let mut locked_wait_map = self.wait_map.lock().await; + + // We must only drop the locked cache after we have locked the + // wait map. Otherwise,we might get a race that causes us to + // miss a broadcast. + drop(locked_cache); + + match locked_wait_map.get_mut(&key) { + Some(waiter) => { + // Register interest in key + let mut receiver = waiter.subscribe(); + drop(locked_wait_map); + // Our use case is very specific, so we are sure + // that we won't get any errors here. + let (recv_key, recv_plan) = receiver.recv().await.expect( + "the sender won't ever be dropped before all the receivers finish; qed", + ); + debug_assert_eq!(recv_key, key); + recv_plan + } + None => { + let (tx, _rx) = broadcast::channel(1); + locked_wait_map.insert(key.clone(), tx.clone()); + drop(locked_wait_map); + // This is the potentially high duration operation + // No cache locks are held here + let value = (callback)(key.clone()).await; + // Update our cache + let mut locked_cache = self.cached.lock().await; + locked_cache.put(key.clone(), value.clone()); + // Update our wait list + let mut locked_wait_map = self.wait_map.lock().await; + locked_wait_map.remove(&key); + // Let our waiters know + let broadcast_value = value.clone(); + // Our use case is very specific, so we are sure that + // we won't get any errors here. + tokio::task::spawn_blocking(move || { + tx.send((key, broadcast_value)) + .expect("there is always at least one receiver alive, the _rx guard; qed") + }) + .await?; + tracing::info!("NOT FOUND in cache: {:?}", &value); + value + } + } + } + + pub async fn get_hot_keys(&self) -> Vec { + let locked_cache = self.cached.lock().await; + locked_cache + .iter() + .take(self.cache_limit / 5) + .map(|(key, _value)| key.clone()) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::QueryPlannerError; + use test_log::test; + + #[test(tokio::test)] + async fn it_should_enforce_cache_limits() { + let cm: CachingMap = CachingMap::new(13); + + let q = |key: usize| async move { Ok(key) }; + for i in 0..14 { + cm.get_with(q, i).await; + } + let guard = cm.cached.lock().await; + println!("{:?}", guard); + assert_eq!(guard.len(), 13); + } +} diff --git a/apollo-router-core/src/lib.rs b/apollo-router-core/src/lib.rs index d497a8894d..5b7ec760d8 100644 --- a/apollo-router-core/src/lib.rs +++ b/apollo-router-core/src/lib.rs @@ -22,6 +22,7 @@ macro_rules! failfast_error { }}; } +mod cache; mod error; mod json_ext; mod naive_introspection; @@ -33,6 +34,7 @@ mod response; mod schema; mod traits; +pub use cache::*; pub use error::*; pub use json_ext::*; pub use naive_introspection::*; diff --git a/apollo-router-core/src/query_cache.rs b/apollo-router-core/src/query_cache.rs index f0ffd14a0f..3b38ad7019 100644 --- a/apollo-router-core/src/query_cache.rs +++ b/apollo-router-core/src/query_cache.rs @@ -1,111 +1,68 @@ use crate::prelude::graphql::*; -use futures::lock::Mutex; -use lru::LruCache; -use std::collections::HashMap; +use crate::CacheCallback; use std::sync::Arc; -use tokio::sync::broadcast; /// A cache for parsed GraphQL queries. #[derive(Debug)] pub struct QueryCache { - cached: Mutex>>>, - #[allow(clippy::type_complexity)] - wait_map: Mutex>)>>>, - cache_limit: usize, + cm: CachingMap>>, schema: Arc, } +#[async_trait::async_trait] +impl CacheCallback>> for QueryCache { + async fn delegated_get(&self, key: String) -> Result>, QueryPlannerError> { + let query_parsing_future = { + let schema = Arc::clone(&self.schema); + tokio::task::spawn_blocking(move || Query::parse(key, &schema)) + }; + let parsed_query = match query_parsing_future.await { + Ok(res) => res.map(Arc::new), + // Silently ignore cancelled tasks (never happen for blocking tasks). + Err(err) if err.is_cancelled() => None, + Err(err) => { + failfast_debug!("Parsing query task failed: {}", err); + None + } + }; + Ok(parsed_query) + } +} + impl QueryCache { /// Instantiate a new cache for parsed GraphQL queries. pub fn new(cache_limit: usize, schema: Arc) -> Self { - Self { - cached: Mutex::new(LruCache::new(cache_limit)), - wait_map: Mutex::new(HashMap::new()), - cache_limit, - schema, - } + let cm = CachingMap::new(cache_limit); + Self { cm, schema } } /// Attempt to parse a string to a [`Query`] using cache if possible. pub async fn get_query(&self, query: impl AsRef) -> Option> { - let mut locked_cache = self.cached.lock().await; let key = query.as_ref().to_string(); - if let Some(value) = locked_cache.get(&key).cloned() { - return value; - } - - // Holding a lock across the query parsing tasks is a bad idea because this would block all - // other get() requests for a potentially long time. - // - // Alternatively, if we don't hold the lock, there is a risk that we will do the work - // multiple times. This is also sub-optimal. - - // To work around this, we keep a list of keys we are currently processing. If we try to - // get a key on this list, we block and wait for it to complete and then retry. - // - // This is more complex than either of the two simple alternatives but succeeds in - // providing a mechanism where each client only waits for uncached Query they are going to - // use AND avoids generating the query multiple times. - - let mut locked_wait_map = self.wait_map.lock().await; - - // We must only drop the locked cache after we have locked the - // wait map. Otherwise,we might get a race that causes us to - // miss a broadcast. - drop(locked_cache); - - match locked_wait_map.get_mut(&key) { - Some(waiter) => { - // Register interest in key - let mut receiver = waiter.subscribe(); - drop(locked_wait_map); - let (recv_key, recv_plan) = receiver.recv().await.expect( - "the sender won't ever be dropped before all the receivers finish; qed", - ); - debug_assert_eq!(recv_key, key); - recv_plan - } - None => { - let (tx, _rx) = broadcast::channel(1); - locked_wait_map.insert(key.clone(), tx.clone()); - drop(locked_wait_map); - // This is the potentially high duration operation - // No locks are held here - let query_parsing_future = { - let query = query.as_ref().to_string(); - let schema = Arc::clone(&self.schema); - tokio::task::spawn_blocking(move || Query::parse(query, &schema)) - }; - let parsed_query = match query_parsing_future.await { - Ok(res) => res.map(Arc::new), - // Silently ignore cancelled tasks (never happen for blocking tasks). - Err(err) if err.is_cancelled() => None, - Err(err) => { - failfast_debug!("Parsing query task failed: {}", err); - None - } - }; - // Update our cache - let mut locked_cache = self.cached.lock().await; - locked_cache.put(key.clone(), parsed_query.clone()); - // Update our wait list - let mut locked_wait_map = self.wait_map.lock().await; - locked_wait_map.remove(&key); - // Let our waiters know - let broadcast_value = parsed_query.clone(); - match tokio::task::spawn_blocking(move || { - let _ = tx - .send((key, broadcast_value)) - .expect("there is always at least one receiver alive, the _rx guard; qed"); - }) - .await - { - Ok(()) => parsed_query, - Err(err) => { - failfast_debug!("Parsing query task failed: {}", err); - None - } + /* + let q = |key: String| async move { + let query_parsing_future = { + let schema = Arc::clone(&self.schema); + tokio::task::spawn_blocking(move || Query::parse(key, &schema)) + }; + let parsed_query = match query_parsing_future.await { + Ok(res) => res.map(Arc::new), + // Silently ignore cancelled tasks (never happen for blocking tasks). + Err(err) if err.is_cancelled() => None, + Err(err) => { + failfast_debug!("Parsing query task failed: {}", err); + None } + }; + Ok(parsed_query) + }; + */ + + match self.cm.get(self, key).await { + Ok(v) => v, + Err(err) => { + failfast_debug!("Parsing query task failed: {}", err); + None } } } diff --git a/apollo-router-core/src/query_planner/caching_query_planner.rs b/apollo-router-core/src/query_planner/caching_query_planner.rs index 2aa0dd2bde..971e78c61f 100644 --- a/apollo-router-core/src/query_planner/caching_query_planner.rs +++ b/apollo-router-core/src/query_planner/caching_query_planner.rs @@ -1,124 +1,56 @@ use crate::prelude::graphql::*; +use crate::CacheCallback; use async_trait::async_trait; -use futures::lock::Mutex; -use lru::LruCache; -use std::collections::HashMap; +use std::fmt; use std::sync::Arc; -use tokio::sync::broadcast::{self, Sender}; type PlanResult = Result, QueryPlannerError>; /// A query planner wrapper that caches results. /// /// The query planner performs LRU caching. -#[derive(Debug)] pub struct CachingQueryPlanner { delegate: T, - cached: Mutex>, - wait_map: Mutex>>, - plan_cache_limit: usize, + cm: CachingMap>, +} + +impl fmt::Debug for CachingQueryPlanner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CachingQueryPlanner").finish() + } } impl CachingQueryPlanner { /// Creates a new query planner that cache the results of another [`QueryPlanner`]. pub fn new(delegate: T, plan_cache_limit: usize) -> CachingQueryPlanner { - Self { - delegate, - cached: Mutex::new(LruCache::new(plan_cache_limit)), - wait_map: Mutex::new(HashMap::new()), - plan_cache_limit, - } + let cm = CachingMap::new(plan_cache_limit); + Self { delegate, cm } + } +} + +#[async_trait] +impl CacheCallback> + for CachingQueryPlanner +{ + async fn delegated_get(&self, key: QueryKey) -> Result, QueryPlannerError> { + self.delegate.get(key.0, key.1, key.2).await } } #[async_trait] -impl QueryPlanner for CachingQueryPlanner { +impl QueryPlanner for CachingQueryPlanner { async fn get( &self, query: String, operation: Option, options: QueryPlanOptions, ) -> PlanResult { - let mut locked_cache = self.cached.lock().await; let key = (query, operation, options); - if let Some(value) = locked_cache.get(&key).cloned() { - return value; - } - - // Holding a lock across the delegated get is a bad idea because - // the delegate get() calls into v8 for processing of the plan. - // This would block all other get() requests for a potentially - // long time. - // Alternatively, if we don't hold the lock, there is a risk - // that we will do the work multiple times. This is also - // sub-optimal. - - // To work around this, we keep a list of keys we are currently - // processing in the delegate. If we try to get a key on this - // list, we block and wait for it to complete and then retry. - // - // This is more complex than either of the two simple - // alternatives but succeeds in providing a mechanism where each - // client only waits for uncached QueryPlans they are going to - // use AND avoids generating the plan multiple times. - - let mut locked_wait_map = self.wait_map.lock().await; - - // We must only drop the locked cache after we have locked the - // wait map. Otherwise,we might get a race that causes us to - // miss a broadcast. - drop(locked_cache); - - match locked_wait_map.get_mut(&key) { - Some(waiter) => { - // Register interest in key - let mut receiver = waiter.subscribe(); - drop(locked_wait_map); - // Our use case is very specific, so we are sure - // that we won't get any errors here. - let (recv_key, recv_plan) = receiver.recv().await.expect( - "the sender won't ever be dropped before all the receivers finish; qed", - ); - debug_assert_eq!(recv_key, key); - recv_plan - } - None => { - let (tx, _rx) = broadcast::channel(1); - locked_wait_map.insert(key.clone(), tx.clone()); - drop(locked_wait_map); - // This is the potentially high duration operation - // No locks are held here - let value = self - .delegate - .get(key.0.clone(), key.1.clone(), key.2.clone()) - .await; - // Update our cache - let mut locked_cache = self.cached.lock().await; - locked_cache.put(key.clone(), value.clone()); - // Update our wait list - let mut locked_wait_map = self.wait_map.lock().await; - locked_wait_map.remove(&key); - // Let our waiters know - let broadcast_value = value.clone(); - // Our use case is very specific, so we are sure that - // we won't get any errors here. - tokio::task::spawn_blocking(move || { - tx.send((key, broadcast_value)) - .expect("there is always at least one receiver alive, the _rx guard; qed") - }) - .await?; - value - } - } + self.cm.get(self, key).await } async fn get_hot_keys(&self) -> Vec { - let locked_cache = self.cached.lock().await; - locked_cache - .iter() - .take(self.plan_cache_limit / 5) - .map(|(key, _value)| key.clone()) - .collect() + self.cm.get_hot_keys().await } } diff --git a/apollo-router-core/src/traits.rs b/apollo-router-core/src/traits.rs index 17c0102eff..58f970165d 100644 --- a/apollo-router-core/src/traits.rs +++ b/apollo-router-core/src/traits.rs @@ -4,6 +4,11 @@ use futures::prelude::*; use std::sync::Arc; use std::{fmt::Debug, pin::Pin}; +#[async_trait::async_trait] +pub trait CacheCallback { + async fn delegated_get(&self, key: K) -> Result; +} + /// A planner key. /// /// This type consists of a query string, an optional operation string and the From 94459897c492a176bb8c691e6e3984fa6261a766 Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Thu, 2 Dec 2021 13:30:14 +0000 Subject: [PATCH 2/8] Clean up some nits Before raising Draft PR... --- apollo-router-core/src/cache.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/apollo-router-core/src/cache.rs b/apollo-router-core/src/cache.rs index 9d0f924641..a4880ba6fc 100644 --- a/apollo-router-core/src/cache.rs +++ b/apollo-router-core/src/cache.rs @@ -15,7 +15,6 @@ use tokio::task::JoinError; /// /// The query planner performs LRU caching. pub struct CachingMap { - // delegate: Option + Send + Sync + 'static>>, cached: Mutex>>, #[allow(clippy::type_complexity)] wait_map: Mutex)>>>, @@ -39,10 +38,8 @@ where Result: Clone, { /// Creates a new CachingMap - // pub fn new(delegate: Option>>, cache_limit: usize) -> Self { pub fn new(cache_limit: usize) -> Self { Self { - // delegate, cached: Mutex::new(LruCache::new(cache_limit)), wait_map: Mutex::new(HashMap::new()), cache_limit, @@ -223,7 +220,7 @@ mod tests { let q = |key: usize| async move { Ok(key) }; for i in 0..14 { - cm.get_with(q, i).await; + cm.get_with(q, i).await.expect("gets the value"); } let guard = cm.cached.lock().await; println!("{:?}", guard); From d69aa5812962c7065953b4283d19d8a6aac4b398 Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Thu, 2 Dec 2021 16:45:26 +0000 Subject: [PATCH 3/8] Cache unification Unify the two cache implementations into a single implementation. After spending a bit of time exploring different ideas, I've settled on the idea proposed by Geoffrey to provide a value resolver for use by the cache map. This approach is cleanest from the point of view of client interactions, although it does require clients to implement the CacheResolver trait. Although CacheResolver is using dynamic dispatch, it's only called for once per cache miss (which is expected to be slow by definition), so I think that's an acceptable performance tradeoff. --- apollo-router-core/src/cache.rs | 163 ++++++------------ apollo-router-core/src/error.rs | 31 ++++ apollo-router-core/src/query_cache.rs | 37 ++-- .../query_planner/caching_query_planner.rs | 43 +++-- apollo-router-core/src/traits.rs | 14 +- 5 files changed, 131 insertions(+), 157 deletions(-) diff --git a/apollo-router-core/src/cache.rs b/apollo-router-core/src/cache.rs index a4880ba6fc..fabee6e91f 100644 --- a/apollo-router-core/src/cache.rs +++ b/apollo-router-core/src/cache.rs @@ -1,27 +1,30 @@ -use crate::CacheCallback; +use crate::{CacheResolver, CacheResolverError}; use futures::lock::Mutex; -use lru::{KeyRef, LruCache}; -use std::borrow::Borrow; +use lru::LruCache; use std::cmp::Eq; use std::collections::HashMap; -use std::error::Error; use std::fmt; -use std::future::Future; use std::hash::Hash; use tokio::sync::broadcast::{self, Sender}; -use tokio::task::JoinError; -/// A query planner wrapper that caches results. +/// A result from a cache get(). +pub type CacheResult = Result; + +/// A caching map optimised for slow value resolution. /// -/// The query planner performs LRU caching. -pub struct CachingMap { - cached: Mutex>>, +/// The CachingMap hold values in an LruCache. Values are loaded into the cache on a cache miss and +/// the cache relies on the resolver to provide values. There is no way to manually remove, update +/// or otherwise invalidate a cache value at this time. Values will be evicted from the cache once +/// the cache_limit is reached. +pub struct CachingMap { + cached: Mutex>>, #[allow(clippy::type_complexity)] - wait_map: Mutex)>>>, + wait_map: Mutex)>>>, cache_limit: usize, + resolver: Box + Send + Sync>, } -impl fmt::Debug for CachingMap { +impl fmt::Debug for CachingMap { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("CachingMap") .field("cache_limit", &self.cache_limit) @@ -29,31 +32,29 @@ impl fmt::Debug for CachingMap { } } -impl CachingMap +impl CachingMap where - E: Error + From + Send + Sync + 'static, K: Clone + fmt::Debug + Eq + Hash + Send + Sync + 'static, V: fmt::Debug + Send + Sync + 'static, - KeyRef: Borrow, - Result: Clone, + CacheResult: Clone, { - /// Creates a new CachingMap - pub fn new(cache_limit: usize) -> Self { + /// Create a new CachingMap. + /// + /// resolver is used to resolve cache misses. + /// cache_limit specifies the size (number of items) of the cache + pub fn new(resolver: Box<(dyn CacheResolver + Send + Sync)>, cache_limit: usize) -> Self { Self { cached: Mutex::new(LruCache::new(cache_limit)), wait_map: Mutex::new(HashMap::new()), cache_limit, + resolver, } } - pub async fn get( - &self, - callback: &(dyn CacheCallback + Send + Sync + 'static), - key: K, - ) -> Result { + /// Get a value from the cache. + pub async fn get(&self, key: K) -> CacheResult { let mut locked_cache = self.cached.lock().await; if let Some(value) = locked_cache.get(&key).cloned() { - tracing::info!("FOUND in cache: {:?}", &value); return value; } @@ -98,9 +99,10 @@ where let (tx, _rx) = broadcast::channel(1); locked_wait_map.insert(key.clone(), tx.clone()); drop(locked_wait_map); - // This is the potentially high duration operation + // This is the potentially high duration operation where we ask our resolver to + // resolve the key (retrieve a value) for us // No cache locks are held here - let value = callback.delegated_get(key.clone()).await; + let value = self.resolver.retrieve(key.clone()).await; // Update our cache let mut locked_cache = self.cached.lock().await; locked_cache.put(key.clone(), value.clone()); @@ -116,88 +118,12 @@ where .expect("there is always at least one receiver alive, the _rx guard; qed") }) .await?; - tracing::info!("NOT FOUND in cache: {:?}", &value); - value - } - } - } - - pub async fn get_with>>( - &self, - callback: impl FnOnce(K) -> Fut, - key: K, - ) -> Result { - let mut locked_cache = self.cached.lock().await; - if let Some(value) = locked_cache.get(&key).cloned() { - tracing::info!("FOUND in cache: {:?}", &value); - return value; - } - - // Holding a lock across the delegated get is a bad idea because - // the delegate get() calls into v8 for processing of the plan. - // This would block all other get() requests for a potentially - // long time. - // Alternatively, if we don't hold the lock, there is a risk - // that we will do the work multiple times. This is also - // sub-optimal. - - // To work around this, we keep a list of keys we are currently - // processing in the delegate. If we try to get a key on this - // list, we block and wait for it to complete and then retry. - // - // This is more complex than either of the two simple - // alternatives but succeeds in providing a mechanism where each - // client only waits for uncached QueryPlans they are going to - // use AND avoids generating the plan multiple times. - - let mut locked_wait_map = self.wait_map.lock().await; - - // We must only drop the locked cache after we have locked the - // wait map. Otherwise,we might get a race that causes us to - // miss a broadcast. - drop(locked_cache); - - match locked_wait_map.get_mut(&key) { - Some(waiter) => { - // Register interest in key - let mut receiver = waiter.subscribe(); - drop(locked_wait_map); - // Our use case is very specific, so we are sure - // that we won't get any errors here. - let (recv_key, recv_plan) = receiver.recv().await.expect( - "the sender won't ever be dropped before all the receivers finish; qed", - ); - debug_assert_eq!(recv_key, key); - recv_plan - } - None => { - let (tx, _rx) = broadcast::channel(1); - locked_wait_map.insert(key.clone(), tx.clone()); - drop(locked_wait_map); - // This is the potentially high duration operation - // No cache locks are held here - let value = (callback)(key.clone()).await; - // Update our cache - let mut locked_cache = self.cached.lock().await; - locked_cache.put(key.clone(), value.clone()); - // Update our wait list - let mut locked_wait_map = self.wait_map.lock().await; - locked_wait_map.remove(&key); - // Let our waiters know - let broadcast_value = value.clone(); - // Our use case is very specific, so we are sure that - // we won't get any errors here. - tokio::task::spawn_blocking(move || { - tx.send((key, broadcast_value)) - .expect("there is always at least one receiver alive, the _rx guard; qed") - }) - .await?; - tracing::info!("NOT FOUND in cache: {:?}", &value); value } } } + /// Get the top 20% of most recently (LRU) used keys pub async fn get_hot_keys(&self) -> Vec { let locked_cache = self.cached.lock().await; locked_cache @@ -211,18 +137,39 @@ where #[cfg(test)] mod tests { use super::*; - use crate::QueryPlannerError; + use crate::CacheResolverError; + use async_trait::async_trait; use test_log::test; + struct HasACache { + cm: CachingMap, + } + + struct HasACacheResolver {} + + impl HasACache { + fn new(limit: usize) -> Self { + let resolver = Box::new(HasACacheResolver {}); + let cm = CachingMap::new(resolver, limit); + Self { cm } + } + } + + #[async_trait] + impl CacheResolver for HasACacheResolver { + async fn retrieve(&self, key: usize) -> Result { + Ok(key) + } + } + #[test(tokio::test)] async fn it_should_enforce_cache_limits() { - let cm: CachingMap = CachingMap::new(13); + let cache = HasACache::new(13); - let q = |key: usize| async move { Ok(key) }; for i in 0..14 { - cm.get_with(q, i).await.expect("gets the value"); + cache.cm.get(i).await.expect("gets the value"); } - let guard = cm.cached.lock().await; + let guard = cache.cm.cached.lock().await; println!("{:?}", guard); assert_eq!(guard.len(), 13); } diff --git a/apollo-router-core/src/error.rs b/apollo-router-core/src/error.rs index 7d27a81de6..b7ad3a71a6 100644 --- a/apollo-router-core/src/error.rs +++ b/apollo-router-core/src/error.rs @@ -150,6 +150,28 @@ impl From for FetchError { } } +/// Error types for CacheResolver +#[derive(Error, Debug, Display, Clone)] +pub enum CacheResolverError { + /// Value retrieval failed: {0} + RetrievalError(Arc), + + /// Cache update failed: {0} + JoinError(Arc), +} + +impl From for CacheResolverError { + fn from(err: JoinError) -> Self { + CacheResolverError::JoinError(Arc::new(err)) + } +} + +impl From for CacheResolverError { + fn from(err: QueryPlannerError) -> Self { + CacheResolverError::RetrievalError(Arc::new(err)) + } +} + /// An error while processing JSON data. #[derive(Debug, Error, Display)] pub enum JsonExtError { @@ -167,6 +189,9 @@ pub enum QueryPlannerError { /// Query planning panicked: {0} JoinError(Arc), + + /// Cache resolution failed: {0} + CacheResolverError(Arc), } impl From for QueryPlannerError { @@ -181,6 +206,12 @@ impl From for QueryPlannerError { } } +impl From for QueryPlannerError { + fn from(err: CacheResolverError) -> Self { + QueryPlannerError::CacheResolverError(Arc::new(err)) + } +} + impl From for ResponseStream { fn from(err: QueryPlannerError) -> Self { stream::once(future::ready(FetchError::from(err).to_response(true))).boxed() diff --git a/apollo-router-core/src/query_cache.rs b/apollo-router-core/src/query_cache.rs index 3b38ad7019..4248553cdf 100644 --- a/apollo-router-core/src/query_cache.rs +++ b/apollo-router-core/src/query_cache.rs @@ -1,17 +1,21 @@ use crate::prelude::graphql::*; -use crate::CacheCallback; +use crate::CacheResolver; use std::sync::Arc; /// A cache for parsed GraphQL queries. #[derive(Debug)] pub struct QueryCache { - cm: CachingMap>>, + cm: CachingMap>>, +} + +/// A resolver for cache misses +struct QueryCacheResolver { schema: Arc, } #[async_trait::async_trait] -impl CacheCallback>> for QueryCache { - async fn delegated_get(&self, key: String) -> Result>, QueryPlannerError> { +impl CacheResolver>> for QueryCacheResolver { + async fn retrieve(&self, key: String) -> Result>, CacheResolverError> { let query_parsing_future = { let schema = Arc::clone(&self.schema); tokio::task::spawn_blocking(move || Query::parse(key, &schema)) @@ -32,33 +36,16 @@ impl CacheCallback>> for QueryCache impl QueryCache { /// Instantiate a new cache for parsed GraphQL queries. pub fn new(cache_limit: usize, schema: Arc) -> Self { - let cm = CachingMap::new(cache_limit); - Self { cm, schema } + let resolver = QueryCacheResolver { schema }; + let cm = CachingMap::new(Box::new(resolver), cache_limit); + Self { cm } } /// Attempt to parse a string to a [`Query`] using cache if possible. pub async fn get_query(&self, query: impl AsRef) -> Option> { let key = query.as_ref().to_string(); - /* - let q = |key: String| async move { - let query_parsing_future = { - let schema = Arc::clone(&self.schema); - tokio::task::spawn_blocking(move || Query::parse(key, &schema)) - }; - let parsed_query = match query_parsing_future.await { - Ok(res) => res.map(Arc::new), - // Silently ignore cancelled tasks (never happen for blocking tasks). - Err(err) if err.is_cancelled() => None, - Err(err) => { - failfast_debug!("Parsing query task failed: {}", err); - None - } - }; - Ok(parsed_query) - }; - */ - match self.cm.get(self, key).await { + match self.cm.get(key).await { Ok(v) => v, Err(err) => { failfast_debug!("Parsing query task failed: {}", err); diff --git a/apollo-router-core/src/query_planner/caching_query_planner.rs b/apollo-router-core/src/query_planner/caching_query_planner.rs index 971e78c61f..8e89ea920a 100644 --- a/apollo-router-core/src/query_planner/caching_query_planner.rs +++ b/apollo-router-core/src/query_planner/caching_query_planner.rs @@ -1,7 +1,7 @@ use crate::prelude::graphql::*; -use crate::CacheCallback; +use crate::CacheResolver; use async_trait::async_trait; -use std::fmt; +use std::marker::PhantomData; use std::sync::Arc; type PlanResult = Result, QueryPlannerError>; @@ -9,36 +9,41 @@ type PlanResult = Result, QueryPlannerError>; /// A query planner wrapper that caches results. /// /// The query planner performs LRU caching. +#[derive(Debug)] pub struct CachingQueryPlanner { - delegate: T, - cm: CachingMap>, + cm: CachingMap>, + phantom: PhantomData, } -impl fmt::Debug for CachingQueryPlanner { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("CachingQueryPlanner").finish() - } +/// A resolver for cache misses +struct CachingQueryPlannerResolver { + delegate: T, } -impl CachingQueryPlanner { - /// Creates a new query planner that cache the results of another [`QueryPlanner`]. +impl CachingQueryPlanner { + /// Creates a new query planner that caches the results of another [`QueryPlanner`]. pub fn new(delegate: T, plan_cache_limit: usize) -> CachingQueryPlanner { - let cm = CachingMap::new(plan_cache_limit); - Self { delegate, cm } + let resolver = CachingQueryPlannerResolver { delegate }; + let cm = CachingMap::new(Box::new(resolver), plan_cache_limit); + Self { + cm, + phantom: PhantomData, + } } } #[async_trait] -impl CacheCallback> - for CachingQueryPlanner -{ - async fn delegated_get(&self, key: QueryKey) -> Result, QueryPlannerError> { - self.delegate.get(key.0, key.1, key.2).await +impl CacheResolver> for CachingQueryPlannerResolver { + async fn retrieve(&self, key: QueryKey) -> Result, CacheResolverError> { + self.delegate + .get(key.0, key.1, key.2) + .await + .map_err(|err| err.into()) } } #[async_trait] -impl QueryPlanner for CachingQueryPlanner { +impl QueryPlanner for CachingQueryPlanner { async fn get( &self, query: String, @@ -46,7 +51,7 @@ impl QueryPlanner for CachingQueryPlanner { options: QueryPlanOptions, ) -> PlanResult { let key = (query, operation, options); - self.cm.get(self, key).await + self.cm.get(key).await.map_err(|err| err.into()) } async fn get_hot_keys(&self) -> Vec { diff --git a/apollo-router-core/src/traits.rs b/apollo-router-core/src/traits.rs index 58f970165d..f920f93425 100644 --- a/apollo-router-core/src/traits.rs +++ b/apollo-router-core/src/traits.rs @@ -4,9 +4,13 @@ use futures::prelude::*; use std::sync::Arc; use std::{fmt::Debug, pin::Pin}; -#[async_trait::async_trait] -pub trait CacheCallback { - async fn delegated_get(&self, key: K) -> Result; +/// A cache resolution trait. +/// +/// Clients of CachingMap are required to provider a resolver during Map creation. The resolver +/// will be used to find values for cache misses. A Result is expected, because retrieval may fail. +#[async_trait] +pub trait CacheResolver { + async fn retrieve(&self, key: K) -> Result; } /// A planner key. @@ -57,7 +61,7 @@ pub trait QueryPlanner: Send + Sync + Debug { /// Adds with_caching to any query planner. pub trait WithCaching: QueryPlanner where - Self: Sized + QueryPlanner, + Self: Sized + QueryPlanner + 'static, { /// Wrap this query planner in a caching decorator. /// The original query planner is consumed. @@ -66,7 +70,7 @@ where } } -impl WithCaching for T where T: QueryPlanner + Sized {} +impl WithCaching for T where T: QueryPlanner + Sized + 'static {} /// An object that accepts a [`Request`] and allow creating [`PreparedQuery`]'s. /// From 87c7001274e89f987103083910f094c04002687d Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Fri, 3 Dec 2021 08:44:13 +0000 Subject: [PATCH 4/8] Fix merge with main Make sure code compiles. --- .../src/query_planner/caching_query_planner.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/apollo-router-core/src/query_planner/caching_query_planner.rs b/apollo-router-core/src/query_planner/caching_query_planner.rs index 5f848ec331..cb841312a6 100644 --- a/apollo-router-core/src/query_planner/caching_query_planner.rs +++ b/apollo-router-core/src/query_planner/caching_query_planner.rs @@ -30,6 +30,10 @@ impl CachingQueryPlanner { phantom: PhantomData, } } + + pub async fn get_hot_keys(&self) -> Vec { + self.cm.get_hot_keys().await + } } #[async_trait] @@ -55,11 +59,6 @@ impl QueryPlanner for CachingQueryPlanner { } } - async fn get_hot_keys(&self) -> Vec { - self.cm.get_hot_keys().await - } -} - #[cfg(test)] mod tests { use super::*; From 64bd3ebb7d2011df4c50a97d6d6fe26dcb06ca47 Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Fri, 3 Dec 2021 08:58:08 +0000 Subject: [PATCH 5/8] Use derivative crate for CachingMap Debug impl As per code review comment. --- apollo-router-core/src/cache.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/apollo-router-core/src/cache.rs b/apollo-router-core/src/cache.rs index fabee6e91f..38ccddb069 100644 --- a/apollo-router-core/src/cache.rs +++ b/apollo-router-core/src/cache.rs @@ -1,4 +1,5 @@ use crate::{CacheResolver, CacheResolverError}; +use derivative::Derivative; use futures::lock::Mutex; use lru::LruCache; use std::cmp::Eq; @@ -16,22 +17,19 @@ pub type CacheResult = Result; /// the cache relies on the resolver to provide values. There is no way to manually remove, update /// or otherwise invalidate a cache value at this time. Values will be evicted from the cache once /// the cache_limit is reached. +#[derive(Derivative)] +#[derivative(Debug)] pub struct CachingMap { + #[derivative(Debug = "ignore")] cached: Mutex>>, #[allow(clippy::type_complexity)] + #[derivative(Debug = "ignore")] wait_map: Mutex)>>>, cache_limit: usize, + #[derivative(Debug = "ignore")] resolver: Box + Send + Sync>, } -impl fmt::Debug for CachingMap { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("CachingMap") - .field("cache_limit", &self.cache_limit) - .finish() - } -} - impl CachingMap where K: Clone + fmt::Debug + Eq + Hash + Send + Sync + 'static, From 2511d2e24754a71d47e611a391c3271d95ec4b2f Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Fri, 3 Dec 2021 16:03:25 +0000 Subject: [PATCH 6/8] Add concurrency unit test suggested in review Mock the retrieve trait and perform a concurrent retrieve test. --- apollo-router-core/src/cache.rs | 63 ++++++++++++++++++++++++++------- 1 file changed, 50 insertions(+), 13 deletions(-) diff --git a/apollo-router-core/src/cache.rs b/apollo-router-core/src/cache.rs index 38ccddb069..1eea1f1bb5 100644 --- a/apollo-router-core/src/cache.rs +++ b/apollo-router-core/src/cache.rs @@ -57,9 +57,8 @@ where } // Holding a lock across the delegated get is a bad idea because - // the delegate get() calls into v8 for processing of the plan. - // This would block all other get() requests for a potentially - // long time. + // the delegate get() could take a long time during which all + // other get() requests are blocked. // Alternatively, if we don't hold the lock, there is a risk // that we will do the work multiple times. This is also // sub-optimal. @@ -70,8 +69,8 @@ where // // This is more complex than either of the two simple // alternatives but succeeds in providing a mechanism where each - // client only waits for uncached QueryPlans they are going to - // use AND avoids generating the plan multiple times. + // client only waits for uncached values that they are going to + // use AND avoids generating the value multiple times. let mut locked_wait_map = self.wait_map.lock().await; @@ -87,11 +86,11 @@ where drop(locked_wait_map); // Our use case is very specific, so we are sure // that we won't get any errors here. - let (recv_key, recv_plan) = receiver.recv().await.expect( + let (recv_key, recv_value) = receiver.recv().await.expect( "the sender won't ever be dropped before all the receivers finish; qed", ); debug_assert_eq!(recv_key, key); - recv_plan + recv_value } None => { let (tx, _rx) = broadcast::channel(1); @@ -137,6 +136,8 @@ mod tests { use super::*; use crate::CacheResolverError; use async_trait::async_trait; + use futures::future::join_all; + use mockall::mock; use test_log::test; struct HasACache { @@ -146,11 +147,19 @@ mod tests { struct HasACacheResolver {} impl HasACache { - fn new(limit: usize) -> Self { - let resolver = Box::new(HasACacheResolver {}); - let cm = CachingMap::new(resolver, limit); + // fn new(resolver: limit: usize) -> Self { + fn new( + resolver: Box<(dyn CacheResolver + Send + Sync)>, + cache_limit: usize, + ) -> Self { + // let resolver = Box::new(HasACacheResolver {}); + let cm = CachingMap::new(resolver, cache_limit); Self { cm } } + + async fn get(&self, key: usize) -> Result { + self.cm.get(key).await + } } #[async_trait] @@ -160,15 +169,43 @@ mod tests { } } + mock! { + HasACacheResolver {} + + #[async_trait] + impl CacheResolver for HasACacheResolver { + async fn retrieve(&self, key: usize) -> Result; + } + } + #[test(tokio::test)] async fn it_should_enforce_cache_limits() { - let cache = HasACache::new(13); + let cache = HasACache::new(Box::new(HasACacheResolver {}), 13); for i in 0..14 { - cache.cm.get(i).await.expect("gets the value"); + cache.get(i).await.expect("gets the value"); } let guard = cache.cm.cached.lock().await; - println!("{:?}", guard); assert_eq!(guard.len(), 13); } + + #[test(tokio::test)] + async fn it_should_only_delegate_once_per_key() { + let mut mock = MockHasACacheResolver::new(); + + mock.expect_retrieve().times(1).return_const(Ok(1)); + + let cache = HasACache::new(Box::new(mock), 10); + + // Let's trigger 100 concurrent gets of the same value and ensure only + // one delegated retrieve is made + let computations = (0..100).map(|_| cache.get(1)); + let _ = join_all(computations) + .await + .into_iter() + .map(|res| assert_eq!(1, res.unwrap())); + // To be really sure, check there is only one value in the cache + let guard = cache.cm.cached.lock().await; + assert_eq!(guard.len(), 1); + } } From 6a63ab412a7b4fb6c8b9ab76520dfb6d4ca785c0 Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Fri, 3 Dec 2021 17:08:34 +0000 Subject: [PATCH 7/8] Try to increase concurrency in concurrency testing By using FuturesUnordered. This should be the most concurrent thing you can do in rust - (https://fasterthanli.me/articles/understanding-rust-futures-by-going-way-too-deep) --- apollo-router-core/src/cache.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/apollo-router-core/src/cache.rs b/apollo-router-core/src/cache.rs index 1eea1f1bb5..490797ada3 100644 --- a/apollo-router-core/src/cache.rs +++ b/apollo-router-core/src/cache.rs @@ -136,7 +136,7 @@ mod tests { use super::*; use crate::CacheResolverError; use async_trait::async_trait; - use futures::future::join_all; + use futures::stream::{FuturesUnordered, StreamExt}; use mockall::mock; use test_log::test; @@ -199,11 +199,12 @@ mod tests { // Let's trigger 100 concurrent gets of the same value and ensure only // one delegated retrieve is made - let computations = (0..100).map(|_| cache.get(1)); - let _ = join_all(computations) - .await - .into_iter() - .map(|res| assert_eq!(1, res.unwrap())); + let mut computations: FuturesUnordered<_> = (0..100).map(|_| cache.get(1)).collect(); + + while let Some(result) = computations.next().await { + result.expect("result retrieved"); + } + // To be really sure, check there is only one value in the cache let guard = cache.cm.cached.lock().await; assert_eq!(guard.len(), 1); From 84a043d61076bfa66746868501519addb17f2193 Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Mon, 6 Dec 2021 09:30:16 +0000 Subject: [PATCH 8/8] Address review comments In particular: - Remove the type definition for CacheResult - Remove the error propagation for spawn task when sending broadcast --- apollo-router-core/src/cache.rs | 14 ++++++-------- apollo-router-core/src/error.rs | 9 --------- 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/apollo-router-core/src/cache.rs b/apollo-router-core/src/cache.rs index 490797ada3..030addb545 100644 --- a/apollo-router-core/src/cache.rs +++ b/apollo-router-core/src/cache.rs @@ -8,9 +8,6 @@ use std::fmt; use std::hash::Hash; use tokio::sync::broadcast::{self, Sender}; -/// A result from a cache get(). -pub type CacheResult = Result; - /// A caching map optimised for slow value resolution. /// /// The CachingMap hold values in an LruCache. Values are loaded into the cache on a cache miss and @@ -21,10 +18,10 @@ pub type CacheResult = Result; #[derivative(Debug)] pub struct CachingMap { #[derivative(Debug = "ignore")] - cached: Mutex>>, + cached: Mutex>>, #[allow(clippy::type_complexity)] #[derivative(Debug = "ignore")] - wait_map: Mutex)>>>, + wait_map: Mutex)>>>, cache_limit: usize, #[derivative(Debug = "ignore")] resolver: Box + Send + Sync>, @@ -34,7 +31,7 @@ impl CachingMap where K: Clone + fmt::Debug + Eq + Hash + Send + Sync + 'static, V: fmt::Debug + Send + Sync + 'static, - CacheResult: Clone, + Result: Clone, { /// Create a new CachingMap. /// @@ -50,7 +47,7 @@ where } /// Get a value from the cache. - pub async fn get(&self, key: K) -> CacheResult { + pub async fn get(&self, key: K) -> Result { let mut locked_cache = self.cached.lock().await; if let Some(value) = locked_cache.get(&key).cloned() { return value; @@ -114,7 +111,8 @@ where tx.send((key, broadcast_value)) .expect("there is always at least one receiver alive, the _rx guard; qed") }) - .await?; + .await + .expect("can only fail if the task is aborted or if the internal code panics, neither is possible here; qed"); value } } diff --git a/apollo-router-core/src/error.rs b/apollo-router-core/src/error.rs index 760e09aeff..0d964964c3 100644 --- a/apollo-router-core/src/error.rs +++ b/apollo-router-core/src/error.rs @@ -155,15 +155,6 @@ impl From for FetchError { pub enum CacheResolverError { /// Value retrieval failed: {0} RetrievalError(Arc), - - /// Cache update failed: {0} - JoinError(Arc), -} - -impl From for CacheResolverError { - fn from(err: JoinError) -> Self { - CacheResolverError::JoinError(Arc::new(err)) - } } impl From for CacheResolverError {