Skip to content

Commit

Permalink
chore: monotonically increasing ports for cluster tests (#4268)
Browse files Browse the repository at this point in the history
We have cascading failures in cluster tests because on assertion failures the nodes are not properly cleaned up and subsequent test cases that use the same ports fail. I added a monotonically increasing port generator to mitigate this effect.
  • Loading branch information
kostasrim authored Dec 6, 2024
1 parent 63ccbbc commit f9f93b1
Showing 1 changed file with 56 additions and 47 deletions.
103 changes: 56 additions & 47 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@
BASE_PORT = 30001


def monotonically_increasing_port_number():
port = BASE_PORT
while True:
yield port
port = port + 1


# Create a generator object
next_port = monotonically_increasing_port_number()


class RedisClusterNode:
def __init__(self, port):
self.port = port
Expand Down Expand Up @@ -279,8 +290,8 @@ def is_local_host(ip: str) -> bool:
# are hidden from users, see https://github.com/dragonflydb/dragonfly/issues/4173
@dfly_args({"proactor_threads": 4, "cluster_mode": "emulated", "managed_service_info": "true"})
async def test_emulated_cluster_with_replicas(df_factory):
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000)
replicas = [df_factory.create(port=BASE_PORT + i, logtostdout=True) for i in range(1, 3)]
master = df_factory.create(port=next(next_port), admin_port=next(next_port))
replicas = [df_factory.create(port=next(next_port), logtostdout=True) for i in range(1, 3)]

df_factory.start_all([master, *replicas])

Expand Down Expand Up @@ -379,8 +390,8 @@ async def test_emulated_cluster_with_replicas(df_factory):

@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_managed_service_info(df_factory):
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 100)
replica = df_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 101)
master = df_factory.create(port=next(next_port), admin_port=next(next_port))
replica = df_factory.create(port=next(next_port), admin_port=next(next_port))

df_factory.start_all([master, replica])

Expand Down Expand Up @@ -561,7 +572,7 @@ async def test_cluster_nodes(df_server, async_client):

@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "cluster_node_id": "inigo montoya"})
async def test_cluster_node_id(df_factory: DflyInstanceFactory):
node = df_factory.create(port=BASE_PORT)
node = df_factory.create(port=next(next_port))
df_factory.start_all([node])

conn = node.client()
Expand All @@ -571,9 +582,7 @@ async def test_cluster_node_id(df_factory: DflyInstanceFactory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
# Start and configure cluster with 2 nodes
nodes = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]
nodes = [df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)]

df_factory.start_all(nodes)

Expand Down Expand Up @@ -640,7 +649,7 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
await c_nodes[1].set("KEY1", "value")
assert False, "Should not be able to set key on non-owner cluster node"
except redis.exceptions.ResponseError as e:
assert e.args[0] == "MOVED 5259 localhost:30001"
assert e.args[0] == f"MOVED 5259 localhost:{nodes[0].port}"

# And that node1 only has 1 key ("KEY2")
assert await c_nodes[1].execute_command("DBSIZE") == 1
Expand All @@ -664,7 +673,7 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
await c_nodes[0].set("KEY1", "value")
assert False, "Should not be able to set key on non-owner cluster node"
except redis.exceptions.ResponseError as e:
assert e.args[0] == "MOVED 5259 localhost:30002"
assert e.args[0] == f"MOVED 5259 localhost:{nodes[1].port}"

# And node1 should own it and allow using it
assert await c_nodes[1].set("KEY1", "value")
Expand Down Expand Up @@ -699,8 +708,8 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFactory):
# Start and configure cluster with 1 master and 1 replica, both own all slots
master = df_factory.create(admin_port=BASE_PORT + 1000)
replica = df_factory.create(admin_port=BASE_PORT + 1001)
master = df_factory.create(admin_port=next(next_port))
replica = df_factory.create(admin_port=next(next_port))
df_factory.start_all([master, replica])

async with master.client() as c_master, master.admin_client() as c_master_admin, replica.client() as c_replica, replica.admin_client() as c_replica_admin:
Expand Down Expand Up @@ -807,8 +816,8 @@ async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFacto
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_flush_slots_after_config_change(df_factory: DflyInstanceFactory):
# Start and configure cluster with 1 master and 1 replica, both own all slots
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000)
replica = df_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 1001)
master = df_factory.create(port=next(next_port), admin_port=next(next_port))
replica = df_factory.create(port=next(next_port), admin_port=next(next_port))
df_factory.start_all([master, replica])

c_master = master.client()
Expand Down Expand Up @@ -958,7 +967,7 @@ async def test_cluster_blocking_command(df_server):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_blocking_commands_cancel(df_factory, df_seeder_factory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]

df_factory.start_all(instances)
Expand Down Expand Up @@ -987,11 +996,11 @@ async def test_blocking_commands_cancel(df_factory, df_seeder_factory):

with pytest.raises(aioredis.ResponseError) as set_e_info:
await set_task
assert "MOVED 3037 127.0.0.1:30002" == str(set_e_info.value)
assert f"MOVED 3037 127.0.0.1:{instances[1].port}" == str(set_e_info.value)

with pytest.raises(aioredis.ResponseError) as list_e_info:
await list_task
assert "MOVED 7141 127.0.0.1:30002" == str(list_e_info.value)
assert f"MOVED 7141 127.0.0.1:{instances[1].port}" == str(list_e_info.value)


@pytest.mark.parametrize("set_cluster_node_id", [True, False])
Expand All @@ -1004,8 +1013,8 @@ async def test_cluster_native_client(
# Start and configure cluster with 3 masters and 3 replicas
masters = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
cluster_node_id=f"master{i}" if set_cluster_node_id else "",
)
for i in range(3)
Expand All @@ -1017,10 +1026,10 @@ async def test_cluster_native_client(

replicas = [
df_factory.create(
port=BASE_PORT + 100 + i,
admin_port=BASE_PORT + i + 1100,
port=next(next_port),
admin_port=next(next_port),
cluster_node_id=f"replica{i}" if set_cluster_node_id else "",
replicaof=f"localhost:{BASE_PORT + i}",
replicaof=f"localhost:{masters[i].port}",
)
for i in range(3)
]
Expand Down Expand Up @@ -1195,7 +1204,7 @@ async def test_random_keys():
async def test_config_consistency(df_factory: DflyInstanceFactory):
# Check slot migration from one node to another
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]

df_factory.start_all(instances)
Expand Down Expand Up @@ -1245,8 +1254,8 @@ async def test_cluster_flushall_during_migration(
# Check data migration from one node to another
instances = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
vmodule="cluster_family=9,outgoing_slot_migration=9,incoming_slot_migration=9",
logtostdout=True,
)
Expand Down Expand Up @@ -1298,8 +1307,8 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt
# Check data migration from one node to another
instances = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=9",
)
for i in range(2)
Expand Down Expand Up @@ -1378,7 +1387,7 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes", "cache_mode": "true"})
async def test_migration_with_key_ttl(df_factory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]

df_factory.start_all(instances)
Expand Down Expand Up @@ -1427,7 +1436,7 @@ async def test_migration_with_key_ttl(df_factory):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "serialization_max_chunk_size": 0})
async def test_network_disconnect_during_migration(df_factory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]

df_factory.start_all(instances)
Expand Down Expand Up @@ -1496,8 +1505,8 @@ async def test_cluster_fuzzymigration(
):
instances = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
serialization_max_chunk_size=huge_values,
replication_stream_output_limit=10,
Expand Down Expand Up @@ -1632,7 +1641,7 @@ async def test_all_finished():
async def test_cluster_config_reapply(df_factory: DflyInstanceFactory):
"""Check data migration from one node to another."""
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)

Expand Down Expand Up @@ -1690,7 +1699,7 @@ async def test_cluster_replication_migration(
and make sure the captures on the replicas are equal.
"""
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(4)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(4)
]
df_factory.start_all(instances)

Expand Down Expand Up @@ -1767,7 +1776,7 @@ async def test_start_replication_during_migration(
in the end master_1 and replica_1 should have the same data
"""
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(3)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(3)
]
df_factory.start_all(instances)

Expand Down Expand Up @@ -1834,7 +1843,7 @@ async def test_snapshoting_during_migration(
The result should be the same: snapshot contains all the data that existed before migration
"""
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)

Expand Down Expand Up @@ -1904,7 +1913,7 @@ async def start_save():
async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory):
"""Check data migration from one node to another."""
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)

Expand Down Expand Up @@ -1965,7 +1974,7 @@ async def node1size0():
@pytest.mark.asyncio
async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]
df_factory.start_all(instances)

Expand Down Expand Up @@ -2027,9 +2036,9 @@ async def test_replicate_cluster(df_factory: DflyInstanceFactory, df_seeder_fact
Send traffic before replication start and while replicating.
Promote the replica to master and check data consistency between cluster and single node.
"""
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")
cluster_nodes = [
df_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2)
df_factory.create(admin_port=next(next_port), cluster_mode="yes") for i in range(2)
]

# Start instances and connect clients
Expand Down Expand Up @@ -2114,9 +2123,9 @@ async def test_replicate_disconnect_cluster(df_factory: DflyInstanceFactory, df_
Promote replica to master
Compare cluster data and replica data
"""
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")
cluster_nodes = [
df_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2)
df_factory.create(admin_port=next(next_port), cluster_mode="yes") for i in range(2)
]

# Start instances and connect clients
Expand Down Expand Up @@ -2228,7 +2237,7 @@ async def test_replicate_redis_cluster(redis_cluster, df_factory, df_seeder_fact
Send traffic before replication start and while replicating.
Promote the replica to master and check data consistency between cluster and single dragonfly node.
"""
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")

# Start instances and connect clients
df_factory.start_all([replica])
Expand Down Expand Up @@ -2286,7 +2295,7 @@ async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_
Send more traffic
Promote the replica to master and check data consistency between cluster and single dragonfly node.
"""
replica = df_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
replica = df_factory.create(admin_port=next(next_port), cluster_mode="emulated")

# Start instances and connect clients
df_factory.start_all([replica])
Expand Down Expand Up @@ -2371,8 +2380,8 @@ async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFact
instances = [
df_factory.create(
maxmemory="15G",
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
vmodule="streamer=9",
)
for i in range(3)
Expand Down Expand Up @@ -2429,8 +2438,8 @@ async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_see
# Timeout set to 3 seconds because we must first saturate the socket before we get the timeout
instances = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
port=next(next_port),
admin_port=next(next_port),
replication_timeout=3000,
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=2",
)
Expand Down

0 comments on commit f9f93b1

Please sign in to comment.