diff --git a/.changesets/feat_enhanced_observability.md b/.changesets/feat_enhanced_observability.md new file mode 100644 index 0000000000..703e0a6918 --- /dev/null +++ b/.changesets/feat_enhanced_observability.md @@ -0,0 +1,13 @@ +### New `apollo.router.cache.storage.estimated_size` gauge ([PR #5770](https://github.com/apollographql/router/pull/5770)) + +The router supports the new metric `apollo.router.cache.storage.estimated_size` that helps users understand and monitor the amount of memory that query planner cache entries consume. + +The `apollo.router.cache.storage.estimated_size` metric gives an estimated size in bytes of a cache entry. It has the following attributes: +- `kind`: `query planner`. +- `storage`: `memory`. + +Before using the estimate to decide whether to update the cache, users should validate that the estimate correlates with their pod's memory usage. + +To learn how to troubleshoot with this metric, see the [Pods terminating due to memory pressure](https://www.apollographql.com/docs/router/containerization/kubernetes#pods-terminating-due-to-memory-pressure) guide in docs. + +By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/5770 \ No newline at end of file diff --git a/.changesets/fix_missing_cache_gauge.md b/.changesets/fix_missing_cache_gauge.md new file mode 100644 index 0000000000..1b71523210 --- /dev/null +++ b/.changesets/fix_missing_cache_gauge.md @@ -0,0 +1,5 @@ +### Fix missing `apollo_router_cache_size` metric ([PR #5770](https://github.com/apollographql/router/pull/5770)) + +Previously, if the in-memory cache wasn't mutated, the `apollo_router_cache_size` metric wouldn't be available. This has been fixed in this release. + +By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/5770 diff --git a/apollo-router/src/cache/mod.rs b/apollo-router/src/cache/mod.rs index 80daa1d8a0..6e1ef01cb8 100644 --- a/apollo-router/src/cache/mod.rs +++ b/apollo-router/src/cache/mod.rs @@ -14,7 +14,9 @@ use self::storage::ValueType; use crate::configuration::RedisCache; pub(crate) mod redis; +mod size_estimation; pub(crate) mod storage; +pub(crate) use size_estimation::estimate_size; type WaitMap = Arc>>>; pub(crate) const DEFAULT_CACHE_CAPACITY: NonZeroUsize = match NonZeroUsize::new(512) { @@ -37,7 +39,7 @@ where pub(crate) async fn with_capacity( capacity: NonZeroUsize, redis: Option, - caller: &str, + caller: &'static str, ) -> Result { Ok(Self { wait_map: Arc::new(Mutex::new(HashMap::new())), @@ -47,7 +49,7 @@ where pub(crate) async fn from_configuration( config: &crate::configuration::Cache, - caller: &str, + caller: &'static str, ) -> Result { Self::with_capacity(config.in_memory.limit, config.redis.clone(), caller).await } diff --git a/apollo-router/src/cache/redis.rs b/apollo-router/src/cache/redis.rs index f0973551f4..ae0697b3cf 100644 --- a/apollo-router/src/cache/redis.rs +++ b/apollo-router/src/cache/redis.rs @@ -593,12 +593,19 @@ mod test { use url::Url; + use crate::cache::storage::ValueType; + #[test] fn ensure_invalid_payload_serialization_doesnt_fail() { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct Stuff { time: SystemTime, } + impl ValueType for Stuff { + fn estimated_size(&self) -> Option { + None + } + } let invalid_json_payload = super::RedisValue(Stuff { // this systemtime is invalid, serialization will fail diff --git a/apollo-router/src/cache/size_estimation.rs b/apollo-router/src/cache/size_estimation.rs new file mode 100644 index 0000000000..885e8d5c13 --- /dev/null +++ b/apollo-router/src/cache/size_estimation.rs @@ -0,0 +1,438 @@ +use std::fmt::Debug; +use std::fmt::Display; +use std::fmt::Formatter; + +use serde::ser; +use serde::ser::SerializeMap; +use serde::ser::SerializeSeq; +use serde::ser::SerializeStruct; +use serde::ser::SerializeStructVariant; +use serde::ser::SerializeTuple; +use serde::ser::SerializeTupleStruct; +use serde::ser::SerializeTupleVariant; +use serde::Serialize; + +pub(crate) fn estimate_size(s: &T) -> usize { + let ser = s + .serialize(CountingSerializer::default()) + .expect("mut be able to serialize"); + ser.count +} + +pub(crate) struct Error; + +impl Debug for Error { + fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { + unreachable!() + } +} + +impl Display for Error { + fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { + unreachable!() + } +} + +impl std::error::Error for Error {} + +impl ser::Error for Error { + fn custom(_msg: T) -> Self { + unreachable!() + } +} + +/// This is a special serializer that doesn't store the serialized data, instead it counts the bytes +/// Yes, it's inaccurate, but we're looking for something that is relatively cheap to compute. +/// It doesn't take into account shared datastructures occurring multiple times and will give the +/// full estimated serialized cost. +#[derive(Default, Debug)] +struct CountingSerializer { + count: usize, +} + +impl ser::Serializer for CountingSerializer { + type Ok = Self; + type Error = Error; + type SerializeSeq = Self; + type SerializeTuple = Self; + type SerializeTupleStruct = Self; + type SerializeTupleVariant = Self; + type SerializeMap = Self; + type SerializeStruct = Self; + type SerializeStructVariant = Self; + + fn serialize_bool(mut self, _v: bool) -> Result { + self.count += std::mem::size_of::(); + Ok(self) + } + + fn serialize_i8(mut self, _v: i8) -> Result { + self.count += std::mem::size_of::(); + Ok(self) + } + + fn serialize_i16(mut self, _v: i16) -> Result { + self.count += std::mem::size_of::(); + Ok(self) + } + + fn serialize_i32(mut self, _v: i32) -> Result { + self.count += std::mem::size_of::(); + Ok(self) + } + + fn serialize_i64(mut self, _v: i64) -> Result { + self.count += std::mem::size_of::(); + Ok(self) + } + + fn serialize_u8(mut self, _v: u8) -> Result { + self.count += std::mem::size_of::(); + Ok(self) + } + + fn serialize_u16(mut self, _v: u16) -> Result { + self.count += std::mem::size_of::(); + Ok(self) + } + + fn serialize_u32(mut self, _v: u32) -> Result { + self.count += std::mem::size_of::(); + Ok(self) + } + + fn serialize_u64(mut self, _v: u64) -> Result { + self.count += std::mem::size_of::(); + Ok(self) + } + + fn serialize_f32(mut self, _v: f32) -> Result { + self.count += std::mem::size_of::(); + Ok(self) + } + + fn serialize_f64(mut self, _v: f64) -> Result { + self.count += std::mem::size_of::(); + Ok(self) + } + + fn serialize_char(mut self, _v: char) -> Result { + self.count += std::mem::size_of::(); + Ok(self) + } + + fn serialize_str(mut self, v: &str) -> Result { + //ptr + 8 bytes length + 8 bytes capacity + self.count += 24 + v.len(); + Ok(self) + } + + fn serialize_bytes(mut self, v: &[u8]) -> Result { + self.count += v.len(); + Ok(self) + } + + fn serialize_none(self) -> Result { + Ok(self) + } + + fn serialize_some(self, value: &T) -> Result + where + T: ?Sized + Serialize, + { + Ok(value.serialize(self).expect("failed to serialize")) + } + + fn serialize_unit(self) -> Result { + Ok(self) + } + + fn serialize_unit_struct(self, _name: &'static str) -> Result { + Ok(self) + } + + fn serialize_unit_variant( + self, + _name: &'static str, + _variant_index: u32, + _variant: &'static str, + ) -> Result { + Ok(self) + } + + fn serialize_newtype_struct( + self, + _name: &'static str, + value: &T, + ) -> Result + where + T: ?Sized + Serialize, + { + Ok(value.serialize(self).expect("failed to serialize")) + } + + fn serialize_newtype_variant( + self, + _name: &'static str, + _variant_index: u32, + _variant: &'static str, + value: &T, + ) -> Result + where + T: ?Sized + Serialize, + { + Ok(value.serialize(self).expect("failed to serialize")) + } + + fn serialize_seq(self, _len: Option) -> Result { + Ok(self) + } + + fn serialize_tuple(self, _len: usize) -> Result { + Ok(self) + } + + fn serialize_tuple_struct( + self, + _name: &'static str, + _len: usize, + ) -> Result { + Ok(self) + } + + fn serialize_tuple_variant( + self, + _name: &'static str, + _variant_index: u32, + _variant: &'static str, + _len: usize, + ) -> Result { + Ok(self) + } + + fn serialize_map(self, _len: Option) -> Result { + Ok(self) + } + + fn serialize_struct( + self, + _name: &'static str, + _len: usize, + ) -> Result { + Ok(self) + } + + fn serialize_struct_variant( + self, + _name: &'static str, + _variant_index: u32, + _variant: &'static str, + _len: usize, + ) -> Result { + Ok(self) + } +} +impl SerializeStructVariant for CountingSerializer { + type Ok = Self; + type Error = Error; + + fn serialize_field(&mut self, _key: &'static str, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + let ser = value + .serialize(CountingSerializer::default()) + .expect("must be able to serialize"); + self.count += ser.count; + Ok(()) + } + + fn end(self) -> Result { + Ok(self) + } +} +impl SerializeSeq for CountingSerializer { + type Ok = Self; + type Error = Error; + + fn serialize_element(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + let ser = value + .serialize(CountingSerializer::default()) + .expect("must be able to serialize"); + self.count += ser.count; + Ok(()) + } + + fn end(self) -> Result { + Ok(self) + } +} +impl SerializeTuple for CountingSerializer { + type Ok = Self; + type Error = Error; + + fn serialize_element(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + let ser = value + .serialize(CountingSerializer::default()) + .expect("must be able to serialize"); + self.count += ser.count; + Ok(()) + } + + fn end(self) -> Result { + Ok(self) + } +} + +impl SerializeStruct for CountingSerializer { + type Ok = Self; + type Error = Error; + + fn serialize_field(&mut self, _key: &'static str, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + let ser = value + .serialize(CountingSerializer::default()) + .expect("must be able to serialize"); + self.count += ser.count; + Ok(()) + } + + fn end(self) -> Result { + Ok(self) + } +} + +impl SerializeMap for CountingSerializer { + type Ok = Self; + type Error = Error; + + fn serialize_key(&mut self, key: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + let ser = key + .serialize(CountingSerializer::default()) + .expect("must be able to serialize"); + self.count += ser.count; + Ok(()) + } + + fn serialize_value(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + let ser = value + .serialize(CountingSerializer::default()) + .expect("must be able to serialize"); + self.count += ser.count; + Ok(()) + } + + fn end(self) -> Result { + Ok(self) + } +} + +impl SerializeTupleVariant for CountingSerializer { + type Ok = Self; + type Error = Error; + + fn serialize_field(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + let ser = value + .serialize(CountingSerializer::default()) + .expect("must be able to serialize"); + self.count += ser.count; + Ok(()) + } + + fn end(self) -> Result { + Ok(self) + } +} + +impl SerializeTupleStruct for CountingSerializer { + type Ok = Self; + type Error = Error; + + fn serialize_field(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + let ser = value + .serialize(CountingSerializer::default()) + .expect("must be able to serialize"); + self.count += ser.count; + Ok(()) + } + + fn end(self) -> Result { + Ok(self) + } +} + +#[cfg(test)] +mod test { + use serde::Serialize; + + use crate::cache::estimate_size; + + #[test] + fn test_estimate_size() { + #[derive(Serialize)] + struct Test { + string: String, + u8: u8, + embedded: TestEmbedded, + } + + #[derive(Serialize)] + struct TestEmbedded { + string: String, + u8: u8, + } + + // Baseline + let s = estimate_size(&Test { + string: "".to_string(), + u8: 0, + embedded: TestEmbedded { + string: "".to_string(), + u8: 0, + }, + }); + assert_eq!(s, 50); + + // Test modifying the root struct + let s = estimate_size(&Test { + string: "test".to_string(), + u8: 0, + embedded: TestEmbedded { + string: "".to_string(), + u8: 0, + }, + }); + assert_eq!(s, 54); + + // Test modifying the embedded struct + let s = estimate_size(&Test { + string: "".to_string(), + u8: 0, + embedded: TestEmbedded { + string: "test".to_string(), + u8: 0, + }, + }); + assert_eq!(s, 54); + } +} diff --git a/apollo-router/src/cache/storage.rs b/apollo-router/src/cache/storage.rs index b72ad9d378..7cfa37a0ad 100644 --- a/apollo-router/src/cache/storage.rs +++ b/apollo-router/src/cache/storage.rs @@ -2,9 +2,16 @@ use std::fmt::Display; use std::fmt::{self}; use std::hash::Hash; use std::num::NonZeroUsize; +use std::sync::atomic::AtomicI64; +use std::sync::atomic::Ordering; use std::sync::Arc; use lru::LruCache; +use opentelemetry::metrics::MeterProvider; +use opentelemetry_api::metrics::Meter; +use opentelemetry_api::metrics::ObservableGauge; +use opentelemetry_api::metrics::Unit; +use opentelemetry_api::KeyValue; use serde::de::DeserializeOwned; use serde::Serialize; use tokio::sync::Mutex; @@ -13,6 +20,8 @@ use tower::BoxError; use super::redis::*; use crate::configuration::RedisCache; +use crate::metrics; +use crate::plugins::telemetry::config_new::instruments::METER_NAME; pub(crate) trait KeyType: Clone + fmt::Debug + fmt::Display + Hash + Eq + Send + Sync @@ -21,6 +30,10 @@ pub(crate) trait KeyType: pub(crate) trait ValueType: Clone + fmt::Debug + Send + Sync + Serialize + DeserializeOwned { + /// Returns an estimated size of the cache entry in bytes. + fn estimated_size(&self) -> Option { + None + } } // Blanket implementation which satisfies the compiler @@ -32,15 +45,6 @@ where // It has the functions it needs already } -// Blanket implementation which satisfies the compiler -impl ValueType for V -where - V: Clone + fmt::Debug + Send + Sync + Serialize + DeserializeOwned, -{ - // Nothing to implement, since V already supports the other traits. - // It has the functions it needs already -} - pub(crate) type InMemoryCache = Arc>>; // placeholder storage module @@ -52,6 +56,10 @@ pub(crate) struct CacheStorage { caller: String, inner: Arc>>, redis: Option, + cache_size: Arc, + cache_estimated_storage: Arc, + _cache_size_gauge: ObservableGauge, + _cache_estimated_storage_gauge: ObservableGauge, } impl CacheStorage @@ -62,9 +70,19 @@ where pub(crate) async fn new( max_capacity: NonZeroUsize, config: Option, - caller: &str, + caller: &'static str, ) -> Result { + // Because calculating the cache size is expensive we do this as we go rather than iterating. This means storing the values for the gauges + let meter: opentelemetry::metrics::Meter = metrics::meter_provider().meter(METER_NAME); + let (cache_size, cache_size_gauge) = Self::create_cache_size_gauge(&meter, caller); + let (cache_estimated_storage, cache_estimated_storage_gauge) = + Self::create_cache_estimated_storage_size_gauge(&meter, caller); + Ok(Self { + _cache_size_gauge: cache_size_gauge, + _cache_estimated_storage_gauge: cache_estimated_storage_gauge, + cache_size, + cache_estimated_storage, caller: caller.to_string(), inner: Arc::new(Mutex::new(LruCache::new(max_capacity))), redis: if let Some(config) = config { @@ -89,6 +107,56 @@ where }) } + fn create_cache_size_gauge( + meter: &Meter, + caller: &'static str, + ) -> (Arc, ObservableGauge) { + let current_cache_size = Arc::new(AtomicI64::new(0)); + let current_cache_size_for_gauge = current_cache_size.clone(); + let cache_size_gauge = meter + // TODO move to dot naming convention + .i64_observable_gauge("apollo_router_cache_size") + .with_description("Cache size") + .with_callback(move |i| { + i.observe( + current_cache_size_for_gauge.load(Ordering::SeqCst), + &[ + KeyValue::new("kind", caller), + KeyValue::new("type", "memory"), + ], + ) + }) + .init(); + (current_cache_size, cache_size_gauge) + } + + fn create_cache_estimated_storage_size_gauge( + meter: &Meter, + caller: &'static str, + ) -> (Arc, ObservableGauge) { + let cache_estimated_storage = Arc::new(AtomicI64::new(0)); + let cache_estimated_storage_for_gauge = cache_estimated_storage.clone(); + let cache_estimated_storage_gauge = meter + .i64_observable_gauge("apollo.router.cache.storage.estimated_size") + .with_description("Estimated cache storage") + .with_unit(Unit::new("bytes")) + .with_callback(move |i| { + // If there's no storage then don't bother updating the gauge + let value = cache_estimated_storage_for_gauge.load(Ordering::SeqCst); + if value > 0 { + i.observe( + cache_estimated_storage_for_gauge.load(Ordering::SeqCst), + &[ + KeyValue::new("kind", caller), + KeyValue::new("type", "memory"), + ], + ) + } + }) + .init(); + (cache_estimated_storage, cache_estimated_storage_gauge) + } + /// `init_from_redis` is called with values newly deserialized from Redis cache /// if an error is returned, the value is ignored and considered a cache miss. pub(crate) async fn get( @@ -143,7 +211,7 @@ where }); match redis_value { Some(v) => { - self.inner.lock().await.put(key.clone(), v.0.clone()); + self.insert_in_memory(key.clone(), v.0.clone()).await; tracing::info!( monotonic_counter.apollo_router_cache_hit_count = 1u64, @@ -187,25 +255,33 @@ where .await; } - let mut in_memory = self.inner.lock().await; - in_memory.put(key, value); - let size = in_memory.len() as u64; - tracing::info!( - value.apollo_router_cache_size = size, - kind = %self.caller, - storage = &tracing::field::display(CacheStorageName::Memory), - ); + self.insert_in_memory(key, value).await; } - pub(crate) async fn insert_in_memory(&self, key: K, value: V) { - let mut in_memory = self.inner.lock().await; - in_memory.put(key, value); - let size = in_memory.len() as u64; - tracing::info!( - value.apollo_router_cache_size = size, - kind = %self.caller, - storage = &tracing::field::display(CacheStorageName::Memory), - ); + pub(crate) async fn insert_in_memory(&self, key: K, value: V) + where + V: ValueType, + { + // Update the cache size and estimated storage size + // This is cheaper than trying to estimate the cache storage size by iterating over the cache + let new_value_size = value.estimated_size().unwrap_or(0) as i64; + + let (old_value, length) = { + let mut in_memory = self.inner.lock().await; + (in_memory.push(key, value), in_memory.len()) + }; + + let size_delta = match old_value { + Some((_, old_value)) => { + let old_value_size = old_value.estimated_size().unwrap_or(0) as i64; + new_value_size - old_value_size + } + None => new_value_size, + }; + self.cache_estimated_storage + .fetch_add(size_delta, Ordering::SeqCst); + + self.cache_size.store(length as i64, Ordering::SeqCst); } pub(crate) fn in_memory_cache(&self) -> InMemoryCache { @@ -231,3 +307,184 @@ impl Display for CacheStorageName { } } } + +impl ValueType for String { + fn estimated_size(&self) -> Option { + Some(self.len()) + } +} + +impl ValueType for crate::graphql::Response { + fn estimated_size(&self) -> Option { + None + } +} + +impl ValueType for usize { + fn estimated_size(&self) -> Option { + Some(std::mem::size_of::()) + } +} + +#[cfg(test)] +mod test { + use std::num::NonZeroUsize; + + use crate::cache::estimate_size; + use crate::cache::storage::CacheStorage; + use crate::cache::storage::ValueType; + use crate::metrics::FutureMetricsExt; + + #[tokio::test] + async fn test_metrics() { + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] + struct Stuff {} + impl ValueType for Stuff { + fn estimated_size(&self) -> Option { + Some(1) + } + } + + async { + let cache: CacheStorage = + CacheStorage::new(NonZeroUsize::new(10).unwrap(), None, "test") + .await + .unwrap(); + + cache.insert("test".to_string(), Stuff {}).await; + assert_gauge!( + "apollo.router.cache.storage.estimated_size", + 1, + "kind" = "test", + "type" = "memory" + ); + assert_gauge!( + "apollo_router_cache_size", + 1, + "kind" = "test", + "type" = "memory" + ); + } + .with_metrics() + .await; + } + + #[tokio::test] + #[should_panic] + async fn test_metrics_not_emitted_where_no_estimated_size() { + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] + struct Stuff {} + impl ValueType for Stuff { + fn estimated_size(&self) -> Option { + None + } + } + + async { + let cache: CacheStorage = + CacheStorage::new(NonZeroUsize::new(10).unwrap(), None, "test") + .await + .unwrap(); + + cache.insert("test".to_string(), Stuff {}).await; + // This metric won't exist + assert_gauge!( + "apollo_router_cache_size", + 0, + "kind" = "test", + "type" = "memory" + ); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_metrics_eviction() { + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] + struct Stuff { + test: String, + } + impl ValueType for Stuff { + fn estimated_size(&self) -> Option { + Some(estimate_size(self)) + } + } + + async { + // note that the cache size is 1 + // so the second insert will always evict + let cache: CacheStorage = + CacheStorage::new(NonZeroUsize::new(1).unwrap(), None, "test") + .await + .unwrap(); + + cache + .insert( + "test".to_string(), + Stuff { + test: "test".to_string(), + }, + ) + .await; + assert_gauge!( + "apollo.router.cache.storage.estimated_size", + 28, + "kind" = "test", + "type" = "memory" + ); + assert_gauge!( + "apollo_router_cache_size", + 1, + "kind" = "test", + "type" = "memory" + ); + + // Insert something slightly larger + cache + .insert( + "test".to_string(), + Stuff { + test: "test_extended".to_string(), + }, + ) + .await; + assert_gauge!( + "apollo.router.cache.storage.estimated_size", + 37, + "kind" = "test", + "type" = "memory" + ); + assert_gauge!( + "apollo_router_cache_size", + 1, + "kind" = "test", + "type" = "memory" + ); + + // Even though this is a new cache entry, we should get back to where we initially were + cache + .insert( + "test2".to_string(), + Stuff { + test: "test".to_string(), + }, + ) + .await; + assert_gauge!( + "apollo.router.cache.storage.estimated_size", + 28, + "kind" = "test", + "type" = "memory" + ); + assert_gauge!( + "apollo_router_cache_size", + 1, + "kind" = "test", + "type" = "memory" + ); + } + .with_metrics() + .await; + } +} diff --git a/apollo-router/src/plugins/cache/entity.rs b/apollo-router/src/plugins/cache/entity.rs index 1bdb3c6614..d0ecdaae90 100644 --- a/apollo-router/src/plugins/cache/entity.rs +++ b/apollo-router/src/plugins/cache/entity.rs @@ -36,6 +36,7 @@ use crate::batching::BatchQuery; use crate::cache::redis::RedisCacheStorage; use crate::cache::redis::RedisKey; use crate::cache::redis::RedisValue; +use crate::cache::storage::ValueType; use crate::configuration::subgraph::SubgraphConfiguration; use crate::configuration::RedisCache; use crate::error::FetchError; @@ -884,6 +885,12 @@ struct CacheEntry { data: Value, } +impl ValueType for CacheEntry { + fn estimated_size(&self) -> Option { + None + } +} + async fn cache_store_root_from_response( cache: RedisCacheStorage, subgraph_ttl: Option, diff --git a/apollo-router/src/plugins/file_uploads/rearrange_query_plan.rs b/apollo-router/src/plugins/file_uploads/rearrange_query_plan.rs index c7bfdc1ec4..22bcf3fdb6 100644 --- a/apollo-router/src/plugins/file_uploads/rearrange_query_plan.rs +++ b/apollo-router/src/plugins/file_uploads/rearrange_query_plan.rs @@ -45,6 +45,7 @@ pub(super) fn rearrange_query_plan( formatted_query_plan: query_plan.formatted_query_plan.clone(), query: query_plan.query.clone(), query_metrics: query_plan.query_metrics, + estimated_size: Default::default(), }) } diff --git a/apollo-router/src/query_planner/bridge_query_planner.rs b/apollo-router/src/query_planner/bridge_query_planner.rs index d4fcb1331c..f5772b4cbf 100644 --- a/apollo-router/src/query_planner/bridge_query_planner.rs +++ b/apollo-router/src/query_planner/bridge_query_planner.rs @@ -624,6 +624,7 @@ impl BridgeQueryPlanner { formatted_query_plan, query: Arc::new(selections), query_metrics, + estimated_size: Default::default(), }), }) } diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index 9d063e1652..f6718143ec 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -24,7 +24,9 @@ use tower_service::Service; use tracing::Instrument; use super::fetch::QueryHash; +use crate::cache::estimate_size; use crate::cache::storage::InMemoryCache; +use crate::cache::storage::ValueType; use crate::cache::DeduplicatingCache; use crate::error::CacheResolverError; use crate::error::QueryPlannerError; @@ -687,6 +689,17 @@ pub(crate) struct WarmUpCachingQueryKey { pub(crate) introspection: bool, } +impl ValueType for Result> { + fn estimated_size(&self) -> Option { + match self { + Ok(QueryPlannerContent::Plan { plan }) => Some(plan.estimated_size()), + Ok(QueryPlannerContent::Response { response }) => Some(estimate_size(response)), + Ok(QueryPlannerContent::IntrospectionDisabled) => None, + Err(e) => Some(estimate_size(e)), + } + } +} + #[cfg(test)] mod tests { use mockall::mock; @@ -838,6 +851,7 @@ mod tests { .into(), query: Arc::new(Query::empty()), query_metrics: Default::default(), + estimated_size: Default::default(), }; let qp_content = QueryPlannerContent::Plan { plan: Arc::new(query_plan), diff --git a/apollo-router/src/query_planner/plan.rs b/apollo-router/src/query_planner/plan.rs index bf4471e23b..447adb7ba7 100644 --- a/apollo-router/src/query_planner/plan.rs +++ b/apollo-router/src/query_planner/plan.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::Arc; use apollo_compiler::validation::Valid; @@ -9,6 +11,7 @@ use serde::Serialize; pub(crate) use self::fetch::OperationKind; use super::fetch; use super::subscription::SubscriptionNode; +use crate::cache::estimate_size; use crate::configuration::Batching; use crate::error::CacheResolverError; use crate::error::ValidationErrors; @@ -42,6 +45,10 @@ pub struct QueryPlan { pub(crate) formatted_query_plan: Option>, pub(crate) query: Arc, pub(crate) query_metrics: OperationLimits, + + /// The estimated size in bytes of the query plan + #[serde(default)] + pub(crate) estimated_size: Arc, } /// This default impl is useful for test users @@ -64,6 +71,7 @@ impl QueryPlan { formatted_query_plan: Default::default(), query: Arc::new(Query::empty()), query_metrics: Default::default(), + estimated_size: Default::default(), } } } @@ -89,6 +97,14 @@ impl QueryPlan { self.root .query_hashes(batching_config, operation, variables, &self.query) } + + pub(crate) fn estimated_size(&self) -> usize { + if self.estimated_size.load(Ordering::SeqCst) == 0 { + self.estimated_size + .store(estimate_size(self), Ordering::SeqCst); + } + self.estimated_size.load(Ordering::SeqCst) + } } /// Query plans are composed of a set of nodes. @@ -607,3 +623,17 @@ pub(crate) struct DeferredNode { pub(crate) struct Depends { pub(crate) id: String, } + +#[cfg(test)] +mod test { + use crate::query_planner::QueryPlan; + + #[test] + fn test_estimated_size() { + let query_plan = QueryPlan::fake_builder().build(); + let size1 = query_plan.estimated_size(); + let size2 = query_plan.estimated_size(); + assert!(size1 > 0); + assert_eq!(size1, size2); + } +} diff --git a/apollo-router/src/query_planner/tests.rs b/apollo-router/src/query_planner/tests.rs index fd7fb6d8b6..cfd44d6d08 100644 --- a/apollo-router/src/query_planner/tests.rs +++ b/apollo-router/src/query_planner/tests.rs @@ -87,6 +87,7 @@ async fn mock_subgraph_service_withf_panics_should_be_reported_as_service_closed referenced_fields_by_type: Default::default(), } .into(), + estimated_size: Default::default(), }; let mut mock_products_service = plugin::test::MockSubgraphService::new(); @@ -142,6 +143,7 @@ async fn fetch_includes_operation_name() { .into(), query: Arc::new(Query::empty()), query_metrics: Default::default(), + estimated_size: Default::default(), }; let succeeded: Arc = Default::default(); @@ -202,6 +204,7 @@ async fn fetch_makes_post_requests() { .into(), query: Arc::new(Query::empty()), query_metrics: Default::default(), + estimated_size: Default::default(), }; let succeeded: Arc = Default::default(); @@ -329,7 +332,8 @@ async fn defer() { referenced_fields_by_type: Default::default(), }.into(), query: Arc::new(Query::empty()), - query_metrics: Default::default() + query_metrics: Default::default(), + estimated_size: Default::default(), }; let mut mock_x_service = plugin::test::MockSubgraphService::new(); @@ -460,6 +464,7 @@ async fn defer_if_condition() { ), formatted_query_plan: None, query_metrics: Default::default(), + estimated_size: Default::default(), }; let mocked_accounts = MockSubgraph::builder() @@ -642,6 +647,7 @@ async fn dependent_mutations() { .into(), query: Arc::new(Query::empty()), query_metrics: Default::default(), + estimated_size: Default::default(), }; let mut mock_a_service = plugin::test::MockSubgraphService::new(); @@ -1826,6 +1832,7 @@ fn broken_plan_does_not_panic() { .into(), query: Arc::new(Query::empty()), query_metrics: Default::default(), + estimated_size: Default::default(), }; let subgraph_schema = apollo_compiler::Schema::parse_and_validate(subgraph_schema, "").unwrap(); let mut subgraph_schemas = HashMap::new(); diff --git a/apollo-router/src/services/supergraph/service.rs b/apollo-router/src/services/supergraph/service.rs index a5ea8403d5..dec84074f8 100644 --- a/apollo-router/src/services/supergraph/service.rs +++ b/apollo-router/src/services/supergraph/service.rs @@ -454,6 +454,7 @@ async fn subscription_task( formatted_query_plan: query_plan.formatted_query_plan.clone(), query: query_plan.query.clone(), query_metrics: query_plan.query_metrics, + estimated_size: Default::default(), }) }), _ => { diff --git a/docs/source/configuration/in-memory-caching.mdx b/docs/source/configuration/in-memory-caching.mdx index 2acf126c59..de15cbaf6c 100644 --- a/docs/source/configuration/in-memory-caching.mdx +++ b/docs/source/configuration/in-memory-caching.mdx @@ -72,15 +72,18 @@ supergraph: To get more information on the planning and warm-up process use the following metrics (where `` can be `redis` for distributed cache or `memory`): * counters: - * `apollo_router_cache_size{kind="query planner", storage="}`: current size of the cache (only for in-memory cache) - * `apollo_router_cache_hit_count{kind="query planner", storage="}` - * `apollo_router_cache_miss_count{kind="query planner", storage="}` + * `apollo_router_cache_hit_count{kind="query planner", storage=""}` + * `apollo_router_cache_miss_count{kind="query planner", storage=""}` * histograms: * `apollo.router.query_planning.plan.duration`: time spent planning queries * `apollo_router_schema_loading_time`: time spent loading a schema - * `apollo_router_cache_hit_time{kind="query planner", storage="}`: time to get a value from the cache - * `apollo_router_cache_miss_time{kind="query planner", storage="}` + * `apollo_router_cache_hit_time{kind="query planner", storage=""}`: time to get a value from the cache + * `apollo_router_cache_miss_time{kind="query planner", storage=""}` + +* gauges + * `apollo_router_cache_size{kind="query planner", storage="memory"}`: current size of the cache (only for in-memory cache) + * `apollo.router.cache.storage.estimated_size{kind="query planner", storage="memory"}`: estimated storage size of the cache (only for in-memory query planner cache) Typically, we would look at `apollo_router_cache_size` and the cache hit rate to define the right size of the in memory cache, then look at `apollo_router_schema_loading_time` and `apollo.router.query_planning.plan.duration` to decide how much time we want to spend warming up queries. diff --git a/docs/source/configuration/telemetry/instrumentation/standard-instruments.mdx b/docs/source/configuration/telemetry/instrumentation/standard-instruments.mdx index 1a3358a38e..37c63e8b57 100644 --- a/docs/source/configuration/telemetry/instrumentation/standard-instruments.mdx +++ b/docs/source/configuration/telemetry/instrumentation/standard-instruments.mdx @@ -38,6 +38,7 @@ These instruments can be consumed by configuring a [metrics exporter](../exporte - `apollo_router_cache_miss_count` - Number of cache misses - `apollo_router_cache_hit_time` - Time to hit the cache in seconds - `apollo_router_cache_miss_time` - Time to miss the cache in seconds +- `apollo.router.cache.storage.estimated_size` - The estimated storage size of the cache in bytes (query planner in memory only). All cache metrics listed above have the following attributes: diff --git a/docs/source/containerization/kubernetes.mdx b/docs/source/containerization/kubernetes.mdx index a2393c5225..591e30db0d 100644 --- a/docs/source/containerization/kubernetes.mdx +++ b/docs/source/containerization/kubernetes.mdx @@ -285,3 +285,26 @@ The gateway propagates subgraph errors to clients, but the router doesn't by def include_subgraph_errors: all: true ``` + +## Troubleshooting + +### Pods terminating due to memory pressure + +If your deployment of routers is terminating due to memory pressure, you can add router cache metrics to monitor and remediate your system: + +1. Add and track the following metrics to your monitoring system: + + * `apollo.router.cache.storage.estimated_size` + * `apollo_router_cache_size` + * ratio of `apollo_router_cache_hits` to `apollo_router_cache_misses` + +2. Observe and monitor the metrics: + + * Observe the `apollo.router.cache.storage.estimated_size` to see if it grows over time and correlates with pod memory usage. + * Observe the ratio of cache hits to misses to determine if the cache is being effective. + +3. Based on your observations, try some remediating adjustments: + + * Lower the cache size if the cache reaches near 100% hit-rate but the cache size is still growing. + * Increase the pod memory if the cache hit rate is low and the cache size is still growing. + * Lower the cache size if the latency of query planning cache misses is acceptable and memory availability is limited.