diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 65fa58643b..d949054c00 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -1027,15 +1027,8 @@ def can_get_connection(self) -> bool: ) async def get_connection(self, command_name, *keys, **options): - """Get a connection from the pool""" - try: - connection = self._available_connections.pop() - except IndexError: - if len(self._in_use_connections) >= self.max_connections: - raise ConnectionError("Too many connections") from None - connection = self.make_connection() - self._in_use_connections.add(connection) - + """Get a connected connection from the pool""" + connection = self.get_available_connection() try: await self.ensure_connection(connection) except BaseException: @@ -1044,6 +1037,16 @@ async def get_connection(self, command_name, *keys, **options): return connection + def get_available_connection(self): + """Get a connection from the pool, without making sure it is connected""" + try: + connection = self._available_connections.pop() + except IndexError: + if len(self._in_use_connections) >= self.max_connections: + raise ConnectionError("Too many connections") from None + connection = self.make_connection() + self._in_use_connections.add(connection) + def get_encoder(self): """Return an encoder based on encoding settings""" kwargs = self.connection_kwargs @@ -1169,10 +1172,19 @@ async def get_connection(self, command_name, *keys, **options): async with async_timeout(self.timeout): async with self._condition: await self._condition.wait_for(self.can_get_connection) - return await super().get_connection(command_name, *keys, **options) + connection = super().get_available_connection() + except asyncio.TimeoutError as err: raise ConnectionError("No connection available.") from err + # We now perform the connection check outside of the lock. + try: + await self.ensure_connection(connection) + return connection + except BaseException: + await self.release(connection) + raise + async def release(self, connection: AbstractConnection): """Releases the connection back to the pool.""" async with self._condition: