Skip to content

Commit

Permalink
Redis: add a fail open option when starting the Router (#4534)
Browse files Browse the repository at this point in the history
Fix #4334

This option allows the router to start even if we cannot connect to
Redis.
  • Loading branch information
Geal authored Feb 22, 2024
1 parent 83d134b commit 3381128
Show file tree
Hide file tree
Showing 15 changed files with 209 additions and 52 deletions.
7 changes: 7 additions & 0 deletions .changesets/feat_geal_redis_fail_open.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
### Redis: add a fail open option ([Issue #4334](https://github.com/apollographql/router/issues/4334))

This option configures the Router's behavior in case it cannot connect to Redis:
- by default, it will still start, so requests will still be handled in a degraded state
- when active, that option will prevent the router from starting if it cannot connect

By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/4534
24 changes: 15 additions & 9 deletions apollo-router/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tower::BoxError;

use self::storage::CacheStorage;
use self::storage::KeyType;
Expand Down Expand Up @@ -36,17 +37,17 @@ where
capacity: NonZeroUsize,
redis: Option<RedisCache>,
caller: &str,
) -> Self {
Self {
) -> Result<Self, BoxError> {
Ok(Self {
wait_map: Arc::new(Mutex::new(HashMap::new())),
storage: CacheStorage::new(capacity, redis, caller).await,
}
storage: CacheStorage::new(capacity, redis, caller).await?,
})
}

pub(crate) async fn from_configuration(
config: &crate::configuration::Cache,
caller: &str,
) -> Self {
) -> Result<Self, BoxError> {
Self::with_capacity(config.in_memory.limit, config.redis.clone(), caller).await
}

Expand Down Expand Up @@ -206,8 +207,9 @@ mod tests {
#[tokio::test]
async fn example_cache_usage() {
let k = "key".to_string();
let cache =
DeduplicatingCache::with_capacity(NonZeroUsize::new(1).unwrap(), None, "test").await;
let cache = DeduplicatingCache::with_capacity(NonZeroUsize::new(1).unwrap(), None, "test")
.await
.unwrap();

let entry = cache.get(&k).await;

Expand All @@ -224,7 +226,9 @@ mod tests {
#[test(tokio::test)]
async fn it_should_enforce_cache_limits() {
let cache: DeduplicatingCache<usize, usize> =
DeduplicatingCache::with_capacity(NonZeroUsize::new(13).unwrap(), None, "test").await;
DeduplicatingCache::with_capacity(NonZeroUsize::new(13).unwrap(), None, "test")
.await
.unwrap();

for i in 0..14 {
let entry = cache.get(&i).await;
Expand All @@ -247,7 +251,9 @@ mod tests {
mock.expect_retrieve().times(1).return_const(1usize);

let cache: DeduplicatingCache<usize, usize> =
DeduplicatingCache::with_capacity(NonZeroUsize::new(10).unwrap(), None, "test").await;
DeduplicatingCache::with_capacity(NonZeroUsize::new(10).unwrap(), None, "test")
.await
.unwrap();

// Let's trigger 100 concurrent gets of the same value and ensure only
// one delegated retrieve is made
Expand Down
17 changes: 11 additions & 6 deletions apollo-router/src/cache/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::Mutex;
use tokio::time::Instant;
use tower::BoxError;

use super::redis::*;
use crate::configuration::RedisCache;
Expand Down Expand Up @@ -60,26 +61,30 @@ where
max_capacity: NonZeroUsize,
config: Option<RedisCache>,
caller: &str,
) -> Self {
Self {
) -> Result<Self, BoxError> {
Ok(Self {
caller: caller.to_string(),
inner: Arc::new(Mutex::new(LruCache::new(max_capacity))),
redis: if let Some(config) = config {
let required_to_start = config.required_to_start;
match RedisCacheStorage::new(config).await {
Err(e) => {
tracing::error!(
"could not open connection to Redis for {} caching: {:?}",
caller,
e
cache = caller,
e,
"could not open connection to Redis for caching",
);
if required_to_start {
return Err(e);
}
None
}
Ok(storage) => Some(storage),
}
} else {
None
},
}
})
}

pub(crate) async fn get(&self, key: &K) -> Option<V> {
Expand Down
8 changes: 8 additions & 0 deletions apollo-router/src/configuration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,14 @@ pub(crate) struct RedisCache {
#[serde(default)]
/// TLS client configuration
pub(crate) tls: Option<TlsClient>,

#[serde(default = "default_required_to_start")]
/// Prevents the router from starting if it cannot connect to Redis
pub(crate) required_to_start: bool,
}

fn default_required_to_start() -> bool {
false
}

/// TLS related configuration options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ expression: "&schema"
"type": "string",
"nullable": true
},
"required_to_start": {
"description": "Prevents the router from starting if it cannot connect to Redis",
"default": false,
"type": "boolean"
},
"timeout": {
"description": "Redis request timeout (default: 2ms)",
"default": null,
Expand Down Expand Up @@ -2073,6 +2078,11 @@ expression: "&schema"
"type": "string",
"nullable": true
},
"required_to_start": {
"description": "Prevents the router from starting if it cannot connect to Redis",
"default": false,
"type": "boolean"
},
"timeout": {
"description": "Redis request timeout (default: 2ms)",
"default": null,
Expand Down Expand Up @@ -2552,6 +2562,11 @@ expression: "&schema"
"type": "string",
"nullable": true
},
"required_to_start": {
"description": "Prevents the router from starting if it cannot connect to Redis",
"default": false,
"type": "boolean"
},
"timeout": {
"description": "Redis request timeout (default: 2ms)",
"default": null,
Expand Down
12 changes: 11 additions & 1 deletion apollo-router/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use serde::Deserialize;
use serde::Serialize;
use thiserror::Error;
use tokio::task::JoinError;
use tower::BoxError;

pub(crate) use crate::configuration::ConfigurationError;
pub(crate) use crate::graphql::Error;
Expand Down Expand Up @@ -210,14 +211,17 @@ impl From<QueryPlannerError> for CacheResolverError {
/// Error types for service building.
#[derive(Error, Debug, Display)]
pub(crate) enum ServiceBuildError {
/// couldn't build Router Service: {0}
/// couldn't build Query Planner Service: {0}
QueryPlannerError(QueryPlannerError),

/// API schema generation failed: {0}
ApiSchemaError(FederationError),

/// schema error: {0}
Schema(SchemaError),

/// couldn't build Router service: {0}
ServiceError(BoxError),
}

impl From<SchemaError> for ServiceBuildError {
Expand All @@ -244,6 +248,12 @@ impl From<router_bridge::error::Error> for ServiceBuildError {
}
}

impl From<BoxError> for ServiceBuildError {
fn from(err: BoxError) -> Self {
ServiceBuildError::ServiceError(err)
}
}

/// Error types for QueryPlanner
#[derive(Error, Debug, Display, Clone, Serialize, Deserialize)]
pub(crate) enum QueryPlannerError {
Expand Down
19 changes: 10 additions & 9 deletions apollo-router/src/introspection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

use router_bridge::introspect::IntrospectionError;
use router_bridge::planner::Planner;
use tower::BoxError;

use crate::cache::storage::CacheStorage;
use crate::graphql::Response;
Expand All @@ -23,28 +24,28 @@ impl Introspection {
pub(crate) async fn with_capacity(
planner: Arc<Planner<QueryPlanResult>>,
capacity: NonZeroUsize,
) -> Self {
Self {
cache: CacheStorage::new(capacity, None, "introspection").await,
) -> Result<Self, BoxError> {
Ok(Self {
cache: CacheStorage::new(capacity, None, "introspection").await?,
planner,
}
})
}

pub(crate) async fn new(planner: Arc<Planner<QueryPlanResult>>) -> Self {
pub(crate) async fn new(planner: Arc<Planner<QueryPlanResult>>) -> Result<Self, BoxError> {
Self::with_capacity(planner, DEFAULT_INTROSPECTION_CACHE_CAPACITY).await
}

#[cfg(test)]
pub(crate) async fn from_cache(
planner: Arc<Planner<QueryPlanResult>>,
cache: HashMap<String, Response>,
) -> Self {
let this = Self::with_capacity(planner, cache.len().try_into().unwrap()).await;
) -> Result<Self, BoxError> {
let this = Self::with_capacity(planner, cache.len().try_into().unwrap()).await?;

for (query, response) in cache.into_iter() {
this.cache.insert(query, response).await;
}
this
Ok(this)
}

/// Execute an introspection and cache the response.
Expand Down Expand Up @@ -122,7 +123,7 @@ mod introspection_tests {
.iter()
.cloned()
.collect();
let introspection = Introspection::from_cache(planner, cache).await;
let introspection = Introspection::from_cache(planner, cache).await.unwrap();

assert_eq!(
expected_data,
Expand Down
33 changes: 23 additions & 10 deletions apollo-router/src/plugins/cache/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub(crate) const CONTEXT_CACHE_KEY: &str = "apollo_entity_cache::key";
register_plugin!("apollo", "preview_entity_cache", EntityCache);

pub(crate) struct EntityCache {
storage: RedisCacheStorage,
storage: Option<RedisCacheStorage>,
subgraphs: Arc<HashMap<String, Subgraph>>,
enabled: Option<bool>,
metrics: Metrics,
Expand Down Expand Up @@ -112,7 +112,21 @@ impl Plugin for EntityCache {
where
Self: Sized,
{
let storage = RedisCacheStorage::new(init.config.redis).await?;
let required_to_start = init.config.redis.required_to_start;
let storage = match RedisCacheStorage::new(init.config.redis).await {
Ok(storage) => Some(storage),
Err(e) => {
tracing::error!(
cache = "entity",
e,
"could not open connection to Redis for caching",
);
if required_to_start {
return Err(e);
}
None
}
};

Ok(Self {
storage,
Expand Down Expand Up @@ -142,19 +156,18 @@ impl Plugin for EntityCache {
name: &str,
mut service: subgraph::BoxService,
) -> subgraph::BoxService {
let storage = self.storage.clone();
let storage = match self.storage.clone() {
Some(storage) => storage,
None => return service,
};

let (subgraph_ttl, subgraph_enabled) = if let Some(config) = self.subgraphs.get(name) {
(
config
.ttl
.clone()
.map(|t| t.0)
.or_else(|| self.storage.ttl()),
config.ttl.clone().map(|t| t.0).or_else(|| storage.ttl()),
config.enabled.or(self.enabled).unwrap_or(false),
)
} else {
(self.storage.ttl(), self.enabled.unwrap_or(false))
(storage.ttl(), self.enabled.unwrap_or(false))
};
let name = name.to_string();

Expand Down Expand Up @@ -190,7 +203,7 @@ impl EntityCache {
Self: Sized,
{
Ok(Self {
storage,
storage: Some(storage),
enabled: Some(true),
subgraphs: Arc::new(subgraphs),
metrics: Metrics::default(),
Expand Down
4 changes: 2 additions & 2 deletions apollo-router/src/query_planner/bridge_query_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl BridgeQueryPlanner {

let schema = Arc::new(schema.with_api_schema(api_schema));
let introspection = if configuration.supergraph.introspection {
Some(Arc::new(Introspection::new(planner.clone()).await))
Some(Arc::new(Introspection::new(planner.clone()).await?))
} else {
None
};
Expand Down Expand Up @@ -287,7 +287,7 @@ impl BridgeQueryPlanner {
let schema = Arc::new(Schema::parse(&schema, &configuration)?.with_api_schema(api_schema));

let introspection = if configuration.supergraph.introspection {
Some(Arc::new(Introspection::new(planner.clone()).await))
Some(Arc::new(Introspection::new(planner.clone()).await?))
} else {
None
};
Expand Down
Loading

0 comments on commit 3381128

Please sign in to comment.