diff --git a/src/server/replica.cc b/src/server/replica.cc index a5051a81739b..adc9a2f3426f 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -303,10 +303,17 @@ std::error_code Replica::HandleCapaDflyResp() { } // If we're syncing a different replication ID, drop the saved LSNs. - if (master_context_.master_repl_id != ToSV(LastResponseArgs()[0].GetBuf())) { + string_view master_repl_id = ToSV(LastResponseArgs()[0].GetBuf()); + if (master_context_.master_repl_id != master_repl_id) { + if (!master_context_.master_repl_id.empty()) { + LOG(ERROR) << "Encountered different master repl id (" << master_repl_id << " vs " + << master_context_.master_repl_id << ")"; + state_mask_.store(0); + return make_error_code(errc::connection_aborted); + } last_journal_LSNs_.reset(); } - master_context_.master_repl_id = ToSV(LastResponseArgs()[0].GetBuf()); + master_context_.master_repl_id = master_repl_id; master_context_.dfly_session_id = ToSV(LastResponseArgs()[1].GetBuf()); num_df_flows_ = param_num_flows; diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index aec109d0da98..70fc85c4e3fd 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2060,6 +2060,13 @@ async def save_replica(): await disconnect_clients(c_master, *[c_replica]) +async def is_replica_down(conn): + role = await conn.execute_command("INFO REPLICATION") + # fancy of way of extracting the field master_link_status + is_down = role.split("\r\n")[4].split(":")[1] + return is_down == "down" + + @pytest.mark.asyncio async def test_user_acl_replication(df_local_factory): master = df_local_factory.create(proactor_threads=4) @@ -2083,10 +2090,7 @@ async def test_user_acl_replication(df_local_factory): await c_master.execute_command("ACL SETUSER tmp -replconf") async with async_timeout.timeout(5): while True: - role = await c_replica.execute_command("INFO REPLICATION") - # fancy of way of extracting the field master_link_status - is_down = role.split("\r\n")[4].split(":")[1] - if is_down == "down": + if await is_replica_down(c_replica): break await asyncio.sleep(1) @@ -2096,3 +2100,42 @@ async def test_user_acl_replication(df_local_factory): await c_master.execute_command("ACL SETUSER tmp +replconf") await check_all_replicas_finished([c_replica], c_master, 5) assert 2 == await c_replica.execute_command("DBSIZE") + + +@pytest.mark.asyncio +async def test_replica_reconnect(df_local_factory): + # Connect replica to master + master = df_local_factory.create(proactor_threads=1) + replica = df_local_factory.create(proactor_threads=1) + df_local_factory.start_all([master, replica]) + + c_master = master.client() + c_replica = replica.client() + + await c_master.execute_command("set k 12345") + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_replica) + + assert not await is_replica_down(c_replica) + + # kill existing master, create master with different repl_id but same port + master_port = master.port + master.stop() + assert await is_replica_down(c_replica) + + master = df_local_factory.create(proactor_threads=1, port=master_port) + df_local_factory.start_all([master]) + + # Assert that replica did not reconnected to master with different repl_id + assert await c_master.execute_command("get k") == None + assert await c_replica.execute_command("get k") == "12345" + assert await c_master.execute_command("set k 6789") + assert await c_replica.execute_command("get k") == "12345" + assert await is_replica_down(c_replica) + + # Force re-replication, assert that it worked + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_replica) + assert await c_replica.execute_command("get k") == "6789" + + await disconnect_clients(c_master, *[c_replica])