Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cluster): add migration finalization #2507

Merged
merged 6 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/server/cluster/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@ using SlotId = uint16_t;
using SlotSet = absl::flat_hash_set<SlotId>;

// MigrationState constants are ordered in state changing order
enum class MigrationState : uint8_t { C_NO_STATE, C_CONNECTING, C_FULL_SYNC, C_STABLE_SYNC };
enum class MigrationState : uint8_t {
C_NO_STATE,
C_CONNECTING,
C_FULL_SYNC,
C_STABLE_SYNC,
C_FINISHED
};

class ClusterConfig {
public:
Expand Down
62 changes: 58 additions & 4 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,9 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
} else if (sub_cmd == "START-SLOT-MIGRATION") {
return DflyClusterStartSlotMigration(args, cntx);
} else if (sub_cmd == "SLOT-MIGRATION-STATUS") {
return DflySlotMigrationStatus(args, cntx);
return DflyClusterSlotMigrationStatus(args, cntx);
} else if (sub_cmd == "SLOT-MIGRATION-FINALIZE") {
return DflyClusterMigrationFinalize(args, cntx);
}

return cntx->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType);
Expand Down Expand Up @@ -531,6 +533,9 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
tracker.TrackOnThread();
};

// TODO think about another place for it
RemoveFinishedIncomingMigrations();

server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb));
DCHECK(tl_cluster_config != nullptr);

Expand Down Expand Up @@ -629,7 +634,7 @@ void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionCon
}
node->Start(cntx);

return cntx->SendOk();
return cntx->SendLong(node->GetSyncId());
}

static std::string_view state_to_str(MigrationState state) {
Expand All @@ -642,12 +647,14 @@ static std::string_view state_to_str(MigrationState state) {
return "FULL_SYNC"sv;
case MigrationState::C_STABLE_SYNC:
return "STABLE_SYNC"sv;
case MigrationState::C_FINISHED:
return "FINISHED"sv;
}
DCHECK(false) << "Unknown State value " << static_cast<underlying_type_t<MigrationState>>(state);
return "UNDEFINED_STATE"sv;
}

void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::DflyClusterSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx) {
CmdArgParser parser(args);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());

Expand Down Expand Up @@ -689,6 +696,40 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext*
return rb->SendSimpleString(state_to_str(MigrationState::C_NO_STATE));
}

void ClusterFamily::DflyClusterMigrationFinalize(CmdArgList args, ConnectionContext* cntx) {
CmdArgParser parser{args};
auto sync_id = parser.Next<uint32_t>();

if (auto err = parser.Error(); err) {
return cntx->SendError(err->MakeReply());
}

auto migration = GetOutgoingMigration(sync_id);
if (!migration)
return cntx->SendError(kIdNotFound);

if (migration->GetState() != MigrationState::C_STABLE_SYNC) {
return cntx->SendError("Migration process is not in STABLE_SYNC state");
}

shard_set->pool()->AwaitFiberOnAll([migration](auto*) {
if (const auto* shard = EngineShard::tlocal(); shard)
migration->Finalize(shard->shard_id());
});

// TODO do next after ACK
util::ThisFiber::SleepFor(500ms);

shard_set->pool()->AwaitFiberOnAll([migration](auto*) {
if (const auto* shard = EngineShard::tlocal(); shard)
migration->Cancel(shard->shard_id());
});

RemoveOutgoingMigration(sync_id);

return cntx->SendOk();
}

void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);
Expand Down Expand Up @@ -718,6 +759,19 @@ ClusterSlotMigration* ClusterFamily::AddMigration(std::string host_ip, uint16_t
.get();
}

void ClusterFamily::RemoveFinishedIncomingMigrations() {
lock_guard lk(migration_mu_);
auto removed_items_it =
std::remove_if(incoming_migrations_jobs_.begin(), incoming_migrations_jobs_.end(),
[](const auto& m) { return m->GetState() == MigrationState::C_FINISHED; });
incoming_migrations_jobs_.erase(removed_items_it, incoming_migrations_jobs_.end());
}

void ClusterFamily::RemoveOutgoingMigration(uint32_t sync_id) {
lock_guard lk(migration_mu_);
outgoing_migration_jobs_.erase(sync_id);
}

void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Create slot migration config";
CmdArgParser parser{args};
Expand Down Expand Up @@ -793,6 +847,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendError(kIdNotFound);

cntx->conn()->Migrate(shard_set->pool()->at(shard_id));
server_family_->journal()->StartInThread();

cntx->SendOk();

Expand Down Expand Up @@ -825,7 +880,6 @@ void ClusterFamily::DflyMigrateFullSyncCut(CmdArgList args, ConnectionContext* c

(*migration_it)->SetStableSyncForFlow(shard_id);
if ((*migration_it)->GetState() == MigrationState::C_STABLE_SYNC) {
(*migration_it)->Stop();
LOG(INFO) << "STABLE-SYNC state is set for sync_id " << sync_id;
}

Expand Down
6 changes: 5 additions & 1 deletion src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class ClusterFamily {

private: // Slots migration section
void DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx);
void DflySlotMigrationStatus(CmdArgList args, ConnectionContext* cntx);
void DflyClusterSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx);
void DflyClusterMigrationFinalize(CmdArgList args, ConnectionContext* cntx);

// DFLYMIGRATE is internal command defines several steps in slots migrations process
void DflyMigrate(CmdArgList args, ConnectionContext* cntx);
Expand All @@ -74,6 +75,9 @@ class ClusterFamily {
ClusterSlotMigration* AddMigration(std::string host_ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots);

void RemoveFinishedIncomingMigrations();
void RemoveOutgoingMigration(uint32_t sync_id);

// store info about migration and create unique session id
uint32_t CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots);
Expand Down
10 changes: 7 additions & 3 deletions src/server/cluster/cluster_shard_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,14 @@ void ClusterShardMigration::FullSyncShardFb(Context* cntx) {

TouchIoTime();

if (!tx_data->is_ping) {
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
} else {
if (tx_data->opcode == journal::Op::FIN) {
VLOG(2) << "Flow " << source_shard_id_ << " is finalized";
is_finalized_ = true;
break;
} else if (tx_data->opcode == journal::Op::PING) {
// TODO check about ping logic
} else {
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/server/cluster/cluster_shard_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,14 @@ class ClusterShardMigration : public ProtocolClient {
return is_stable_sync_.load();
}

bool IsFinalized() {
return is_finalized_;
}

void JoinFlow();

private:
void FullSyncShardFb(Context* cntx);
void JoinFlow();

void ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx);
void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx);
Expand All @@ -45,6 +50,7 @@ class ClusterShardMigration : public ProtocolClient {
std::unique_ptr<JournalExecutor> executor_;
Fiber sync_fb_;
std::atomic_bool is_stable_sync_ = false;
bool is_finalized_ = false;
};

} // namespace dfly
16 changes: 16 additions & 0 deletions src/server/cluster/cluster_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ void ClusterSlotMigration::SetStableSyncForFlow(uint32_t flow) {
}
}

bool ClusterSlotMigration::IsFinalized() const {
return std::all_of(shard_flows_.begin(), shard_flows_.end(),
[](const auto& el) { return el->IsFinalized(); });
}

void ClusterSlotMigration::Stop() {
for (auto& flow : shard_flows_) {
flow->Cancel();
Expand All @@ -129,6 +134,10 @@ void ClusterSlotMigration::MainMigrationFb() {
LOG(WARNING) << "Error syncing with " << server().Description() << " " << ec << " "
<< ec.message();
}

if (IsFinalized()) {
state_ = MigrationState::C_FINISHED;
}
}

std::error_code ClusterSlotMigration::InitiateSlotsMigration() {
Expand All @@ -137,6 +146,13 @@ std::error_code ClusterSlotMigration::InitiateSlotsMigration() {
shard_flows_[i].reset(new ClusterShardMigration(server(), i, sync_id_, &service_));
}

absl::Cleanup cleanup = [this]() {
// We do the following operations regardless of outcome.
for (auto& flow : shard_flows_) {
flow->JoinFlow();
}
};

// Switch to new error handler that closes flow sockets.
auto err_handler = [this](const auto& ge) mutable {
// Make sure the flows are not in a state transition
Expand Down
4 changes: 4 additions & 0 deletions src/server/cluster/cluster_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class ClusterSlotMigration : ProtocolClient {
}

void SetStableSyncForFlow(uint32_t flow);

void Stop();

private:
Expand All @@ -45,6 +46,9 @@ class ClusterSlotMigration : ProtocolClient {
// Creates flows, one per shard on the source node and manage migration process
std::error_code InitiateSlotsMigration();

// may be called after we finish all flows
bool IsFinalized() const;
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved

private:
Service& service_;
Mutex flows_op_mu_;
Expand Down
18 changes: 17 additions & 1 deletion src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ class OutgoingMigration::SliceSlotMigration {
state_ = MigrationState::C_FULL_SYNC;
}

~SliceSlotMigration() {
void Cancel() {
streamer_.Cancel();
}

void Finalize() {
streamer_.SendFinalize();
}

MigrationState GetState() const {
return state_ == MigrationState::C_FULL_SYNC && streamer_.IsSnapshotFinished()
? MigrationState::C_STABLE_SYNC
Expand Down Expand Up @@ -56,8 +60,20 @@ void OutgoingMigration::StartFlow(DbSlice* slice, uint32_t sync_id, journal::Jou
std::make_unique<SliceSlotMigration>(slice, std::move(sset), sync_id, journal, &cntx_, dest);
}

void OutgoingMigration::Finalize(uint32_t shard_id) {
slot_migrations_[shard_id]->Finalize();
}

void OutgoingMigration::Cancel(uint32_t shard_id) {
slot_migrations_[shard_id]->Cancel();
}

MigrationState OutgoingMigration::GetState() const {
std::lock_guard lck(flows_mu_);
return GetStateImpl();
}

MigrationState OutgoingMigration::GetStateImpl() const {
MigrationState min_state = MigrationState::C_STABLE_SYNC;
for (const auto& slot_migration : slot_migrations_) {
if (slot_migration)
Expand Down
6 changes: 5 additions & 1 deletion src/server/cluster/outgoing_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Journal;

class DbSlice;

// Whole slots migration process information
// Whole outgoing slots migration manager
class OutgoingMigration {
public:
OutgoingMigration() = default;
Expand All @@ -25,6 +25,9 @@ class OutgoingMigration {

void StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal, io::Sink* dest);

void Finalize(uint32_t shard_id);
void Cancel(uint32_t shard_id);

MigrationState GetState() const;

const std::string& GetHostIp() const {
Expand All @@ -35,6 +38,7 @@ class OutgoingMigration {
};

private:
MigrationState GetStateImpl() const;
// SliceSlotMigration manages state and data transfering for the corresponding shard
class SliceSlotMigration;

Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ io::Result<journal::ParsedEntry> JournalReader::ReadEntry() {
entry.dbid = dbid_;
entry.opcode = opcode;

if (opcode == journal::Op::PING) {
if (opcode == journal::Op::PING || opcode == journal::Op::FIN) {
return entry;
}

Expand Down
9 changes: 9 additions & 0 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ void RestoreStreamer::Start(io::Sink* dest) {
});
}

void RestoreStreamer::SendFinalize() {
VLOG(2) << "DFLYMIGRATE FINALIZE for " << sync_id_ << " : " << db_slice_->shard_id();
journal::Entry entry(journal::Op::FIN, 0 /*db_id*/, 0 /*slot_id*/);

JournalWriter writer{this};
writer.Write(entry);
NotifyWritten(true);
}

void RestoreStreamer::Cancel() {
fiber_cancellation_.Cancel();
snapshot_fb_.JoinIfNeeded();
Expand Down
2 changes: 2 additions & 0 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class RestoreStreamer : public JournalStreamer {
// Cancel() must be called if Start() is called
void Cancel() override;

void SendFinalize();

bool IsSnapshotFinished() const {
return snapshot_finished_;
}
Expand Down
6 changes: 4 additions & 2 deletions src/server/journal/tx_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ void MultiShardExecution::CancelAllBlockingEntities() {

bool TransactionData::AddEntry(journal::ParsedEntry&& entry) {
++journal_rec_count;
opcode = entry.opcode;

switch (entry.opcode) {
case journal::Op::PING:
is_ping = true;
return true;
case journal::Op::FIN:
return true;
case journal::Op::EXPIRED:
case journal::Op::COMMAND:
Expand Down Expand Up @@ -112,7 +114,7 @@ std::optional<TransactionData> TransactionReader::NextTxData(JournalReader* read
// Check if journal command can be executed right away.
// Expiration checks lock on master, so it never conflicts with running multi transactions.
if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND ||
res->opcode == journal::Op::PING)
res->opcode == journal::Op::PING || res->opcode == journal::Op::FIN)
return TransactionData::FromSingle(std::move(res.value()));

// Otherwise, continue building multi command.
Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/tx_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ struct TransactionData {
uint32_t shard_cnt{0};
absl::InlinedVector<journal::ParsedEntry::CmdData, 1> commands{0};
uint32_t journal_rec_count{0}; // Count number of source entries to check offset.
bool is_ping = false; // For Op::PING entries.
journal::Op opcode = journal::Op::NOOP;
};

// Utility for reading TransactionData from a journal reader.
Expand Down
1 change: 1 addition & 0 deletions src/server/journal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum class Op : uint8_t {
MULTI_COMMAND = 11,
EXEC = 12,
PING = 13,
FIN = 14
};

struct EntryBase {
Expand Down
Loading
Loading