Skip to content

Commit

Permalink
Make the connection callback methods public again, add documentation (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kristjanvalur authored and vladvildanov committed Sep 27, 2024
1 parent ce9eb83 commit e0c6035
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Connection.register_connect_callback() is made public.
* Fix async `read_response` to use `disable_decoding`.
* Add 'aclose()' methods to async classes, deprecate async close().
* Fix #2831, add auto_close_connection_pool=True arg to asyncio.Redis.from_url()
Expand Down
6 changes: 3 additions & 3 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ async def __aexit__(self, exc_type, exc_value, traceback):

def __del__(self):
if self.connection:
self.connection._deregister_connect_callback(self.on_connect)
self.connection.deregister_connect_callback(self.on_connect)

async def aclose(self):
# In case a connection property does not yet exist
Expand All @@ -796,7 +796,7 @@ async def aclose(self):
async with self._lock:
if self.connection:
await self.connection.disconnect()
self.connection._deregister_connect_callback(self.on_connect)
self.connection.deregister_connect_callback(self.on_connect)
await self.connection_pool.release(self.connection)
self.connection = None
self.channels = {}
Expand Down Expand Up @@ -859,7 +859,7 @@ async def connect(self):
)
# register a callback that re-subscribes to any channels we
# were listening to when we were disconnected
self.connection._register_connect_callback(self.on_connect)
self.connection.register_connect_callback(self.on_connect)
else:
await self.connection.connect()
if self.push_handler_func is not None and not HIREDIS_AVAILABLE:
Expand Down
16 changes: 14 additions & 2 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,24 @@ def repr_pieces(self):
def is_connected(self):
return self._reader is not None and self._writer is not None

def _register_connect_callback(self, callback):
def register_connect_callback(self, callback):
"""
Register a callback to be called when the connection is established either
initially or reconnected. This allows listeners to issue commands that
are ephemeral to the connection, for example pub/sub subscription or
key tracking. The callback must be a _method_ and will be kept as
a weak reference.
"""
wm = weakref.WeakMethod(callback)
if wm not in self._connect_callbacks:
self._connect_callbacks.append(wm)

def _deregister_connect_callback(self, callback):
def deregister_connect_callback(self, callback):
"""
De-register a previously registered callback. It will no-longer receive
notifications on connection events. Calling this is not required when the
listener goes away, since the callbacks are kept as weak methods.
"""
try:
self._connect_callbacks.remove(weakref.WeakMethod(callback))
except ValueError:
Expand Down
4 changes: 2 additions & 2 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ def __del__(self) -> None:
def reset(self) -> None:
if self.connection:
self.connection.disconnect()
self.connection._deregister_connect_callback(self.on_connect)
self.connection.deregister_connect_callback(self.on_connect)
self.connection_pool.release(self.connection)
self.connection = None
self.health_check_response_counter = 0
Expand Down Expand Up @@ -814,7 +814,7 @@ def execute_command(self, *args):
)
# register a callback that re-subscribes to any channels we
# were listening to when we were disconnected
self.connection._register_connect_callback(self.on_connect)
self.connection.register_connect_callback(self.on_connect)
if self.push_handler_func is not None and not HIREDIS_AVAILABLE:
self.connection._parser.set_push_handler(self.push_handler_func)
connection = self.connection
Expand Down
2 changes: 1 addition & 1 deletion redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,7 @@ def execute_command(self, *args):
)
# register a callback that re-subscribes to any channels we
# were listening to when we were disconnected
self.connection._register_connect_callback(self.on_connect)
self.connection.register_connect_callback(self.on_connect)
if self.push_handler_func is not None and not HIREDIS_AVAILABLE:
self.connection._parser.set_push_handler(self.push_handler_func)
connection = self.connection
Expand Down
16 changes: 14 additions & 2 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,24 @@ def _construct_command_packer(self, packer):
else:
return PythonRespSerializer(self._buffer_cutoff, self.encoder.encode)

def _register_connect_callback(self, callback):
def register_connect_callback(self, callback):
"""
Register a callback to be called when the connection is established either
initially or reconnected. This allows listeners to issue commands that
are ephemeral to the connection, for example pub/sub subscription or
key tracking. The callback must be a _method_ and will be kept as
a weak reference.
"""
wm = weakref.WeakMethod(callback)
if wm not in self._connect_callbacks:
self._connect_callbacks.append(wm)

def _deregister_connect_callback(self, callback):
def deregister_connect_callback(self, callback):
"""
De-register a previously registered callback. It will no-longer receive
notifications on connection events. Calling this is not required when the
listener goes away, since the callbacks are kept as weak methods.
"""
try:
self._connect_callbacks.remove(weakref.WeakMethod(callback))
except ValueError:
Expand Down

0 comments on commit e0c6035

Please sign in to comment.