Skip to content

Commit

Permalink
Allow tracking/reporting and closing of "lost" connections.
Browse files Browse the repository at this point in the history
ConnectionPool keeps a WeakSet of in_use connections, allowing lost ones to be collected.
Collection produces a warning and closes the underlying transport.
  • Loading branch information
kristjanvalur committed Oct 12, 2023
1 parent 5391c5f commit 5a848e9
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
1 change: 1 addition & 0 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ def __del__(
_grl().call_exception_handler(context)
except RuntimeError:
pass
self.connection._close()

async def aclose(self, close_connection_pool: Optional[bool] = None) -> None:
"""
Expand Down
23 changes: 21 additions & 2 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import socket
import ssl
import sys
import warnings
import weakref
from abc import abstractmethod
from itertools import chain
Expand Down Expand Up @@ -204,9 +205,27 @@ def __init__(
raise ConnectionError("protocol must be either 2 or 3")
self.protocol = protocol

def __del__(self, _warnings: Any = warnings):
# For some reason, the individual streams don't get properly garbage
# collected and therefore produce no resource warnings. We add one
# here, in the same style as those from the stdlib.
if self._writer:
_warnings.warn(
f"unclosed Connection {self!r}", ResourceWarning, source=self
)
self._close()

def _close(self):
"""
Internal method to silently close the connection without waiting
"""
if self._writer:
self._writer.close()
self._writer = self._reader = None

def __repr__(self):
repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces()))
return f"{self.__class__.__name__}<{repr_args}>"
return f"<{self.__class__.__module__}.{self.__class__.__name__}({repr_args})>"

@abstractmethod
def repr_pieces(self):
Expand Down Expand Up @@ -1017,7 +1036,7 @@ def __repr__(self):

def reset(self):
self._available_connections = []
self._in_use_connections = set()
self._in_use_connections = weakref.WeakSet()

def can_get_connection(self) -> bool:
"""Return True if a connection can be retrieved from the pool."""
Expand Down

0 comments on commit 5a848e9

Please sign in to comment.