From b55936552cd66dd20cbebd84aae11c75936d2a4d Mon Sep 17 00:00:00 2001 From: frusciante Date: Mon, 9 Sep 2024 23:01:15 +0900 Subject: [PATCH] Add read_from_replicas support for Redis cluster configuration This closes #351 --- r2d2/CHANGELOG.md | 2 +- redis/CHANGELOG.md | 2 ++ redis/src/cluster/config.rs | 28 ++++++++++++++++++++++++---- redis/src/cluster/mod.rs | 15 +++++++++++---- redis/tests/redis_cluster.rs | 25 +++++++++++++++++++++++++ 5 files changed, 63 insertions(+), 9 deletions(-) diff --git a/r2d2/CHANGELOG.md b/r2d2/CHANGELOG.md index 077166d..ed43291 100644 --- a/r2d2/CHANGELOG.md +++ b/r2d2/CHANGELOG.md @@ -42,4 +42,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [0.4.0]: https://github.com/bikeshedder/deadpool/compare/deadpool-r2d2-v0.3.0...deadpool-r2d2-v0.4.0 [0.3.0]: https://github.com/bikeshedder/deadpool/compare/deadpool-r2d2-v0.2.0...deadpool-r2d2-v0.3.0 [0.2.0]: https://github.com/bikeshedder/deadpool/compare/deadpool-r2d2-v0.1.0...deadpool-r2d2-v0.2.0 -[0.1.0]: https://github.com/bikeshedder/deadpool/releases/tag/deadpool-r2d2-v0.1.0 \ No newline at end of file +[0.1.0]: https://github.com/bikeshedder/deadpool/releases/tag/deadpool-r2d2-v0.1.0 diff --git a/redis/CHANGELOG.md b/redis/CHANGELOG.md index 3378c73..5aed5b9 100644 --- a/redis/CHANGELOG.md +++ b/redis/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +- __Breaking:__ Modified the `Config` struct to support `read_from_replicas` for Redis clusters, using the redis-rs crate's functionality. +- Add support for enabling `read_from_replicas` via the environment variable REDIS_CLUSTER__READ_FROM_REPLICAS. - Add support for `redis::sentinel` ## [0.16.0] - 2024-08-05 diff --git a/redis/src/cluster/config.rs b/redis/src/cluster/config.rs index d45b91e..cb7e042 100644 --- a/redis/src/cluster/config.rs +++ b/redis/src/cluster/config.rs @@ -11,6 +11,7 @@ use super::{CreatePoolError, Pool, PoolBuilder, PoolConfig, Runtime}; /// [`config`](https://crates.io/crates/config) crate as following: /// ```env /// REDIS_CLUSTER__URLS=redis://127.0.0.1:7000,redis://127.0.0.1:7001 +/// REDIS_CLUSTER__READ_FROM_REPLICAS=true /// REDIS_CLUSTER__POOL__MAX_SIZE=16 /// REDIS_CLUSTER__POOL__TIMEOUTS__WAIT__SECS=2 /// REDIS_CLUSTER__POOL__TIMEOUTS__WAIT__NANOS=0 @@ -48,6 +49,18 @@ pub struct Config { /// Pool configuration. pub pool: Option, + + /// Enables or disables reading from replica nodes in a Redis cluster. + /// + /// When set to `true`, read operations may be distributed across + /// replica nodes, which can help in load balancing read requests. + /// When set to `false`, all read operations will be directed to the + /// master node(s). This option is particularly useful in a high-availability + /// setup where read scalability is needed. + /// + /// Default is `false`. + #[serde(default)] + pub read_from_replicas: bool, } impl Config { @@ -71,11 +84,16 @@ impl Config { /// See [`ConfigError`] for details. pub fn builder(&self) -> Result { let manager = match (&self.urls, &self.connections) { - (Some(urls), None) => { - super::Manager::new(urls.iter().map(|url| url.as_str()).collect())? + (Some(urls), None) => super::Manager::new( + urls.iter().map(|url| url.as_str()).collect(), + self.read_from_replicas, + )?, + (None, Some(connections)) => { + super::Manager::new(connections.clone(), self.read_from_replicas)? + } + (None, None) => { + super::Manager::new(vec![ConnectionInfo::default()], self.read_from_replicas)? } - (None, Some(connections)) => super::Manager::new(connections.clone())?, - (None, None) => super::Manager::new(vec![ConnectionInfo::default()])?, (Some(_), Some(_)) => return Err(ConfigError::UrlAndConnectionSpecified), }; let pool_config = self.get_pool_config(); @@ -97,6 +115,7 @@ impl Config { urls: Some(urls.into()), connections: None, pool: None, + read_from_replicas: false, } } } @@ -107,6 +126,7 @@ impl Default for Config { urls: None, connections: Some(vec![ConnectionInfo::default()]), pool: None, + read_from_replicas: false, } } } diff --git a/redis/src/cluster/mod.rs b/redis/src/cluster/mod.rs index c2be660..9bb9dc0 100644 --- a/redis/src/cluster/mod.rs +++ b/redis/src/cluster/mod.rs @@ -10,7 +10,7 @@ use deadpool::managed; use redis::{aio::ConnectionLike, IntoConnectionInfo, RedisError, RedisResult}; use redis; -pub use redis::cluster::ClusterClient; +pub use redis::cluster::{ClusterClient, ClusterClientBuilder}; pub use redis::cluster_async::ClusterConnection; pub use self::config::{Config, ConfigError}; @@ -122,10 +122,17 @@ impl Manager { /// /// # Errors /// - /// If establishing a new [`ClusterClient`] fails. - pub fn new(params: Vec) -> RedisResult { + /// If establishing a new [`ClusterClientBuilder`] fails. + pub fn new( + params: Vec, + read_from_replicas: bool, + ) -> RedisResult { + let mut client = ClusterClientBuilder::new(params); + if read_from_replicas { + client = client.read_from_replicas(); + } Ok(Self { - client: ClusterClient::new(params)?, + client: client.build()?, ping_number: AtomicUsize::new(0), }) } diff --git a/redis/tests/redis_cluster.rs b/redis/tests/redis_cluster.rs index b20b7e3..f200edf 100644 --- a/redis/tests/redis_cluster.rs +++ b/redis/tests/redis_cluster.rs @@ -52,6 +52,31 @@ async fn test_pipeline() { assert_eq!(value, "42".to_string()); } +#[tokio::test] +async fn test_read_from_replicas() { + use deadpool_redis::redis::pipe; + let mut cfg = Config::from_env(); + cfg.redis_cluster.read_from_replicas = true; + assert_eq!(cfg.redis_cluster.read_from_replicas, true); + + let pool = cfg + .redis_cluster + .create_pool(Some(Runtime::Tokio1)) + .unwrap(); + let mut conn = pool.get().await.unwrap(); + let (value,): (String,) = pipe() + .cmd("SET") + .arg("deadpool/pipeline_test_key") + .arg("42") + .ignore() + .cmd("GET") + .arg("deadpool/pipeline_test_key") + .query_async(&mut conn) + .await + .unwrap(); + assert_eq!(value, "42".to_string()); +} + #[tokio::test] async fn test_high_level_commands() { use deadpool_redis::redis::AsyncCommands;