Skip to content

Commit

Permalink
Merge pull request #167 from Kakadus/2472redis-add-dynamic-startup-no…
Browse files Browse the repository at this point in the history
…des-flag-to-async-valkey-cluster

Add dynamic_startup_nodes parameter to async ValkeyCluster
  • Loading branch information
mkmkme authored Jan 13, 2025
2 parents e0151c1 + 4be949a commit bcd5e2c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
21 changes: 21 additions & 0 deletions tests/test_asyncio/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2598,6 +2598,27 @@ def cmd_init_mock(self, r: ClusterNode) -> None:
assert rc.get_node(host=default_host, port=7001) is not None
assert rc.get_node(host=default_host, port=7002) is not None

@pytest.mark.parametrize("dynamic_startup_nodes", [True, False])
async def test_init_slots_dynamic_startup_nodes(self, dynamic_startup_nodes):
rc = await get_mocked_valkey_client(
host="my@DNS.com",
port=7000,
cluster_slots=default_cluster_slots,
dynamic_startup_nodes=dynamic_startup_nodes,
)
# Nodes are taken from default_cluster_slots
discovered_nodes = [
"127.0.0.1:7000",
"127.0.0.1:7001",
"127.0.0.1:7002",
"127.0.0.1:7003",
]
startup_nodes = list(rc.nodes_manager.startup_nodes.keys())
if dynamic_startup_nodes is True:
assert startup_nodes.sort() == discovered_nodes.sort()
else:
assert startup_nodes == ["my@DNS.com:7000"]


class TestClusterPipeline:
"""Tests for the ClusterPipeline class."""
Expand Down
19 changes: 17 additions & 2 deletions valkey/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ class ValkeyCluster(AbstractValkey, AbstractValkeyCluster, AsyncValkeyClusterCom
| Enable read from replicas in READONLY mode. You can read possibly stale data.
When set to true, read commands will be assigned between the primary and
its replications in a Round-Robin manner.
:param dynamic_startup_nodes:
| Set the ValkeyCluster's startup nodes to all the discovered nodes.
If true (default value), the cluster's discovered nodes will be used to
determine the cluster nodes-slots mapping in the next topology refresh.
It will remove the initial passed startup nodes if their endpoints aren't
listed in the CLUSTER SLOTS output.
If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
specific IP addresses, it is best to set it to false.
:param reinitialize_steps:
| Specifies the number of MOVED errors that need to occur before reinitializing
the whole cluster topology. If a MOVED error occurs and the cluster does not
Expand Down Expand Up @@ -237,6 +245,7 @@ def __init__(
startup_nodes: Optional[List["ClusterNode"]] = None,
require_full_coverage: bool = True,
read_from_replicas: bool = False,
dynamic_startup_nodes: bool = True,
reinitialize_steps: int = 5,
cluster_error_retry_attempts: int = 3,
connection_error_retry_attempts: int = 3,
Expand Down Expand Up @@ -389,6 +398,7 @@ def __init__(
startup_nodes,
require_full_coverage,
kwargs,
dynamic_startup_nodes=dynamic_startup_nodes,
address_remap=address_remap,
)
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
Expand Down Expand Up @@ -1142,6 +1152,7 @@ class NodesManager:
"require_full_coverage",
"slots_cache",
"startup_nodes",
"_dynamic_startup_nodes",
"address_remap",
)

Expand All @@ -1150,11 +1161,13 @@ def __init__(
startup_nodes: List["ClusterNode"],
require_full_coverage: bool,
connection_kwargs: Dict[str, Any],
dynamic_startup_nodes: bool = True,
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
) -> None:
self.startup_nodes = {node.name: node for node in startup_nodes}
self.require_full_coverage = require_full_coverage
self.connection_kwargs = connection_kwargs
self._dynamic_startup_nodes = dynamic_startup_nodes
self.address_remap = address_remap

self.default_node: "ClusterNode" = None
Expand Down Expand Up @@ -1387,8 +1400,10 @@ async def initialize(self) -> None:
# Set the tmp variables to the real variables
self.slots_cache = tmp_slots
self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True)
# Populate the startup nodes with all discovered nodes
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)

if self._dynamic_startup_nodes:
# Populate the startup nodes with all discovered nodes
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)

# Set the default node
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
Expand Down

0 comments on commit bcd5e2c

Please sign in to comment.