Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pytests overhaul #569

Merged
merged 16 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions tests/dragonfly/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import pytest
import typing
import time
import subprocess

Expand All @@ -8,6 +7,9 @@

from dataclasses import dataclass

START_DELAY = 0.4
START_GDB_DELAY = 3.0


@dataclass
class DflyParams:
Expand All @@ -29,24 +31,13 @@ def __init__(self, params: DflyParams, args):
self.proc = None

def start(self):
arglist = DflyInstance.format_args(self.args)

print(f"Starting instance on {self.port} with arguments {arglist}")

args = [self.params.path, *arglist]
if self.params.gdb:
args = ["gdb", "--ex", "r", "--args"] + args

self.proc = subprocess.Popen(args, cwd=self.params.cwd)
self._start()

# Give Dragonfly time to start and detect possible failure causes
# Gdb starts slowly
time.sleep(0.4 if not self.params.gdb else 3.0)
time.sleep(START_DELAY if not self.params.gdb else START_GDB_DELAY)

return_code = self.proc.poll()
if return_code is not None:
raise Exception(
f"Failed to start instance, return code {return_code}")
self._check_started()

def stop(self, kill=False):
proc, self.proc = self.proc, None
Expand All @@ -59,11 +50,26 @@ def stop(self, kill=False):
proc.kill()
else:
proc.terminate()
outs, errs = proc.communicate(timeout=15)
proc.communicate(timeout=15)
except subprocess.TimeoutExpired:
print("Unable to terminate DragonflyDB gracefully, it was killed")
outs, errs = proc.communicate()
print(outs, errs)
proc.kill()

def _start(self):
arglist = DflyInstance.format_args(self.args)
print(f"Starting instance on {self.port} with arguments {arglist}")

args = [self.params.path, *arglist]
if self.params.gdb:
args = ["gdb", "--ex", "r", "--args"] + args

self.proc = subprocess.Popen(args, cwd=self.params.cwd)

def _check_started(self):
return_code = self.proc.poll()
if return_code is not None:
raise Exception(
f"Failed to start instance, return code {return_code}")

def __getitem__(self, k):
return self.args.get(k)
Expand Down Expand Up @@ -99,6 +105,16 @@ def create(self, **kwargs) -> DflyInstance:
self.instances.append(instance)
return instance

def start_all(self, instances):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use python test containers and not run them as sub processes (if I understand correctly, this is what it does here, spins DFs sub processes). They reject my PR for supporting DF, but you can start a DF container with the right parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inconvenient (increased memory requirements) and requires re-building a container just for testing some change

for instance in instances:
instance._start()

delay = START_DELAY if not self.params.gdb else START_GDB_DELAY
time.sleep(delay * (1 + len(instances) / 2))

for instance in instances:
instance._check_started()

def stop_all(self):
"""Stop all lanched instances."""
for instance in self.instances:
Expand Down
151 changes: 76 additions & 75 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
import asyncio
import aioredis
import random
from itertools import count, chain, repeat
from itertools import chain, repeat

from .utility import *
from . import dfly_args


BASE_PORT = 1111
Expand All @@ -21,73 +20,77 @@

# 1. Number of master threads
# 2. Number of threads for each replica
# 3. Number of keys stored and sent in full sync
# 4. Number of keys overwritten during full sync
# 3. Seeder config
replication_cases = [
(8, [8], 20000, 5000),
(8, [8], 10000, 10000),
(8, [2, 2, 2, 2], 20000, 5000),
(6, [6, 6, 6], 30000, 15000),
(4, [1] * 12, 10000, 4000),
(8, [8], dict(keys=10_000, dbcount=4)),
(6, [6, 6, 6], dict(keys=4_000, dbcount=4)),
(8, [2, 2, 2, 2], dict(keys=4_000, dbcount=4)),
(4, [8, 8], dict(keys=4_000, dbcount=4)),
(4, [1] * 10, dict(keys=500, dbcount=2)),
]


@pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_replicas, n_keys, n_stream_keys", replication_cases)
async def test_replication_all(df_local_factory, t_master, t_replicas, n_keys, n_stream_keys):
@pytest.mark.parametrize("t_master, t_replicas, seeder_config", replication_cases)
async def test_replication_all(df_local_factory, t_master, t_replicas, seeder_config):
master = df_local_factory.create(port=1111, proactor_threads=t_master)
replicas = [
df_local_factory.create(port=BASE_PORT+i+1, proactor_threads=t)
for i, t in enumerate(t_replicas)
]

# Start master and fill with test data
# Start master
master.start()
c_master = aioredis.Redis(port=master.port)
await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=1))

# Fill master with test data
seeder = DflySeeder(port=master.port, **seeder_config)
await seeder.run(target_deviation=0.1)

# Start replicas
for replica in replicas:
replica.start()
df_local_factory.start_all(replicas)

c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]

async def stream_data():
""" Stream data during stable state replication phase and afterwards """
gen = gen_test_data(n_stream_keys, seed=2)
for chunk in grouper(3, gen):
await c_master.mset({k: v for k, v in chunk})
# Start data stream
stream_task = asyncio.create_task(seeder.run(target_times=3))

# Start replication
async def run_replication(c_replica):
await c_replica.execute_command("REPLICAOF localhost " + str(master.port))

async def check_replication(c_replica):
""" Check that static and streamed data arrived """
await wait_available_async(c_replica)
# Check range [n_stream_keys, n_keys] is of seed 1
await batch_check_data_async(c_replica, gen_test_data(n_keys, start=n_stream_keys, seed=1))
# Check range [0, n_stream_keys] is of seed 2
await asyncio.sleep(0.2)
await batch_check_data_async(c_replica, gen_test_data(n_stream_keys, seed=2))

# Start streaming data and run REPLICAOF in parallel
stream_fut = asyncio.create_task(stream_data())
await asyncio.gather(*(asyncio.create_task(run_replication(c))
for c in c_replicas))

assert not stream_fut.done(
), "Weak testcase. Increase number of streamed keys to surpass full sync"
await stream_fut
# Wait for streaming to finish
assert not stream_task.done(
), "Weak testcase. Increase number of streamed iterations to surpass full sync"
await stream_task

# Check full sync results
await asyncio.gather(*(check_replication(c) for c in c_replicas))
# Check data after full sync
await asyncio.sleep(0.1)
await check_data(seeder, replicas, c_replicas)

# Check stable state streaming
await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=3))
# Stream more data in stable state
await seeder.run(target_times=2)

# Check data after stable state stream
await asyncio.sleep(0.5)
await asyncio.gather(*(batch_check_data_async(c, gen_test_data(n_keys, seed=3))
for c in c_replicas))
await check_data(seeder, replicas, c_replicas)

# Issue lots of deletes
seeder.target(100)
await seeder.run(target_deviation=0.1)

# Check data after deletes
await asyncio.sleep(0.5)
await check_data(seeder, replicas, c_replicas)


async def check_data(seeder, replicas, c_replicas):
capture = await seeder.capture()
for (replica, c_replica) in zip(replicas, c_replicas):
await wait_available_async(c_replica)
assert await seeder.compare(capture, port=replica.port)


"""
Expand All @@ -109,16 +112,14 @@ async def check_replication(c_replica):
# 5. Number of distinct keys that are constantly streamed
disconnect_cases = [
# balanced
(8, [4, 4], [4, 4], [4], 10000),
(8, [2] * 6, [2] * 6, [2, 2], 10000),
(8, [4, 4], [4, 4], [4], 4_000),
(8, [2] * 6, [2] * 6, [2, 2], 2_000),
# full sync heavy
(8, [4] * 6, [], [], 10000),
(8, [2] * 12, [], [], 10000),
(8, [4] * 6, [], [], 4_000),
# stable state heavy
(8, [], [4] * 6, [], 10000),
(8, [], [2] * 12, [], 10000),
(8, [], [4] * 6, [], 4_000),
# disconnect only
(8, [], [], [2] * 6, 10000)
(8, [], [], [2] * 6, 4_000)
]


Expand All @@ -142,9 +143,10 @@ async def test_disconnect_replica(df_local_factory, t_master, t_crash_fs, t_cras
master.start()
c_master = aioredis.Redis(port=master.port, single_connection_client=True)

seeder = DflySeeder(port=master.port, keys=n_keys, dbcount=2)

# Start replicas and create clients
for replica, _ in replicas:
replica.start()
df_local_factory.start_all([replica for replica, _ in replicas])

c_replicas = [
(replica, aioredis.Redis(port=replica.port), crash_type)
Expand All @@ -158,13 +160,7 @@ def replicas_of_type(tfunc):
]

# Start data fill loop
async def fill_loop():
local_c = aioredis.Redis(
port=master.port, single_connection_client=True)
for seed in count(1):
await batch_fill_data_async(local_c, gen_test_data(n_keys, seed=seed))

fill_task = asyncio.create_task(fill_loop())
fill_task = asyncio.create_task(seeder.run())

# Run full sync
async def full_sync(replica, c_replica, crash_type):
Expand Down Expand Up @@ -204,18 +200,19 @@ async def stable_sync(replica, c_replica, crash_type):
assert await c_replica.ping()

# Stop streaming
fill_task.cancel()
seeder.stop()
await fill_task

# Check master survived all crashes
assert await c_master.ping()

# Check phase 3 replicas are up-to-date and there is no gap or lag
def check_gen(): return gen_test_data(n_keys//5, seed=0)

await batch_fill_data_async(c_master, check_gen())
await seeder.run(target_times=2)
await asyncio.sleep(1.0)
for _, c_replica, _ in replicas_of_type(lambda t: t > 1):
await batch_check_data_async(c_replica, check_gen())

capture = await seeder.capture()
for replica, _, _ in replicas_of_type(lambda t: t > 1):
assert await seeder.compare(capture, port=replica.port)

# Check disconnects
async def disconnect(replica, c_replica, crash_type):
Expand All @@ -228,9 +225,9 @@ async def disconnect(replica, c_replica, crash_type):
await asyncio.sleep(0.5)

# Check phase 3 replica survived
for _, c_replica, _ in replicas_of_type(lambda t: t == 2):
for replica, c_replica, _ in replicas_of_type(lambda t: t == 2):
assert await c_replica.ping()
await batch_check_data_async(c_replica, check_gen())
assert await seeder.compare(capture, port=replica.port)

# Check master survived all disconnects
assert await c_master.ping()
Expand All @@ -254,10 +251,10 @@ async def disconnect(replica, c_replica, crash_type):
# 3. Number of times a random crash happens
# 4. Number of keys transferred (the more, the higher the propability to not miss full sync)
master_crash_cases = [
(4, [4], 3, 1000),
(8, [8], 3, 5000),
(6, [6, 6, 6], 3, 5000),
(4, [2] * 8, 3, 5000),
(4, [4], 3, 2_000),
(8, [8], 3, 2_000),
(6, [6, 6, 6], 3, 2_000),
(4, [2] * 8, 3, 2_000),
]


Expand All @@ -271,8 +268,9 @@ async def test_disconnect_master(df_local_factory, t_master, t_replicas, n_rando
for i, t in enumerate(t_replicas)
]

for replica in replicas:
replica.start()
seeder = DflySeeder(port=master.port, keys=n_keys, dbcount=2)

df_local_factory.start_all(replicas)

c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]

Expand All @@ -292,7 +290,8 @@ async def start_master():
master.start()
c_master = aioredis.Redis(port=master.port)
assert await c_master.ping()
await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=0))
seeder.reset()
await seeder.run(target_deviation=0.1)

await start_master()

Expand All @@ -307,15 +306,17 @@ async def start_master():

await start_master()
await asyncio.sleep(1 + len(replicas) * 0.5) # Replicas check every 500ms.
for c_replica in c_replicas:
capture = await seeder.capture()
for replica, c_replica in zip(replicas, c_replicas):
await wait_available_async(c_replica)
await batch_check_data_async(c_replica, gen_test_data(n_keys, seed=0))
assert await seeder.compare(capture, port=replica.port)

# Crash master during stable state
master.stop(kill=True)

await start_master()
await asyncio.sleep(1 + len(replicas) * 0.5)
capture = await seeder.capture()
for c_replica in c_replicas:
await wait_available_async(c_replica)
await batch_check_data_async(c_replica, gen_test_data(n_keys, seed=0))
assert await seeder.compare(capture, port=replica.port)
Loading