From 74fefedf993779b46dc69833f135dfaf681ef89d Mon Sep 17 00:00:00 2001 From: Dongkeun Lee <3315213+zakaf@users.noreply.github.com> Date: Mon, 5 Feb 2024 02:04:10 +0900 Subject: [PATCH] release already acquired connections on ClusterPipeline, when get_connection raises an exception (#3133) Signed-off-by: zach.lee --- redis/cluster.py | 2 ++ tests/test_cluster.py | 26 ++++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/redis/cluster.py b/redis/cluster.py index c36665eb5c..ba25b92246 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -2143,6 +2143,8 @@ def _send_cluster_commands( try: connection = get_connection(redis_node, c.args) except ConnectionError: + for n in nodes.values(): + n.connection_pool.release(n.connection) # Connection retries are being handled in the node's # Retry object. Reinitialize the node -> slot table. self.nodes_manager.initialize() diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 854b64c563..8a44d45ea3 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -10,6 +10,7 @@ from unittest.mock import DEFAULT, Mock, call, patch import pytest +import redis from redis import Redis from redis._parsers import CommandsParser from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff @@ -3250,6 +3251,31 @@ def raise_ask_error(): assert ask_node.redis_connection.connection.read_response.called assert res == ["MOCK_OK"] + def test_return_previously_acquired_connections(self, r): + # in order to ensure that a pipeline will make use of connections + # from different nodes + assert r.keyslot("a") != r.keyslot("b") + + orig_func = redis.cluster.get_connection + with patch("redis.cluster.get_connection") as get_connection: + + def raise_error(target_node, *args, **kwargs): + if get_connection.call_count == 2: + raise ConnectionError("mocked error") + else: + return orig_func(target_node, *args, **kwargs) + + get_connection.side_effect = raise_error + + r.pipeline().get("a").get("b").execute() + + # 4 = 2 get_connections per execution * 2 executions + assert get_connection.call_count == 4 + for cluster_node in r.nodes_manager.nodes_cache.values(): + connection_pool = cluster_node.redis_connection.connection_pool + num_of_conns = len(connection_pool._available_connections) + assert num_of_conns == connection_pool._created_connections + def test_empty_stack(self, r): """ If pipeline is executed with no commands it should