Skip to content

Commit

Permalink
Don't perform blocking connect inside the BlockingConnectionQueue Con…
Browse files Browse the repository at this point in the history
…dition variable.
  • Loading branch information
kristjanvalur committed Oct 10, 2023
1 parent 054caf3 commit 8907150
Showing 1 changed file with 22 additions and 10 deletions.
32 changes: 22 additions & 10 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,15 +1027,8 @@ def can_get_connection(self) -> bool:
)

async def get_connection(self, command_name, *keys, **options):
"""Get a connection from the pool"""
try:
connection = self._available_connections.pop()
except IndexError:
if len(self._in_use_connections) >= self.max_connections:
raise ConnectionError("Too many connections") from None
connection = self.make_connection()
self._in_use_connections.add(connection)

"""Get a connected connection from the pool"""
connection = self.get_available_connection()
try:
await self.ensure_connection(connection)
except BaseException:
Expand All @@ -1044,6 +1037,16 @@ async def get_connection(self, command_name, *keys, **options):

return connection

def get_available_connection(self):
"""Get a connection from the pool, without making sure it is connected"""
try:
connection = self._available_connections.pop()
except IndexError:
if len(self._in_use_connections) >= self.max_connections:
raise ConnectionError("Too many connections") from None
connection = self.make_connection()
self._in_use_connections.add(connection)

def get_encoder(self):
"""Return an encoder based on encoding settings"""
kwargs = self.connection_kwargs
Expand Down Expand Up @@ -1169,10 +1172,19 @@ async def get_connection(self, command_name, *keys, **options):
async with async_timeout(self.timeout):
async with self._condition:
await self._condition.wait_for(self.can_get_connection)
return await super().get_connection(command_name, *keys, **options)
connection = super().get_available_connection()

except asyncio.TimeoutError as err:
raise ConnectionError("No connection available.") from err

# We now perform the connection check outside of the lock.
try:
await self.ensure_connection(connection)
return connection
except BaseException:
await self.release(connection)
raise

async def release(self, connection: AbstractConnection):
"""Releases the connection back to the pool."""
async with self._condition:
Expand Down

0 comments on commit 8907150

Please sign in to comment.