Skip to content

Commit

Permalink
refactor: remove FULL-SYNC-CUT cmd #2687 (#2688)
Browse files Browse the repository at this point in the history
* refactor: remove FULL-SYNC-CUT cmd #2687
  • Loading branch information
BorysTheDev authored Mar 6, 2024
1 parent 66b87e1 commit dfedaf7
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 140 deletions.
36 changes: 0 additions & 36 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -701,10 +701,6 @@ void ClusterFamily::DflyClusterMigrationFinalize(CmdArgList args, ConnectionCont
if (!migration)
return cntx->SendError(kIdNotFound);

if (migration->GetState() != MigrationState::C_STABLE_SYNC) {
return cntx->SendError("Migration process is not in STABLE_SYNC state");
}

// TODO implement blocking on migrated slots only

bool is_block_active = true;
Expand Down Expand Up @@ -742,8 +738,6 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
MigrationConf(args, cntx);
} else if (sub_cmd == "FLOW") {
DflyMigrateFlow(args, cntx);
} else if (sub_cmd == "FULL-SYNC-CUT") {
DflyMigrateFullSyncCut(args, cntx);
} else if (sub_cmd == "ACK") {
DflyMigrateAck(args, cntx);
} else {
Expand Down Expand Up @@ -864,36 +858,6 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
info->StartFlow(&shard->db_slice(), sync_id, server_family_->journal(), cntx->conn()->socket());
}

void ClusterFamily::DflyMigrateFullSyncCut(CmdArgList args, ConnectionContext* cntx) {
CHECK(cntx->slot_migration_id != 0);
CmdArgParser parser{args};
auto [sync_id, shard_id] = parser.Next<uint32_t, uint32_t>();

if (auto err = parser.Error(); err) {
return cntx->SendError(err->MakeReply());
}

VLOG(1) << "Full sync cut "
<< " sync_id: " << sync_id << " shard_id: " << shard_id << " shard";

std::lock_guard lck(migration_mu_);
auto migration_it = std::find_if(
incoming_migrations_jobs_.begin(), incoming_migrations_jobs_.end(),
[cntx](const auto& el) { return cntx->slot_migration_id == el->GetLocalSyncId(); });

if (migration_it == incoming_migrations_jobs_.end()) {
LOG(WARNING) << "Couldn't find migration id";
return cntx->SendError(kIdNotFound);
}

(*migration_it)->SetStableSyncForFlow(shard_id);
if ((*migration_it)->GetState() == MigrationState::C_STABLE_SYNC) {
LOG(INFO) << "STABLE-SYNC state is set for sync_id " << sync_id;
}

cntx->SendOk();
}

void ClusterFamily::FinalizeIncomingMigration(uint32_t local_sync_id) {
lock_guard lk(migration_mu_);
auto it =
Expand Down
2 changes: 0 additions & 2 deletions src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ class ClusterFamily {
// source for migration
void DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx);

void DflyMigrateFullSyncCut(CmdArgList args, ConnectionContext* cntx);

void DflyMigrateAck(CmdArgList args, ConnectionContext* cntx);

// create a ClusterSlotMigration entity which will execute migration
Expand Down
10 changes: 0 additions & 10 deletions src/server/cluster/cluster_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,6 @@ ClusterSlotMigration::Info ClusterSlotMigration::GetInfo() const {
return {ctx.host, ctx.port};
}

void ClusterSlotMigration::SetStableSyncForFlow(uint32_t flow) {
DCHECK(shard_flows_.size() > flow);
shard_flows_[flow]->SetStableSync();

if (std::all_of(shard_flows_.begin(), shard_flows_.end(),
[](const auto& el) { return el->IsStableSync(); })) {
state_ = MigrationState::C_STABLE_SYNC;
}
}

bool ClusterSlotMigration::IsFinalized() const {
return std::all_of(shard_flows_.begin(), shard_flows_.end(),
[](const auto& el) { return el->IsFinalized(); });
Expand Down
2 changes: 0 additions & 2 deletions src/server/cluster/cluster_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ class ClusterSlotMigration : private ProtocolClient {
return state_;
}

void SetStableSyncForFlow(uint32_t flow);

void Stop();

const SlotRanges& GetSlots() const {
Expand Down
43 changes: 33 additions & 10 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <atomic>

#include "base/logging.h"
#include "server/db_slice.h"
#include "server/journal/streamer.h"

Expand All @@ -17,46 +18,58 @@ class OutgoingMigration::SliceSlotMigration {
SliceSlotMigration(DbSlice* slice, SlotSet slots, uint32_t sync_id, journal::Journal* journal,
Context* cntx, io::Sink* dest)
: streamer_(slice, std::move(slots), sync_id, journal, cntx) {
streamer_.Start(dest);
state_.store(MigrationState::C_FULL_SYNC, memory_order_relaxed);
sync_fb_ = Fiber("slot-snapshot", [this, dest] { streamer_.Start(dest); });
}

void Cancel() {
streamer_.Cancel();
}

void WaitForSnapshotFinished() {
sync_fb_.JoinIfNeeded();
}

void Finalize() {
streamer_.SendFinalize();
state_.store(MigrationState::C_FINISHED, memory_order_relaxed);
}

MigrationState GetState() const {
auto state = state_.load(memory_order_relaxed);
return state == MigrationState::C_FULL_SYNC && streamer_.IsSnapshotFinished()
? MigrationState::C_STABLE_SYNC
: state;
return state_.load(memory_order_relaxed);
}

private:
RestoreStreamer streamer_;
// Atomic only for simple read operation, writes - from the same thread, reads - from any thread
atomic<MigrationState> state_ = MigrationState::C_CONNECTING;
Fiber sync_fb_;
};

OutgoingMigration::OutgoingMigration(std::uint32_t flows_num, std::string ip, uint16_t port,
SlotRanges slots, Context::ErrHandler err_handler)
: host_ip_(ip), port_(port), slots_(slots), cntx_(err_handler), slot_migrations_(flows_num) {
}

OutgoingMigration::~OutgoingMigration() = default;
OutgoingMigration::~OutgoingMigration() {
main_sync_fb_.JoinIfNeeded();
}

void OutgoingMigration::StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal,
io::Sink* dest) {
const auto shard_id = slice->shard_id();

std::lock_guard lck(flows_mu_);
slot_migrations_[shard_id] =
std::make_unique<SliceSlotMigration>(slice, slots_, sync_id, journal, &cntx_, dest);
MigrationState state = MigrationState::C_NO_STATE;
{
std::lock_guard lck(flows_mu_);
slot_migrations_[shard_id] =
std::make_unique<SliceSlotMigration>(slice, slots_, sync_id, journal, &cntx_, dest);
state = GetStateImpl();
}

if (state == MigrationState::C_FULL_SYNC) {
main_sync_fb_ = Fiber("outgoing_migration", &OutgoingMigration::SyncFb, this);
}
}

void OutgoingMigration::Finalize(uint32_t shard_id) {
Expand All @@ -75,10 +88,20 @@ MigrationState OutgoingMigration::GetState() const {
MigrationState OutgoingMigration::GetStateImpl() const {
MigrationState min_state = MigrationState::C_MAX_INVALID;
for (const auto& slot_migration : slot_migrations_) {
if (slot_migration)
if (slot_migration) {
min_state = std::min(min_state, slot_migration->GetState());
} else {
min_state = MigrationState::C_NO_STATE;
}
}
return min_state;
}

void OutgoingMigration::SyncFb() {
for (auto& migration : slot_migrations_) {
migration->WaitForSnapshotFinished();
}
VLOG(1) << "Migrations snapshot is finihed";
}

} // namespace dfly
4 changes: 4 additions & 0 deletions src/server/cluster/outgoing_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,17 @@ class OutgoingMigration {
// SliceSlotMigration manages state and data transfering for the corresponding shard
class SliceSlotMigration;

void SyncFb();

private:
std::string host_ip_;
uint16_t port_;
SlotRanges slots_;
Context cntx_;
mutable Mutex flows_mu_;
std::vector<std::unique_ptr<SliceSlotMigration>> slot_migrations_ ABSL_GUARDED_BY(flows_mu_);

Fiber main_sync_fb_;
};

} // namespace dfly
53 changes: 21 additions & 32 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,39 +65,30 @@ void RestoreStreamer::Start(io::Sink* dest) {

JournalStreamer::Start(dest);

DCHECK(!snapshot_fb_.IsJoinable());
snapshot_fb_ = fb2::Fiber("slot-snapshot", [this] {
PrimeTable::Cursor cursor;
uint64_t last_yield = 0;
PrimeTable* pt = &db_slice_->databases()[0]->prime;

do {
if (fiber_cancellation_.IsCancelled())
return;

bool written = false;
cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) {
if (WriteBucket(it)) {
written = true;
}
});
if (written) {
NotifyWritten(true);
}
++last_yield;
PrimeTable::Cursor cursor;
uint64_t last_yield = 0;
PrimeTable* pt = &db_slice_->databases()[0]->prime;

if (last_yield >= 100) {
ThisFiber::Yield();
last_yield = 0;
do {
if (fiber_cancellation_.IsCancelled())
return;

bool written = false;
cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) {
if (WriteBucket(it)) {
written = true;
}
} while (cursor);
});
if (written) {
NotifyWritten(true);
}
++last_yield;

VLOG(2) << "FULL-SYNC-CUT for " << sync_id_ << " : " << db_slice_->shard_id();
WriteCommand(make_pair("DFLYMIGRATE", ArgSlice{"FULL-SYNC-CUT", absl::StrCat(sync_id_),
absl::StrCat(db_slice_->shard_id())}));
NotifyWritten(true);
snapshot_finished_ = true;
});
if (last_yield >= 100) {
ThisFiber::Yield();
last_yield = 0;
}
} while (cursor);
}

void RestoreStreamer::SendFinalize() {
Expand All @@ -110,12 +101,10 @@ void RestoreStreamer::SendFinalize() {
}

RestoreStreamer::~RestoreStreamer() {
CHECK(!snapshot_fb_.IsJoinable());
}

void RestoreStreamer::Cancel() {
fiber_cancellation_.Cancel();
snapshot_fb_.JoinIfNeeded();
db_slice_->UnregisterOnChange(snapshot_version_);
JournalStreamer::Cancel();
}
Expand Down
1 change: 0 additions & 1 deletion src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ class RestoreStreamer : public JournalStreamer {
uint64_t snapshot_version_ = 0;
SlotSet my_slots_;
uint32_t sync_id_;
Fiber snapshot_fb_;
Cancellation fiber_cancellation_;
bool snapshot_finished_ = false;
};
Expand Down
51 changes: 4 additions & 47 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,22 +816,7 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
c_nodes_admin,
)

while (
await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port)
)
!= "STABLE_SYNC"
):
await asyncio.sleep(0.05)

status = await c_nodes_admin[0].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[1].port)
)
assert "STABLE_SYNC" == status

status = await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
assert ["out 127.0.0.1:30002 STABLE_SYNC"] == status

await asyncio.sleep(0.5)
try:
await c_nodes_admin[1].execute_command(
"DFLYCLUSTER",
Expand All @@ -850,12 +835,6 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
c_nodes_admin,
)

status = await c_nodes_admin[0].execute_command("DFLYCLUSTER SLOT-MIGRATION-STATUS")
assert ["out 127.0.0.1:30002 STABLE_SYNC"] == status

status = await c_nodes_admin[1].execute_command("DFLYCLUSTER SLOT-MIGRATION-STATUS")
assert ["in 127.0.0.1:31001 STABLE_SYNC"] == status

await close_clients(*c_nodes, *c_nodes_admin)


Expand Down Expand Up @@ -920,13 +899,7 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
assert await c_nodes[0].set("KEY0", "value")
assert await c_nodes[0].set("KEY1", "value")

while (
await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port)
)
!= "STABLE_SYNC"
):
await asyncio.sleep(0.05)
await asyncio.sleep(0.5)

assert await c_nodes[0].set("KEY4", "value")
assert await c_nodes[0].set("KEY5", "value")
Expand Down Expand Up @@ -1090,28 +1063,12 @@ async def generate_config():
keeping = node.slots[num_outgoing:]
node.next_slots.extend(keeping)

# Busy loop for migrations to finish - all in stable state
iterations = 0
while True:
for node in nodes:
states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
print(states)
if not all(s.endswith("STABLE_SYNC") for s in states) and not states == "NO_STATE":
break
else:
break

iterations += 1
assert iterations < 100

await asyncio.sleep(0.1)

# Give seeder one more second
# some more time fo seeder
await asyncio.sleep(1.0)

# Stop seeder
seeder.stop()
await fill_task
await asyncio.sleep(1.0)

# Counter that pushes values to a list
async def list_counter(key, client: aioredis.RedisCluster):
Expand Down

0 comments on commit dfedaf7

Please sign in to comment.