Skip to content

Commit

Permalink
fix(cluster): add mutex for migrations removing
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Feb 1, 2024
1 parent 82c48f6 commit 5d5546a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
28 changes: 16 additions & 12 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
};

// TODO think about another place for it
RemoveFinishedMigrations();
RemoveFinishedIncomingMigrations();

server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb));
DCHECK(tl_cluster_config != nullptr);
Expand Down Expand Up @@ -704,29 +704,28 @@ void ClusterFamily::DflyClusterMigrationFinalize(CmdArgList args, ConnectionCont
return cntx->SendError(err->MakeReply());
}

auto migration_it = outgoing_migration_jobs_.find(sync_id);
if (migration_it == outgoing_migration_jobs_.end()) {
return cntx->SendError(absl::StrCat("finalize invalid session id ", sync_id));
}
auto migration = GetOutgoingMigration(sync_id);
if (!migration)
return cntx->SendError(kIdNotFound);

if (migration_it->second->GetState() != MigrationState::C_STABLE_SYNC) {
if (migration->GetState() != MigrationState::C_STABLE_SYNC) {
return cntx->SendError("Migration process is not in STABLE_SYNC state");
}

shard_set->pool()->AwaitFiberOnAll([migration_it](auto*) {
shard_set->pool()->AwaitFiberOnAll([migration](auto*) {
if (const auto* shard = EngineShard::tlocal(); shard)
migration_it->second->Finalize(shard->shard_id());
migration->Finalize(shard->shard_id());
});

// TODO do next after ACK
util::ThisFiber::SleepFor(500ms);

shard_set->pool()->AwaitFiberOnAll([migration_it](auto*) {
shard_set->pool()->AwaitFiberOnAll([migration](auto*) {
if (const auto* shard = EngineShard::tlocal(); shard)
migration_it->second->Cancel(shard->shard_id());
migration->Cancel(shard->shard_id());
});

outgoing_migration_jobs_.erase(migration_it);
RemoveOutgoingMigration(sync_id);

return cntx->SendOk();
}
Expand Down Expand Up @@ -760,14 +759,19 @@ ClusterSlotMigration* ClusterFamily::AddMigration(std::string host_ip, uint16_t
.get();
}

void ClusterFamily::RemoveFinishedMigrations() {
void ClusterFamily::RemoveFinishedIncomingMigrations() {
lock_guard lk(migration_mu_);
auto removed_items_it =
std::remove_if(incoming_migrations_jobs_.begin(), incoming_migrations_jobs_.end(),
[](const auto& m) { return m->GetState() == MigrationState::C_FINISHED; });
incoming_migrations_jobs_.erase(removed_items_it, incoming_migrations_jobs_.end());
}

void ClusterFamily::RemoveOutgoingMigration(uint32_t sync_id) {
lock_guard lk(migration_mu_);
outgoing_migration_jobs_.erase(sync_id);
}

void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Create slot migration config";
CmdArgParser parser{args};
Expand Down
3 changes: 2 additions & 1 deletion src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class ClusterFamily {
ClusterSlotMigration* AddMigration(std::string host_ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots);

void RemoveFinishedMigrations();
void RemoveFinishedIncomingMigrations();
void RemoveOutgoingMigration(uint32_t sync_id);

// store info about migration and create unique session id
uint32_t CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port,
Expand Down

0 comments on commit 5d5546a

Please sign in to comment.