diff --git a/.changesets/feat_geal_redis_fail_open.md b/.changesets/feat_geal_redis_fail_open.md new file mode 100644 index 0000000000..175b68b4e4 --- /dev/null +++ b/.changesets/feat_geal_redis_fail_open.md @@ -0,0 +1,7 @@ +### Redis: add a fail open option ([Issue #4334](https://github.com/apollographql/router/issues/4334)) + +This option configures the Router's behavior in case it cannot connect to Redis: +- by default, it will still start, so requests will still be handled in a degraded state +- when active, that option will prevent the router from starting if it cannot connect + +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/4534 \ No newline at end of file diff --git a/apollo-router/src/cache/mod.rs b/apollo-router/src/cache/mod.rs index 8aaf10510a..4c487d8968 100644 --- a/apollo-router/src/cache/mod.rs +++ b/apollo-router/src/cache/mod.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use tokio::sync::broadcast; use tokio::sync::oneshot; use tokio::sync::Mutex; +use tower::BoxError; use self::storage::CacheStorage; use self::storage::KeyType; @@ -36,17 +37,17 @@ where capacity: NonZeroUsize, redis: Option, caller: &str, - ) -> Self { - Self { + ) -> Result { + Ok(Self { wait_map: Arc::new(Mutex::new(HashMap::new())), - storage: CacheStorage::new(capacity, redis, caller).await, - } + storage: CacheStorage::new(capacity, redis, caller).await?, + }) } pub(crate) async fn from_configuration( config: &crate::configuration::Cache, caller: &str, - ) -> Self { + ) -> Result { Self::with_capacity(config.in_memory.limit, config.redis.clone(), caller).await } @@ -206,8 +207,9 @@ mod tests { #[tokio::test] async fn example_cache_usage() { let k = "key".to_string(); - let cache = - DeduplicatingCache::with_capacity(NonZeroUsize::new(1).unwrap(), None, "test").await; + let cache = DeduplicatingCache::with_capacity(NonZeroUsize::new(1).unwrap(), None, "test") + .await + .unwrap(); let entry = cache.get(&k).await; @@ -224,7 +226,9 @@ mod tests { #[test(tokio::test)] async fn it_should_enforce_cache_limits() { let cache: DeduplicatingCache = - DeduplicatingCache::with_capacity(NonZeroUsize::new(13).unwrap(), None, "test").await; + DeduplicatingCache::with_capacity(NonZeroUsize::new(13).unwrap(), None, "test") + .await + .unwrap(); for i in 0..14 { let entry = cache.get(&i).await; @@ -247,7 +251,9 @@ mod tests { mock.expect_retrieve().times(1).return_const(1usize); let cache: DeduplicatingCache = - DeduplicatingCache::with_capacity(NonZeroUsize::new(10).unwrap(), None, "test").await; + DeduplicatingCache::with_capacity(NonZeroUsize::new(10).unwrap(), None, "test") + .await + .unwrap(); // Let's trigger 100 concurrent gets of the same value and ensure only // one delegated retrieve is made diff --git a/apollo-router/src/cache/storage.rs b/apollo-router/src/cache/storage.rs index 385c20d1cd..e8b4a9696d 100644 --- a/apollo-router/src/cache/storage.rs +++ b/apollo-router/src/cache/storage.rs @@ -9,6 +9,7 @@ use serde::de::DeserializeOwned; use serde::Serialize; use tokio::sync::Mutex; use tokio::time::Instant; +use tower::BoxError; use super::redis::*; use crate::configuration::RedisCache; @@ -60,18 +61,22 @@ where max_capacity: NonZeroUsize, config: Option, caller: &str, - ) -> Self { - Self { + ) -> Result { + Ok(Self { caller: caller.to_string(), inner: Arc::new(Mutex::new(LruCache::new(max_capacity))), redis: if let Some(config) = config { + let required_to_start = config.required_to_start; match RedisCacheStorage::new(config).await { Err(e) => { tracing::error!( - "could not open connection to Redis for {} caching: {:?}", - caller, - e + cache = caller, + e, + "could not open connection to Redis for caching", ); + if required_to_start { + return Err(e); + } None } Ok(storage) => Some(storage), @@ -79,7 +84,7 @@ where } else { None }, - } + }) } pub(crate) async fn get(&self, key: &K) -> Option { diff --git a/apollo-router/src/configuration/mod.rs b/apollo-router/src/configuration/mod.rs index 3c44b4d0b6..536b529b13 100644 --- a/apollo-router/src/configuration/mod.rs +++ b/apollo-router/src/configuration/mod.rs @@ -934,6 +934,14 @@ pub(crate) struct RedisCache { #[serde(default)] /// TLS client configuration pub(crate) tls: Option, + + #[serde(default = "default_required_to_start")] + /// Prevents the router from starting if it cannot connect to Redis + pub(crate) required_to_start: bool, +} + +fn default_required_to_start() -> bool { + false } /// TLS related configuration options. diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index c1d1ab68cb..46e3461afb 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -93,6 +93,11 @@ expression: "&schema" "type": "string", "nullable": true }, + "required_to_start": { + "description": "Prevents the router from starting if it cannot connect to Redis", + "default": false, + "type": "boolean" + }, "timeout": { "description": "Redis request timeout (default: 2ms)", "default": null, @@ -2073,6 +2078,11 @@ expression: "&schema" "type": "string", "nullable": true }, + "required_to_start": { + "description": "Prevents the router from starting if it cannot connect to Redis", + "default": false, + "type": "boolean" + }, "timeout": { "description": "Redis request timeout (default: 2ms)", "default": null, @@ -2552,6 +2562,11 @@ expression: "&schema" "type": "string", "nullable": true }, + "required_to_start": { + "description": "Prevents the router from starting if it cannot connect to Redis", + "default": false, + "type": "boolean" + }, "timeout": { "description": "Redis request timeout (default: 2ms)", "default": null, diff --git a/apollo-router/src/error.rs b/apollo-router/src/error.rs index 5e59bae73c..6ffba0fdc3 100644 --- a/apollo-router/src/error.rs +++ b/apollo-router/src/error.rs @@ -11,6 +11,7 @@ use serde::Deserialize; use serde::Serialize; use thiserror::Error; use tokio::task::JoinError; +use tower::BoxError; pub(crate) use crate::configuration::ConfigurationError; pub(crate) use crate::graphql::Error; @@ -210,7 +211,7 @@ impl From for CacheResolverError { /// Error types for service building. #[derive(Error, Debug, Display)] pub(crate) enum ServiceBuildError { - /// couldn't build Router Service: {0} + /// couldn't build Query Planner Service: {0} QueryPlannerError(QueryPlannerError), /// API schema generation failed: {0} @@ -218,6 +219,9 @@ pub(crate) enum ServiceBuildError { /// schema error: {0} Schema(SchemaError), + + /// couldn't build Router service: {0} + ServiceError(BoxError), } impl From for ServiceBuildError { @@ -244,6 +248,12 @@ impl From for ServiceBuildError { } } +impl From for ServiceBuildError { + fn from(err: BoxError) -> Self { + ServiceBuildError::ServiceError(err) + } +} + /// Error types for QueryPlanner #[derive(Error, Debug, Display, Clone, Serialize, Deserialize)] pub(crate) enum QueryPlannerError { diff --git a/apollo-router/src/introspection.rs b/apollo-router/src/introspection.rs index 23357da081..d3af08418a 100644 --- a/apollo-router/src/introspection.rs +++ b/apollo-router/src/introspection.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use router_bridge::introspect::IntrospectionError; use router_bridge::planner::Planner; +use tower::BoxError; use crate::cache::storage::CacheStorage; use crate::graphql::Response; @@ -23,14 +24,14 @@ impl Introspection { pub(crate) async fn with_capacity( planner: Arc>, capacity: NonZeroUsize, - ) -> Self { - Self { - cache: CacheStorage::new(capacity, None, "introspection").await, + ) -> Result { + Ok(Self { + cache: CacheStorage::new(capacity, None, "introspection").await?, planner, - } + }) } - pub(crate) async fn new(planner: Arc>) -> Self { + pub(crate) async fn new(planner: Arc>) -> Result { Self::with_capacity(planner, DEFAULT_INTROSPECTION_CACHE_CAPACITY).await } @@ -38,13 +39,13 @@ impl Introspection { pub(crate) async fn from_cache( planner: Arc>, cache: HashMap, - ) -> Self { - let this = Self::with_capacity(planner, cache.len().try_into().unwrap()).await; + ) -> Result { + let this = Self::with_capacity(planner, cache.len().try_into().unwrap()).await?; for (query, response) in cache.into_iter() { this.cache.insert(query, response).await; } - this + Ok(this) } /// Execute an introspection and cache the response. @@ -122,7 +123,7 @@ mod introspection_tests { .iter() .cloned() .collect(); - let introspection = Introspection::from_cache(planner, cache).await; + let introspection = Introspection::from_cache(planner, cache).await.unwrap(); assert_eq!( expected_data, diff --git a/apollo-router/src/plugins/cache/entity.rs b/apollo-router/src/plugins/cache/entity.rs index e54077c9d6..6b5f9761e9 100644 --- a/apollo-router/src/plugins/cache/entity.rs +++ b/apollo-router/src/plugins/cache/entity.rs @@ -46,7 +46,7 @@ pub(crate) const CONTEXT_CACHE_KEY: &str = "apollo_entity_cache::key"; register_plugin!("apollo", "preview_entity_cache", EntityCache); pub(crate) struct EntityCache { - storage: RedisCacheStorage, + storage: Option, subgraphs: Arc>, enabled: Option, metrics: Metrics, @@ -112,7 +112,21 @@ impl Plugin for EntityCache { where Self: Sized, { - let storage = RedisCacheStorage::new(init.config.redis).await?; + let required_to_start = init.config.redis.required_to_start; + let storage = match RedisCacheStorage::new(init.config.redis).await { + Ok(storage) => Some(storage), + Err(e) => { + tracing::error!( + cache = "entity", + e, + "could not open connection to Redis for caching", + ); + if required_to_start { + return Err(e); + } + None + } + }; Ok(Self { storage, @@ -142,19 +156,18 @@ impl Plugin for EntityCache { name: &str, mut service: subgraph::BoxService, ) -> subgraph::BoxService { - let storage = self.storage.clone(); + let storage = match self.storage.clone() { + Some(storage) => storage, + None => return service, + }; let (subgraph_ttl, subgraph_enabled) = if let Some(config) = self.subgraphs.get(name) { ( - config - .ttl - .clone() - .map(|t| t.0) - .or_else(|| self.storage.ttl()), + config.ttl.clone().map(|t| t.0).or_else(|| storage.ttl()), config.enabled.or(self.enabled).unwrap_or(false), ) } else { - (self.storage.ttl(), self.enabled.unwrap_or(false)) + (storage.ttl(), self.enabled.unwrap_or(false)) }; let name = name.to_string(); @@ -190,7 +203,7 @@ impl EntityCache { Self: Sized, { Ok(Self { - storage, + storage: Some(storage), enabled: Some(true), subgraphs: Arc::new(subgraphs), metrics: Metrics::default(), diff --git a/apollo-router/src/query_planner/bridge_query_planner.rs b/apollo-router/src/query_planner/bridge_query_planner.rs index 4e1e5a0d7b..5bde99904a 100644 --- a/apollo-router/src/query_planner/bridge_query_planner.rs +++ b/apollo-router/src/query_planner/bridge_query_planner.rs @@ -231,7 +231,7 @@ impl BridgeQueryPlanner { let schema = Arc::new(schema.with_api_schema(api_schema)); let introspection = if configuration.supergraph.introspection { - Some(Arc::new(Introspection::new(planner.clone()).await)) + Some(Arc::new(Introspection::new(planner.clone()).await?)) } else { None }; @@ -287,7 +287,7 @@ impl BridgeQueryPlanner { let schema = Arc::new(Schema::parse(&schema, &configuration)?.with_api_schema(api_schema)); let introspection = if configuration.supergraph.introspection { - Some(Arc::new(Introspection::new(planner.clone()).await)) + Some(Arc::new(Introspection::new(planner.clone()).await?)) } else { None }; diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index ba2f92dbbc..fac70d670e 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -13,6 +13,7 @@ use router_bridge::planner::Planner; use router_bridge::planner::UsageReporting; use sha2::Digest; use sha2::Sha256; +use tower::BoxError; use tower::ServiceBuilder; use tower::ServiceExt; use tower_service::Service; @@ -73,24 +74,24 @@ where schema: Arc, configuration: &Configuration, plugins: Plugins, - ) -> CachingQueryPlanner { + ) -> Result, BoxError> { let cache = Arc::new( DeduplicatingCache::from_configuration( &configuration.supergraph.query_planning.cache, "query planner", ) - .await, + .await?, ); let enable_authorization_directives = AuthorizationPlugin::enable_directives(configuration, &schema).unwrap_or(false); - Self { + Ok(Self { cache, delegate, schema, plugins: Arc::new(plugins), enable_authorization_directives, - } + }) } pub(crate) async fn cache_keys(&self, count: Option) -> Vec { @@ -584,7 +585,9 @@ mod tests { ); let mut planner = - CachingQueryPlanner::new(delegate, schema, &configuration, IndexMap::new()).await; + CachingQueryPlanner::new(delegate, schema, &configuration, IndexMap::new()) + .await + .unwrap(); let configuration = Configuration::default(); @@ -667,7 +670,8 @@ mod tests { let mut planner = CachingQueryPlanner::new(delegate, Arc::new(schema), &configuration, IndexMap::new()) - .await; + .await + .unwrap(); let context = Context::new(); context.extensions().lock().insert::(doc); diff --git a/apollo-router/src/router_factory.rs b/apollo-router/src/router_factory.rs index 1e989f2e8b..428b9f8f38 100644 --- a/apollo-router/src/router_factory.rs +++ b/apollo-router/src/router_factory.rs @@ -460,7 +460,7 @@ pub async fn create_test_service_factory_from_yaml(schema: &str, configuration: .await; assert_eq!( service.map(|_| ()).unwrap_err().to_string().as_str(), - r#"couldn't build Router Service: couldn't instantiate query planner; invalid schema: schema validation errors: Error extracting subgraphs from the supergraph: this might be due to errors in subgraphs that were mistakenly ignored by federation 0.x versions but are rejected by federation 2. + r#"couldn't build Query Planner Service: couldn't instantiate query planner; invalid schema: schema validation errors: Error extracting subgraphs from the supergraph: this might be due to errors in subgraphs that were mistakenly ignored by federation 0.x versions but are rejected by federation 2. Please try composing your subgraphs with federation 2: this should help precisely pinpoint the problems and, once fixed, generate a correct federation 2 supergraph. Details: diff --git a/apollo-router/src/services/router/service.rs b/apollo-router/src/services/router/service.rs index 11ee844629..94c18323f9 100644 --- a/apollo-router/src/services/router/service.rs +++ b/apollo-router/src/services/router/service.rs @@ -736,7 +736,7 @@ impl RouterCreator { let apq_layer = if configuration.apq.enabled { APQLayer::with_cache( DeduplicatingCache::from_configuration(&configuration.apq.router.cache, "APQ") - .await, + .await?, ) } else { APQLayer::disabled() diff --git a/apollo-router/src/services/supergraph/service.rs b/apollo-router/src/services/supergraph/service.rs index e1bf96a21b..9aa70a023b 100644 --- a/apollo-router/src/services/supergraph/service.rs +++ b/apollo-router/src/services/supergraph/service.rs @@ -705,7 +705,7 @@ impl PluggableSupergraphServiceBuilder { &configuration, IndexMap::new(), ) - .await; + .await?; // Activate the telemetry plugin. // We must NOT fail to go live with the new router from this point as the telemetry plugin activate interacts with globals. diff --git a/apollo-router/tests/integration/redis.rs b/apollo-router/tests/integration/redis.rs index 14c6f808bc..bab8b89a77 100644 --- a/apollo-router/tests/integration/redis.rs +++ b/apollo-router/tests/integration/redis.rs @@ -741,4 +741,58 @@ mod test { let _ = connection_task.await; Ok(()) } + + #[tokio::test(flavor = "multi_thread")] + async fn connection_failure_blocks_startup() { + let _ = apollo_router::TestHarness::builder() + .with_subgraph_network_requests() + .configuration_json(json!({ + "supergraph": { + "query_planning": { + "cache": { + "in_memory": { + "limit": 2 + }, + "redis": { + // invalid port + "urls": ["redis://127.0.0.1:6378"] + } + } + } + } + })) + .unwrap() + .schema(include_str!("../fixtures/supergraph.graphql")) + .build_supergraph() + .await + .unwrap(); + + let e = apollo_router::TestHarness::builder() + .with_subgraph_network_requests() + .configuration_json(json!({ + "supergraph": { + "query_planning": { + "cache": { + "in_memory": { + "limit": 2 + }, + "redis": { + // invalid port + "urls": ["redis://127.0.0.1:6378"], + "required_to_start": true + } + } + } + } + })) + .unwrap() + .schema(include_str!("../fixtures/supergraph.graphql")) + .build_supergraph() + .await + .unwrap_err(); + assert_eq!( + e.to_string(), + "couldn't build Router service: IO Error: Os { code: 111, kind: ConnectionRefused, message: \"Connection refused\" }" + ); + } } diff --git a/docs/source/configuration/distributed-caching.mdx b/docs/source/configuration/distributed-caching.mdx index c81d425ea8..c3a836a18d 100644 --- a/docs/source/configuration/distributed-caching.mdx +++ b/docs/source/configuration/distributed-caching.mdx @@ -95,10 +95,6 @@ supergraph: cache: redis: #highlight-line urls: ["redis://..."] #highlight-line - username: admin/123 # Optional, can be part of the urls directly, mainly useful if you have special character like '/' in your password that doesn't work in url. This field takes precedence over the username in the URL - password: admin # Optional, can be part of the urls directly, mainly useful if you have special character like '/' in your password that doesn't work in url. This field takes precedence over the password in the URL - timeout: 5ms # Optional, by default: 2ms - ttl: 24h # Optional, by default no expiration ``` The value of `urls` is a list of URLs for all Redis instances in your cluster. @@ -115,10 +111,48 @@ apq: cache: redis: #highlight-line urls: ["redis://..."] #highlight-line - timeout: 5ms # Optional, by default: 2ms - ttl: 24h # Optional, by default no expiration ``` The value of `urls` is a list of URLs for all Redis instances in your cluster. All APQ cache entries will be prefixed with `apq` followed by a null byte character (referenced by the escape sequence `\0` in most programming languages) within the distributed cache. + +### Common Redis configuration + +Redis configuration is done in the same way for APQ caching, query plan caching and [entity caching](./entity-caching). + +```yaml title="router.yaml" +supergraph: + query_planning: + cache: + redis: #highlight-line + urls: ["redis://..."] #highlight-line + username: admin/123 # Optional, can be part of the urls directly, mainly useful if you have special character like '/' in your password that doesn't work in url. This field takes precedence over the username in the URL + password: admin # Optional, can be part of the urls directly, mainly useful if you have special character like '/' in your password that doesn't work in url. This field takes precedence over the password in the URL + timeout: 5ms # Optional, by default: 2ms + ttl: 24h # Optional, by default no expiration + namespace: "prefix" # Optional + #tls: + required_to_start: false # Optional, defaults to false +``` + +#### Timeout + +Connecting and sending commands to Redis are subject to a timeout, set by default to 2ms, that can be overriden. + + +#### TTL + +The `ttl` option defines the default global expiration for Redis entries. + +### Namespace + +When using the same Redis instance for multiple puposes, the `namespace` option defines a prefix for all the keys defined by the router. + +### TLS + +TLS for Redis connections can be configured in the same way as subgraphs, to [override the list of certificate authorities](./overview/#overriding-certificate-authorities-for-subgraphs) or [set up client authentication](./overview/#tls-client-authentication-for-subgraph-requests). + +### Required to start + +When active, the `required_to_start` option will prevent the Router from starting if it cannot connect to Redis. By default, the Router will still start without a connection to Redis, which would result in only using the in-memory cache for APQ and query planning, and entity caching sending the requestsz to subgraphs undisturbed. \ No newline at end of file