From 64d474e75c0943e145cfc157e4a0beb65b079e91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Tue, 10 Oct 2023 18:22:35 +0000 Subject: [PATCH] Don't perform blocking connect inside the BlockingConnectionQueue Condition variable. --- redis/asyncio/connection.py | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 65fa58643b..1ef9960ff3 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -1027,7 +1027,18 @@ def can_get_connection(self) -> bool: ) async def get_connection(self, command_name, *keys, **options): - """Get a connection from the pool""" + """Get a connected connection from the pool""" + connection = self.get_available_connection() + try: + await self.ensure_connection(connection) + except BaseException: + await self.release(connection) + raise + + return connection + + def get_available_connection(self): + """Get a connection from the pool, without making sure it is connected""" try: connection = self._available_connections.pop() except IndexError: @@ -1035,13 +1046,6 @@ async def get_connection(self, command_name, *keys, **options): raise ConnectionError("Too many connections") from None connection = self.make_connection() self._in_use_connections.add(connection) - - try: - await self.ensure_connection(connection) - except BaseException: - await self.release(connection) - raise - return connection def get_encoder(self): @@ -1166,13 +1170,21 @@ def __init__( async def get_connection(self, command_name, *keys, **options): """Gets a connection from the pool, blocking until one is available""" try: - async with async_timeout(self.timeout): - async with self._condition: + async with self._condition: + async with async_timeout(self.timeout): await self._condition.wait_for(self.can_get_connection) - return await super().get_connection(command_name, *keys, **options) + connection = super().get_available_connection() except asyncio.TimeoutError as err: raise ConnectionError("No connection available.") from err + # We now perform the connection check outside of the lock. + try: + await self.ensure_connection(connection) + return connection + except BaseException: + await self.release(connection) + raise + async def release(self, connection: AbstractConnection): """Releases the connection back to the pool.""" async with self._condition: