diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 644c7a12ccb4..1c84974ff607 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -534,7 +534,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) }; // TODO think about another place for it - RemoveFinishedMigrations(); + RemoveFinishedIncomingMigrations(); server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb)); DCHECK(tl_cluster_config != nullptr); @@ -704,29 +704,28 @@ void ClusterFamily::DflyClusterMigrationFinalize(CmdArgList args, ConnectionCont return cntx->SendError(err->MakeReply()); } - auto migration_it = outgoing_migration_jobs_.find(sync_id); - if (migration_it == outgoing_migration_jobs_.end()) { - return cntx->SendError(absl::StrCat("finalize invalid session id ", sync_id)); - } + auto migration = GetOutgoingMigration(sync_id); + if (!migration) + return cntx->SendError(kIdNotFound); - if (migration_it->second->GetState() != MigrationState::C_STABLE_SYNC) { + if (migration->GetState() != MigrationState::C_STABLE_SYNC) { return cntx->SendError("Migration process is not in STABLE_SYNC state"); } - shard_set->pool()->AwaitFiberOnAll([migration_it](auto*) { + shard_set->pool()->AwaitFiberOnAll([migration](auto*) { if (const auto* shard = EngineShard::tlocal(); shard) - migration_it->second->Finalize(shard->shard_id()); + migration->Finalize(shard->shard_id()); }); // TODO do next after ACK util::ThisFiber::SleepFor(500ms); - shard_set->pool()->AwaitFiberOnAll([migration_it](auto*) { + shard_set->pool()->AwaitFiberOnAll([migration](auto*) { if (const auto* shard = EngineShard::tlocal(); shard) - migration_it->second->Cancel(shard->shard_id()); + migration->Cancel(shard->shard_id()); }); - outgoing_migration_jobs_.erase(migration_it); + RemoveOutgoingMigration(sync_id); return cntx->SendOk(); } @@ -760,7 +759,7 @@ ClusterSlotMigration* ClusterFamily::AddMigration(std::string host_ip, uint16_t .get(); } -void ClusterFamily::RemoveFinishedMigrations() { +void ClusterFamily::RemoveFinishedIncomingMigrations() { lock_guard lk(migration_mu_); auto removed_items_it = std::remove_if(incoming_migrations_jobs_.begin(), incoming_migrations_jobs_.end(), @@ -768,6 +767,11 @@ void ClusterFamily::RemoveFinishedMigrations() { incoming_migrations_jobs_.erase(removed_items_it, incoming_migrations_jobs_.end()); } +void ClusterFamily::RemoveOutgoingMigration(uint32_t sync_id) { + lock_guard lk(migration_mu_); + outgoing_migration_jobs_.erase(sync_id); +} + void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) { VLOG(1) << "Create slot migration config"; CmdArgParser parser{args}; diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index c8f495bbca64..687d56cb5dfe 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -75,7 +75,8 @@ class ClusterFamily { ClusterSlotMigration* AddMigration(std::string host_ip, uint16_t port, std::vector slots); - void RemoveFinishedMigrations(); + void RemoveFinishedIncomingMigrations(); + void RemoveOutgoingMigration(uint32_t sync_id); // store info about migration and create unique session id uint32_t CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port,