Skip to content

Commit

Permalink
test(replication test): check data only after replica finished execut…
Browse files Browse the repository at this point in the history
…ion (#746)

Signed-off-by: adi_holden <adi@dragonflydb.io>
  • Loading branch information
adiholden authored and ashotland committed Feb 2, 2023
1 parent 6f737cd commit 7dec995
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 12 deletions.
2 changes: 2 additions & 0 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ error_code Replica::InitiateDflySync() {
// We do the following operations regardless of outcome.
JoinAllFlows();
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
state_mask_ &= ~R_SYNCING;
};

// Initialize MultiShardExecution.
Expand Down Expand Up @@ -497,6 +498,7 @@ error_code Replica::InitiateDflySync() {
JournalExecutor{&service_}.FlushAll();

// Start full sync flows.
state_mask_ |= R_SYNCING;
{
auto partition = Partition(num_df_flows_);
auto shard_cb = [&](unsigned index, auto*) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ bool Transaction::RunInShard(EngineShard* shard) {

/*************************************************************************/

if (!was_suspended && is_concluding) // Check last hop & non suspended.
if (is_concluding) // Check last hop
LogAutoJournalOnShard(shard);

// at least the coordinator thread owns the reference.
Expand Down
29 changes: 18 additions & 11 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,33 @@ async def run_replication(c_replica):
), "Weak testcase. Increase number of streamed iterations to surpass full sync"
await stream_task

async def check_replica_finished_exec(c_replica):
info_stats = await c_replica.execute_command("INFO")
tc1 = info_stats['total_commands_processed']
await asyncio.sleep(0.1)
info_stats = await c_replica.execute_command("INFO")
tc2 = info_stats['total_commands_processed']
return tc1+1 == tc2 # Replica processed only the info command on above sleep.

async def check_all_replicas_finished():
while True:
await asyncio.sleep(1.0)
is_finished_arr = await asyncio.gather(*(asyncio.create_task(check_replica_finished_exec(c))
for c in c_replicas))
if all(is_finished_arr):
break

# Check data after full sync
await asyncio.sleep(4.0)
await check_all_replicas_finished()
await check_data(seeder, replicas, c_replicas)

# Stream more data in stable state
await seeder.run(target_ops=2000)

# Check data after stable state stream
await asyncio.sleep(3.0)
await check_all_replicas_finished()
await check_data(seeder, replicas, c_replicas)

# Issue lots of deletes
# TODO: Enable after stable state is faster
# seeder.target(100)
# await seeder.run(target_deviation=0.1)

# Check data after deletes
# await asyncio.sleep(2.0)
# await check_data(seeder, replicas, c_replicas)


async def check_data(seeder, replicas, c_replicas):
capture = await seeder.capture()
Expand Down

0 comments on commit 7dec995

Please sign in to comment.