From 9771d0472956c483b94f9cbf2fc741f1af5aed6d Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Fri, 23 Dec 2022 22:07:54 +0300 Subject: [PATCH] feat(testing): Generator improvements Signed-off-by: Vladislav Oleshko --- tests/dragonfly/generator.py | 301 +++++++++++++++++++--------- tests/dragonfly/replication_test.py | 121 ++++++----- tests/dragonfly/snapshot_test.py | 40 ++-- tests/dragonfly/utility.py | 40 ---- 4 files changed, 295 insertions(+), 207 deletions(-) diff --git a/tests/dragonfly/generator.py b/tests/dragonfly/generator.py index d344c1f58099..2020184374e2 100644 --- a/tests/dragonfly/generator.py +++ b/tests/dragonfly/generator.py @@ -4,13 +4,17 @@ import string import itertools import time -from collections import deque +import difflib +import sys from enum import Enum +from .utility import grouper -from utility import grouper, batch_fill_data_async, gen_test_data +def eprint(*args, **kwargs): + print(*args, file=sys.stderr, **kwargs) -class Action(Enum): + +class SizeChange(Enum): SHRINK = 0 NO_CHANGE = 1 GROW = 2 @@ -20,136 +24,186 @@ class ValueType(Enum): STRING = 0 LIST = 1 SET = 2 + HSET = 3 class TestingGenerator: - def __init__(self, val_size, batch_size, target_keys, max_multikey): + """Class for generating complex command sequences""" + + def __init__(self, target_keys, val_size, batch_size, max_multikey): + self.key_cnt_target = target_keys self.val_size = val_size self.batch_size = batch_size - self.key_cnt_target = target_keys self.max_multikey = max_multikey + # Key management self.key_sets = [set() for _ in ValueType] self.key_cursor = 0 self.key_cnt = 0 - self.str_filler = ''.join(random.choices( + # Random string for MSET + self.str_filler = lambda: ''.join(random.choices( string.ascii_letters, k=val_size)) - self.list_filler = ' '.join( + # Random sequence of single letter elements for LPUSH + self.list_filler = lambda: ' '.join( random.choices(string.ascii_letters, k=val_size//2)) - self.set_filler = ' '.join( + # Random sequence of 3 letter elements for SADD + self.set_filler = lambda: ' '.join( (''.join(random.choices(string.ascii_letters, k=3)) for _ in range(val_size//4)) ) - def action_weigts(self): - diff = self.key_cnt_target - self.key_cnt - relative_diff = diff / self.key_cnt_target - return [ - max(0.1 - 4 * relative_diff, 0.05), - 1.0, - max(0.15 + 8 * relative_diff, 0.05) - ] - - def set_of_type(self, t: ValueType): - return self.key_sets[t.value] + self.hset_filler = lambda: 'v0 0 v1 0 ' + ' '.join( + (''.join(random.choices(string.ascii_letters, k=3)) + + ' ' + str(random.randint(0, 100)) + for _ in range(val_size//5)) + ) def keys(self): return itertools.chain(*self.key_sets) + def keys_and_types(self): + out = [] + for t in list(ValueType): + out.extend((k, t) for k in self.set_for_type(t)) + return out + + def set_for_type(self, t: ValueType): + """Get backing keyset for value type""" + + return self.key_sets[t.value] + def add_key(self, t: ValueType): - k = self.key_cursor - self.key_cursor += 1 - self.set_of_type(t).add(k) + k, self.key_cursor = self.key_cursor, self.key_cursor + 1 + self.set_for_type(t).add(k) return k def randomize_type(self): - return random.choice(list(ValueType)) + types = [t for t in ValueType if t != ValueType.SET] + return random.choice(types) def randomize_nonempty_set(self): if not any(self.key_sets): return None, None t = self.randomize_type() - s = self.set_of_type(t) + s = self.set_for_type(t) if len(s) == 0: return self.randomize_nonempty_set() else: return s, t - def randomize_key(self, t=None, pop=False): + def randomize_key(self, t=None, pop=False) -> str: if t is None: s, t = self.randomize_nonempty_set() else: - s = self.set_of_type(t) + s = self.set_for_type(t) if s is None or len(s) == 0: - return None + return None, None k = s.pop() if not pop: s.add(k) - return k + return k, t def make_shrink(self): + # Simulate shrinking by deleting a few random keys keys_gen = (self.randomize_key(pop=True) for _ in range(random.randint(1, self.max_multikey))) - keys = [str(k) for k in keys_gen if k is not None] + keys = [str(k) for k, _ in keys_gen if k is not None] + if len(keys) == 0: return None, 0 return "DEL " + " ".join(keys), -len(keys) def make_no_change(self): + # Simulate no changes to memory size by issuing one of the following commands: NO_CHANGE_ACTIONS = [ ('APPEND {k} {val}', ValueType.STRING), ('SETRANGE {k} 10 {val}', ValueType.STRING), ('LPUSH {k} {val}', ValueType.LIST), ('LPOP {k}', ValueType.LIST), ('SADD {k} {val}', ValueType.SET), - ('SPOP {k}', ValueType.SET) + ('SPOP {k}', ValueType.SET), + ('HSETNX {k} v0 {val}', ValueType.HSET), + ('HINCRBY {k} v1 1', ValueType.HSET) ] cmd, t = random.choice(NO_CHANGE_ACTIONS) - k = self.randomize_key(t) + k, _ = self.randomize_key(t) val = ''.join(random.choices(string.ascii_letters, k=4)) return cmd.format(k=str(k), val=val) if k is not None else None, 0 def make_grow(self): + # Simulate growing memory usage by using MSET, LPUSH or SADD + # with large values or COPY + + # TODO: Implement COPY in Dragonfly. + # 50% chance of copy + # if self.key_cnt > 0 and random.random() < 0.5: + # k_old, t_old = self.randomize_key() + # if k_old is not None: + # k_new = self.add_key(t_old) + # return f"COPY {k_old} {k_new}", 1 + + # Euqal chances for all types t = self.randomize_type() if t == ValueType.STRING: keys = (self.add_key(t) for _ in range(random.randint(1, self.max_multikey))) - pairs = [str(k) + " " + self.str_filler for k in keys] + pairs = [str(k) + " " + self.str_filler() for k in keys] return "MSET " + " ".join(pairs), len(pairs) elif t == ValueType.LIST: k = self.add_key(t) - return f"LPUSH {k} {self.list_filler}", 1 + return f"LPUSH {k} {self.list_filler()}", 1 + elif t == ValueType.SET: + k = self.add_key(t) + return f"SADD {k} {self.set_filler()}", 1 else: k = self.add_key(t) - return f"SADD {k} {self.set_filler}", 1 + return f"HMSET {k} {self.hset_filler()}", 1 def make(self, action): - if action == Action.SHRINK: + """ Create command for action and return it together with number of keys added (removed)""" + + if action == SizeChange.SHRINK: return self.make_shrink() - elif action == Action.NO_CHANGE: + elif action == SizeChange.NO_CHANGE: return self.make_no_change() else: return self.make_grow() - def generate(self): - actions = [] + def reset(self): + self.key_sets = [set() for _ in ValueType] + self.key_cursor = 0 + self.key_cnt = 0 + def size_change_probs(self): + """Calculate probabilities of size change actions""" + # Relative distance to key target + dist = (self.key_cnt_target - self.key_cnt) / self.key_cnt_target + return [ + max(0.15 - 4 * dist, 0.05), + 1.0, + max(0.15 + 8 * dist, 0.05) + ] + + def generate(self): + """Generate next batch of commands, return it and ratio of current keys to target""" + changes = [] cmds = [] while len(cmds) < self.batch_size: - if len(actions) == 0: - actions = random.choices( - list(Action), weights=self.action_weigts(), k=50) + # Re-calculating changes in small groups increases performance + if len(changes) == 0: + changes = random.choices( + list(SizeChange), weights=self.size_change_probs(), k=50) - cmd, delta = self.make(actions.pop()) + cmd, delta = self.make(changes.pop()) if cmd is not None: cmds.append(cmd) self.key_cnt += delta @@ -157,26 +211,54 @@ def generate(self): return cmds, self.key_cnt/self.key_cnt_target -class TestingExecutor: - def __init__(self, pool, target_bytes, dbcount): - self.pool = pool +class DataCapture: + def __init__(self, entries): + self.entries = entries + + def compare(self, other): + if self.entries == other.entries: + return True + + self._print_diff(other) + return False - max_multikey = 5 - batch_size = 1_000 - val_size = 50 - target_keys = 500_000 + def _print_diff(self, other): + eprint("=== DIFF ===") + printed = 0 + diff = difflib.ndiff(self.entries, other.entries) + for line in diff: + if line.startswith(' '): + if printed >= 10: + eprint("... omitted ...") + break + continue + eprint(line) + printed += 1 + eprint("=== END DIFF ===") - print(target_keys * dbcount) +class DflySeeder: + """Data seeder that supports strings, lists and sets""" + + def __init__(self, port=6379, keys=1000, val_size=50, batch_size=1000, max_multikey=5, dbcount=1): self.gen = TestingGenerator( - val_size, batch_size, target_keys, max_multikey) + keys, val_size, batch_size, max_multikey + ) + self.port = port self.dbcount = dbcount - - async def run(self, **kwargs): - queues = [asyncio.Queue(maxsize=30) for _ in range(self.dbcount)] - producer = asyncio.create_task(self.generator_task(queues, **kwargs)) + self.stop_flag = False + + async def run(self, target_times=None, target_deviation=None): + """ + Run a seeding cycle on all dbs until either a fixed number of batches (target_times) + or until reaching an allowed deviation from the target number of keys (target_deviation) + """ + self.stop_flag = False + queues = [asyncio.Queue(maxsize=3) for _ in range(self.dbcount)] + producer = asyncio.create_task(self._generator_task( + queues, target_times=target_times, target_deviation=target_deviation)) consumers = [ - asyncio.create_task(self.cosumer_task(i, queue)) + asyncio.create_task(self._cosumer_task(i, queue)) for i, queue in enumerate(queues) ] @@ -184,11 +266,71 @@ async def run(self, **kwargs): for consumer in consumers: consumer.cancel() - async def generator_task(self, queues, target_times=None, target_deviation=None): + def stop(self): + self.stop_flag = True + + def reset(self): + self.gen.reset() + + async def capture(self, port=None, target_db=0, keys=None): + def tostr(b): return str(b) if isinstance( + b, int) else b.decode("utf-8") + + if port is None: + port = self.port + + if keys is None: + keys = sorted(list(self.gen.keys_and_types())) + + client = aioredis.Redis(port=port, db=target_db) + fragments = [] + for group in grouper(100, keys): + pipe = client.pipeline() + for k, t in group: + if t == ValueType.STRING: + pipe.get(k) + elif t == ValueType.LIST: + pipe.lrange(k, start=0, end=-1) + elif t == ValueType.SET: + pipe.sscan(k) + else: + pipe.hgetall(k) + + results = await pipe.execute() + for (k, t), res in zip(group, results): + out = f"{t.name} {k}: " + if t == ValueType.STRING: + out += tostr(res) + elif t == ValueType.LIST: + out += ' '.join(tostr(s) for s in res) + elif t == ValueType.SET: + out += ' '.join(sorted(tostr(s) for s in res[1])) + else: + out += ' '.join( + tostr(k) + '=' + tostr(v) + for k, v in res.items() + ) + fragments.append(out) + + return DataCapture(fragments) + + async def compare(self, initial_capture, port=6379): + keys = sorted(list(self.gen.keys_and_types())) + for db in range(self.dbcount): + capture = await self.capture(port=port, target_db=db, keys=keys) + if not initial_capture.compare(capture): + eprint(f">>> Inconsistent data on port {port}, db {db}") + return False + return True + + async def _generator_task(self, queues, target_times=None, target_deviation=None): cpu_time = 0 - submitted = 0 + while True: + if self.stop_flag: + break + start_time = time.time() blob, deviation = self.gen.generate() cpu_time += (time.time() - start_time) @@ -203,15 +345,13 @@ async def generator_task(self, queues, target_times=None, target_deviation=None) await asyncio.sleep(0.0) - print("done") + print("cpu time ", cpu_time) for q in queues: await q.join() - print("cpu time", cpu_time) - - async def cosumer_task(self, db, queue): - client = aioredis.Redis(db=db) + async def _cosumer_task(self, db, queue): + client = aioredis.Redis(port=self.port, db=db) while True: cmds = await queue.get() pipe = client.pipeline(transaction=False) @@ -220,41 +360,20 @@ async def cosumer_task(self, db, queue): await pipe.execute() queue.task_done() - async def checkhash(self, client=None): - if client is None: - client = self.client - - all_keys = sorted(list(self.gen.keys())) - hashes = [] - - for chunk in grouper(self.gen.batch_size, all_keys): - pipe = client.pipeline(transaction=False) - for k in chunk: - pipe.execute_command(f"DUMP {k}") - res = await pipe.execute() - hashes.append(hash(tuple(res))) - - return hash(tuple(hashes)) async def main(): + random.seed(100) + client = aioredis.Redis() await client.flushall() - pool = aioredis.ConnectionPool() - - #random.seed(11111) - - # Ask for 20 mb - ex = TestingExecutor(pool, 1000, 16) + ex = DflySeeder(keys=10_000, batch_size=2500, dbcount=5) - # Fill with max target deviation of 10% + # Fill with max key deviation of 10% await ex.run(target_deviation=0.1) - # Run a few times - # await ex.run(target_times=5) - - # print("hash: ", await ex.checkhash()) - print((await client.info("MEMORY"))['used_memory_human']) -asyncio.run(main()) + # capture = await ex.capture() + +# asyncio.run(main()) diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index d9d47794f0ca..b2a2870359ef 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -6,7 +6,7 @@ from itertools import count, chain, repeat from .utility import * -from . import dfly_args +from .generator import DflySeeder BASE_PORT = 1111 @@ -22,29 +22,30 @@ # 1. Number of master threads # 2. Number of threads for each replica # 3. Number of keys stored and sent in full sync -# 4. Number of keys overwritten during full sync +# 4. Number of databases replication_cases = [ - (8, [8], 20000, 5000), - (8, [8], 10000, 10000), - (8, [2, 2, 2, 2], 20000, 5000), - (6, [6, 6, 6], 30000, 15000), - (4, [1] * 12, 10000, 4000), + (8, [8], 5_000, 5), + (8, [2, 2, 2, 2], 5_000, 5), + (6, [6, 6, 6], 5_000, 5), + (4, [1] * 12, 1_000, 5), ] @pytest.mark.asyncio -@pytest.mark.parametrize("t_master, t_replicas, n_keys, n_stream_keys", replication_cases) -async def test_replication_all(df_local_factory, t_master, t_replicas, n_keys, n_stream_keys): +@pytest.mark.parametrize("t_master, t_replicas, n_keys, n_dbs", replication_cases) +async def test_replication_all(df_local_factory, t_master, t_replicas, n_keys, n_dbs): master = df_local_factory.create(port=1111, proactor_threads=t_master) replicas = [ df_local_factory.create(port=BASE_PORT+i+1, proactor_threads=t) for i, t in enumerate(t_replicas) ] - # Start master and fill with test data + # Start master master.start() - c_master = aioredis.Redis(port=master.port) - await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=1)) + + # Fill master with test data + seeder = DflySeeder(port=master.port, keys=n_keys, dbcount=n_dbs) + await seeder.run(target_deviation=0.1) # Start replicas for replica in replicas: @@ -52,42 +53,36 @@ async def test_replication_all(df_local_factory, t_master, t_replicas, n_keys, n c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas] - async def stream_data(): - """ Stream data during stable state replication phase and afterwards """ - gen = gen_test_data(n_stream_keys, seed=2) - for chunk in grouper(3, gen): - await c_master.mset({k: v for k, v in chunk}) + # Start data stream + stream_task = asyncio.create_task(seeder.run(target_times=5)) + # Start replication async def run_replication(c_replica): await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) - async def check_replication(c_replica): - """ Check that static and streamed data arrived """ - await wait_available_async(c_replica) - # Check range [n_stream_keys, n_keys] is of seed 1 - await batch_check_data_async(c_replica, gen_test_data(n_keys, start=n_stream_keys, seed=1)) - # Check range [0, n_stream_keys] is of seed 2 - await asyncio.sleep(0.2) - await batch_check_data_async(c_replica, gen_test_data(n_stream_keys, seed=2)) - - # Start streaming data and run REPLICAOF in parallel - stream_fut = asyncio.create_task(stream_data()) await asyncio.gather(*(asyncio.create_task(run_replication(c)) for c in c_replicas)) - assert not stream_fut.done( - ), "Weak testcase. Increase number of streamed keys to surpass full sync" - await stream_fut + # Wait for streaming to finish + assert not stream_task.done( + ), "Weak testcase. Increase number of streamed iterations to surpass full sync" + await stream_task - # Check full sync results - await asyncio.gather(*(check_replication(c) for c in c_replicas)) + # Check data after full sync + capture = await seeder.capture() + for (replica, c_replica) in zip(replicas, c_replicas): + await wait_available_async(c_replica) + assert await seeder.compare(capture, port=replica.port) - # Check stable state streaming - await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=3)) + # Stream more data in stable state + await seeder.run(target_times=2) + # Check data after stable state stream await asyncio.sleep(0.5) - await asyncio.gather(*(batch_check_data_async(c, gen_test_data(n_keys, seed=3)) - for c in c_replicas)) + capture = await seeder.capture() + for (replica, c_replica) in zip(replicas, c_replicas): + await wait_available_async(c_replica) + assert await seeder.compare(capture, port=replica.port) """ @@ -109,16 +104,16 @@ async def check_replication(c_replica): # 5. Number of distinct keys that are constantly streamed disconnect_cases = [ # balanced - (8, [4, 4], [4, 4], [4], 10000), - (8, [2] * 6, [2] * 6, [2, 2], 10000), + (8, [4, 4], [4, 4], [4], 10_000), + (8, [2] * 6, [2] * 6, [2, 2], 10_000), # full sync heavy - (8, [4] * 6, [], [], 10000), - (8, [2] * 12, [], [], 10000), + (8, [4] * 6, [], [], 10_000), + (8, [2] * 12, [], [], 10_000), # stable state heavy - (8, [], [4] * 6, [], 10000), - (8, [], [2] * 12, [], 10000), + (8, [], [4] * 6, [], 10_000), + (8, [], [2] * 12, [], 10_000), # disconnect only - (8, [], [], [2] * 6, 10000) + (8, [], [], [2] * 6, 10_000) ] @@ -142,6 +137,8 @@ async def test_disconnect_replica(df_local_factory, t_master, t_crash_fs, t_cras master.start() c_master = aioredis.Redis(port=master.port, single_connection_client=True) + seeder = DflySeeder(port=master.port, keys=n_keys, dbcount=2) + # Start replicas and create clients for replica, _ in replicas: replica.start() @@ -158,13 +155,7 @@ def replicas_of_type(tfunc): ] # Start data fill loop - async def fill_loop(): - local_c = aioredis.Redis( - port=master.port, single_connection_client=True) - for seed in count(1): - await batch_fill_data_async(local_c, gen_test_data(n_keys, seed=seed)) - - fill_task = asyncio.create_task(fill_loop()) + fill_task = asyncio.create_task(seeder.run()) # Run full sync async def full_sync(replica, c_replica, crash_type): @@ -204,7 +195,8 @@ async def stable_sync(replica, c_replica, crash_type): assert await c_replica.ping() # Stop streaming - fill_task.cancel() + seeder.stop() + await fill_task # Check master survived all crashes assert await c_master.ping() @@ -212,10 +204,12 @@ async def stable_sync(replica, c_replica, crash_type): # Check phase 3 replicas are up-to-date and there is no gap or lag def check_gen(): return gen_test_data(n_keys//5, seed=0) - await batch_fill_data_async(c_master, check_gen()) + await seeder.run(target_times=2) await asyncio.sleep(1.0) - for _, c_replica, _ in replicas_of_type(lambda t: t > 1): - await batch_check_data_async(c_replica, check_gen()) + + capture = await seeder.capture() + for replica, _, _ in replicas_of_type(lambda t: t > 1): + assert await seeder.compare(capture, port=replica.port) # Check disconnects async def disconnect(replica, c_replica, crash_type): @@ -228,9 +222,9 @@ async def disconnect(replica, c_replica, crash_type): await asyncio.sleep(0.5) # Check phase 3 replica survived - for _, c_replica, _ in replicas_of_type(lambda t: t == 2): + for replica, c_replica, _ in replicas_of_type(lambda t: t == 2): assert await c_replica.ping() - await batch_check_data_async(c_replica, check_gen()) + assert await seeder.compare(capture, port=replica.port) # Check master survived all disconnects assert await c_master.ping() @@ -271,6 +265,8 @@ async def test_disconnect_master(df_local_factory, t_master, t_replicas, n_rando for i, t in enumerate(t_replicas) ] + seeder = DflySeeder(port=master.port, keys=n_keys) + for replica in replicas: replica.start() @@ -292,7 +288,8 @@ async def start_master(): master.start() c_master = aioredis.Redis(port=master.port) assert await c_master.ping() - await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=0)) + seeder.reset() + await seeder.run(target_deviation=0.1) await start_master() @@ -307,15 +304,17 @@ async def start_master(): await start_master() await asyncio.sleep(1 + len(replicas) * 0.5) # Replicas check every 500ms. - for c_replica in c_replicas: + capture = await seeder.capture() + for replica, c_replica in zip(replicas, c_replicas): await wait_available_async(c_replica) - await batch_check_data_async(c_replica, gen_test_data(n_keys, seed=0)) + assert await seeder.compare(capture, port=replica.port) # Crash master during stable state master.stop(kill=True) await start_master() await asyncio.sleep(1 + len(replicas) * 0.5) + capture = await seeder.capture() for c_replica in c_replicas: await wait_available_async(c_replica) - await batch_check_data_async(c_replica, gen_test_data(n_keys, seed=0)) + assert await seeder.compare(capture, port=replica.port) diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index db607f02c2ac..1214c2566e54 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -7,7 +7,7 @@ from pathlib import Path from . import dfly_args -from .utility import batch_check_data, batch_fill_data, gen_test_data +from .generator import DflySeeder BASIC_ARGS = {"dir": "{DRAGONFLY_TMP}/"} NUM_KEYS = 100 @@ -33,15 +33,19 @@ class TestRdbSnapshot(SnapshotTestBase): def setup(self, tmp_dir: Path): super().setup(tmp_dir) - def test_snapshot(self, client: redis.Redis): - batch_fill_data(client, gen_test_data(NUM_KEYS)) + @pytest.mark.asyncio + async def test_snapshot(self, async_client, df_server): + seeder = DflySeeder(port=df_server.port, keys=2_000, dbcount=5) + await seeder.run(target_deviation=0.1) + + start_capture = await seeder.capture() # save + flush + load - client.execute_command("SAVE") - assert client.flushall() - client.execute_command("DEBUG LOAD " + super().get_main_file("rdb")) + await async_client.execute_command("SAVE") + assert await async_client.flushall() + await async_client.execute_command("DEBUG LOAD " + super().get_main_file("rdb")) - batch_check_data(client, gen_test_data(NUM_KEYS)) + assert await seeder.compare(start_capture) @dfly_args({**BASIC_ARGS, "dbfilename": "test"}) @@ -54,15 +58,19 @@ def setup(self, tmp_dir: Path): for file in files: os.remove(file) - def test_snapshot(self, client: redis.Redis): - batch_fill_data(client, gen_test_data(NUM_KEYS)) + @pytest.mark.asyncio + async def test_snapshot(self, async_client, df_server): + seeder = DflySeeder(port=df_server.port, keys=2_000, dbcount=5) + await seeder.run(target_deviation=0.1) + + start_capture = await seeder.capture() # save + flush + load - client.execute_command("SAVE DF") - assert client.flushall() - client.execute_command("DEBUG LOAD " + super().get_main_file("dfs")) + await async_client.execute_command("SAVE DF") + assert await async_client.flushall() + await async_client.execute_command("DEBUG LOAD " + super().get_main_file("dfs")) - batch_check_data(client, gen_test_data(NUM_KEYS)) + assert await seeder.compare(start_capture) @dfly_args({**BASIC_ARGS, "dbfilename": "test.rdb", "save_schedule": "*:*"}) @@ -72,8 +80,10 @@ class TestPeriodicSnapshot(SnapshotTestBase): def setup(self, tmp_dir: Path): super().setup(tmp_dir) - def test_snapshot(self, client: redis.Redis): - batch_fill_data(client, gen_test_data(NUM_KEYS)) + @pytest.mark.asyncio + async def test_snapshot(self, df_server): + seeder = DflySeeder(port=df_server.port, keys=5_000, dbcount=5) + await seeder.run(target_deviation=0.1) time.sleep(60) diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index 25d569e5d8f2..08c80441c3b0 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -28,46 +28,6 @@ def batch_fill_data(client: redis.Redis, gen): client.mset({k: v for k, v, in group}) -async def batch_fill_data_async(client: aioredis.Redis, gen): - for group in grouper(BATCH_SIZE, gen): - await client.mset({k: v for k, v in group}) - - -def as_str_val(v) -> str: - if isinstance(v, str): - return v - elif isinstance(v, bytes): - return v.decode() - else: - return str(v) - - -def batch_check_data(client: redis.Redis, gen): - for group in grouper(BATCH_SIZE, gen): - vals = [as_str_val(v) for v in client.mget(k for k, _ in group)] - gvals = [v for _, v in group] - assert vals == gvals - -async def batch_check_data_async(client: aioredis.Redis, gen): - for group in grouper(BATCH_SIZE, gen): - vals = [as_str_val(v) for v in await client.mget(k for k, _ in group)] - gvals = [v for _, v in group] - assert vals == gvals - -def wait_available(client: redis.Redis): - its = 0 - while True: - try: - client.get('key') - print("wait_available iterations:", its) - return - except redis.ResponseError as e: - assert "Can not execute during LOADING" in str(e) - - time.sleep(0.01) - its += 1 - - async def wait_available_async(client: aioredis.Redis): its = 0 while True: