Skip to content

Commit

Permalink
feat(replication): Do not auto replicate different master
Browse files Browse the repository at this point in the history
Until now, replicas would re-connect and re-replicate a master after the
master will restart. This is problematic in case the master loses its
data, which will cause the replica to flush all and lose its data as
well.

This is a breaking change though, in that whoever controls the replica
now has to explicitly issue a `REPLICAOF X Y` in order to re-establish
a connection to a new master. This is true even if the master loaded an
up to date RDB file.

It's not necessary if the replica lost connection to the master and the
master was always alive, and the connection is re-established.

Fixes #2636
  • Loading branch information
chakaz committed Mar 20, 2024
1 parent 31fabf2 commit c9bc918
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 6 deletions.
11 changes: 9 additions & 2 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,17 @@ std::error_code Replica::HandleCapaDflyResp() {
}

// If we're syncing a different replication ID, drop the saved LSNs.
if (master_context_.master_repl_id != ToSV(LastResponseArgs()[0].GetBuf())) {
string_view master_repl_id = ToSV(LastResponseArgs()[0].GetBuf());
if (master_context_.master_repl_id != master_repl_id) {
if (!master_context_.master_repl_id.empty()) {
LOG(ERROR) << "Encountered different master repl id (" << master_repl_id << " vs "
<< master_context_.master_repl_id << ")";
state_mask_.store(0);
return make_error_code(errc::connection_aborted);
}
last_journal_LSNs_.reset();
}
master_context_.master_repl_id = ToSV(LastResponseArgs()[0].GetBuf());
master_context_.master_repl_id = master_repl_id;
master_context_.dfly_session_id = ToSV(LastResponseArgs()[1].GetBuf());
num_df_flows_ = param_num_flows;

Expand Down
51 changes: 47 additions & 4 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2060,6 +2060,13 @@ async def save_replica():
await disconnect_clients(c_master, *[c_replica])


async def is_replica_down(conn):
role = await conn.execute_command("INFO REPLICATION")
# fancy of way of extracting the field master_link_status
is_down = role.split("\r\n")[4].split(":")[1]
return is_down == "down"


@pytest.mark.asyncio
async def test_user_acl_replication(df_local_factory):
master = df_local_factory.create(proactor_threads=4)
Expand All @@ -2083,10 +2090,7 @@ async def test_user_acl_replication(df_local_factory):
await c_master.execute_command("ACL SETUSER tmp -replconf")
async with async_timeout.timeout(5):
while True:
role = await c_replica.execute_command("INFO REPLICATION")
# fancy of way of extracting the field master_link_status
is_down = role.split("\r\n")[4].split(":")[1]
if is_down == "down":
if await is_replica_down(c_replica):
break
await asyncio.sleep(1)

Expand All @@ -2096,3 +2100,42 @@ async def test_user_acl_replication(df_local_factory):
await c_master.execute_command("ACL SETUSER tmp +replconf")
await check_all_replicas_finished([c_replica], c_master, 5)
assert 2 == await c_replica.execute_command("DBSIZE")


@pytest.mark.asyncio
async def test_replica_reconnect(df_local_factory):
# Connect replica to master
master = df_local_factory.create(proactor_threads=1)
replica = df_local_factory.create(proactor_threads=1)
df_local_factory.start_all([master, replica])

c_master = master.client()
c_replica = replica.client()

await c_master.execute_command("set k 12345")
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)

assert not await is_replica_down(c_replica)

# kill existing master, create master with different repl_id but same port
master_port = master.port
master.stop()
assert await is_replica_down(c_replica)

master = df_local_factory.create(proactor_threads=1, port=master_port)
df_local_factory.start_all([master])

# Assert that replica did not reconnected to master with different repl_id
assert await c_master.execute_command("get k") == None
assert await c_replica.execute_command("get k") == "12345"
assert await c_master.execute_command("set k 6789")
assert await c_replica.execute_command("get k") == "12345"
assert await is_replica_down(c_replica)

# Force re-replication, assert that it worked
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)
assert await c_replica.execute_command("get k") == "6789"

await disconnect_clients(c_master, *[c_replica])

0 comments on commit c9bc918

Please sign in to comment.