From 07b25062b7e155aca746fe9d46f114836a4c47cb Mon Sep 17 00:00:00 2001 From: avifenesh Date: Sat, 9 Nov 2024 19:45:06 +0000 Subject: [PATCH 1/8] arrange typos and clean code Signed-off-by: avifenesh --- glide-core/redis-rs/redis/src/cluster_async/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 35997b2282..7d5b08d8a2 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -332,7 +332,7 @@ where }) .map(|response| match response { Response::Single(value) => value, - Response::ClusterScanResult(..) | Response::Multiple(_) => unreachable!(), + _ => unreachable!(), }) } From 486ba40061f8489aea910ec969ec635752aac3bc Mon Sep 17 00:00:00 2001 From: avifenesh Date: Sat, 9 Nov 2024 19:49:28 +0000 Subject: [PATCH 2/8] Add support for re-authentication on NOAUTH error Signed-off-by: avifenesh --- .../redis/src/aio/multiplexed_connection.rs | 47 +++++----- glide-core/redis-rs/redis/src/cluster.rs | 1 + .../redis-rs/redis/src/cluster_async/mod.rs | 93 ++++++++++++++++++- glide-core/redis-rs/redis/src/parser.rs | 1 + glide-core/redis-rs/redis/src/types.rs | 10 ++ 5 files changed, 124 insertions(+), 28 deletions(-) diff --git a/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs b/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs index b31c817817..25e874bafe 100644 --- a/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs +++ b/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs @@ -31,8 +31,8 @@ use std::time::Duration; #[cfg(feature = "tokio-comp")] use tokio_util::codec::Decoder; -// Default connection timeout in ms -const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(250); +// Default connection timeout in seconds +const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(2); // Senders which the result of a single request are sent through type PipelineOutput = oneshot::Sender>; @@ -79,7 +79,7 @@ struct PipelineMessage { /// interface provided by `Pipeline` an easy interface of request to response, hiding the `Stream` /// and `Sink`. #[derive(Clone)] -pub(crate) struct Pipeline { +pub struct Pipeline { sender: mpsc::Sender>, push_manager: Arc>, is_stream_closed: Arc, @@ -473,7 +473,8 @@ impl MultiplexedConnection { pipeline.set_push_manager(pm.clone()).await; - let mut con = MultiplexedConnection::builder(pipeline) + let mut con = MultiplexedConnection::builder() + .with_pipeline(pipeline) .with_db(connection_info.redis.db) .with_response_timeout(response_timeout) .with_push_manager(pm) @@ -575,8 +576,7 @@ impl MultiplexedConnection { self.pipeline.set_push_manager(push_manager).await; } - /// Replace the password used to authenticate with the server. - /// If `None` is provided, the password will be removed. + /// Replace password of connection pub async fn update_connection_password( &mut self, password: Option, @@ -586,32 +586,27 @@ impl MultiplexedConnection { } /// Creates a new `MultiplexedConnectionBuilder` for constructing a `MultiplexedConnection`. - pub(crate) fn builder(pipeline: Pipeline>) -> MultiplexedConnectionBuilder { - MultiplexedConnectionBuilder::new(pipeline) + pub fn builder() -> MultiplexedConnectionBuilder { + MultiplexedConnectionBuilder::default() } } +#[derive(Default)] /// A builder for creating `MultiplexedConnection` instances. pub struct MultiplexedConnectionBuilder { - pipeline: Pipeline>, + pipeline: Option>>, db: Option, response_timeout: Option, push_manager: Option, protocol: Option, - password: Option, + password: Option>, } impl MultiplexedConnectionBuilder { - /// Creates a new builder with the required pipeline - pub(crate) fn new(pipeline: Pipeline>) -> Self { - Self { - pipeline, - db: None, - response_timeout: None, - push_manager: None, - protocol: None, - password: None, - } + /// Sets the pipeline for the `MultiplexedConnectionBuilder`. + pub fn with_pipeline(mut self, pipeline: Pipeline>) -> Self { + self.pipeline = Some(pipeline); + self } /// Sets the database index for the `MultiplexedConnectionBuilder`. @@ -640,22 +635,28 @@ impl MultiplexedConnectionBuilder { /// Sets the password for the `MultiplexedConnectionBuilder`. pub fn with_password(mut self, password: Option) -> Self { - self.password = password; + self.password = Some(password); self } /// Builds and returns a new `MultiplexedConnection` instance using the configured settings. pub async fn build(self) -> RedisResult { + let pipeline = self.pipeline.ok_or_else(|| { + RedisError::from(( + crate::ErrorKind::InvalidClientConfig, + "Pipeline is required", + )) + })?; let db = self.db.unwrap_or_default(); let response_timeout = self .response_timeout .unwrap_or(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT); let push_manager = self.push_manager.unwrap_or_default(); let protocol = self.protocol.unwrap_or_default(); - let password = self.password; + let password = self.password.unwrap_or_default(); let con = MultiplexedConnection { - pipeline: self.pipeline, + pipeline, db, response_timeout, push_manager, diff --git a/glide-core/redis-rs/redis/src/cluster.rs b/glide-core/redis-rs/redis/src/cluster.rs index 1107965bf3..1eb14d7b40 100644 --- a/glide-core/redis-rs/redis/src/cluster.rs +++ b/glide-core/redis-rs/redis/src/cluster.rs @@ -784,6 +784,7 @@ where return Err(err); } RetryMethod::RetryImmediately => {} + RetryMethod::ReAuthenticate => {} } } } diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 7d5b08d8a2..f117d7efc9 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -332,7 +332,7 @@ where }) .map(|response| match response { Response::Single(value) => value, - _ => unreachable!(), + Response::ClusterScanResult(..) | Response::Multiple(_) => unreachable!(), }) } @@ -373,7 +373,7 @@ where Response::ClusterScanResult(..) | Response::Single(_) => unreachable!(), }) } - /// Update the password used to authenticate with all cluster servers + /// Reset the password used to authenticate with all cluster servers pub async fn update_connection_password( &mut self, password: Option, @@ -390,7 +390,7 @@ where let (sender, receiver) = oneshot::channel(); self.0 .send(Message { - cmd: CmdArg::OperationRequest(operation_request), + cmd: CmdArg::OperationRequest { operation_request }, sender, }) .await @@ -473,7 +473,7 @@ where { self.cluster_params .read() - .map(|guard| f(&guard).clone()) + .map(|guard| f(&*guard).clone()) .map_err(|_| RedisError::from((ErrorKind::ClientError, MUTEX_READ_ERR))) } @@ -912,6 +912,11 @@ enum Next { // if not set, then a reconnect should happen without sending a request afterwards request: Option>, }, + ReAuth { + request: PendingRequest, + address: String, + error: Option, + }, Done, } @@ -1065,6 +1070,12 @@ impl Future for Request { self.respond(Err(err)); Next::Done.into() } + crate::types::RetryMethod::ReAuthenticate => Next::ReAuth { + request: this.request.take().unwrap(), + address, + error: Some(err), + } + .into(), } } } @@ -2094,7 +2105,7 @@ where Err(err) => Err((OperationTarget::FanOut, err)), } } - CmdArg::OperationRequest(operation_request) => match operation_request { + CmdArg::OperationRequest { operation_request } => match operation_request { Operation::UpdateConnectionPassword(password) => { core.set_cluster_param(|params| params.password = password) .expect(MUTEX_WRITE_ERR); @@ -2421,6 +2432,25 @@ where self.inner.pending_requests.lock().unwrap().push(request); } } + Next::ReAuth { + request, + address, + error, + } => { + let future = Self::re_auth_and_retry_request( + self.inner.clone(), + request.info.clone(), + address, + error, + ); + self.in_flight_requests.push(Box::pin(Request { + retry_params: retry_params.clone(), + request: Some(request), + future: RequestState::Future { + future: Box::pin(future), + }, + })); + } } } @@ -2435,6 +2465,59 @@ where } } + // The function is used to send an AUTH command to a node and then retry the original request. + // The function is used when the original request failed due to an AUTH error. + // Cases in which the function is helpful: + // 1. The password was changed, the connection was not re-established, and the request failed due to an AUTH error. + // 2. The protection method is allowing X hours of connection without authentication, and the request failed due to an AUTH error. + async fn re_auth_and_retry_request( + core: Core, + info: RequestInfo, + address: String, + error: Option, + ) -> OperationResult { + let password = core.get_cluster_param(|params| params.password.clone()); + let username = core.get_cluster_param(|params| params.username.clone()); + if password.is_ok() { + return Err(( + OperationTarget::Node { address }, + RedisError::from(( + ErrorKind::AuthenticationFailed, + "No password provided for AUTH", + format!("Original error={error:?}"), + )), + )); + } + let mut auth_cmd = crate::cmd("AUTH"); + if let Ok(Some(username)) = username { + auth_cmd.arg(username); + } + auth_cmd.arg(password.unwrap()); + let cmd = Arc::new(auth_cmd.to_owned()); + let routing = + InternalRoutingInfo::SingleNode(InternalSingleNodeRouting::ByAddress(address.clone())); + let auth_info = RequestInfo { + cmd: CmdArg::Cmd { cmd, routing }, + }; + let response = Self::try_request(auth_info, core.clone()).await; + match response { + Ok(response) => { + if let Response::Single(Value::Okay) = response { + Self::try_request(info, core.clone()).await + } else { + Err(( + OperationTarget::Node { address }, + RedisError::from(( + ErrorKind::AuthenticationFailed, + "Reauthentication attempt failed following a NOAUTH error", +format!("Reauthenticate error={response:?}\nOriginal NOAUTH error={error:?}") + )), + )) + } + } + Err(err) => Err((OperationTarget::Node { address }, err.1)), + } + } fn send_refresh_error(&mut self) { if self.refresh_error.is_some() { if let Some(mut request) = Pin::new(&mut self.in_flight_requests) diff --git a/glide-core/redis-rs/redis/src/parser.rs b/glide-core/redis-rs/redis/src/parser.rs index 1f42a774f1..69ee54465d 100644 --- a/glide-core/redis-rs/redis/src/parser.rs +++ b/glide-core/redis-rs/redis/src/parser.rs @@ -39,6 +39,7 @@ fn err_parser(line: &str) -> ServerError { "MASTERDOWN" => ServerErrorKind::MasterDown, "READONLY" => ServerErrorKind::ReadOnly, "NOTBUSY" => ServerErrorKind::NotBusy, + "NOAUTH" => ServerErrorKind::NoAuth, code => { return ServerError::ExtensionError { code: code.to_string(), diff --git a/glide-core/redis-rs/redis/src/types.rs b/glide-core/redis-rs/redis/src/types.rs index 4b6cdbb150..c0a09f49c8 100644 --- a/glide-core/redis-rs/redis/src/types.rs +++ b/glide-core/redis-rs/redis/src/types.rs @@ -153,6 +153,9 @@ pub enum ErrorKind { /// Not all slots are covered by the cluster NotAllSlotsCovered, + + /// The server requires authentication + NoAuth, } #[derive(PartialEq, Debug)] @@ -169,6 +172,7 @@ pub(crate) enum ServerErrorKind { MasterDown, ReadOnly, NotBusy, + NoAuth, } #[derive(PartialEq, Debug)] @@ -209,6 +213,7 @@ impl From for RedisError { ServerErrorKind::MasterDown => ErrorKind::MasterDown, ServerErrorKind::ReadOnly => ErrorKind::ReadOnly, ServerErrorKind::NotBusy => ErrorKind::NotBusy, + ServerErrorKind::NoAuth => ErrorKind::NoAuth, }; match detail { Some(detail) => RedisError::from((kind, desc, detail)), @@ -820,6 +825,7 @@ pub(crate) enum RetryMethod { AskRedirect, MovedRedirect, WaitAndRetryOnPrimaryRedirectOnReplica, + ReAuthenticate, } /// Indicates a general failure in the library. @@ -858,6 +864,7 @@ impl RedisError { ErrorKind::MasterDown => Some("MASTERDOWN"), ErrorKind::ReadOnly => Some("READONLY"), ErrorKind::NotBusy => Some("NOTBUSY"), + ErrorKind::NoAuth => Some("NOAUTH"), _ => match self.repr { ErrorRepr::ExtensionError(ref code, _) => Some(code), _ => None, @@ -900,6 +907,7 @@ impl RedisError { ErrorKind::RESP3NotSupported => "resp3 is not supported by server", ErrorKind::ParseError => "parse error", ErrorKind::NotAllSlotsCovered => "not all slots are covered", + ErrorKind::NoAuth => "authentication required", } } @@ -986,6 +994,7 @@ impl RedisError { RetryMethod::AskRedirect => false, RetryMethod::MovedRedirect => false, RetryMethod::WaitAndRetryOnPrimaryRedirectOnReplica => false, + RetryMethod::ReAuthenticate => false, } } @@ -1095,6 +1104,7 @@ impl RedisError { ErrorKind::NotAllSlotsCovered => RetryMethod::NoRetry, ErrorKind::FatalReceiveError => RetryMethod::Reconnect, ErrorKind::FatalSendError => RetryMethod::ReconnectAndRetry, + ErrorKind::NoAuth => RetryMethod::ReAuthenticate, } } } From d8dfeb37674e34b85c101696c5544a47928392a1 Mon Sep 17 00:00:00 2001 From: avifenesh Date: Sat, 9 Nov 2024 19:54:20 +0000 Subject: [PATCH 3/8] Add integration tests for password replacement in Redis connections Signed-off-by: avifenesh --- .../redis-rs/redis/src/cluster_async/mod.rs | 4 +- glide-core/redis-rs/redis/tests/auth.rs | 73 +++++++++++-------- 2 files changed, 44 insertions(+), 33 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index f117d7efc9..d73cf5199c 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -390,7 +390,7 @@ where let (sender, receiver) = oneshot::channel(); self.0 .send(Message { - cmd: CmdArg::OperationRequest { operation_request }, + cmd: CmdArg::OperationRequest(operation_request), sender, }) .await @@ -2105,7 +2105,7 @@ where Err(err) => Err((OperationTarget::FanOut, err)), } } - CmdArg::OperationRequest { operation_request } => match operation_request { + CmdArg::OperationRequest(operation_request) => match operation_request { Operation::UpdateConnectionPassword(password) => { core.set_cluster_param(|params| params.password = password) .expect(MUTEX_WRITE_ERR); diff --git a/glide-core/redis-rs/redis/tests/auth.rs b/glide-core/redis-rs/redis/tests/auth.rs index 2bf9d250a2..4e745b79db 100644 --- a/glide-core/redis-rs/redis/tests/auth.rs +++ b/glide-core/redis-rs/redis/tests/auth.rs @@ -90,24 +90,21 @@ mod auth { } } - async fn kill_non_management_connections(con: &mut Connection) { + async fn kill_all_connections_but_caller(con: &mut ClusterConnection) { let mut kill_cmd = cmd("client"); - kill_cmd.arg("kill").arg("type").arg("normal"); - match con { - Connection::Cluster(cluster_conn) => { - cluster_conn - .route_command(&kill_cmd, ALL_SUCCESS_ROUTE) - .await - .unwrap(); - } - Connection::Standalone(standalone_conn) => { - kill_cmd.arg("skipme").arg("no"); - kill_cmd - .query_async::<_, ()>(standalone_conn) - .await - .unwrap(); - } - } + kill_cmd + .arg("kill") + .arg("type") + .arg("normal") + .arg("skipme") + .arg("yes"); + con.route_command(&kill_cmd, ALL_SUCCESS_ROUTE) + .await + .unwrap(); + } + + async fn reset_connection_standalone(con: &mut MultiplexedConnection) { + let _: () = cmd("reset").query_async(con).await.unwrap(); } #[tokio::test] @@ -116,7 +113,7 @@ mod auth { let cluster_context = TestClusterContext::new(3, 0); // Create a management connection to set the password - let management_connection = + let mut management_connection = match create_connection(None, ConnectionType::Cluster, Some(&cluster_context), None) .await .unwrap() @@ -134,8 +131,7 @@ mod auth { create_connection(None, ConnectionType::Cluster, Some(&cluster_context), None).await; assert!(connection_should_fail.is_err()); let err = connection_should_fail.err().unwrap(); - println!("{}", err.to_string()); - assert!(err.to_string().contains("Authentication required.")); + assert!(err.to_string().contains("NoAuth: Authentication required.")); // Test that we can connect with password let mut connection_should_succeed = match create_connection( @@ -166,8 +162,7 @@ mod auth { assert_eq!(res.unwrap(), Value::BulkString(b"bar".to_vec())); // Kill the connection to force reconnection - kill_non_management_connections(&mut Connection::Cluster(management_connection.clone())) - .await; + kill_all_connections_but_caller(&mut management_connection).await; // Attempt to get the value again to ensure reconnection works let should_be_ok: RedisResult = cmd("get") @@ -176,7 +171,7 @@ mod auth { .await; assert_eq!(should_be_ok.unwrap(), Value::BulkString(b"bar".to_vec())); - // Update the password in the connection + // Reset the password in the connection connection_should_succeed .update_connection_password(Some(NEW_PASSWORD.to_string())) .await @@ -198,13 +193,11 @@ mod auth { .await; assert!(connection_should_fail.is_err()); let err = connection_should_fail.err().unwrap(); - assert!(err - .to_string() - .contains("Password authentication failed- AuthenticationFailed")); + let detail = err.detail().unwrap(); + assert!(detail.contains("AuthenticationFailed")); // Kill the connection to force reconnection - let mut management_conn = Connection::Cluster(management_connection); - kill_non_management_connections(&mut management_conn).await; + kill_all_connections_but_caller(&mut management_connection).await; // Verify that the connection with new password still works let result_should_succeed: RedisResult = cmd("get") @@ -224,7 +217,7 @@ mod auth { let standalone_context = TestContext::new(); // Create a management connection to set the password - let management_connection = match create_connection( + let mut management_connection = match create_connection( None, ConnectionType::Standalone, None, @@ -276,7 +269,7 @@ mod auth { .await; assert_eq!(res.unwrap(), Value::Okay); - // Update the password in the connection + // Reset the password in the connection connection_should_succeed .update_connection_password(Some(NEW_PASSWORD.to_string())) .await @@ -289,7 +282,7 @@ mod auth { .unwrap(); // Reset the management connection - kill_non_management_connections(&mut management_conn).await; + reset_connection_standalone(&mut management_connection).await; // Test that we can't connect with the old password let connection_should_fail = create_connection( @@ -300,5 +293,23 @@ mod auth { ) .await; assert!(connection_should_fail.is_err()); + + // Verify that the management connection can't perform operations after reset + let result_should_fail: RedisResult = cmd("get") + .arg("foo") + .query_async(&mut management_connection) + .await; + assert!(result_should_fail.is_err()); + + // Verify that the connection with new password still works + let result_should_succeed: RedisResult = cmd("get") + .arg("foo") + .query_async(&mut connection_should_succeed) + .await; + assert!(result_should_succeed.is_ok()); + assert_eq!( + result_should_succeed.unwrap(), + Value::BulkString(b"bar".to_vec()) + ); } } From 73b158bfd3c02ce1376b3d1e11aa9419d8c79f02 Mon Sep 17 00:00:00 2001 From: avifenesh Date: Sat, 9 Nov 2024 19:54:47 +0000 Subject: [PATCH 4/8] Implement password replacement functionality in Client and StandaloneClient Signed-off-by: avifenesh --- .../redis-rs/redis/src/cluster_async/mod.rs | 17 +++--- glide-core/src/client/mod.rs | 2 +- glide-core/src/client/standalone_client.rs | 3 +- node/src/BaseClient.ts | 60 +++++++++---------- python/python/glide/async_commands/core.py | 17 +++--- python/python/glide/glide_client.py | 12 ++-- python/python/tests/test_auth.py | 32 +++++----- 7 files changed, 68 insertions(+), 75 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index d73cf5199c..f9d128a519 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -473,7 +473,7 @@ where { self.cluster_params .read() - .map(|guard| f(&*guard).clone()) + .map(|guard| f(&guard).clone()) .map_err(|_| RedisError::from((ErrorKind::ClientError, MUTEX_READ_ERR))) } @@ -1070,7 +1070,7 @@ impl Future for Request { self.respond(Err(err)); Next::Done.into() } - crate::types::RetryMethod::ReAuthenticate => Next::ReAuth { + RetryMethod::ReAuthenticate => Next::ReAuth { request: this.request.take().unwrap(), address, error: Some(err), @@ -2476,9 +2476,11 @@ where address: String, error: Option, ) -> OperationResult { - let password = core.get_cluster_param(|params| params.password.clone()); + let password = core + .get_cluster_param(|params| params.password.clone()) + .expect(MUTEX_READ_ERR); let username = core.get_cluster_param(|params| params.username.clone()); - if password.is_ok() { + let Some(password) = password else { return Err(( OperationTarget::Node { address }, RedisError::from(( @@ -2487,12 +2489,12 @@ where format!("Original error={error:?}"), )), )); - } + }; let mut auth_cmd = crate::cmd("AUTH"); if let Ok(Some(username)) = username { auth_cmd.arg(username); } - auth_cmd.arg(password.unwrap()); + auth_cmd.arg(password); let cmd = Arc::new(auth_cmd.to_owned()); let routing = InternalRoutingInfo::SingleNode(InternalSingleNodeRouting::ByAddress(address.clone())); @@ -2509,8 +2511,7 @@ where OperationTarget::Node { address }, RedisError::from(( ErrorKind::AuthenticationFailed, - "Reauthentication attempt failed following a NOAUTH error", -format!("Reauthenticate error={response:?}\nOriginal NOAUTH error={error:?}") + "After NOAUTH error, AUTH try failed", )), )) } diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index ffbdc60d4e..00b7db3af2 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -477,7 +477,7 @@ impl Client { .fetch_add(1, Ordering::SeqCst) } - /// Update the password used to authenticate with the servers. + /// Reset the password used to authenticate with the servers. /// If None is passed, the password will be removed. /// If `re_auth` is true, the new password will be used to re-authenticate with all of the nodes. pub async fn update_connection_password( diff --git a/glide-core/src/client/standalone_client.rs b/glide-core/src/client/standalone_client.rs index 2a6dbd0e77..cc2e1e179a 100644 --- a/glide-core/src/client/standalone_client.rs +++ b/glide-core/src/client/standalone_client.rs @@ -471,8 +471,7 @@ impl StandaloneClient { }); } - /// Update the password used to authenticate with the servers. - /// If the password is `None`, the password will be removed. + /// Replace password of the multiplexed connection. pub async fn update_connection_password( &mut self, password: Option, diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 665acb4fae..c978cbe10d 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -673,7 +673,7 @@ function toProtobufRoute( if (split.length !== 2) { throw new RequestError( "No port provided, expected host to be formatted as `{hostname}:{port}`. Received " + - host, + host, ); } @@ -992,30 +992,30 @@ export class BaseClient { ) { const message = Array.isArray(command) ? command_request.CommandRequest.create({ - callbackIdx, - transaction: command_request.Transaction.create({ - commands: command, - }), - }) + callbackIdx, + transaction: command_request.Transaction.create({ + commands: command, + }), + }) : command instanceof command_request.Command - ? command_request.CommandRequest.create({ + ? command_request.CommandRequest.create({ callbackIdx, singleCommand: command, }) - : command instanceof command_request.ClusterScan - ? command_request.CommandRequest.create({ - callbackIdx, - clusterScan: command, - }) - : command instanceof command_request.UpdateConnectionPassword - ? command_request.CommandRequest.create({ + : command instanceof command_request.ClusterScan + ? command_request.CommandRequest.create({ callbackIdx, - updateConnectionPassword: command, + clusterScan: command, }) - : command_request.CommandRequest.create({ - callbackIdx, - scriptInvocation: command, - }); + : command instanceof command_request.UpdateConnectionPassword + ? command_request.CommandRequest.create({ + callbackIdx, + updateConnectionPassword: command, + }) + : command_request.CommandRequest.create({ + callbackIdx, + scriptInvocation: command, + }); message.route = route; this.writeOrBufferRequest( @@ -5991,9 +5991,9 @@ export class BaseClient { ReadFrom, connection_request.ReadFrom > = { - primary: connection_request.ReadFrom.Primary, - preferReplica: connection_request.ReadFrom.PreferReplica, - }; + primary: connection_request.ReadFrom.Primary, + preferReplica: connection_request.ReadFrom.PreferReplica, + }; /** * Returns the number of messages that were successfully acknowledged by the consumer group member of a stream. @@ -7302,8 +7302,8 @@ export class BaseClient { res === null ? null : res!.map((r) => { - return { key: r.key, elements: r.value }; - })[0], + return { key: r.key, elements: r.value }; + })[0], ); } @@ -7345,8 +7345,8 @@ export class BaseClient { res === null ? null : res!.map((r) => { - return { key: r.key, elements: r.value }; - })[0], + return { key: r.key, elements: r.value }; + })[0], ); } @@ -7553,11 +7553,11 @@ export class BaseClient { : connection_request.ReadFrom.Primary; const authenticationInfo = options.credentials !== undefined && - "password" in options.credentials + "password" in options.credentials ? { - password: options.credentials.password, - username: options.credentials.username, - } + password: options.credentials.password, + username: options.credentials.username, + } : undefined; const protocol = options.protocol as | connection_request.ProtocolVersion diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index 4c29fbc3c9..3ce140d322 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -392,15 +392,13 @@ async def _cluster_scan( type: Optional[ObjectType] = ..., ) -> TResult: ... - async def _update_connection_password( - self, password: Optional[str], re_auth: bool + async def _replace_connection_password( + self, password: str, re_auth: bool ) -> TResult: ... - async def update_connection_password( - self, password: Optional[str], re_auth: bool - ) -> TOK: + async def replace_connection_password(self, password: str, re_auth: bool) -> TOK: """ - Update the current connection password with a new password. + Replace the current connection password with a new password. **Note:** This method updates the client's internal password configuration and does not perform password rotation on the server side. @@ -412,8 +410,7 @@ async def update_connection_password( handle reconnection seamlessly, preventing the loss of in-flight commands. Args: - password (Optional[str]): The new password to use for the connection, - if `None` the password will be removed. + password (str): The new password to replace the current password. re_auth (bool): - `True`: The client will re-authenticate immediately with the new password. - `False`: The new password will be used for the next connection attempt. @@ -422,10 +419,10 @@ async def update_connection_password( TOK: A simple OK response. Example: - >>> await client.update_connection_password("new_password", re_auth=True) + >>> await client.replace_connection_password("new_password", re_auth=True) 'OK' """ - return cast(TOK, await self._update_connection_password(password, re_auth)) + return cast(TOK, await self._replace_connection_password(password, re_auth)) async def set( self, diff --git a/python/python/glide/glide_client.py b/python/python/glide/glide_client.py index 2838ae288e..f82f5410a9 100644 --- a/python/python/glide/glide_client.py +++ b/python/python/glide/glide_client.py @@ -533,19 +533,19 @@ async def _reader_loop(self) -> None: else: await self._process_response(response=response) - async def _update_connection_password( - self, password: Optional[str], re_auth: bool + async def _replace_connection_password( + self, password: str, re_auth: bool ) -> TResult: request = CommandRequest() request.callback_idx = self._get_callback_index() - request.update_connection_password.password = password - request.update_connection_password.re_auth = re_auth + request.replace_connection_password.password = password + request.replace_connection_password.re_auth = re_auth response = await self._write_request_await_response(request) # Update the client binding side password if managed to change core configuration password if response is OK: if self.config.credentials is None: - self.config.credentials = ServerCredentials(password=password or "") - self.config.credentials.password = password or "" + self.config.credentials = ServerCredentials(password=password) + self.config.credentials.password = password return response diff --git a/python/python/tests/test_auth.py b/python/python/tests/test_auth.py index 694c8c345b..30261af4b1 100644 --- a/python/python/tests/test_auth.py +++ b/python/python/tests/test_auth.py @@ -30,18 +30,14 @@ async def config_set_new_password(client: TGlideClient, password): await client.config_set({"requirepass": password}, route=AllNodes()) -async def kill_connections(client: TGlideClient): +async def reset_connections(client: TGlideClient): """ - Kills all connections to the given TGlideClient server connected. + Resets the connections for the given TGlideClient server connected. """ if isinstance(client, GlideClient): - await client.custom_command( - ["CLIENT", "KILL", "TYPE", "normal", "skipme", "no"] - ) + await client.custom_command(["RESET"]) if isinstance(client, GlideClusterClient): - await client.custom_command( - ["CLIENT", "KILL", "TYPE", "normal", "skipme", "no"], route=AllNodes() - ) + await client.custom_command(["RESET"], route=AllNodes()) @pytest.mark.asyncio @@ -67,7 +63,7 @@ async def setup(self, glide_client: TGlideClient): @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) - async def test_update_connection_password(self, glide_client: TGlideClient): + async def test_replace_connection_password(self, glide_client: TGlideClient): """ Test replacing the connection password without immediate re-authentication. Verifies that: @@ -77,7 +73,7 @@ async def test_update_connection_password(self, glide_client: TGlideClient): Currently, this test is only supported for cluster mode, since standalone mode dont have retry mechanism. """ - result = await glide_client.update_connection_password( + result = await glide_client.replace_connection_password( NEW_PASSWORD, re_auth=False ) assert result == OK @@ -86,14 +82,14 @@ async def test_update_connection_password(self, glide_client: TGlideClient): value = await glide_client.get("test_key") assert value == b"test_value" await config_set_new_password(glide_client, NEW_PASSWORD) - await kill_connections(glide_client) + await reset_connections(glide_client) # Verify that the client is able to reconnect with the new password value = await glide_client.get("test_key") assert value == b"test_value" @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) - async def test_update_connection_password_no_server_auth( + async def test_replace_connection_password_no_server_auth( self, glide_client: TGlideClient ): """ @@ -102,17 +98,17 @@ async def test_update_connection_password_no_server_auth( password when the server has no password set. """ with pytest.raises(RequestError): - await glide_client.update_connection_password(WRONG_PASSWORD, re_auth=True) + await glide_client.replace_connection_password(WRONG_PASSWORD, re_auth=True) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) - async def test_update_connection_password_long(self, glide_client: TGlideClient): + async def test_replace_connection_password_long(self, glide_client: TGlideClient): """ Test replacing connection password with a long password string. Verifies that the client can handle long passwords (1000 characters). """ long_password = "p" * 1000 - result = await glide_client.update_connection_password( + result = await glide_client.replace_connection_password( long_password, re_auth=False ) assert result == OK @@ -129,11 +125,11 @@ async def test_replace_password_reauth_wrong_password( """ await config_set_new_password(glide_client, NEW_PASSWORD) with pytest.raises(RequestError): - await glide_client.update_connection_password(WRONG_PASSWORD, re_auth=True) + await glide_client.replace_connection_password(WRONG_PASSWORD, re_auth=True) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) - async def test_update_connection_password_with_reauth( + async def test_replace_connection_password_with_reauth( self, glide_client: TGlideClient ): """ @@ -143,7 +139,7 @@ async def test_update_connection_password_with_reauth( 2. The client remains operational after re-authentication """ await config_set_new_password(glide_client, NEW_PASSWORD) - result = await glide_client.update_connection_password( + result = await glide_client.replace_connection_password( NEW_PASSWORD, re_auth=True ) assert result == OK From 6a78298f00c157979fd11cd200fdbfd502a34973 Mon Sep 17 00:00:00 2001 From: avifenesh Date: Sat, 9 Nov 2024 19:55:51 +0000 Subject: [PATCH 5/8] Add replace_connection_password method to manage connection password updates Signed-off-by: avifenesh --- python/python/glide/async_commands/core.py | 17 ++++++++++------- python/python/glide/glide_client.py | 12 ++++++------ python/python/tests/test_async_client.py | 2 +- python/python/tests/test_auth.py | 18 +++++++++--------- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index 3ce140d322..4c29fbc3c9 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -392,13 +392,15 @@ async def _cluster_scan( type: Optional[ObjectType] = ..., ) -> TResult: ... - async def _replace_connection_password( - self, password: str, re_auth: bool + async def _update_connection_password( + self, password: Optional[str], re_auth: bool ) -> TResult: ... - async def replace_connection_password(self, password: str, re_auth: bool) -> TOK: + async def update_connection_password( + self, password: Optional[str], re_auth: bool + ) -> TOK: """ - Replace the current connection password with a new password. + Update the current connection password with a new password. **Note:** This method updates the client's internal password configuration and does not perform password rotation on the server side. @@ -410,7 +412,8 @@ async def replace_connection_password(self, password: str, re_auth: bool) -> TOK handle reconnection seamlessly, preventing the loss of in-flight commands. Args: - password (str): The new password to replace the current password. + password (Optional[str]): The new password to use for the connection, + if `None` the password will be removed. re_auth (bool): - `True`: The client will re-authenticate immediately with the new password. - `False`: The new password will be used for the next connection attempt. @@ -419,10 +422,10 @@ async def replace_connection_password(self, password: str, re_auth: bool) -> TOK TOK: A simple OK response. Example: - >>> await client.replace_connection_password("new_password", re_auth=True) + >>> await client.update_connection_password("new_password", re_auth=True) 'OK' """ - return cast(TOK, await self._replace_connection_password(password, re_auth)) + return cast(TOK, await self._update_connection_password(password, re_auth)) async def set( self, diff --git a/python/python/glide/glide_client.py b/python/python/glide/glide_client.py index f82f5410a9..2838ae288e 100644 --- a/python/python/glide/glide_client.py +++ b/python/python/glide/glide_client.py @@ -533,19 +533,19 @@ async def _reader_loop(self) -> None: else: await self._process_response(response=response) - async def _replace_connection_password( - self, password: str, re_auth: bool + async def _update_connection_password( + self, password: Optional[str], re_auth: bool ) -> TResult: request = CommandRequest() request.callback_idx = self._get_callback_index() - request.replace_connection_password.password = password - request.replace_connection_password.re_auth = re_auth + request.update_connection_password.password = password + request.update_connection_password.re_auth = re_auth response = await self._write_request_await_response(request) # Update the client binding side password if managed to change core configuration password if response is OK: if self.config.credentials is None: - self.config.credentials = ServerCredentials(password=password) - self.config.credentials.password = password + self.config.credentials = ServerCredentials(password=password or "") + self.config.credentials.password = password or "" return response diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index 7560cc7b23..b9a792943b 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -185,7 +185,7 @@ async def test_can_connect_with_auth_requirepass( ["CONFIG", "SET", "requirepass", password] ) - with pytest.raises(ClosingError, match="NOAUTH"): + with pytest.raises(ClosingError, match="NoAuth"): # Creation of a new client without password should fail await create_client( request, diff --git a/python/python/tests/test_auth.py b/python/python/tests/test_auth.py index 30261af4b1..165f8d3ea4 100644 --- a/python/python/tests/test_auth.py +++ b/python/python/tests/test_auth.py @@ -63,7 +63,7 @@ async def setup(self, glide_client: TGlideClient): @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) - async def test_replace_connection_password(self, glide_client: TGlideClient): + async def test_update_connection_password(self, glide_client: TGlideClient): """ Test replacing the connection password without immediate re-authentication. Verifies that: @@ -73,7 +73,7 @@ async def test_replace_connection_password(self, glide_client: TGlideClient): Currently, this test is only supported for cluster mode, since standalone mode dont have retry mechanism. """ - result = await glide_client.replace_connection_password( + result = await glide_client.update_connection_password( NEW_PASSWORD, re_auth=False ) assert result == OK @@ -89,7 +89,7 @@ async def test_replace_connection_password(self, glide_client: TGlideClient): @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) - async def test_replace_connection_password_no_server_auth( + async def test_update_connection_password_no_server_auth( self, glide_client: TGlideClient ): """ @@ -98,17 +98,17 @@ async def test_replace_connection_password_no_server_auth( password when the server has no password set. """ with pytest.raises(RequestError): - await glide_client.replace_connection_password(WRONG_PASSWORD, re_auth=True) + await glide_client.update_connection_password(WRONG_PASSWORD, re_auth=True) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) - async def test_replace_connection_password_long(self, glide_client: TGlideClient): + async def test_update_connection_password_long(self, glide_client: TGlideClient): """ Test replacing connection password with a long password string. Verifies that the client can handle long passwords (1000 characters). """ long_password = "p" * 1000 - result = await glide_client.replace_connection_password( + result = await glide_client.update_connection_password( long_password, re_auth=False ) assert result == OK @@ -125,11 +125,11 @@ async def test_replace_password_reauth_wrong_password( """ await config_set_new_password(glide_client, NEW_PASSWORD) with pytest.raises(RequestError): - await glide_client.replace_connection_password(WRONG_PASSWORD, re_auth=True) + await glide_client.update_connection_password(WRONG_PASSWORD, re_auth=True) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) - async def test_replace_connection_password_with_reauth( + async def test_update_connection_password_with_reauth( self, glide_client: TGlideClient ): """ @@ -139,7 +139,7 @@ async def test_replace_connection_password_with_reauth( 2. The client remains operational after re-authentication """ await config_set_new_password(glide_client, NEW_PASSWORD) - result = await glide_client.replace_connection_password( + result = await glide_client.update_connection_password( NEW_PASSWORD, re_auth=True ) assert result == OK From 265d171d1e76599c0589aa3f05c4a3efc0876786 Mon Sep 17 00:00:00 2001 From: jhpung Date: Sun, 10 Nov 2024 23:57:30 +0900 Subject: [PATCH 6/8] Node: add repalceConnectionPassword method to manage connection password updates Signed-off-by: jhpung Signed-off-by: avifenesh --- .../redis/src/aio/multiplexed_connection.rs | 47 +++++----- glide-core/redis-rs/redis/src/cluster.rs | 1 - .../redis-rs/redis/src/cluster_async/mod.rs | 86 +------------------ glide-core/redis-rs/redis/src/parser.rs | 1 - glide-core/redis-rs/redis/src/types.rs | 10 --- glide-core/redis-rs/redis/tests/auth.rs | 73 +++++++--------- glide-core/src/client/mod.rs | 2 +- glide-core/src/client/standalone_client.rs | 3 +- node/src/BaseClient.ts | 60 ++++++------- python/python/tests/test_async_client.py | 2 +- python/python/tests/test_auth.py | 14 +-- 11 files changed, 98 insertions(+), 201 deletions(-) diff --git a/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs b/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs index 25e874bafe..b31c817817 100644 --- a/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs +++ b/glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs @@ -31,8 +31,8 @@ use std::time::Duration; #[cfg(feature = "tokio-comp")] use tokio_util::codec::Decoder; -// Default connection timeout in seconds -const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(2); +// Default connection timeout in ms +const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(250); // Senders which the result of a single request are sent through type PipelineOutput = oneshot::Sender>; @@ -79,7 +79,7 @@ struct PipelineMessage { /// interface provided by `Pipeline` an easy interface of request to response, hiding the `Stream` /// and `Sink`. #[derive(Clone)] -pub struct Pipeline { +pub(crate) struct Pipeline { sender: mpsc::Sender>, push_manager: Arc>, is_stream_closed: Arc, @@ -473,8 +473,7 @@ impl MultiplexedConnection { pipeline.set_push_manager(pm.clone()).await; - let mut con = MultiplexedConnection::builder() - .with_pipeline(pipeline) + let mut con = MultiplexedConnection::builder(pipeline) .with_db(connection_info.redis.db) .with_response_timeout(response_timeout) .with_push_manager(pm) @@ -576,7 +575,8 @@ impl MultiplexedConnection { self.pipeline.set_push_manager(push_manager).await; } - /// Replace password of connection + /// Replace the password used to authenticate with the server. + /// If `None` is provided, the password will be removed. pub async fn update_connection_password( &mut self, password: Option, @@ -586,27 +586,32 @@ impl MultiplexedConnection { } /// Creates a new `MultiplexedConnectionBuilder` for constructing a `MultiplexedConnection`. - pub fn builder() -> MultiplexedConnectionBuilder { - MultiplexedConnectionBuilder::default() + pub(crate) fn builder(pipeline: Pipeline>) -> MultiplexedConnectionBuilder { + MultiplexedConnectionBuilder::new(pipeline) } } -#[derive(Default)] /// A builder for creating `MultiplexedConnection` instances. pub struct MultiplexedConnectionBuilder { - pipeline: Option>>, + pipeline: Pipeline>, db: Option, response_timeout: Option, push_manager: Option, protocol: Option, - password: Option>, + password: Option, } impl MultiplexedConnectionBuilder { - /// Sets the pipeline for the `MultiplexedConnectionBuilder`. - pub fn with_pipeline(mut self, pipeline: Pipeline>) -> Self { - self.pipeline = Some(pipeline); - self + /// Creates a new builder with the required pipeline + pub(crate) fn new(pipeline: Pipeline>) -> Self { + Self { + pipeline, + db: None, + response_timeout: None, + push_manager: None, + protocol: None, + password: None, + } } /// Sets the database index for the `MultiplexedConnectionBuilder`. @@ -635,28 +640,22 @@ impl MultiplexedConnectionBuilder { /// Sets the password for the `MultiplexedConnectionBuilder`. pub fn with_password(mut self, password: Option) -> Self { - self.password = Some(password); + self.password = password; self } /// Builds and returns a new `MultiplexedConnection` instance using the configured settings. pub async fn build(self) -> RedisResult { - let pipeline = self.pipeline.ok_or_else(|| { - RedisError::from(( - crate::ErrorKind::InvalidClientConfig, - "Pipeline is required", - )) - })?; let db = self.db.unwrap_or_default(); let response_timeout = self .response_timeout .unwrap_or(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT); let push_manager = self.push_manager.unwrap_or_default(); let protocol = self.protocol.unwrap_or_default(); - let password = self.password.unwrap_or_default(); + let password = self.password; let con = MultiplexedConnection { - pipeline, + pipeline: self.pipeline, db, response_timeout, push_manager, diff --git a/glide-core/redis-rs/redis/src/cluster.rs b/glide-core/redis-rs/redis/src/cluster.rs index 1eb14d7b40..1107965bf3 100644 --- a/glide-core/redis-rs/redis/src/cluster.rs +++ b/glide-core/redis-rs/redis/src/cluster.rs @@ -784,7 +784,6 @@ where return Err(err); } RetryMethod::RetryImmediately => {} - RetryMethod::ReAuthenticate => {} } } } diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index f9d128a519..35997b2282 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -373,7 +373,7 @@ where Response::ClusterScanResult(..) | Response::Single(_) => unreachable!(), }) } - /// Reset the password used to authenticate with all cluster servers + /// Update the password used to authenticate with all cluster servers pub async fn update_connection_password( &mut self, password: Option, @@ -912,11 +912,6 @@ enum Next { // if not set, then a reconnect should happen without sending a request afterwards request: Option>, }, - ReAuth { - request: PendingRequest, - address: String, - error: Option, - }, Done, } @@ -1070,12 +1065,6 @@ impl Future for Request { self.respond(Err(err)); Next::Done.into() } - RetryMethod::ReAuthenticate => Next::ReAuth { - request: this.request.take().unwrap(), - address, - error: Some(err), - } - .into(), } } } @@ -2432,25 +2421,6 @@ where self.inner.pending_requests.lock().unwrap().push(request); } } - Next::ReAuth { - request, - address, - error, - } => { - let future = Self::re_auth_and_retry_request( - self.inner.clone(), - request.info.clone(), - address, - error, - ); - self.in_flight_requests.push(Box::pin(Request { - retry_params: retry_params.clone(), - request: Some(request), - future: RequestState::Future { - future: Box::pin(future), - }, - })); - } } } @@ -2465,60 +2435,6 @@ where } } - // The function is used to send an AUTH command to a node and then retry the original request. - // The function is used when the original request failed due to an AUTH error. - // Cases in which the function is helpful: - // 1. The password was changed, the connection was not re-established, and the request failed due to an AUTH error. - // 2. The protection method is allowing X hours of connection without authentication, and the request failed due to an AUTH error. - async fn re_auth_and_retry_request( - core: Core, - info: RequestInfo, - address: String, - error: Option, - ) -> OperationResult { - let password = core - .get_cluster_param(|params| params.password.clone()) - .expect(MUTEX_READ_ERR); - let username = core.get_cluster_param(|params| params.username.clone()); - let Some(password) = password else { - return Err(( - OperationTarget::Node { address }, - RedisError::from(( - ErrorKind::AuthenticationFailed, - "No password provided for AUTH", - format!("Original error={error:?}"), - )), - )); - }; - let mut auth_cmd = crate::cmd("AUTH"); - if let Ok(Some(username)) = username { - auth_cmd.arg(username); - } - auth_cmd.arg(password); - let cmd = Arc::new(auth_cmd.to_owned()); - let routing = - InternalRoutingInfo::SingleNode(InternalSingleNodeRouting::ByAddress(address.clone())); - let auth_info = RequestInfo { - cmd: CmdArg::Cmd { cmd, routing }, - }; - let response = Self::try_request(auth_info, core.clone()).await; - match response { - Ok(response) => { - if let Response::Single(Value::Okay) = response { - Self::try_request(info, core.clone()).await - } else { - Err(( - OperationTarget::Node { address }, - RedisError::from(( - ErrorKind::AuthenticationFailed, - "After NOAUTH error, AUTH try failed", - )), - )) - } - } - Err(err) => Err((OperationTarget::Node { address }, err.1)), - } - } fn send_refresh_error(&mut self) { if self.refresh_error.is_some() { if let Some(mut request) = Pin::new(&mut self.in_flight_requests) diff --git a/glide-core/redis-rs/redis/src/parser.rs b/glide-core/redis-rs/redis/src/parser.rs index 69ee54465d..1f42a774f1 100644 --- a/glide-core/redis-rs/redis/src/parser.rs +++ b/glide-core/redis-rs/redis/src/parser.rs @@ -39,7 +39,6 @@ fn err_parser(line: &str) -> ServerError { "MASTERDOWN" => ServerErrorKind::MasterDown, "READONLY" => ServerErrorKind::ReadOnly, "NOTBUSY" => ServerErrorKind::NotBusy, - "NOAUTH" => ServerErrorKind::NoAuth, code => { return ServerError::ExtensionError { code: code.to_string(), diff --git a/glide-core/redis-rs/redis/src/types.rs b/glide-core/redis-rs/redis/src/types.rs index c0a09f49c8..4b6cdbb150 100644 --- a/glide-core/redis-rs/redis/src/types.rs +++ b/glide-core/redis-rs/redis/src/types.rs @@ -153,9 +153,6 @@ pub enum ErrorKind { /// Not all slots are covered by the cluster NotAllSlotsCovered, - - /// The server requires authentication - NoAuth, } #[derive(PartialEq, Debug)] @@ -172,7 +169,6 @@ pub(crate) enum ServerErrorKind { MasterDown, ReadOnly, NotBusy, - NoAuth, } #[derive(PartialEq, Debug)] @@ -213,7 +209,6 @@ impl From for RedisError { ServerErrorKind::MasterDown => ErrorKind::MasterDown, ServerErrorKind::ReadOnly => ErrorKind::ReadOnly, ServerErrorKind::NotBusy => ErrorKind::NotBusy, - ServerErrorKind::NoAuth => ErrorKind::NoAuth, }; match detail { Some(detail) => RedisError::from((kind, desc, detail)), @@ -825,7 +820,6 @@ pub(crate) enum RetryMethod { AskRedirect, MovedRedirect, WaitAndRetryOnPrimaryRedirectOnReplica, - ReAuthenticate, } /// Indicates a general failure in the library. @@ -864,7 +858,6 @@ impl RedisError { ErrorKind::MasterDown => Some("MASTERDOWN"), ErrorKind::ReadOnly => Some("READONLY"), ErrorKind::NotBusy => Some("NOTBUSY"), - ErrorKind::NoAuth => Some("NOAUTH"), _ => match self.repr { ErrorRepr::ExtensionError(ref code, _) => Some(code), _ => None, @@ -907,7 +900,6 @@ impl RedisError { ErrorKind::RESP3NotSupported => "resp3 is not supported by server", ErrorKind::ParseError => "parse error", ErrorKind::NotAllSlotsCovered => "not all slots are covered", - ErrorKind::NoAuth => "authentication required", } } @@ -994,7 +986,6 @@ impl RedisError { RetryMethod::AskRedirect => false, RetryMethod::MovedRedirect => false, RetryMethod::WaitAndRetryOnPrimaryRedirectOnReplica => false, - RetryMethod::ReAuthenticate => false, } } @@ -1104,7 +1095,6 @@ impl RedisError { ErrorKind::NotAllSlotsCovered => RetryMethod::NoRetry, ErrorKind::FatalReceiveError => RetryMethod::Reconnect, ErrorKind::FatalSendError => RetryMethod::ReconnectAndRetry, - ErrorKind::NoAuth => RetryMethod::ReAuthenticate, } } } diff --git a/glide-core/redis-rs/redis/tests/auth.rs b/glide-core/redis-rs/redis/tests/auth.rs index 4e745b79db..2bf9d250a2 100644 --- a/glide-core/redis-rs/redis/tests/auth.rs +++ b/glide-core/redis-rs/redis/tests/auth.rs @@ -90,21 +90,24 @@ mod auth { } } - async fn kill_all_connections_but_caller(con: &mut ClusterConnection) { + async fn kill_non_management_connections(con: &mut Connection) { let mut kill_cmd = cmd("client"); - kill_cmd - .arg("kill") - .arg("type") - .arg("normal") - .arg("skipme") - .arg("yes"); - con.route_command(&kill_cmd, ALL_SUCCESS_ROUTE) - .await - .unwrap(); - } - - async fn reset_connection_standalone(con: &mut MultiplexedConnection) { - let _: () = cmd("reset").query_async(con).await.unwrap(); + kill_cmd.arg("kill").arg("type").arg("normal"); + match con { + Connection::Cluster(cluster_conn) => { + cluster_conn + .route_command(&kill_cmd, ALL_SUCCESS_ROUTE) + .await + .unwrap(); + } + Connection::Standalone(standalone_conn) => { + kill_cmd.arg("skipme").arg("no"); + kill_cmd + .query_async::<_, ()>(standalone_conn) + .await + .unwrap(); + } + } } #[tokio::test] @@ -113,7 +116,7 @@ mod auth { let cluster_context = TestClusterContext::new(3, 0); // Create a management connection to set the password - let mut management_connection = + let management_connection = match create_connection(None, ConnectionType::Cluster, Some(&cluster_context), None) .await .unwrap() @@ -131,7 +134,8 @@ mod auth { create_connection(None, ConnectionType::Cluster, Some(&cluster_context), None).await; assert!(connection_should_fail.is_err()); let err = connection_should_fail.err().unwrap(); - assert!(err.to_string().contains("NoAuth: Authentication required.")); + println!("{}", err.to_string()); + assert!(err.to_string().contains("Authentication required.")); // Test that we can connect with password let mut connection_should_succeed = match create_connection( @@ -162,7 +166,8 @@ mod auth { assert_eq!(res.unwrap(), Value::BulkString(b"bar".to_vec())); // Kill the connection to force reconnection - kill_all_connections_but_caller(&mut management_connection).await; + kill_non_management_connections(&mut Connection::Cluster(management_connection.clone())) + .await; // Attempt to get the value again to ensure reconnection works let should_be_ok: RedisResult = cmd("get") @@ -171,7 +176,7 @@ mod auth { .await; assert_eq!(should_be_ok.unwrap(), Value::BulkString(b"bar".to_vec())); - // Reset the password in the connection + // Update the password in the connection connection_should_succeed .update_connection_password(Some(NEW_PASSWORD.to_string())) .await @@ -193,11 +198,13 @@ mod auth { .await; assert!(connection_should_fail.is_err()); let err = connection_should_fail.err().unwrap(); - let detail = err.detail().unwrap(); - assert!(detail.contains("AuthenticationFailed")); + assert!(err + .to_string() + .contains("Password authentication failed- AuthenticationFailed")); // Kill the connection to force reconnection - kill_all_connections_but_caller(&mut management_connection).await; + let mut management_conn = Connection::Cluster(management_connection); + kill_non_management_connections(&mut management_conn).await; // Verify that the connection with new password still works let result_should_succeed: RedisResult = cmd("get") @@ -217,7 +224,7 @@ mod auth { let standalone_context = TestContext::new(); // Create a management connection to set the password - let mut management_connection = match create_connection( + let management_connection = match create_connection( None, ConnectionType::Standalone, None, @@ -269,7 +276,7 @@ mod auth { .await; assert_eq!(res.unwrap(), Value::Okay); - // Reset the password in the connection + // Update the password in the connection connection_should_succeed .update_connection_password(Some(NEW_PASSWORD.to_string())) .await @@ -282,7 +289,7 @@ mod auth { .unwrap(); // Reset the management connection - reset_connection_standalone(&mut management_connection).await; + kill_non_management_connections(&mut management_conn).await; // Test that we can't connect with the old password let connection_should_fail = create_connection( @@ -293,23 +300,5 @@ mod auth { ) .await; assert!(connection_should_fail.is_err()); - - // Verify that the management connection can't perform operations after reset - let result_should_fail: RedisResult = cmd("get") - .arg("foo") - .query_async(&mut management_connection) - .await; - assert!(result_should_fail.is_err()); - - // Verify that the connection with new password still works - let result_should_succeed: RedisResult = cmd("get") - .arg("foo") - .query_async(&mut connection_should_succeed) - .await; - assert!(result_should_succeed.is_ok()); - assert_eq!( - result_should_succeed.unwrap(), - Value::BulkString(b"bar".to_vec()) - ); } } diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index 00b7db3af2..ffbdc60d4e 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -477,7 +477,7 @@ impl Client { .fetch_add(1, Ordering::SeqCst) } - /// Reset the password used to authenticate with the servers. + /// Update the password used to authenticate with the servers. /// If None is passed, the password will be removed. /// If `re_auth` is true, the new password will be used to re-authenticate with all of the nodes. pub async fn update_connection_password( diff --git a/glide-core/src/client/standalone_client.rs b/glide-core/src/client/standalone_client.rs index cc2e1e179a..2a6dbd0e77 100644 --- a/glide-core/src/client/standalone_client.rs +++ b/glide-core/src/client/standalone_client.rs @@ -471,7 +471,8 @@ impl StandaloneClient { }); } - /// Replace password of the multiplexed connection. + /// Update the password used to authenticate with the servers. + /// If the password is `None`, the password will be removed. pub async fn update_connection_password( &mut self, password: Option, diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index c978cbe10d..665acb4fae 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -673,7 +673,7 @@ function toProtobufRoute( if (split.length !== 2) { throw new RequestError( "No port provided, expected host to be formatted as `{hostname}:{port}`. Received " + - host, + host, ); } @@ -992,30 +992,30 @@ export class BaseClient { ) { const message = Array.isArray(command) ? command_request.CommandRequest.create({ - callbackIdx, - transaction: command_request.Transaction.create({ - commands: command, - }), - }) + callbackIdx, + transaction: command_request.Transaction.create({ + commands: command, + }), + }) : command instanceof command_request.Command - ? command_request.CommandRequest.create({ + ? command_request.CommandRequest.create({ callbackIdx, singleCommand: command, }) - : command instanceof command_request.ClusterScan - ? command_request.CommandRequest.create({ + : command instanceof command_request.ClusterScan + ? command_request.CommandRequest.create({ + callbackIdx, + clusterScan: command, + }) + : command instanceof command_request.UpdateConnectionPassword + ? command_request.CommandRequest.create({ callbackIdx, - clusterScan: command, + updateConnectionPassword: command, }) - : command instanceof command_request.UpdateConnectionPassword - ? command_request.CommandRequest.create({ - callbackIdx, - updateConnectionPassword: command, - }) - : command_request.CommandRequest.create({ - callbackIdx, - scriptInvocation: command, - }); + : command_request.CommandRequest.create({ + callbackIdx, + scriptInvocation: command, + }); message.route = route; this.writeOrBufferRequest( @@ -5991,9 +5991,9 @@ export class BaseClient { ReadFrom, connection_request.ReadFrom > = { - primary: connection_request.ReadFrom.Primary, - preferReplica: connection_request.ReadFrom.PreferReplica, - }; + primary: connection_request.ReadFrom.Primary, + preferReplica: connection_request.ReadFrom.PreferReplica, + }; /** * Returns the number of messages that were successfully acknowledged by the consumer group member of a stream. @@ -7302,8 +7302,8 @@ export class BaseClient { res === null ? null : res!.map((r) => { - return { key: r.key, elements: r.value }; - })[0], + return { key: r.key, elements: r.value }; + })[0], ); } @@ -7345,8 +7345,8 @@ export class BaseClient { res === null ? null : res!.map((r) => { - return { key: r.key, elements: r.value }; - })[0], + return { key: r.key, elements: r.value }; + })[0], ); } @@ -7553,11 +7553,11 @@ export class BaseClient { : connection_request.ReadFrom.Primary; const authenticationInfo = options.credentials !== undefined && - "password" in options.credentials + "password" in options.credentials ? { - password: options.credentials.password, - username: options.credentials.username, - } + password: options.credentials.password, + username: options.credentials.username, + } : undefined; const protocol = options.protocol as | connection_request.ProtocolVersion diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index b9a792943b..7560cc7b23 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -185,7 +185,7 @@ async def test_can_connect_with_auth_requirepass( ["CONFIG", "SET", "requirepass", password] ) - with pytest.raises(ClosingError, match="NoAuth"): + with pytest.raises(ClosingError, match="NOAUTH"): # Creation of a new client without password should fail await create_client( request, diff --git a/python/python/tests/test_auth.py b/python/python/tests/test_auth.py index 165f8d3ea4..694c8c345b 100644 --- a/python/python/tests/test_auth.py +++ b/python/python/tests/test_auth.py @@ -30,14 +30,18 @@ async def config_set_new_password(client: TGlideClient, password): await client.config_set({"requirepass": password}, route=AllNodes()) -async def reset_connections(client: TGlideClient): +async def kill_connections(client: TGlideClient): """ - Resets the connections for the given TGlideClient server connected. + Kills all connections to the given TGlideClient server connected. """ if isinstance(client, GlideClient): - await client.custom_command(["RESET"]) + await client.custom_command( + ["CLIENT", "KILL", "TYPE", "normal", "skipme", "no"] + ) if isinstance(client, GlideClusterClient): - await client.custom_command(["RESET"], route=AllNodes()) + await client.custom_command( + ["CLIENT", "KILL", "TYPE", "normal", "skipme", "no"], route=AllNodes() + ) @pytest.mark.asyncio @@ -82,7 +86,7 @@ async def test_update_connection_password(self, glide_client: TGlideClient): value = await glide_client.get("test_key") assert value == b"test_value" await config_set_new_password(glide_client, NEW_PASSWORD) - await reset_connections(glide_client) + await kill_connections(glide_client) # Verify that the client is able to reconnect with the new password value = await glide_client.get("test_key") assert value == b"test_value" From 934f422118381962af65b076aea861dfa336ad3a Mon Sep 17 00:00:00 2001 From: umit Date: Mon, 11 Nov 2024 16:10:51 +0300 Subject: [PATCH 7/8] Go: add ReplaceConnectionPassword function in ServerManagementClusterCommands Signed-off-by: umit Signed-off-by: avifenesh --- go/api/base_client.go | 74 ++++++++++++++++++ go/api/commands.go | 92 +++++++++++++++++++++++ go/api/glide_client.go | 79 -------------------- go/integTest/glide_test_suite_test.go | 30 ++++++++ go/integTest/shared_commands_test.go | 51 +++++++++++++ go/integTest/standalone_commands_test.go | 54 +++++++++++++- go/src/lib.rs | 95 ++++++++++++++++++++++++ 7 files changed, 394 insertions(+), 81 deletions(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index 5347f711f1..bd37a2223f 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -25,6 +25,7 @@ type BaseClient interface { HashCommands ListCommands ConnectionManagementCommands + ServerManagementClusterCommands // Close terminates the client by closing all associated resources. Close() } @@ -130,6 +131,32 @@ func (client *baseClient) executeCommand(requestType C.RequestType, args []strin return payload.value, nil } +func (client *baseClient) executeCommandRequest(message proto.Message) (*C.struct_CommandResponse, error) { + msg, err := proto.Marshal(message) + if err != nil { + return nil, err + } + + resultChannel := make(chan payload) + resultChannelPtr := uintptr(unsafe.Pointer(&resultChannel)) + + requestBytes := C.CBytes(msg) + requestLen := len(msg) + + C.send_command_request( + client.coreClient, + C.uintptr_t(resultChannelPtr), + (*C.uchar)(requestBytes), + C.uintptr_t(requestLen), + ) + payload := <-resultChannel + if payload.error != nil { + return nil, payload.error + } + + return payload.value, nil +} + // Zero copying conversion from go's []string into C pointers func toCStrings(args []string) ([]C.uintptr_t, []C.ulong) { cStrings := make([]C.uintptr_t, len(args)) @@ -540,3 +567,50 @@ func (client *baseClient) PingWithMessage(message string) (string, error) { } return response.Value(), nil } + +func (client *baseClient) ConfigGet(args []string) (map[Result[string]]Result[string], error) { + res, err := client.executeCommand(C.ConfigGet, args) + if err != nil { + return nil, err + } + + return handleStringToStringMapResponse(res) +} + +func (client *baseClient) ConfigSet(parameters map[string]string) (Result[string], error) { + result, err := client.executeCommand(C.ConfigSet, utils.MapToString(parameters)) + if err != nil { + return CreateNilStringResult(), err + } + + return handleStringResponse(result) +} + +func (client *baseClient) CustomCommand(args []string) (interface{}, error) { + res, err := client.executeCommand(C.CustomCommand, args) + if err != nil { + return nil, err + } + + resString, err := handleStringOrNullResponse(res) + if err != nil { + return nil, err + } + + return resString.Value(), err +} + +func (client *baseClient) UpdateConnectionPassword(password *string, reAuth bool) (Result[string], error) { + request := protobuf.CommandRequest{ + Command: &protobuf.CommandRequest_UpdateConnectionPassword{ + UpdateConnectionPassword: &protobuf.UpdateConnectionPassword{Password: password, ReAuth: reAuth}, + }, + } + + res, err := client.executeCommandRequest(&request) + if err != nil { + return CreateNilStringResult(), err + } + + return handleStringResponse(res) +} diff --git a/go/api/commands.go b/go/api/commands.go index 3c422dd687..14dd757fbb 100644 --- a/go/api/commands.go +++ b/go/api/commands.go @@ -731,4 +731,96 @@ type ConnectionManagementCommands interface { // // [valkey.io]: https://valkey.io/commands/ping/ PingWithMessage(message string) (string, error) + + // UpdateConnectionPassword updates the connection password and optionally re-authenticates. + // + // See [valkey.io] for details. + // + // Parameters: + // password - The new password to set, or null to remove the password. + // reAuth - If true, re-authenticates the connection with the new password. + // + // Return value: + // A Result containing the operation status, or an error if the password replacement fails. + // + // For example: + // result, err := client.UpdateConnectionPassword("newpass123", true) + // // result equals api.CreateStringResult("OK") + // + // [valkey.io]: https://valkey.io/commands/auth/ + UpdateConnectionPassword(password *string, reAuth bool) (Result[string], error) +} + +// ServerManagementClusterCommands provides cluster management operations. +// +// See [valkey.io] for details. +// +// [valkey.io]: https://valkey.io/commands/?group=server +type ServerManagementClusterCommands interface { + // ConfigGet returns values for the specified configuration parameters. + // + // See [valkey.io] for details. + // + // Parameters: + // args - A slice of configuration parameter names to retrieve values for. + // + // Return value: + // A map of parameter names to their values, both wrapped in Result types, or an error if the operation fails. + // + // For example: + // params, err := client.ConfigGet([]string{"timeout", "maxmemory"}) + // // timeout equals api.CreateStringResult("1000") + // // maxmemory equals api.CreateStringResult("1GB") + // // params equals map[api.Result[string]]api.Result[string]{timeout: timeout, maxmemory: maxmemory} + // + // [valkey.io]: https://valkey.io/commands/config-get/ + ConfigGet(args []string) (map[Result[string]]Result[string], error) + + // ConfigSet updates the specified configuration parameters with new values. + // + // See [valkey.io] for details. + // + // Parameters: + // parameters - A map where keys are configuration parameter names and values are their new settings. + // + // Returns: + // - Result[string]: A Result containing "OK" if all parameters were set successfully + // - error: An error if the operation fails for any parameter + // + // Example: + // result, err := client.ConfigSet(map[string]string{ + // "timeout": "1000", + // "maxmemory": "1GB", + // }) + // // If successful: + // // result.Value() equals "OK" + // // err equals nil + // + // Common configurations: + // - "timeout": Connection timeout in milliseconds + // - "maxmemory": Maximum memory limit (e.g., "1GB", "512MB") + // - "maxclients": Maximum number of client connections + // - "requirepass": Authentication password + // + // [valkey.io]: https://valkey.io/commands/config-set/ + ConfigSet(parameters map[string]string) (Result[string], error) + + // CustomCommand executes a custom server command with the provided arguments. + // + // See [valkey.io] for details. + // + // Parameters: + // args - A slice containing the command name and its arguments. + // + // Return value: + // An interface{} that must be type asserted to the expected return type, or an error if the command execution fails. + // + // For example: + // result, err := client.CustomCommand([]string{"MYCOMMAND", "arg1", "arg2"}) + // // Assuming command returns a string + // // strResult := result.(string) + // // strResult equals "command response" + // + // [valkey.io]: https://valkey.io/commands/ + CustomCommand(args []string) (interface{}, error) } diff --git a/go/api/glide_client.go b/go/api/glide_client.go index 27f3dbac75..f08f429f6e 100644 --- a/go/api/glide_client.go +++ b/go/api/glide_client.go @@ -5,7 +5,6 @@ package api // #cgo LDFLAGS: -L../target/release -lglide_rs // #include "../lib.h" import "C" -import "github.com/valkey-io/valkey-glide/go/glide/utils" // GlideClient is a client used for connection in Standalone mode. type GlideClient struct { @@ -21,81 +20,3 @@ func NewGlideClient(config *GlideClientConfiguration) (*GlideClient, error) { return &GlideClient{client}, nil } - -// CustomCommand executes a single command, specified by args, without checking inputs. Every part of the command, including -// the command name and subcommands, should be added as a separate value in args. The returning value depends on the executed -// command. -// -// This function should only be used for single-response commands. Commands that don't return complete response and awaits -// (such as SUBSCRIBE), or that return potentially more than a single response (such as XREAD), or that change the client's -// behavior (such as entering pub/sub mode on RESP2 connections) shouldn't be called using this function. -// -// For example, to return a list of all pub/sub clients: -// -// client.CustomCommand([]string{"CLIENT", "LIST","TYPE", "PUBSUB"}) -// -// TODO: Add support for complex return types. -func (client *GlideClient) CustomCommand(args []string) (interface{}, error) { - res, err := client.executeCommand(C.CustomCommand, args) - if err != nil { - return nil, err - } - resString, err := handleStringOrNullResponse(res) - if err != nil { - return nil, err - } - return resString.Value(), err -} - -// Sets configuration parameters to the specified values. -// -// Note: Prior to Version 7.0.0, only one parameter can be send. -// -// Parameters: -// -// parameters - A map consisting of configuration parameters and their respective values to set. -// -// Return value: -// -// A api.Result[string] containing "OK" if all configurations have been successfully set. Otherwise, raises an error. -// -// For example: -// -// result, err := client.ConfigSet(map[string]string{"timeout": "1000", "maxmemory": "1GB"}) -// result.Value(): "OK" -// -// [valkey.io]: https://valkey.io/commands/config-set/ -func (client *GlideClient) ConfigSet(parameters map[string]string) (Result[string], error) { - result, err := client.executeCommand(C.ConfigSet, utils.MapToString(parameters)) - if err != nil { - return CreateNilStringResult(), err - } - return handleStringResponse(result) -} - -// Gets the values of configuration parameters. -// -// Note: Prior to Version 7.0.0, only one parameter can be send. -// -// Parameters: -// -// args - A slice of configuration parameter names to retrieve values for. -// -// Return value: -// -// A map of api.Result[string] corresponding to the configuration parameters. -// -// For example: -// -// result, err := client.ConfigGet([]string{"timeout" , "maxmemory"}) -// result[api.CreateStringResult("timeout")] = api.CreateStringResult("1000") -// result[api.CreateStringResult"maxmemory")] = api.CreateStringResult("1GB") -// -// [valkey.io]: https://valkey.io/commands/config-get/ -func (client *GlideClient) ConfigGet(args []string) (map[Result[string]]Result[string], error) { - res, err := client.executeCommand(C.ConfigGet, args) - if err != nil { - return nil, err - } - return handleStringToStringMapResponse(res) -} diff --git a/go/integTest/glide_test_suite_test.go b/go/integTest/glide_test_suite_test.go index fd4e0d92d5..9cab7dc83d 100644 --- a/go/integTest/glide_test_suite_test.go +++ b/go/integTest/glide_test_suite_test.go @@ -186,6 +186,18 @@ func (suite *GlideTestSuite) runWithClients(clients []api.BaseClient, test func( } } +func (suite *GlideTestSuite) runWithDefaultClient(test func(client *api.GlideClient)) { + suite.T().Run("Testing with default client", func(t *testing.T) { + test(suite.defaultClient()) + }) +} + +func (suite *GlideTestSuite) runWithClusterClient(test func(client api.BaseClient)) { + suite.T().Run("Testing with cluster client", func(t *testing.T) { + test(suite.defaultClusterClient()) + }) +} + func (suite *GlideTestSuite) verifyOK(result api.Result[string], err error) { assert.Nil(suite.T(), err) assert.Equal(suite.T(), api.OK, result.Value()) @@ -196,3 +208,21 @@ func (suite *GlideTestSuite) SkipIfServerVersionLowerThanBy(version string) { suite.T().Skipf("This feature is added in version %s", version) } } + +func (suite *GlideTestSuite) addAuthConfig(client api.BaseClient) { + config, err := client.ConfigSet(map[string]string{"requirepass": "pass"}) + suite.verifyOK(config, err) + + auth, err := client.CustomCommand([]string{"AUTH", "pass"}) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), auth.(string), api.OK) +} + +func (suite *GlideTestSuite) removeAuthConfig(client api.BaseClient) { + config, err := client.ConfigSet(map[string]string{"requirepass": ""}) + suite.verifyOK(config, err) + + reset, err := client.CustomCommand([]string{"RESET"}) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), reset.(string), "RESET") +} diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 6c0735ab9a..4ecb4d22d3 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -4,6 +4,7 @@ package integTest import ( "math" + "strings" "time" "github.com/google/uuid" @@ -1248,3 +1249,53 @@ func (suite *GlideTestSuite) TestRPush() { assert.IsType(suite.T(), &api.RequestError{}, err) }) } + +func (suite *GlideTestSuite) TestUpdateConnectionPassword_With_Cluster_Client() { + suite.runWithClusterClient(func(client api.BaseClient) { + suite.addAuthConfig(client) + + newPass := "newpass" + res, err := client.UpdateConnectionPassword(&newPass, false) + suite.verifyOK(res, err) + + key := uuid.NewString() + value := uuid.NewString() + + set, err := client.Set(key, value) + suite.verifyOK(set, err) + + get, err := client.Get(key) + suite.verifyOK(set, err) + assert.Equal(suite.T(), get.Value(), value) + + suite.removeAuthConfig(client) + }) +} + +func (suite *GlideTestSuite) TestUpdateConnectionPassword_No_Server_Auth_With_ClusterClient() { + suite.runWithClusterClient(func(client api.BaseClient) { + suite.addAuthConfig(client) + + newPass := "newpass" + res, err := client.UpdateConnectionPassword(&newPass, true) + + assert.NotNil(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.Empty(suite.T(), res.Value()) + + suite.removeAuthConfig(client) + }) +} + +func (suite *GlideTestSuite) TestUpdateConnectionPassword_Password_Long_With_ClusterClient() { + suite.runWithClusterClient(func(client api.BaseClient) { + suite.addAuthConfig(client) + + password := strings.Repeat("p", 1000) + + res, err := client.UpdateConnectionPassword(&password, false) + suite.verifyOK(res, err) + + suite.removeAuthConfig(client) + }) +} diff --git a/go/integTest/standalone_commands_test.go b/go/integTest/standalone_commands_test.go index 4186c6b184..8718e397d8 100644 --- a/go/integTest/standalone_commands_test.go +++ b/go/integTest/standalone_commands_test.go @@ -6,9 +6,9 @@ import ( "fmt" "strings" - "github.com/valkey-io/valkey-glide/go/glide/api" - + "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/valkey-io/valkey-glide/go/glide/api" ) func (suite *GlideTestSuite) TestCustomCommandInfo() { @@ -124,3 +124,53 @@ func (suite *GlideTestSuite) TestConfigSetAndGet_invalidArgs() { assert.Equal(suite.T(), map[api.Result[string]]api.Result[string]{}, result2) assert.Nil(suite.T(), err) } + +func (suite *GlideTestSuite) TestUpdateConnectionPassword() { + suite.runWithDefaultClient(func(client *api.GlideClient) { + suite.addAuthConfig(client) + + newPass := "newpass" + res, err := client.UpdateConnectionPassword(&newPass, false) + suite.verifyOK(res, err) + + key := uuid.NewString() + value := uuid.NewString() + + set, err := client.Set(key, value) + suite.verifyOK(set, err) + + get, err := client.Get(key) + suite.verifyOK(set, err) + assert.Equal(suite.T(), get.Value(), value) + + suite.removeAuthConfig(client) + }) +} + +func (suite *GlideTestSuite) TestUpdateConnectionPassword_No_Server_Auth() { + suite.runWithDefaultClient(func(client *api.GlideClient) { + suite.addAuthConfig(client) + + newPass := "newpass" + res, err := client.UpdateConnectionPassword(&newPass, true) + + assert.NotNil(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.Empty(suite.T(), res.Value()) + + suite.removeAuthConfig(client) + }) +} + +func (suite *GlideTestSuite) TestUpdateConnectionPassword_Password_long() { + suite.runWithDefaultClient(func(client *api.GlideClient) { + suite.addAuthConfig(client) + + password := strings.Repeat("p", 1000) + + res, err := client.UpdateConnectionPassword(&password, false) + suite.verifyOK(res, err) + + suite.removeAuthConfig(client) + }) +} diff --git a/go/src/lib.rs b/go/src/lib.rs index 344dac6e45..f9244e4044 100644 --- a/go/src/lib.rs +++ b/go/src/lib.rs @@ -4,6 +4,7 @@ #![deny(unsafe_op_in_unsafe_fn)] use glide_core::client::Client as GlideClient; +use glide_core::command_request; use glide_core::connection_request; use glide_core::errors; use glide_core::errors::RequestErrorType; @@ -543,3 +544,97 @@ pub unsafe extern "C" fn command( } }); } + +/// Executes a command request received from Go through the FFI interface. +/// +/// # Arguments +/// +/// * `client_adapter_ptr` - Pointer to the ClientAdapter instance +/// * `channel` - Channel identifier for callback routing +/// * `request_bytes` - Pointer to protobuf-encoded command request bytes +/// * `request_len` - Length of the request bytes array +/// +/// # Safety +/// +#[no_mangle] +pub unsafe extern "C" fn send_command_request( + client_adapter_ptr: *const c_void, + channel: usize, + request_bytes: *const u8, + request_len: usize, +) { + let client_adapter = + unsafe { Box::leak(Box::from_raw(client_adapter_ptr as *mut ClientAdapter)) }; + // The safety of this needs to be ensured by the calling code. Cannot dispose of the pointer before + // all operations have completed. + let ptr_address = client_adapter_ptr as usize; + + let request_bytes = unsafe { std::slice::from_raw_parts(request_bytes, request_len) }; + + let request = match command_request::CommandRequest::parse_from_bytes(request_bytes) { + Ok(resp) => resp, + Err(err) => { + let message = err.to_string(); + let c_err_str = CString::into_raw( + CString::new(message).expect("Couldn't convert error message to CString"), + ); + unsafe { + (client_adapter.failure_callback)(channel, c_err_str, RequestErrorType::Unspecified) + }; + return; + } + }; + + let mut client_clone = client_adapter.client.clone(); + client_adapter.runtime.spawn(async move { + let result = match request.command { + Some(command_request::command_request::Command::UpdateConnectionPassword( + update_connection_password, + )) => { + client_clone + .update_connection_password( + update_connection_password + .password + .map(|chars| chars.to_string()), + update_connection_password.re_auth, + ) + .await + } + // TODO: Add support for other return types. + _ => todo!(), + }; + + let client_adapter = unsafe { Box::leak(Box::from_raw(ptr_address as *mut ClientAdapter)) }; + let value = match result { + Ok(value) => value, + Err(err) => { + let message = errors::error_message(&err); + let error_type = errors::error_type(&err); + + let c_err_str = CString::into_raw( + CString::new(message).expect("Couldn't convert error message to CString"), + ); + unsafe { (client_adapter.failure_callback)(channel, c_err_str, error_type) }; + return; + } + }; + + let result: RedisResult = valkey_value_to_command_response(value); + unsafe { + match result { + Ok(message) => { + (client_adapter.success_callback)(channel, Box::into_raw(Box::new(message))) + } + Err(err) => { + let message = errors::error_message(&err); + let error_type = errors::error_type(&err); + + let c_err_str = CString::into_raw( + CString::new(message).expect("Couldn't convert error message to CString"), + ); + (client_adapter.failure_callback)(channel, c_err_str, error_type); + } + }; + } + }); +} From 5460c67edd4182253340134ad0f7482d977cc0cb Mon Sep 17 00:00:00 2001 From: umit Date: Wed, 13 Nov 2024 17:55:45 +0300 Subject: [PATCH 8/8] Go: add new commands: ServerManagementCommands, GenericCommands, ServerManagementClusterCommands and GenericClusterCommands and implement related functions. Signed-off-by: umit Signed-off-by: avifenesh Signed-off-by: umit Signed-off-by: avifenesh --- CHANGELOG.md | 1 + go/api/base_client.go | 1 - go/api/commands.go | 160 +++++++++++++++++++---- go/api/glide_client.go | 60 ++++++++- go/api/glide_cluster_client.go | 64 ++++++++- go/integTest/glide_test_suite_test.go | 68 ++++++---- go/integTest/shared_commands_test.go | 18 +-- go/integTest/standalone_commands_test.go | 18 +-- 8 files changed, 290 insertions(+), 100 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e6d5968117..0fb33ba509 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ #### Changes +* Go: Adding support for replacing connection configured password ([#2687](https://github.com/valkey-io/valkey-glide/pull/2687)) * Node, Python: Adding support for replacing connection configured password ([#2651](https://github.com/valkey-io/valkey-glide/pull/2651)) * Node: Add FT._ALIASLIST command([#2652](https://github.com/valkey-io/valkey-glide/pull/2652)) * Python: Python: `FT._ALIASLIST` command added([#2638](https://github.com/valkey-io/valkey-glide/pull/2638)) diff --git a/go/api/base_client.go b/go/api/base_client.go index bd37a2223f..86d38d79ff 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -25,7 +25,6 @@ type BaseClient interface { HashCommands ListCommands ConnectionManagementCommands - ServerManagementClusterCommands // Close terminates the client by closing all associated resources. Close() } diff --git a/go/api/commands.go b/go/api/commands.go index 14dd757fbb..5ffb44ebd3 100644 --- a/go/api/commands.go +++ b/go/api/commands.go @@ -737,7 +737,7 @@ type ConnectionManagementCommands interface { // See [valkey.io] for details. // // Parameters: - // password - The new password to set, or null to remove the password. + // password - The new password to update, or null to remove the password. // reAuth - If true, re-authenticates the connection with the new password. // // Return value: @@ -751,56 +751,51 @@ type ConnectionManagementCommands interface { UpdateConnectionPassword(password *string, reAuth bool) (Result[string], error) } -// ServerManagementClusterCommands provides cluster management operations. +// ServerManagementClusterCommands supports commands and transactions for the "Server Management Commands" group for +// a cluster client. // // See [valkey.io] for details. // // [valkey.io]: https://valkey.io/commands/?group=server type ServerManagementClusterCommands interface { - // ConfigGet returns values for the specified configuration parameters. + // ConfigGet retrieves the values of configuration parameters. + // The command will be sent to a random node. // // See [valkey.io] for details. // // Parameters: - // args - A slice of configuration parameter names to retrieve values for. + // parameters - An array of configuration parameter names to retrieve values for // - // Return value: - // A map of parameter names to their values, both wrapped in Result types, or an error if the operation fails. + // Returns: + // - map[Result[string]]Result[string]: A map of values corresponding to the configuration parameters + // - error: An error if the operation fails // // For example: - // params, err := client.ConfigGet([]string{"timeout", "maxmemory"}) - // // timeout equals api.CreateStringResult("1000") - // // maxmemory equals api.CreateStringResult("1GB") - // // params equals map[api.Result[string]]api.Result[string]{timeout: timeout, maxmemory: maxmemory} + // configParams, err := client.ConfigGet([]string{"timeout", "maxmemory"}) + // // configParams["timeout"].Value() equals "1000" + // // configParams["maxmemory"].Value() equals "1GB" // // [valkey.io]: https://valkey.io/commands/config-get/ ConfigGet(args []string) (map[Result[string]]Result[string], error) - // ConfigSet updates the specified configuration parameters with new values. + // ConfigSet sets configuration parameters to the specified values. + // The command will be sent to all nodes. // // See [valkey.io] for details. // // Parameters: - // parameters - A map where keys are configuration parameter names and values are their new settings. + // parameters - A map consisting of configuration parameters and their respective values to set // // Returns: - // - Result[string]: A Result containing "OK" if all parameters were set successfully - // - error: An error if the operation fails for any parameter - // - // Example: - // result, err := client.ConfigSet(map[string]string{ - // "timeout": "1000", - // "maxmemory": "1GB", - // }) - // // If successful: - // // result.Value() equals "OK" - // // err equals nil - // - // Common configurations: - // - "timeout": Connection timeout in milliseconds - // - "maxmemory": Maximum memory limit (e.g., "1GB", "512MB") - // - "maxclients": Maximum number of client connections - // - "requirepass": Authentication password + // - Result[string]: "OK" if all configurations have been successfully set + // - error: An error if the operation fails + // + // For example: + // resp, err := client.ConfigSet(map[string]string{ + // "timeout": "1000", + // "maxmemory": "1GB", + // }) + // // resp.Value() equals "OK" // // [valkey.io]: https://valkey.io/commands/config-set/ ConfigSet(parameters map[string]string) (Result[string], error) @@ -824,3 +819,110 @@ type ServerManagementClusterCommands interface { // [valkey.io]: https://valkey.io/commands/ CustomCommand(args []string) (interface{}, error) } + +// ServerManagementCommands supports commands and transactions for the "Server Management" group for a +// standalone client. +// +// See [valkey.io] for details. +// +// [valkey.io]: https://valkey.io/commands/?group=server +type ServerManagementCommands interface { + // ConfigGet retrieves the values of configuration parameters. + // + // See [valkey.io] for details. + // + // Parameters: + // parameters - An array of configuration parameter names to retrieve values for + // + // Returns: + // - map[Result[string]]Result[string]: A map of values corresponding to the configuration parameters + // - error: An error if the operation fails + // + // For example: + // configParams, err := client.ConfigGet([]string{"timeout", "maxmemory"}) + // // configParams["timeout"].Value() equals "1000" + // // configParams["maxmemory"].Value() equals "1GB" + // + // [valkey.io]: https://valkey.io/commands/config-get/ + ConfigGet(args []string) (map[Result[string]]Result[string], error) + + // ConfigSet sets configuration parameters to the specified values. + // + // See [valkey.io] for details. + // + // Parameters: + // parameters - A map consisting of configuration parameters and their respective values to set + // + // Returns: + // - Result[string]: "OK" if all configurations have been successfully set + // - error: An error if the operation fails + // + // For example: + // resp, err := client.ConfigSet(map[string]string{ + // "timeout": "1000", + // "maxmemory": "1GB", + // }) + // // resp.Value() equals "OK" + // + // [valkey.io]: https://valkey.io/commands/config-set/ + ConfigSet(parameters map[string]string) (Result[string], error) +} + +// GenericClusterCommands supports generic commands for cluster mode. +// +// See [valkey.io] for details. +// +// [valkey.io]: https://valkey.io/commands/?group=generic +type GenericClusterCommands interface { + // CustomCommand executes a single command without checking inputs. Every part of the command, including subcommands, + // should be added as a separate value in args. The command will be routed automatically based on the passed + // command's default request policy. + // + // See [wiki] for details. + // + // Parameters: + // args - Arguments for the custom command including the command name. For example: ["ping"] + // + // Returns: + // - interface{}: The returned value for the custom command. For example: "PONG" for ping command + // - error: An error if the operation fails + // + // For example: + // resp, err := client.CustomCommand([]string{"ping"}) + // // If successful: + // // resp equals "PONG" + // // err equals nil + // + // [wiki]: https://github.com/valkey-io/valkey-glide/wiki/General-Concepts#custom-command + CustomCommand(args []string) (interface{}, error) +} + +// GenericCommands supports generic commands and transactions for standalone mode. +// +// See [valkey.io] for details. +// +// [valkey.io]: https://valkey.io/commands/?group=generic +type GenericCommands interface { + // CustomCommand executes a single command without checking inputs. Every part of the command, + // including subcommands, should be added as a separate value in args. + // + // See [wiki] for details. + // + // Parameters: + // args - Arguments for the custom command + // + // Returns: + // - interface{}: The returned value for the custom command + // - error: An error if the operation fails + // + // For example: + // // Simple ping command + // resp, err := client.CustomCommand([]string{"ping", "GLIDE"}) + // // resp equals "GLIDE" + // + // // Get a list of all pub/sub clients + // result, err := client.CustomCommand([]string{"CLIENT", "LIST", "TYPE", "PUBSUB"}) + // + // [wiki]: https://github.com/valkey-io/valkey-glide/wiki/General-Concepts#custom-command + CustomCommand(args []string) (interface{}, error) +} diff --git a/go/api/glide_client.go b/go/api/glide_client.go index f08f429f6e..d69fe3af9b 100644 --- a/go/api/glide_client.go +++ b/go/api/glide_client.go @@ -6,17 +6,65 @@ package api // #include "../lib.h" import "C" -// GlideClient is a client used for connection in Standalone mode. -type GlideClient struct { - *baseClient -} +import ( + "github.com/valkey-io/valkey-glide/go/glide/utils" +) + +// GlideClient interface compliance check. +var _ GlideClient = (*glideClient)(nil) + +type ( + // GlideClient is a client used for connection in Standalone mode. + GlideClient interface { + BaseClient + ServerManagementCommands + GenericCommands + } + + // glideClient implements standalone mode operations by extending baseClient functionality. + glideClient struct { + *baseClient + } +) // NewGlideClient creates a [GlideClient] in standalone mode using the given [GlideClientConfiguration]. -func NewGlideClient(config *GlideClientConfiguration) (*GlideClient, error) { +func NewGlideClient(config *GlideClientConfiguration) (GlideClient, error) { client, err := createClient(config) if err != nil { return nil, err } - return &GlideClient{client}, nil + return &glideClient{client}, nil +} + +func (client *glideClient) ConfigGet(args []string) (map[Result[string]]Result[string], error) { + res, err := client.executeCommand(C.ConfigGet, args) + if err != nil { + return nil, err + } + + return handleStringToStringMapResponse(res) +} + +func (client *glideClient) ConfigSet(parameters map[string]string) (Result[string], error) { + result, err := client.executeCommand(C.ConfigSet, utils.MapToString(parameters)) + if err != nil { + return CreateNilStringResult(), err + } + + return handleStringResponse(result) +} + +func (client *glideClient) CustomCommand(args []string) (interface{}, error) { + cmdResp, err := client.executeCommand(C.CustomCommand, args) + if err != nil { + return nil, err + } + + res, err := handleStringOrNullResponse(cmdResp) + if err != nil { + return nil, err + } + + return res.Value(), err } diff --git a/go/api/glide_cluster_client.go b/go/api/glide_cluster_client.go index 7ab186d7c2..c4e86be5d8 100644 --- a/go/api/glide_cluster_client.go +++ b/go/api/glide_cluster_client.go @@ -2,17 +2,69 @@ package api -// GlideClusterClient is a client used for connection in cluster mode. -type GlideClusterClient struct { - *baseClient -} +// #cgo LDFLAGS: -L../target/release -lglide_rs +// #include "../lib.h" +import "C" + +import ( + "github.com/valkey-io/valkey-glide/go/glide/utils" +) + +// GlideClusterClient interface compliance check. +var _ GlideClusterClient = (*glideClusterClient)(nil) + +type ( + // GlideClusterClient is a client used for connection in cluster mode. + GlideClusterClient interface { + BaseClient + ServerManagementClusterCommands + GenericClusterCommands + } + + // glideClusterClient implements cluster operations by extending baseClient functionality. + glideClusterClient struct { + *baseClient + } +) // NewGlideClusterClient creates a [GlideClusterClient] in cluster mode using the given [GlideClusterClientConfiguration]. -func NewGlideClusterClient(config *GlideClusterClientConfiguration) (*GlideClusterClient, error) { +func NewGlideClusterClient(config *GlideClusterClientConfiguration) (GlideClusterClient, error) { client, err := createClient(config) if err != nil { return nil, err } - return &GlideClusterClient{client}, nil + return &glideClusterClient{client}, nil +} + +func (client *glideClusterClient) ConfigGet(args []string) (map[Result[string]]Result[string], error) { + res, err := client.executeCommand(C.ConfigGet, args) + if err != nil { + return nil, err + } + + return handleStringToStringMapResponse(res) +} + +func (client *glideClusterClient) ConfigSet(parameters map[string]string) (Result[string], error) { + result, err := client.executeCommand(C.ConfigSet, utils.MapToString(parameters)) + if err != nil { + return CreateNilStringResult(), err + } + + return handleStringResponse(result) +} + +func (client *glideClusterClient) CustomCommand(args []string) (interface{}, error) { + cmdResp, err := client.executeCommand(C.CustomCommand, args) + if err != nil { + return nil, err + } + + res, err := handleStringOrNullResponse(cmdResp) + if err != nil { + return nil, err + } + + return res.Value(), err } diff --git a/go/integTest/glide_test_suite_test.go b/go/integTest/glide_test_suite_test.go index 9cab7dc83d..22d285e3d9 100644 --- a/go/integTest/glide_test_suite_test.go +++ b/go/integTest/glide_test_suite_test.go @@ -22,8 +22,8 @@ type GlideTestSuite struct { standalonePorts []int clusterPorts []int serverVersion string - clients []*api.GlideClient - clusterClients []*api.GlideClusterClient + clients []api.GlideClient + clusterClients []api.GlideClusterClient } func (suite *GlideTestSuite) SetupSuite() { @@ -144,14 +144,14 @@ func (suite *GlideTestSuite) getDefaultClients() []api.BaseClient { return []api.BaseClient{suite.defaultClient(), suite.defaultClusterClient()} } -func (suite *GlideTestSuite) defaultClient() *api.GlideClient { +func (suite *GlideTestSuite) defaultClient() api.GlideClient { config := api.NewGlideClientConfiguration(). WithAddress(&api.NodeAddress{Port: suite.standalonePorts[0]}). WithRequestTimeout(5000) return suite.client(config) } -func (suite *GlideTestSuite) client(config *api.GlideClientConfiguration) *api.GlideClient { +func (suite *GlideTestSuite) client(config *api.GlideClientConfiguration) api.GlideClient { client, err := api.NewGlideClient(config) assert.Nil(suite.T(), err) @@ -161,14 +161,14 @@ func (suite *GlideTestSuite) client(config *api.GlideClientConfiguration) *api.G return client } -func (suite *GlideTestSuite) defaultClusterClient() *api.GlideClusterClient { +func (suite *GlideTestSuite) defaultClusterClient() api.GlideClusterClient { config := api.NewGlideClusterClientConfiguration(). WithAddress(&api.NodeAddress{Port: suite.clusterPorts[0]}). WithRequestTimeout(5000) return suite.clusterClient(config) } -func (suite *GlideTestSuite) clusterClient(config *api.GlideClusterClientConfiguration) *api.GlideClusterClient { +func (suite *GlideTestSuite) clusterClient(config *api.GlideClusterClientConfiguration) api.GlideClusterClient { client, err := api.NewGlideClusterClient(config) assert.Nil(suite.T(), err) @@ -186,15 +186,45 @@ func (suite *GlideTestSuite) runWithClients(clients []api.BaseClient, test func( } } -func (suite *GlideTestSuite) runWithDefaultClient(test func(client *api.GlideClient)) { +func (suite *GlideTestSuite) runWithDefaultClientAndAuth(test func(client api.GlideClient)) { suite.T().Run("Testing with default client", func(t *testing.T) { - test(suite.defaultClient()) + client := suite.defaultClient() + config, err := client.ConfigSet(map[string]string{"requirepass": "pass"}) + suite.verifyOK(config, err) + + auth, err := client.CustomCommand([]string{"AUTH", "pass"}) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), auth.(string), api.OK) + + test(client) + + emptyConfig, err := client.ConfigSet(map[string]string{"requirepass": ""}) + suite.verifyOK(emptyConfig, err) + + reset, err := client.CustomCommand([]string{"RESET"}) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), reset.(string), "RESET") }) } -func (suite *GlideTestSuite) runWithClusterClient(test func(client api.BaseClient)) { +func (suite *GlideTestSuite) runWithClusterClient(test func(client api.GlideClusterClient)) { suite.T().Run("Testing with cluster client", func(t *testing.T) { - test(suite.defaultClusterClient()) + client := suite.defaultClusterClient() + config, err := client.ConfigSet(map[string]string{"requirepass": "pass"}) + suite.verifyOK(config, err) + + auth, err := client.CustomCommand([]string{"AUTH", "pass"}) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), auth.(string), api.OK) + + test(client) + + emptyConfig, err := client.ConfigSet(map[string]string{"requirepass": ""}) + suite.verifyOK(emptyConfig, err) + + reset, err := client.CustomCommand([]string{"RESET"}) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), reset.(string), "RESET") }) } @@ -208,21 +238,3 @@ func (suite *GlideTestSuite) SkipIfServerVersionLowerThanBy(version string) { suite.T().Skipf("This feature is added in version %s", version) } } - -func (suite *GlideTestSuite) addAuthConfig(client api.BaseClient) { - config, err := client.ConfigSet(map[string]string{"requirepass": "pass"}) - suite.verifyOK(config, err) - - auth, err := client.CustomCommand([]string{"AUTH", "pass"}) - assert.Nil(suite.T(), err) - assert.Equal(suite.T(), auth.(string), api.OK) -} - -func (suite *GlideTestSuite) removeAuthConfig(client api.BaseClient) { - config, err := client.ConfigSet(map[string]string{"requirepass": ""}) - suite.verifyOK(config, err) - - reset, err := client.CustomCommand([]string{"RESET"}) - assert.Nil(suite.T(), err) - assert.Equal(suite.T(), reset.(string), "RESET") -} diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 4ecb4d22d3..f0016eff88 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -1251,9 +1251,7 @@ func (suite *GlideTestSuite) TestRPush() { } func (suite *GlideTestSuite) TestUpdateConnectionPassword_With_Cluster_Client() { - suite.runWithClusterClient(func(client api.BaseClient) { - suite.addAuthConfig(client) - + suite.runWithClusterClient(func(client api.GlideClusterClient) { newPass := "newpass" res, err := client.UpdateConnectionPassword(&newPass, false) suite.verifyOK(res, err) @@ -1267,35 +1265,25 @@ func (suite *GlideTestSuite) TestUpdateConnectionPassword_With_Cluster_Client() get, err := client.Get(key) suite.verifyOK(set, err) assert.Equal(suite.T(), get.Value(), value) - - suite.removeAuthConfig(client) }) } func (suite *GlideTestSuite) TestUpdateConnectionPassword_No_Server_Auth_With_ClusterClient() { - suite.runWithClusterClient(func(client api.BaseClient) { - suite.addAuthConfig(client) - + suite.runWithClusterClient(func(client api.GlideClusterClient) { newPass := "newpass" res, err := client.UpdateConnectionPassword(&newPass, true) assert.NotNil(suite.T(), err) assert.IsType(suite.T(), &api.RequestError{}, err) assert.Empty(suite.T(), res.Value()) - - suite.removeAuthConfig(client) }) } func (suite *GlideTestSuite) TestUpdateConnectionPassword_Password_Long_With_ClusterClient() { - suite.runWithClusterClient(func(client api.BaseClient) { - suite.addAuthConfig(client) - + suite.runWithClusterClient(func(client api.GlideClusterClient) { password := strings.Repeat("p", 1000) res, err := client.UpdateConnectionPassword(&password, false) suite.verifyOK(res, err) - - suite.removeAuthConfig(client) }) } diff --git a/go/integTest/standalone_commands_test.go b/go/integTest/standalone_commands_test.go index 8718e397d8..63518ac57c 100644 --- a/go/integTest/standalone_commands_test.go +++ b/go/integTest/standalone_commands_test.go @@ -126,9 +126,7 @@ func (suite *GlideTestSuite) TestConfigSetAndGet_invalidArgs() { } func (suite *GlideTestSuite) TestUpdateConnectionPassword() { - suite.runWithDefaultClient(func(client *api.GlideClient) { - suite.addAuthConfig(client) - + suite.runWithDefaultClientAndAuth(func(client api.GlideClient) { newPass := "newpass" res, err := client.UpdateConnectionPassword(&newPass, false) suite.verifyOK(res, err) @@ -142,35 +140,25 @@ func (suite *GlideTestSuite) TestUpdateConnectionPassword() { get, err := client.Get(key) suite.verifyOK(set, err) assert.Equal(suite.T(), get.Value(), value) - - suite.removeAuthConfig(client) }) } func (suite *GlideTestSuite) TestUpdateConnectionPassword_No_Server_Auth() { - suite.runWithDefaultClient(func(client *api.GlideClient) { - suite.addAuthConfig(client) - + suite.runWithDefaultClientAndAuth(func(client api.GlideClient) { newPass := "newpass" res, err := client.UpdateConnectionPassword(&newPass, true) assert.NotNil(suite.T(), err) assert.IsType(suite.T(), &api.RequestError{}, err) assert.Empty(suite.T(), res.Value()) - - suite.removeAuthConfig(client) }) } func (suite *GlideTestSuite) TestUpdateConnectionPassword_Password_long() { - suite.runWithDefaultClient(func(client *api.GlideClient) { - suite.addAuthConfig(client) - + suite.runWithDefaultClientAndAuth(func(client api.GlideClient) { password := strings.Repeat("p", 1000) res, err := client.UpdateConnectionPassword(&password, false) suite.verifyOK(res, err) - - suite.removeAuthConfig(client) }) }