Skip to content

Commit

Permalink
fix(tests): Small fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
  • Loading branch information
dranikpg committed Jan 5, 2023
1 parent 6969517 commit 0e0efc1
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 35 deletions.
17 changes: 10 additions & 7 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from itertools import chain, repeat

from .utility import *
from . import dfly_args


BASE_PORT = 1111
Expand All @@ -29,7 +30,7 @@
(4, [1] * 10, dict(keys=500, dbcount=2)),
]


@dfly_args({"logtostdout":""})
@pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_replicas, seeder_config", replication_cases)
async def test_replication_all(df_local_factory, t_master, t_replicas, seeder_config):
Expand All @@ -53,6 +54,7 @@ async def test_replication_all(df_local_factory, t_master, t_replicas, seeder_co

# Start data stream
stream_task = asyncio.create_task(seeder.run(target_times=3))
await asyncio.sleep(0.0)

# Start replication
async def run_replication(c_replica):
Expand All @@ -67,23 +69,24 @@ async def run_replication(c_replica):
await stream_task

# Check data after full sync
await asyncio.sleep(0.1)
await asyncio.sleep(3.0)
await check_data(seeder, replicas, c_replicas)

# Stream more data in stable state
await seeder.run(target_times=2)

# Check data after stable state stream
await asyncio.sleep(0.5)
await asyncio.sleep(3.0)
await check_data(seeder, replicas, c_replicas)

# Issue lots of deletes
seeder.target(100)
await seeder.run(target_deviation=0.1)
# TODO: Enable after stable state is faster
#seeder.target(100)
#await seeder.run(target_deviation=0.1)

# Check data after deletes
await asyncio.sleep(0.5)
await check_data(seeder, replicas, c_replicas)
#await asyncio.sleep(2.0)
#await check_data(seeder, replicas, c_replicas)


async def check_data(seeder, replicas, c_replicas):
Expand Down
14 changes: 14 additions & 0 deletions tests/dragonfly/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import asyncio
import aioredis

from utility import DflySeeder


async def main():
c = aioredis.Redis()
await c.flushall()

s = DflySeeder()
await s.run(target_deviation=0.01)

asyncio.run(main())
2 changes: 1 addition & 1 deletion tests/dragonfly/server_family_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def test_connection_name(client):
def test_scan(client):
def gen_test_data():
for i in range(10):
yield "key-"+str(i), "value-"+str(i)
yield f"key-{i}", f"value-{i}"

for key, val in gen_test_data():
res = client.set(key, val)
Expand Down
58 changes: 31 additions & 27 deletions tests/dragonfly/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class ValueType(Enum):
HSET = 3
ZSET = 4

@staticmethod
def randomize():
return random.choice([t for t in ValueType])


class CommandGenerator:
"""Class for generating complex command sequences"""
Expand Down Expand Up @@ -87,14 +91,11 @@ def add_key(self, t: ValueType):
self.set_for_type(t).add(k)
return k

def randomize_type(self):
return random.choice([t for t in ValueType])

def randomize_nonempty_set(self):
if not any(self.key_sets):
return None, None

t = self.randomize_type()
t = ValueType.randomize()
s = self.set_for_type(t)

if len(s) == 0:
Expand All @@ -119,33 +120,34 @@ def randomize_key(self, t=None, pop=False):

def generate_val(self, t: ValueType):
def rand_str(k=3, s=''):
# Use small k value to reduce mem usage and increase number of ops
return s.join(random.choices(string.ascii_letters, k=k))

if t == ValueType.STRING:
# Random string for MSET
return rand_str(self.val_size)
elif t == ValueType.LIST:
# Random sequence of single letter elements for LPUSH
return rand_str(self.val_size//2, ' ')
# Random sequence k-letter elements for LPUSH
return ' '.join(rand_str() for _ in range(self.val_size//4))
elif t == ValueType.SET:
# Random sequence of 3 letter elements for SADD
# Random sequence of k-letter elements for SADD
return ' '.join(rand_str() for _ in range(self.val_size//4))
elif t == ValueType.HSET:
# Random sequence of 3 letter keys + int and two start values for HSET
# Random sequence of k-letter keys + int and two start values for HSET
return 'v0 0 v1 0 ' + ' '.join(
rand_str() + ' ' + str(random.randint(0, self.val_size))
for _ in range(self.val_size//5)
)
else:
# Random sequnce of 3 letter keys and int score for ZSET
# Random sequnce of k-letter keys and int score for ZSET
return ' '.join(str(random.randint(0, self.val_size)) + ' ' + rand_str()
for _ in range(self.val_size//4))

def make_shrink(self):
# Simulate shrinking by deleting a few random keys
keys_gen = (self.randomize_key(pop=True)
for _ in range(random.randint(1, self.max_multikey)))
keys = [str(k) for k, _ in keys_gen if k is not None]
keys = [f"k{k}" for k, _ in keys_gen if k is not None]

if len(keys) == 0:
return None, 0
Expand All @@ -158,8 +160,8 @@ def make_shrink(self):
('LPOP {k}', ValueType.LIST),
#('SADD {k} {val}', ValueType.SET),
#('SPOP {k}', ValueType.SET),
('HSETNX {k} v0 {val}', ValueType.HSET),
('HINCRBY {k} v1 1', ValueType.HSET),
#('HSETNX {k} v0 {val}', ValueType.HSET),
#('HINCRBY {k} v1 1', ValueType.HSET),
#('ZPOPMIN {k} 1', ValueType.ZSET),
#('ZADD {k} 0 {val}', ValueType.ZSET)
]
Expand All @@ -169,7 +171,7 @@ def make_no_change(self):
cmd, t = random.choice(self.NO_CHANGE_ACTIONS)
k, _ = self.randomize_key(t)
val = ''.join(random.choices(string.ascii_letters, k=4))
return cmd.format(k=str(k), val=val) if k is not None else None, 0
return cmd.format(k=f"k{k}", val=val) if k is not None else None, 0

GROW_ACTINONS = {
ValueType.STRING: 'MSET',
Expand All @@ -182,14 +184,14 @@ def make_no_change(self):
def make_grow(self):
# Simulate growing memory usage by using MSET, LPUSH, SADD, HMSET or ZADD with large values
# TODO: Implement COPY in Dragonfly.
t = self.randomize_type()
t = ValueType.randomize()
if t == ValueType.STRING:
count = random.randint(1, self.max_multikey)
else:
count = 1

keys = (self.add_key(t) for _ in range(count))
payload = " ".join(str(k) + " " + self.generate_val(t) for k in keys)
payload = " ".join(f"k{k}" + " " + self.generate_val(t) for k in keys)
return self.GROW_ACTINONS[t] + " " + payload, count

def make(self, action):
Expand Down Expand Up @@ -304,7 +306,7 @@ async def run(self, target_times=None, target_deviation=None):
producer = asyncio.create_task(self._generator_task(
queues, target_times=target_times, target_deviation=target_deviation))
consumers = [
asyncio.create_task(self._cosumer_task(i, queue))
asyncio.create_task(self._consumer_task(i, queue))
for i, queue in enumerate(queues)
]

Expand Down Expand Up @@ -350,23 +352,25 @@ def target(self, key_cnt):
async def _generator_task(self, queues, target_times=None, target_deviation=None):
cpu_time = 0
submitted = 0
deviation = 0.0

while True:
def should_run():
if self.stop_flag:
break
return False
if target_times is not None and submitted >= target_times:
return False
if target_deviation is not None and abs(1-deviation) < target_deviation:
return False
return True

while should_run():
start_time = time.time()
blob, deviation = self.gen.generate()
cpu_time += (time.time() - start_time)

await asyncio.gather(*(q.put(blob) for q in queues))
submitted += 1

if target_times is not None and submitted >= target_times:
break
if target_deviation is not None and abs(1-deviation) < target_deviation:
break

await asyncio.sleep(0.0)

print("cpu time", cpu_time, "batches", submitted)
Expand All @@ -375,7 +379,7 @@ async def _generator_task(self, queues, target_times=None, target_deviation=None
for q in queues:
await q.join()

async def _cosumer_task(self, db, queue):
async def _consumer_task(self, db, queue):
client = aioredis.Redis(port=self.port, db=db)
pipe_time = 0
while True:
Expand Down Expand Up @@ -415,17 +419,17 @@ async def _cosumer_task(self, db, queue):

async def _capture_entries(self, client, keys):
def tostr(b):
return str(b) if isinstance(b, int) else b.decode("utf-8")
return b.decode("utf-8") if isinstance(b, bytes) else str(b)

entries = []
for group in chunked(self.gen.batch_size * 2, keys):
pipe = client.pipeline(transaction=False)
for k, t in group:
self.CAPTURE_COMMANDS[t](pipe, k)
self.CAPTURE_COMMANDS[t](pipe, f"k{k}")

results = await pipe.execute()
for (k, t), res in zip(group, results):
out = f"{t.name} {k}: " + \
out = f"{t.name} k{k}: " + \
' '.join(self.CAPTURE_EXTRACTORS[t](res, tostr))
entries.append(out)

Expand Down

0 comments on commit 0e0efc1

Please sign in to comment.