Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scan iter bug: dev branch #2

Draft
wants to merge 44 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
4397ec4
fix scan iter command issued to different replicas
agnesnatasya Apr 26, 2024
3d5e674
add tests
agnesnatasya Apr 26, 2024
b9d2338
reorder
agnesnatasya Apr 27, 2024
3c1b372
remove ignore
agnesnatasya Apr 27, 2024
cadd357
lint and format
agnesnatasya Apr 27, 2024
e2aa893
better inline
agnesnatasya Apr 29, 2024
1758a9b
backward compatible typing
agnesnatasya Apr 29, 2024
fac2ecd
test inline docs
agnesnatasya May 10, 2024
6751a04
add tests for all scan iter family
agnesnatasya May 10, 2024
f667e34
lint
agnesnatasya May 10, 2024
847ea71
implement in sync client
agnesnatasya Jul 9, 2024
93a2b91
more features for ConnectionsINdexer
agnesnatasya Jul 14, 2024
2f3c887
add _same_addres methods for sentinels
agnesnatasya Jul 14, 2024
992d192
fix connect_to args
agnesnatasya Jul 14, 2024
8d6ca1f
fix tests
agnesnatasya Jul 14, 2024
b2d9d93
add self
agnesnatasya Jul 14, 2024
5838500
convert ConnectionsIndexer to list before indexing
agnesnatasya Jul 14, 2024
e101bdf
convert ConnectionsIndexer to list before indexing
agnesnatasya Jul 14, 2024
96222dd
fix typo
agnesnatasya Jul 14, 2024
3482723
fix
agnesnatasya Jul 14, 2024
ae1b09a
fix connect_to_address
agnesnatasya Jul 14, 2024
6886f71
cleanup in sync client
agnesnatasya Jul 16, 2024
52a1d58
rename kwargs to no underscore for consistency
agnesnatasya Jul 16, 2024
b37fa0c
add cleanup tests for pipeline
agnesnatasya Jul 16, 2024
2f9964e
remove test for pipeline
agnesnatasya Jul 16, 2024
f436f60
lints
agnesnatasya Jul 16, 2024
a2ed1ac
reformat
agnesnatasya Jul 16, 2024
a9f2160
def cleanup in base class
agnesnatasya Jul 16, 2024
6940526
fix some tests
agnesnatasya Jul 16, 2024
e8c7a8b
rename iter_req_id properly
agnesnatasya Jul 16, 2024
dda3b61
fix tests
agnesnatasya Jul 16, 2024
e98c770
set fix address as a property of SentinelManagedConnection
agnesnatasya Jul 19, 2024
e32df58
lint
agnesnatasya Jul 20, 2024
08d3428
make mock class have same behavior as actual class
agnesnatasya Jul 20, 2024
d1db9f6
define _connect_to_sentinel in async server
agnesnatasya Jul 20, 2024
4c59821
mock can_read_destructive for parser
agnesnatasya Jul 20, 2024
25777cf
skip test sentinel managed connection if hirediswq
agnesnatasya Jul 20, 2024
0ee1b85
undo ensure_connection deduplication in BlockingConnectionPool
agnesnatasya Jul 20, 2024
5edff2b
import HIREDIS
agnesnatasya Jul 20, 2024
8d9c735
polymorphism for reset available connections instead
agnesnatasya Jul 20, 2024
c6c7bf7
merge
agnesnatasya Jul 20, 2024
868b499
lint
agnesnatasya Jul 20, 2024
5e249fd
fix inline comments + rename
agnesnatasya Jul 20, 2024
8643185
lint
agnesnatasya Jul 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,10 @@ async def execute_command(self, *args, **options):
finally:
if not self.connection:
await pool.release(conn)
# Do additional cleanup if this is part of a SCAN ITER family command.
# It's possible that this is just a pure SCAN family command though.
if "SCAN" in command_name.upper():
pool.cleanup(iter_req_id=options.get("iter_req_id", None))

async def parse_response(
self, connection: Connection, command_name: Union[str, bytes], **options
Expand Down
18 changes: 16 additions & 2 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,14 @@ class ConnectionPool:
``connection_class``.
"""

def cleanup(self, **options):
"""
Additional cleanup operations that the connection pool might need to do.
See SentinelManagedConnection for an example cleanup operation that
might need to be done.
"""
pass

@classmethod
def from_url(cls: Type[_CP], url: str, **kwargs) -> _CP:
"""
Expand Down Expand Up @@ -1118,7 +1126,7 @@ def __init__(
self.connection_kwargs = connection_kwargs
self.max_connections = max_connections

self._available_connections: List[AbstractConnection] = []
self._available_connections = self.reset_available_connections()
self._in_use_connections: Set[AbstractConnection] = set()
self.encoder_class = self.connection_kwargs.get("encoder_class", Encoder)

Expand All @@ -1129,9 +1137,12 @@ def __repr__(self):
)

def reset(self):
self._available_connections = []
self._available_connections = self.reset_available_connections()
self._in_use_connections = weakref.WeakSet()

def reset_available_connections(self):
return []

def can_get_connection(self) -> bool:
"""Return True if a connection can be retrieved from the pool."""
return (
Expand Down Expand Up @@ -1324,3 +1335,6 @@ async def release(self, connection: AbstractConnection):
async with self._condition:
await super().release(connection)
self._condition.notify()

def cleanup(self, **options):
pass
111 changes: 110 additions & 1 deletion redis/asyncio/sentinel.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import asyncio
import random
import weakref
from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type
from typing import (
Any,
AsyncIterator,
Iterable,
Mapping,
Optional,
Sequence,
Tuple,
Type,
)

from redis.asyncio.client import Redis
from redis.asyncio.connection import (
Expand All @@ -12,6 +21,7 @@
)
from redis.commands import AsyncSentinelCommands
from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
from redis.sentinel import ConnectionsIndexer
from redis.utils import str_if_bytes


Expand All @@ -26,6 +36,10 @@ class SlaveNotFoundError(ConnectionError):
class SentinelManagedConnection(Connection):
def __init__(self, **kwargs):
self.connection_pool = kwargs.pop("connection_pool")
# To be set to True if we want to prevent
# the connection to connect to the most relevant sentinel
# in the pool and just connect to the current host and port
self._is_address_set = False
super().__init__(**kwargs)

def __repr__(self):
Expand All @@ -39,6 +53,14 @@ def __repr__(self):
s += host_info
return s + ")>"

def set_address(self, address):
"""
By setting the address, the connection will just connect
to the current host and port the next time connect is called.
"""
self.host, self.port = address
self._is_address_set = True

async def connect_to(self, address):
self.host, self.port = address
await super().connect()
Expand All @@ -50,6 +72,14 @@ async def connect_to(self, address):
async def _connect_retry(self):
if self._reader:
return # already connected
# If address is fixed, it means that the connection
# just connect to the current host and port
if self._is_address_set:
await self.connect_to((self.host, self.port))
return
await self._connect_to_sentinel()

async def _connect_to_sentinel(self):
if self.connection_pool.is_master:
await self.connect_to(await self.connection_pool.get_master_address())
else:
Expand Down Expand Up @@ -122,6 +152,7 @@ def __init__(self, service_name, sentinel_manager, **kwargs):
self.sentinel_manager = sentinel_manager
self.master_address = None
self.slave_rr_counter = None
self._iter_req_id_to_replica_address = {}

def __repr__(self):
return (
Expand All @@ -134,6 +165,9 @@ def reset(self):
self.master_address = None
self.slave_rr_counter = None

def reset_available_connections(self):
return ConnectionsIndexer()

def owns_connection(self, connection: Connection):
check = not self.is_master or (
self.is_master and self.master_address == (connection.host, connection.port)
Expand Down Expand Up @@ -167,6 +201,81 @@ async def rotate_slaves(self) -> AsyncIterator:
pass
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")

def cleanup(self, **options):
"""
Remove the SCAN ITER family command's request id from the dictionary
"""
self._iter_req_id_to_replica_address.pop(options.get("iter_req_id", None), None)

async def get_connection(
self, command_name: str, *keys: Any, **options: Any
) -> SentinelManagedConnection:
"""
Get a connection from the pool.
'xxxscan_iter' ('scan_iter', 'hscan_iter', 'sscan_iter', 'zscan_iter')
commands needs to be handled specially.
If the client is created using a connection pool, in replica mode,
all 'scan' command-equivalent of the 'xxx_scan_iter' commands needs
to be issued to the same Redis replica.

The way each server positions each key is different with one another,
and the cursor acts as the offset of the scan.
Hence, all scans coming from a single 'xxx_scan_iter_channel' command
should go to the same replica.
"""
# If not an iter command or in master mode, call superclass' implementation
if not (iter_req_id := options.get("iter_req_id", None)) or self.is_master:
return await super().get_connection(command_name, *keys, **options)

# Check if this iter request has already been directed to a particular server
(
server_host,
server_port,
) = self._iter_req_id_to_replica_address.get(iter_req_id, (None, None))
connection = None
# If this is the first scan request of the iter command,
# get a connection from the pool
if server_host is None or server_port is None:
try:
connection = self._available_connections.pop()
except IndexError:
connection = self.make_connection()
# If this is not the first scan request of the iter command
else:
# Get the connection that has the same host and port
connection = self._available_connections.get_connection(
host=server_host, port=server_port
)
# If not, make a new dummy connection object, and set its host and
# port to the one that we want later in the call to ``set_address``
if not connection:
connection = self.make_connection()
assert connection
self._in_use_connections.add(connection)
try:
# Ensure this connection is connected to Redis
# If this is the first scan request, it will
# call rotate_slaves and connect to a random replica
if server_port is None or server_port is None:
await connection.connect()
# If this is not the first scan request,
# connect to the previous replica.
# This will connect to the host and port of the replica
else:
connection.set_address((server_host, server_port))
await self.ensure_connection(connection)
except BaseException:
# Release the connection back to the pool so that we don't
# leak it
await self.release(connection)
raise
# Store the connection to the dictionary
self._iter_req_id_to_replica_address[iter_req_id] = (
connection.host,
connection.port,
)
return connection


class Sentinel(AsyncSentinelCommands):
"""
Expand Down
4 changes: 4 additions & 0 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,10 @@ def execute_command(self, *args, **options):
finally:
if not self.connection:
pool.release(conn)
# Do additional cleanup if this is part of a SCAN ITER family command.
# It's possible that this is just a pure SCAN family command though.
if "SCAN" in command_name.upper():
pool.cleanup(iter_req_id=options.get("iter_req_id", None))

def parse_response(self, connection, command_name, **options):
"""Parses a response from the Redis server"""
Expand Down
Loading
Loading