Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(replica): test journal rewrite with multi shards #720

Merged
merged 1 commit into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/server/set_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,9 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v

db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key);
if (journal_update && op_args.shard->journal()) {
RecordJournal(op_args, "DEL"sv, ArgSlice{key});
if (overwrite) {
RecordJournal(op_args, "DEL"sv, ArgSlice{key});
}
vector<string_view> mapped(vals.size() + 1);
mapped[0] = key;
std::copy(vals.begin(), vals.end(), mapped.begin() + 1);
Expand Down
100 changes: 89 additions & 11 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def gen_test_data(start, end):
"""


@dfly_args({"proactor_threads": 1})
@dfly_args({"proactor_threads": 4})
@pytest.mark.asyncio
async def test_rewrites(df_local_factory):
CLOSE_TIMESTAMP = (int(time.time()) + 100)
Expand All @@ -393,25 +393,48 @@ async def test_rewrites(df_local_factory):
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)

# Make sure journal writer sends its first SELECT command on its only shard
await c_master.set("no-select-please", "ok")
await asyncio.sleep(0.5)

# Create monitor and bind utility functions
m_replica = c_replica.monitor()

async def check_rsp(rx):
print("waiting on", rx)
async def get_next_command():
mcmd = (await m_replica.next_command())['command']
print("Got:", mcmd, "Regex", rx)
assert re.match(rx, mcmd)
# skip select command
if (mcmd == "SELECT 0"):
print("Got:", mcmd)
mcmd = (await m_replica.next_command())['command']
print("Got:", mcmd)
return mcmd

async def is_match_rsp(rx):
mcmd = (await get_next_command())
print("Regex:", rx)
return re.match(rx, mcmd)

async def skip_cmd():
await check_rsp(r".*")
await is_match_rsp(r".*")

async def check(cmd, rx):
await c_master.execute_command(cmd)
await check_rsp(rx)
match = (await is_match_rsp(rx))
assert match

async def check_list(cmd, rx_list):
print("master cmd:", cmd)
await c_master.execute_command(cmd)
for rx in rx_list:
match = (await is_match_rsp(rx))
assert match

async def check_list_ooo(cmd, rx_list):
print("master cmd:", cmd)
await c_master.execute_command(cmd)
expected_cmds = len(rx_list)
for i in range(expected_cmds):
mcmd = (await get_next_command())
#check command matches one regex from list
match_rx = list(filter(lambda rx: re.match(rx, mcmd), rx_list))
assert len(match_rx) == 1
rx_list.remove(match_rx[0])

async def check_expire(key):
ttl1 = await c_master.ttl(key)
Expand Down Expand Up @@ -458,3 +481,58 @@ async def check_expire(key):
await check_expire("k")
await check("GETEX k EX 100", r"PEXPIREAT k (.*?)")
await check_expire("k")

# Check SDIFFSTORE turns into DEL and SADD
await c_master.sadd("set1", "v1", "v2", "v3")
await c_master.sadd("set2", "v1", "v2")
await skip_cmd()
await skip_cmd()
await check_list("SDIFFSTORE k set1 set2", [r"DEL k", r"SADD k v3"])

# Check SINTERSTORE turns into DEL and SADD
await check_list("SINTERSTORE k set1 set2", [r"DEL k", r"SADD k (.*?)"])

# Check SMOVE turns into SREM and SADD
await check_list_ooo("SMOVE set1 set2 v3", [r"SREM set1 v3", r"SADD set2 v3"])

# Check SUNIONSTORE turns into DEL and SADD
await check_list_ooo("SUNIONSTORE k set1 set2", [r"DEL k", r"SADD k (.*?)"])

await c_master.set("k1", "1000")
await c_master.set("k2", "1100")
await skip_cmd()
await skip_cmd()
# Check BITOP turns into SET
await check("BITOP OR kdest k1 k2", r"SET kdest 1100")

# Check there is no rewrite for LMOVE on single shard
await c_master.lpush("list", "v1", "v2", "v3", "v4")
await skip_cmd()
await check("LMOVE list list LEFT RIGHT", r"LMOVE list list LEFT RIGHT")

# Check there is no rewrite for RPOPLPUSH on single shard
await check("RPOPLPUSH list list", r"RPOPLPUSH list list")
# Check there is no rewrite for BRPOPLPUSH on single shard
await check("BRPOPLPUSH list list 0", r"BRPOPLPUSH list list 0")


await c_master.lpush("list1s", "v1", "v2", "v3", "v4")
await skip_cmd()
# Check LMOVE turns into LPUSH LPOP on multi shard
await check_list_ooo("LMOVE list1s list2s LEFT LEFT", [r"LPUSH list2s v4", r"LPOP list1s"])
# Check RPOPLPUSH turns into LPUSH RPOP on multi shard
await check_list_ooo("RPOPLPUSH list1s list2s", [r"LPUSH list2s v1", r"RPOP list1s"])
# Check BRPOPLPUSH turns into LPUSH RPOP on multi shard
await check_list_ooo("BRPOPLPUSH list1s list2s 0", [r"LPUSH list2s v2", r"RPOP list1s"])

# MOVE runs as global command, check only one journal entry is sent
await check("MOVE list2s 2", r"MOVE list2s 2")

await c_master.set("renamekey", "1000", px=50000)
await skip_cmd()
# Check RENAME turns into DEL SET and PEXPIREAT
await check_list_ooo("RENAME renamekey renamed", [r"DEL renamekey", r"SET renamed 1000", r"PEXPIREAT renamed (.*?)"])
await check_expire("renamed")
# Check RENAMENX turns into DEL SET and PEXPIREAT
await check_list_ooo("RENAMENX renamed renamekey", [r"DEL renamed", r"SET renamekey 1000", r"PEXPIREAT renamekey (.*?)"])
await check_expire("renamekey")