Skip to content

Commit

Permalink
Merge pull request #20805 from bashtanov/migrations-infra-ab
Browse files Browse the repository at this point in the history
Migrations infra, part 2: coordination/reconciliation logic for partition ops
  • Loading branch information
mmaslankaprv authored Jul 4, 2024
2 parents 26b6202 + 9a9de57 commit 5e277ca
Show file tree
Hide file tree
Showing 25 changed files with 1,635 additions and 182 deletions.
2 changes: 2 additions & 0 deletions src/v/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ v_cc_library(
data_migration_types.cc
data_migration_table.cc
data_migration_frontend.cc
data_migration_worker.cc
data_migrated_resources.cc
data_migration_irpc_frontend.cc
data_migration_service_handler.cc
data_migration_backend.cc
DEPS
Expand Down
27 changes: 19 additions & 8 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
#include "cluster/data_migrated_resources.h"
#include "cluster/data_migration_backend.h"
#include "cluster/data_migration_frontend.h"
#include "cluster/data_migration_irpc_frontend.h"
#include "cluster/data_migration_table.h"
#include "cluster/data_migration_types.h"
#include "cluster/data_migration_worker.h"
#include "cluster/ephemeral_credential_frontend.h"
#include "cluster/feature_backend.h"
#include "cluster/feature_manager.h"
Expand Down Expand Up @@ -80,6 +83,7 @@
#include "ssx/future-util.h"

#include <seastar/core/future.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/smp.hh>
#include <seastar/core/thread.hh>
Expand Down Expand Up @@ -303,6 +307,8 @@ ss::future<> controller::start(
std::ref(_connections),
std::ref(_as));

co_await _data_migration_worker.start(
_raft0->self().id(), std::ref(_partition_leaders), std::ref(_as));
{
limiter_configuration limiter_conf{
config::shard_local_cfg().enable_controller_log_rate_limiting.bind(),
Expand Down Expand Up @@ -749,11 +755,19 @@ ss::future<> controller::start(
}
}

_data_migration_backend = std::make_unique<data_migrations::backend>(
co_await _data_migration_backend.start_on(
data_migrations::data_migrations_shard,
std::ref(*_data_migration_table),
std::ref(_data_migration_frontend.local()),
std::ref(_data_migration_worker),
std::ref(_partition_leaders.local()),
std::ref(_tp_state.local()),
std::ref(_shard_table.local()),
std::ref(_as.local()));
co_await _data_migration_backend->start();
co_await _data_migration_backend.invoke_on_instance(
&data_migrations::backend::start);
co_await _data_migration_irpc_frontend.start(
std::ref(_feature_table), std::ref(_data_migration_backend));
}

ss::future<> controller::set_ready() {
Expand Down Expand Up @@ -802,12 +816,8 @@ ss::future<> controller::stop() {
}
return ss::make_ready_future();
})
.then([this] {
if (_data_migration_backend) {
return _data_migration_backend->stop();
}
return ss::make_ready_future();
})
.then([this] { return _data_migration_irpc_frontend.stop(); })
.then([this] { return _data_migration_backend.stop(); })
.then([this] {
if (_recovery_backend) {
return _recovery_backend->stop_and_wait();
Expand All @@ -823,6 +833,7 @@ ss::future<> controller::stop() {
.then([this] { return _hm_backend.stop(); })
.then([this] { return _health_manager.stop(); })
.then([this] { return _members_backend.stop(); })
.then([this] { return _data_migration_worker.stop(); })
.then([this] { return _data_migration_frontend.stop(); })
.then([this] { return _config_manager.stop(); })
.then([this] { return _api.stop(); })
Expand Down
10 changes: 9 additions & 1 deletion src/v/cluster/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "raft/fwd.h"
#include "rpc/fwd.h"
#include "security/fwd.h"
#include "ssx/single_sharded.h"
#include "storage/api.h"
#include "storage/fwd.h"

Expand Down Expand Up @@ -147,6 +148,11 @@ class controller {
return _data_migration_frontend;
}

ss::sharded<data_migrations::irpc_frontend>&
get_data_migration_irpc_frontend() {
return _data_migration_irpc_frontend;
}

std::optional<std::reference_wrapper<cloud_metadata::uploader>>
metadata_uploader() {
if (_metadata_uploader) {
Expand Down Expand Up @@ -314,7 +320,9 @@ class controller {
ss::sharded<client_quota::frontend> _quota_frontend; // instance per core
ss::sharded<client_quota::store> _quota_store; // instance per core
ss::sharded<client_quota::backend> _quota_backend; // single instance
std::unique_ptr<data_migrations::backend> _data_migration_backend;
ss::sharded<data_migrations::worker> _data_migration_worker;
ssx::single_sharded<data_migrations::backend> _data_migration_backend;
ss::sharded<data_migrations::irpc_frontend> _data_migration_irpc_frontend;
ss::gate _gate;
consensus_ptr _raft0;
ss::sharded<cloud_storage::remote>& _cloud_storage_api;
Expand Down
Loading

0 comments on commit 5e277ca

Please sign in to comment.