diff --git a/src/v/cluster/CMakeLists.txt b/src/v/cluster/CMakeLists.txt index 78bca414caca3..f5b2810bad52c 100644 --- a/src/v/cluster/CMakeLists.txt +++ b/src/v/cluster/CMakeLists.txt @@ -241,7 +241,9 @@ v_cc_library( data_migration_types.cc data_migration_table.cc data_migration_frontend.cc + data_migration_worker.cc data_migrated_resources.cc + data_migration_irpc_frontend.cc data_migration_service_handler.cc data_migration_backend.cc DEPS diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index d6cf21da13b91..9ce8abd1a4d37 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -33,7 +33,10 @@ #include "cluster/data_migrated_resources.h" #include "cluster/data_migration_backend.h" #include "cluster/data_migration_frontend.h" +#include "cluster/data_migration_irpc_frontend.h" #include "cluster/data_migration_table.h" +#include "cluster/data_migration_types.h" +#include "cluster/data_migration_worker.h" #include "cluster/ephemeral_credential_frontend.h" #include "cluster/feature_backend.h" #include "cluster/feature_manager.h" @@ -80,6 +83,7 @@ #include "ssx/future-util.h" #include +#include #include #include #include @@ -303,6 +307,8 @@ ss::future<> controller::start( std::ref(_connections), std::ref(_as)); + co_await _data_migration_worker.start( + _raft0->self().id(), std::ref(_partition_leaders), std::ref(_as)); { limiter_configuration limiter_conf{ config::shard_local_cfg().enable_controller_log_rate_limiting.bind(), @@ -749,11 +755,19 @@ ss::future<> controller::start( } } - _data_migration_backend = std::make_unique( + co_await _data_migration_backend.start_on( + data_migrations::data_migrations_shard, std::ref(*_data_migration_table), std::ref(_data_migration_frontend.local()), + std::ref(_data_migration_worker), + std::ref(_partition_leaders.local()), + std::ref(_tp_state.local()), + std::ref(_shard_table.local()), std::ref(_as.local())); - co_await _data_migration_backend->start(); + co_await _data_migration_backend.invoke_on_instance( + &data_migrations::backend::start); + co_await _data_migration_irpc_frontend.start( + std::ref(_feature_table), std::ref(_data_migration_backend)); } ss::future<> controller::set_ready() { @@ -802,12 +816,8 @@ ss::future<> controller::stop() { } return ss::make_ready_future(); }) - .then([this] { - if (_data_migration_backend) { - return _data_migration_backend->stop(); - } - return ss::make_ready_future(); - }) + .then([this] { return _data_migration_irpc_frontend.stop(); }) + .then([this] { return _data_migration_backend.stop(); }) .then([this] { if (_recovery_backend) { return _recovery_backend->stop_and_wait(); @@ -823,6 +833,7 @@ ss::future<> controller::stop() { .then([this] { return _hm_backend.stop(); }) .then([this] { return _health_manager.stop(); }) .then([this] { return _members_backend.stop(); }) + .then([this] { return _data_migration_worker.stop(); }) .then([this] { return _data_migration_frontend.stop(); }) .then([this] { return _config_manager.stop(); }) .then([this] { return _api.stop(); }) diff --git a/src/v/cluster/controller.h b/src/v/cluster/controller.h index 9261d65404daa..9c43222784e56 100644 --- a/src/v/cluster/controller.h +++ b/src/v/cluster/controller.h @@ -24,6 +24,7 @@ #include "raft/fwd.h" #include "rpc/fwd.h" #include "security/fwd.h" +#include "ssx/single_sharded.h" #include "storage/api.h" #include "storage/fwd.h" @@ -147,6 +148,11 @@ class controller { return _data_migration_frontend; } + ss::sharded& + get_data_migration_irpc_frontend() { + return _data_migration_irpc_frontend; + } + std::optional> metadata_uploader() { if (_metadata_uploader) { @@ -314,7 +320,9 @@ class controller { ss::sharded _quota_frontend; // instance per core ss::sharded _quota_store; // instance per core ss::sharded _quota_backend; // single instance - std::unique_ptr _data_migration_backend; + ss::sharded _data_migration_worker; + ssx::single_sharded _data_migration_backend; + ss::sharded _data_migration_irpc_frontend; ss::gate _gate; consensus_ptr _raft0; ss::sharded& _cloud_storage_api; diff --git a/src/v/cluster/data_migration_backend.cc b/src/v/cluster/data_migration_backend.cc index dd62120ac40d5..7380676cb50a9 100644 --- a/src/v/cluster/data_migration_backend.cc +++ b/src/v/cluster/data_migration_backend.cc @@ -10,33 +10,769 @@ */ #include "cluster/data_migration_backend.h" +#include "cluster/partition_leaders_table.h" +#include "config/node_config.h" +#include "data_migration_frontend.h" #include "data_migration_types.h" +#include "data_migration_worker.h" #include "fwd.h" #include "logger.h" +#include "model/fundamental.h" +#include "model/metadata.h" +#include "ssx/async_algorithm.h" +#include "ssx/future-util.h" +#include "types.h" #include +#include +#include +#include + +using namespace std::chrono_literals; + namespace cluster::data_migrations { backend::backend( - migrations_table& table, frontend& frontend, ss::abort_source& as) - : _table(table) + migrations_table& table, + frontend& frontend, + ss::sharded& worker, + partition_leaders_table& leaders_table, + topic_table& topic_table, + shard_table& shard_table, + ss::abort_source& as) + : _self(*config::node().node_id()) + , _table(table) , _frontend(frontend) + , _worker(worker) + , _leaders_table(leaders_table) + , _topic_table(topic_table) + , _shard_table(shard_table) , _as(as) {} -ss::future<> backend::start() { - _id = _table.register_notification( - [this](id id) { handle_migration_update(id); }); - co_return; +void backend::start() { + vassert( + ss::this_shard_id() == data_migrations_shard, "Called on wrong shard"); + + _is_raft0_leader = _is_coordinator + = _self == _leaders_table.get_leader(model::controller_ntp); + _plt_raft0_leadership_notification_id + = _leaders_table.register_leadership_change_notification( + model::controller_ntp, + [this](model::ntp, model::term_id, model::node_id leader_node_id) { + _is_raft0_leader = leader_node_id == _self; + if (_is_raft0_leader != _is_coordinator) { + ssx::spawn_with_gate( + _gate, [this]() { return handle_raft0_leadership_update(); }); + } + }); + _table_notification_id = _table.register_notification([this](id id) { + ssx::spawn_with_gate( + _gate, [this, id]() { return handle_migration_update(id); }); + }); + _topic_table_notification_id = _topic_table.register_delta_notification( + [this](topic_table::delta_range_t deltas) { + _unprocessed_deltas.reserve( + _unprocessed_deltas.size() + deltas.size()); + for (const auto& delta : deltas) { + _unprocessed_deltas.push_back(delta); + } + wakeup(); + }); + + _shard_notification_id = _shard_table.register_notification( + [this]( + const model::ntp& ntp, + raft::group_id g, + std::optional shard) { + handle_shard_update(ntp, g, shard); + }); + + ssx::repeat_until_gate_closed(_gate, [this]() { return loop_once(); }); } ss::future<> backend::stop() { - _table.unregister_notification(_id); - co_return; + _mutex.broken(); + _sem.broken(); + _timer.cancel(); + _shard_table.unregister_delta_notification(_shard_notification_id); + _topic_table.unregister_delta_notification(_topic_table_notification_id); + _leaders_table.unregister_leadership_change_notification( + model::controller_ntp, _plt_raft0_leadership_notification_id); + _table.unregister_notification(_table_notification_id); + co_await _gate.close(); +} + +ss::future<> backend::loop_once() { + { + auto units = co_await _mutex.get_units(_as); + co_await work_once(); + } + co_await _sem.wait(_as); + _sem.consume(_sem.available_units()); +} + +ss::future<> backend::work_once() { + // process pending deltas + auto unprocessed_deltas = std::move(_unprocessed_deltas); + for (auto&& delta : unprocessed_deltas) { + co_await process_delta(std::move(delta)); + } + + // process RPC responses + for (const auto& [node_id, response] : _rpc_responses) { + co_await ssx::async_for_each( + response.actual_states, [this](const auto& ntp_resp) { + auto rs_it = _migration_states.find(ntp_resp.migration); + if (rs_it == _migration_states.end()) { + // migration gone, ignore + return; + } + migration_reconciliation_state& rs = rs_it->second; + if (rs.sought_state > ntp_resp.state) { + // migration advanced since then, ignore + return; + } + mark_migration_step_done_for_ntp(rs, ntp_resp.ntp); + if (rs.outstanding_topics.empty()) { + to_advance(ntp_resp.migration, rs.sought_state); + _migration_states.erase(rs_it); + } + }); + } + + auto next_tick = model::timeout_clock::time_point::max(); + + // prepare RPC requests + chunked_vector to_send_rpc; + auto now = model::timeout_clock::now(); + for (const auto& [node_id, deadline] : _nodes_to_retry) { + if (deadline <= now) { + to_send_rpc.push_back(node_id); + } else { + next_tick = std::min(deadline, next_tick); + } + } + + // defer RPC retries + // todo: configure timeout + auto new_deadline = now + 5s; + for (const auto& node_id : _rpc_responses | std::views::keys) { + if (_node_states.contains(node_id)) { + _nodes_to_retry.try_emplace(node_id, new_deadline); + next_tick = std::min(next_tick, new_deadline); + } + } + _rpc_responses.clear(); + + // schedule fibers + for (auto node_id : to_send_rpc) { + _nodes_to_retry.erase(node_id); + co_await send_rpc(node_id); + } + spawn_advances(); + if (next_tick == model::timeout_clock::time_point::max()) { + _timer.cancel(); + } else { + _timer.rearm(next_tick); + } } -void backend::handle_migration_update(id id) { +void backend::wakeup() { _sem.signal(1 - _sem.available_units()); } + +void backend::mark_migration_step_done_for_ntp( + migration_reconciliation_state& rs, const model::ntp& ntp) { + auto& rs_topics = rs.outstanding_topics; + auto rs_topic_it = rs_topics.find({ntp.ns, ntp.tp.topic}); + if (rs_topic_it != rs_topics.end()) { + auto& rs_parts = rs_topic_it->second.outstanding_partitions; + auto rs_part_it = rs_parts.find(ntp.tp.partition); + if (rs_part_it != rs_parts.end()) { + for (const auto& affected_node_id : rs_part_it->second) { + auto nstate_it = _node_states.find(affected_node_id); + nstate_it->second.erase(ntp); + if (nstate_it->second.empty()) { + _node_states.erase(nstate_it); + _nodes_to_retry.erase(affected_node_id); + } + } + rs_parts.erase(rs_part_it); + if (rs_parts.empty()) { + rs_topics.erase(rs_topic_it); + } + } + } +} + +ss::future<> backend::send_rpc(model::node_id node_id) { + check_ntp_states_request req; + co_await ssx::async_for_each( + _node_states[node_id], [this, &req](const auto& pair) { + auto& [ntp, migration_id] = pair; + req.sought_states.push_back( + {.ntp = ntp, + .migration = migration_id, + .state + = _migration_states.find(migration_id)->second.sought_state}); + }); + + ssx::spawn_with_gate( + _gate, [this, node_id, req = std::move(req)]() mutable { + vlog(dm_log.debug, "sending RPC to node {}: {}", node_id, req); + ss::future reply + = (_self == node_id) ? check_ntp_states_locally(std::move(req)) + : _frontend.check_ntp_states_on_foreign_node( + node_id, std::move(req)); + return reply.then([node_id, this](check_ntp_states_reply&& reply) { + vlog( + dm_log.debug, "got RPC response from {}: {}", node_id, reply); + _rpc_responses[node_id] = std::move(reply); + return wakeup(); + }); + }); +} + +void backend::to_advance(id migration_id, state sought_state) { + auto [it, ins] = _advance_requests.try_emplace(migration_id, sought_state); + if (!ins && it->second.sought_state < sought_state) { + it->second = advance_info(sought_state); + } +} + +void backend::spawn_advances() { + for (auto& [migration_id, advance_info] : _advance_requests) { + if (advance_info.sent) { + continue; + } + advance_info.sent = true; + auto& sought_state = advance_info.sought_state; + ssx::spawn_with_gate(_gate, [this, migration_id, sought_state]() { + return _frontend.update_migration_state(migration_id, sought_state) + .then([migration_id, sought_state](std::error_code ec) { + vlogl( + dm_log, + (ec == make_error_code(errc::success)) + ? ss::log_level::debug + : ss::log_level::warn, + "request to advance migration {} into state {} has " + "been processed with error code {}", + migration_id, + sought_state, + ec); + }); + }); + } +} + +ss::future<> backend::handle_raft0_leadership_update() { + auto units = co_await _mutex.get_units(_as); + if (_is_raft0_leader == _is_coordinator) { + co_return; + } + _is_coordinator = _is_raft0_leader; + if (_is_coordinator) { + vlog(dm_log.debug, "stepping up as a coordinator"); + // start coordinating + for (auto& [id, mrstate] : _migration_states) { + for (auto& [nt, tstate] : mrstate.outstanding_topics) { + co_await reconcile_topic( + nt, tstate, id, mrstate.sought_state, false); + } + } + // resend advance requests + for (auto& [migration_id, advance_info] : _advance_requests) { + advance_info.sent = false; + } + wakeup(); + } else { + vlog(dm_log.debug, "stepping down as a coordinator"); + // stop coordinating + for (auto& [id, mrstate] : _migration_states) { + for (auto& [id, tstate] : mrstate.outstanding_topics) { + tstate.outstanding_partitions.clear(); + } + } + _nodes_to_retry.clear(); + _node_states.clear(); + } +} + +ss::future<> backend::handle_migration_update(id id) { vlog(dm_log.debug, "received data migration {} notification", id); + + bool need_wakeup = false; + + auto new_maybe_metadata = _table.get_migration(id); + auto new_state = new_maybe_metadata ? std::make_optional( + new_maybe_metadata->get().state) + : std::nullopt; + vlog(dm_log.debug, "migration {} new state is {}", id, new_state); + + // forget about the migration if it went forward or is gone + auto old_it = std::as_const(_migration_states).find(id); + if (old_it != _migration_states.cend()) { + const migration_reconciliation_state& old_mrstate = old_it->second; + vlog( + dm_log.debug, + "migration {} old sought state is {}", + id, + old_mrstate.sought_state); + if (!new_maybe_metadata || new_state >= old_mrstate.sought_state) { + vlog( + dm_log.debug, "dropping migration {} reconciliation state", id); + drop_migration_reconciliation_rstate(old_it); + } + } + // create new state if needed + if (new_maybe_metadata) { + const auto& new_metadata = new_maybe_metadata->get(); + auto sought_state = new_metadata.next_replica_state(); + if (sought_state.has_value()) { + vlog( + dm_log.debug, "creating migration {} reconciliation state", id); + auto new_it = _migration_states.emplace_hint( + old_it, id, sought_state.value()); + co_await reconcile_migration(new_it->second, new_metadata); + need_wakeup = true; + } + } + // delete old advance requests + if (auto it = _advance_requests.find(id); it != _advance_requests.end()) { + if (!new_state || it->second.sought_state <= new_state) { + _advance_requests.erase(it); + } + } + + if (_is_coordinator && need_wakeup) { + wakeup(); + } +} + +ss::future<> backend::process_delta(cluster::topic_table_delta&& delta) { + vlog(dm_log.debug, "processing topic table delta={}", delta); + model::topic_namespace nt{delta.ntp.ns, delta.ntp.tp.topic}; + auto it = _topic_migration_map.find(nt); + if (it == _topic_migration_map.end()) { + co_return; + } + auto migration_id = it->second; + + // coordination + vassert( + delta.type == topic_table_delta_type::replicas_updated + || delta.type == topic_table_delta_type::disabled_flag_updated, + "topic {} altered with topic_table_delta_type={} during " + "migration {}", + nt, + delta.type, + migration_id); + auto& mrstate = _migration_states.find(migration_id)->second; + auto& tstate = mrstate.outstanding_topics[nt]; + clear_tstate_belongings(nt, tstate); + tstate.outstanding_partitions.clear(); + // We potentially re-enqueue an already coordinated partition here. + // The first RPC reply will clear it. + co_await reconcile_topic( + nt, tstate, migration_id, mrstate.sought_state, false); + + // local work + if (has_local_replica(delta.ntp)) { + _work_states[nt].try_emplace( + delta.ntp.tp.partition, + migration_id, + _migration_states.find(migration_id)->second.sought_state); + } else { + auto topic_work_it = _work_states.find(nt); + if (topic_work_it != _work_states.end()) { + auto& topic_work_state = topic_work_it->second; + auto rwstate_it = topic_work_state.find(delta.ntp.tp.partition); + if (rwstate_it != topic_work_state.end()) { + auto& rwstate = rwstate_it->second; + if (rwstate.shard) { + stop_partition_work(delta.ntp, rwstate); + } + topic_work_state.erase(rwstate_it); + if (topic_work_state.empty()) { + _work_states.erase(topic_work_it); + } + } + } + } +} + +void backend::handle_shard_update( + const model::ntp& ntp, raft::group_id, std::optional shard) { + if (auto maybe_rwstate = get_replica_work_state(ntp)) { + auto& rwstate = maybe_rwstate->get(); + if (rwstate.status == migrated_replica_status::can_run) { + update_partition_shard(ntp, rwstate, shard); + } + } +} + +ss::future +backend::check_ntp_states_locally(check_ntp_states_request&& req) { + vlog(dm_log.debug, "processing node request {}", req); + check_ntp_states_reply reply; + for (const auto& ntp_req : req.sought_states) { + vlog( + dm_log.trace, + "received an RPC to promote ntp {} to state {} for migration {}", + ntp_req.ntp, + ntp_req.state, + ntp_req.migration); + // due to async notification processing we may get fresher state + // than we have in rwstate; this is fine + const auto maybe_migration = _table.get_migration(ntp_req.migration); + if (!maybe_migration) { + // migration either not yet there or gone, and we cannot tell + // for sure => no reply + vlog( + dm_log.trace, + "migration {} not found, ignoring", + ntp_req.migration); + continue; + } + + const auto& metadata = maybe_migration->get(); + if (metadata.state >= ntp_req.state) { + vlog( + dm_log.trace, + "migration {} already in state {}, no partition work needed", + ntp_req.migration, + metadata.state); + // report progress migration-wise, whether or not made by us + reply.actual_states.push_back( + {.ntp = ntp_req.ntp, + .migration = metadata.id, + .state = metadata.state}); + continue; + } + + auto maybe_rwstate = get_replica_work_state(ntp_req.ntp); + if (!maybe_rwstate) { + vlog( + dm_log.warn, + "migration_id={} got RPC to move ntp {} to state {}, but " + "missing " + "partition state for it", + ntp_req.migration, + ntp_req.ntp, + ntp_req.state); + continue; + } + auto& rwstate = maybe_rwstate->get(); + if (ntp_req.state != rwstate.sought_state) { + vlog( + dm_log.warn, + "migration_id={} got RPC to move ntp {} to state {}, but in " + "raft0 its desired state is {}, ignoring", + ntp_req.migration, + ntp_req.ntp, + ntp_req.state, + metadata.state); + continue; + } + vlog( + dm_log.trace, + "migration_id={} got both RPC and raft0 message to move ntp {} " + "to " + "state {}, replica state is {}", + ntp_req.migration, + ntp_req.ntp, + ntp_req.state, + rwstate.status); + // raft0 and RPC agree => time to do it! + switch (rwstate.status) { + case migrated_replica_status::waiting_for_rpc: + rwstate.status = migrated_replica_status::can_run; + [[fallthrough]]; + case migrated_replica_status::can_run: { + auto new_shard = _shard_table.shard_for(ntp_req.ntp); + update_partition_shard(ntp_req.ntp, rwstate, new_shard); + } break; + case migrated_replica_status::done: + reply.actual_states.push_back( + {.ntp = ntp_req.ntp, + .migration = metadata.id, + .state = ntp_req.state}); + } + } + + vlog(dm_log.debug, "node request reply: {}", reply); + return ssx::now(std::move(reply)); +} + +void backend::update_partition_shard( + const model::ntp& ntp, + replica_work_state& rwstate, + std::optional new_shard) { + vlog( + dm_log.trace, + "for ntp {} for migration {} seeking state {} updating shard: {} => " + "{}", + ntp, + rwstate.migration_id, + rwstate.sought_state, + rwstate.shard, + new_shard); + if (new_shard != rwstate.shard) { + if (rwstate.shard) { + stop_partition_work(ntp, rwstate); + } + rwstate.shard = new_shard; + if (new_shard) { + start_partition_work(ntp, rwstate); + } + } +} + +void backend::clear_tstate_belongings( + const model::topic_namespace& nt, const topic_reconciliation_state& tstate) { + const auto& partitions = tstate.outstanding_partitions; + for (const auto& [partition, nodes] : partitions) { + for (const model::node_id& node : nodes) { + auto ns_it = _node_states.find(node); + ns_it->second.erase({nt.ns, nt.tp, partition}); + if (ns_it->second.empty()) { + _nodes_to_retry.erase(node); + _node_states.erase(ns_it); + } + } + } +} + +void backend::drop_migration_reconciliation_rstate( + migration_reconciliation_states_t::const_iterator rs_it) { + const auto& topics = rs_it->second.outstanding_topics; + for (const auto& [nt, tstate] : topics) { + clear_tstate_belongings(nt, tstate); + _work_states.erase(nt); + _topic_migration_map.erase(nt); + } + _migration_states.erase(rs_it); +} + +ss::future<> backend::reconcile_topic( + const model::topic_namespace& nt, + topic_reconciliation_state& tstate, + id migration, + state sought_state, + bool schedule_local_work) { + if (!schedule_local_work && !_is_coordinator) { + vlog( + dm_log.debug, + "not tracking topic {} transition towards state {} as part of " + "migration {}", + nt, + sought_state, + migration); + co_return; + } + vlog( + dm_log.debug, + "tracking topic {} transition towards state {} as part of " + "migration {}, schedule_local_work={}, _is_coordinator={}", + nt, + sought_state, + migration, + schedule_local_work, + _is_coordinator); + auto maybe_assignments = _topic_table.get_topic_assignments(nt); + if (!maybe_assignments) { + co_return; + } + auto assignments = *maybe_assignments | std::views::values; + auto now = model::timeout_clock::now(); + co_await ssx::async_for_each( + assignments, + [this, nt, &tstate, sought_state, migration, now, schedule_local_work]( + const auto& assignment) { + model::ntp ntp{nt.ns, nt.tp, assignment.id}; + auto nodes = assignment.replicas + | std::views::transform(&model::broker_shard::node_id); + if (_is_coordinator) { + auto [it, ins] = tstate.outstanding_partitions.emplace( + std::piecewise_construct, + std::tuple{assignment.id}, + std::tuple{nodes.begin(), nodes.end()}); + vassert( + ins, + "tried to repeatedly track partition {} " + "as part of migration {}", + ntp, + migration); + } + for (const auto& node_id : nodes) { + if (_is_coordinator) { + auto [it, ins] = _node_states[node_id].emplace( + ntp, migration); + vassert( + ins, + "tried to track partition {} on node {} as part of " + "migration {}, while it is already tracked as part " + "of migration {}", + ntp, + node_id, + migration, + it->second); + _nodes_to_retry.insert_or_assign(node_id, now); + } + if (schedule_local_work && _self == node_id) { + vlog( + dm_log.debug, + "tracking ntp {} transition towards state {} as part " + "of " + "migration {}", + ntp, + sought_state, + migration); + auto& topic_work_state = _work_states[nt]; + auto [it, _] = topic_work_state.try_emplace( + assignment.id, migration, sought_state); + auto& rwstate = it->second; + if ( + rwstate.sought_state != sought_state + || rwstate.migration_id != migration) { + if (it->second.shard) { + stop_partition_work(ntp, rwstate); + } + rwstate = {migration, sought_state}; + } + } + } + }); +} + +ss::future<> backend::reconcile_migration( + migration_reconciliation_state& mrstate, const migration_metadata& metadata) { + vlog( + dm_log.debug, + "tracking migration {} transition towards state {}", + metadata.id, + mrstate.sought_state); + co_await std::visit( + [this, &metadata, &mrstate](const auto& migration) mutable { + return ss::do_with( + migration.topic_nts(), + [this, &metadata, &mrstate](const auto& nts) { + return ssx::async_for_each( + nts, + [this, &metadata, &mrstate]( + const model::topic_namespace& nt) { + auto& tstate = mrstate.outstanding_topics[nt]; + _topic_migration_map.emplace(nt, metadata.id); + return reconcile_topic( + nt, tstate, metadata.id, mrstate.sought_state, true); + }); + }); + }, + metadata.migration); +} + +std::optional> +backend::get_replica_work_state(const model::ntp& ntp) { + model::topic_namespace nt{ntp.ns, ntp.tp.topic}; + if (auto it = _work_states.find(nt); it != _work_states.end()) { + auto& topic_work_state = it->second; + auto rwstate_it = topic_work_state.find(ntp.tp.partition); + if (rwstate_it != topic_work_state.end()) { + return rwstate_it->second; + } + } + return std::nullopt; +} + +void backend::start_partition_work( + const model::ntp& ntp, const backend::replica_work_state& rwstate) { + vlog( + dm_log.trace, + "while working on migration {}, asking worker on shard " + "{} to advance ntp {} to state {}; tmp: node_id={}, ntp_hash%3={}", + rwstate.migration_id, + rwstate.shard, + ntp, + rwstate.sought_state, + _self, + std::hash()(ntp) % 3); + ssx::spawn_with_gate(_gate, [this, &ntp, &rwstate]() mutable { + return _worker + .invoke_on( + *rwstate.shard, + &worker::perform_partition_work, + model::ntp{ntp}, + rwstate.migration_id, + rwstate.sought_state, + _self % 3 == std::hash()(ntp) % 3) + .then([this, ntp = ntp, rwstate](errc ec) mutable { + if (ec == errc::success) { + vlog( + dm_log.trace, + "as part of migration {} worker on shard {} has " + "advanced " + "ntp {} to state {}", + rwstate.migration_id, + rwstate.shard, + ntp, + rwstate.sought_state); + on_partition_work_completed( + std::move(ntp), rwstate.migration_id, rwstate.sought_state); + } else { + // worker should always retry unless we instructed + // it to abort or it is shutting down + vlog( + dm_log.warn, + "while working on migration {} worker on shard " + "{} stopped trying to advance ntp {} to state {}", + rwstate.migration_id, + rwstate.shard, + std::move(ntp), + rwstate.sought_state); + } + }); + }); +} + +void backend::stop_partition_work( + const model::ntp& ntp, const backend::replica_work_state& rwstate) { + vlog( + dm_log.info, + "while working on migration {}, asking worker on shard " + "{} to stop trying to advance ntp {} to state {}", + rwstate.migration_id, + rwstate.shard, + ntp, + rwstate.sought_state); + ssx::spawn_with_gate(_gate, [this, &rwstate, &ntp]() { + return _worker.invoke_on( + *rwstate.shard, &worker::abort_partition_work, model::ntp{ntp}); + }); +} + +void backend::on_partition_work_completed( + model::ntp&& ntp, id migration, state state) { + auto maybe_rwstate = get_replica_work_state(ntp); + if (!maybe_rwstate) { + return; + } + auto& rwstate = maybe_rwstate->get(); + if (rwstate.migration_id == migration && rwstate.sought_state == state) { + rwstate.status = migrated_replica_status::done; + rwstate.shard = std::nullopt; + } +} + +bool backend::has_local_replica(const model::ntp& ntp) { + auto maybe_assignment = _topic_table.get_partition_assignment(ntp); + if (!maybe_assignment) { + return false; + } + for (const auto& replica : maybe_assignment->replicas) { + if (_self == replica.node_id) { + return true; + } + } + return false; } } // namespace cluster::data_migrations diff --git a/src/v/cluster/data_migration_backend.h b/src/v/cluster/data_migration_backend.h index 30960f1307d5d..13a65a763968d 100644 --- a/src/v/cluster/data_migration_backend.h +++ b/src/v/cluster/data_migration_backend.h @@ -10,32 +10,185 @@ */ #pragma once #include "cluster/data_migration_table.h" +#include "cluster/shard_table.h" #include "container/chunked_hash_map.h" #include "data_migration_types.h" #include "fwd.h" +#include "ssx/semaphore.h" +#include "utils/mutex.h" #include +#include namespace cluster::data_migrations { +/* + * Cluster-wide coordinator for migrations, + * as well as node coordinator for local partition-specific actions + */ class backend { public: - backend(migrations_table& table, frontend& frontend, ss::abort_source&); + backend( + migrations_table& table, + frontend& frontend, + ss::sharded& worker, + partition_leaders_table& leaders_table, + topic_table& topic_table, + shard_table& shard_table, + ss::abort_source& as); - ss::future<> start(); + void start(); ss::future<> stop(); private: - struct reconciliation_state {}; - void handle_migration_update(id id); + struct topic_reconciliation_state { + chunked_hash_map> + outstanding_partitions; + }; + struct migration_reconciliation_state { + explicit migration_reconciliation_state(state sought_state) + : sought_state(sought_state){}; + state sought_state; + chunked_hash_map + outstanding_topics; + }; + using migration_reconciliation_states_t + = absl::flat_hash_map; - ss::future<> reconcile_data_migration(id id); + struct replica_work_state { + id migration_id; + state sought_state; + // shard may only be assigned if replica_status is can_run + std::optional shard; + migrated_replica_status status + = migrated_replica_status::waiting_for_rpc; + replica_work_state(id migration_id, state sought_state) + : migration_id(migration_id) + , sought_state(sought_state) {} + }; - chunked_hash_map _states; - ss::gate _gate; +private: + /* loop management */ + ss::future<> loop_once(); + ss::future<> work_once(); + void wakeup(); + + /* event handlers outside main loop */ + ss::future<> handle_raft0_leadership_update(); + ss::future<> handle_migration_update(id id); + void handle_shard_update( + const model::ntp& ntp, raft::group_id, std::optional shard); + + /* RPC and raft0 actions */ + ss::future<> send_rpc(model::node_id node_id); + ss::future + check_ntp_states_locally(check_ntp_states_request&& req); + void to_advance(id migration_id, state sought_state); + void spawn_advances(); + + /* communication with workers */ + void start_partition_work( + const model::ntp& ntp, const replica_work_state& rwstate); + void stop_partition_work( + const model::ntp& ntp, const replica_work_state& rwstate); + void + on_partition_work_completed(model::ntp&& ntp, id migration, state state); + + /* deferred event handlers */ + ss::future<> process_delta(cluster::topic_table_delta&& delta); + + /* helpers */ + void update_partition_shard( + const model::ntp& ntp, + replica_work_state& rwstate, + std::optional new_shard); + void mark_migration_step_done_for_ntp( + migration_reconciliation_state& rs, const model::ntp& ntp); + void drop_migration_reconciliation_rstate( + migration_reconciliation_states_t::const_iterator rs_it); + void clear_tstate_belongings( + const model::topic_namespace& nt, + const topic_reconciliation_state& tstate); + + ss::future<> reconcile_migration( + migration_reconciliation_state& mrstate, + const migration_metadata& metadata); + + ss::future<> reconcile_topic( + const model::topic_namespace& nt, + topic_reconciliation_state& tstate, + id migration, + state sought_state, + bool schedule_local_work); + + std::optional> + get_replica_work_state(const model::ntp& ntp); + bool has_local_replica(const model::ntp& ntp); + + /* + * Reconciliation-related data. + * + * When we are not the coordinator, _mrstates stores sought states and + * topics only, but no partititons, _nstates and _nodes_to_retry are + * empty + * + * When we are the coordinator: + * - _mrstates and _nstates store the same set of migration-ntp + * combinations. + * - For each node there is no more than one RPC in flight at a time. + * - Nodes in _nstates = nodes in _nodes_to_retry ⊔ nodes of in-flight + * RPCs. + * + * - _advance_requests is only modified by the synchronous part of + * work_cycle + * - _mrstates, _nstates and _nodes_to_retry are only modified by the + * synchronous part of work_cycle and by handle_migration_update + */ + migration_reconciliation_states_t _migration_states; + // reverse map for topics in mrstates + using topic_migration_map_t = chunked_hash_map; + topic_migration_map_t _topic_migration_map; + using node_state = chunked_hash_map; + chunked_hash_map _node_states; + using deadline_t = model::timeout_clock::time_point; + chunked_hash_map _nodes_to_retry; + struct advance_info { + state sought_state; + bool sent = false; + explicit advance_info(state sought_state) + : sought_state(sought_state) {} + }; + absl::flat_hash_map _advance_requests; + chunked_vector _unprocessed_deltas; + + /* Node-local data */ + using topic_work_state_t + = chunked_hash_map; + chunked_hash_map _work_states; + + chunked_hash_map _rpc_responses; + + model::node_id _self; migrations_table& _table; frontend& _frontend; + ss::sharded& _worker; + partition_leaders_table& _leaders_table; + topic_table& _topic_table; + shard_table& _shard_table; ss::abort_source& _as; - migrations_table::notification_id _id; + + ss::gate _gate; + ssx::semaphore _sem{0, "c/data-migration-be"}; + mutex _mutex{"c/data-migration-be::lock"}; + ss::timer _timer{[this]() { wakeup(); }}; + + bool _is_raft0_leader; + bool _is_coordinator; + migrations_table::notification_id _table_notification_id; + cluster::notification_id_type _plt_raft0_leadership_notification_id; + cluster::notification_id_type _topic_table_notification_id; + cluster::notification_id_type _shard_notification_id; + + friend irpc_frontend; }; } // namespace cluster::data_migrations diff --git a/src/v/cluster/data_migration_frontend.cc b/src/v/cluster/data_migration_frontend.cc index 0d134859ceac6..30b79a2bc5e95 100644 --- a/src/v/cluster/data_migration_frontend.cc +++ b/src/v/cluster/data_migration_frontend.cc @@ -214,6 +214,29 @@ frontend::remove_migration(id id, can_dispatch_to_leader can_dispatch) { }); } +ss::future frontend::check_ntp_states_on_foreign_node( + model::node_id node, check_ntp_states_request&& req) { + vlog(dm_log.debug, "dispatching node request {} to node {}", req, node); + + return _connections.local() + .with_node_client( + _self, + ss::this_shard_id(), + node, + _operation_timeout, + [req = std::move(req), + this](data_migrations_client_protocol client) mutable { + return client + .check_ntp_states( + std::move(req), rpc::client_opts(_operation_timeout)) + .then(&rpc::get_ctx_data); + }) + .then([](result res) { + return res.has_value() ? std::move(res.assume_value()) + : check_ntp_states_reply{}; + }); +} + ss::future> frontend::do_create_migration(data_migration migration) { validate_migration_shard(); auto ec = co_await insert_barrier(); @@ -287,6 +310,7 @@ frontend::do_update_migration_state(id id, state state) { validate_migration_shard(); auto ec = co_await insert_barrier(); if (ec) { + vlog(dm_log.warn, "failed waiting for barrier: {}", ec); co_return ec; } /** @@ -294,10 +318,17 @@ frontend::do_update_migration_state(id id, state state) { */ auto migration = _table.get_migration(id); if (!migration) { + vlog(dm_log.warn, "migration {} id not found", id); co_return errc::data_migration_not_exists; } - if (!migrations_table::is_valid_state_transition( - migration.value().get().state, state)) { + auto cur_state = migration.value().get().state; + if (!migrations_table::is_valid_state_transition(cur_state, state)) { + vlog( + dm_log.warn, + "migration {} cannot be transitioned from state {} to {}", + id, + state, + cur_state); co_return errc::invalid_data_migration_state; } ec = co_await replicate_and_wait( @@ -308,8 +339,18 @@ frontend::do_update_migration_state(id id, state state) { _operation_timeout + model::timeout_clock::now()); if (ec) { + vlog( + dm_log.warn, + "failed to send update_data_migration_state_cmd: {}", + ec); co_return ec; } + vlog( + dm_log.debug, + "successfully sent migration {} transition request from state {} to {}", + id, + cur_state, + state); co_return errc::success; } diff --git a/src/v/cluster/data_migration_frontend.h b/src/v/cluster/data_migration_frontend.h index fccf1c83aa456..6dc302ea3c95c 100644 --- a/src/v/cluster/data_migration_frontend.h +++ b/src/v/cluster/data_migration_frontend.h @@ -43,6 +43,8 @@ class frontend : public ss::peering_sharded_service { ss::future remove_migration( id, can_dispatch_to_leader dispatch = can_dispatch_to_leader::yes); + ss::future check_ntp_states_on_foreign_node( + model::node_id node, check_ntp_states_request&& req); ss::future> list_migrations(); private: @@ -54,6 +56,7 @@ class frontend : public ss::peering_sharded_service { ss::future do_remove_migration(id); ss::future insert_barrier(); + template< typename Request, typename Reply, diff --git a/src/v/cluster/data_migration_irpc_frontend.cc b/src/v/cluster/data_migration_irpc_frontend.cc new file mode 100644 index 0000000000000..f7367119da1cc --- /dev/null +++ b/src/v/cluster/data_migration_irpc_frontend.cc @@ -0,0 +1,36 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#include "cluster/data_migration_irpc_frontend.h" + +#include "cluster/data_migration_backend.h" +#include "features/feature_table.h" + +#include + +namespace cluster::data_migrations { + +irpc_frontend::irpc_frontend( + ss::sharded& features, + ssx::single_sharded& backend) + : _features(features) + , _backend(backend) {} + +ss::future +irpc_frontend::check_ntp_states(check_ntp_states_request&& req) { + if (!_features.local().is_active(features::feature::data_migrations)) { + return ssx::now({}); + } + + return _backend.invoke_on_instance( + &backend::check_ntp_states_locally, std::move(req)); +} + +} // namespace cluster::data_migrations diff --git a/src/v/cluster/data_migration_irpc_frontend.h b/src/v/cluster/data_migration_irpc_frontend.h new file mode 100644 index 0000000000000..20a5855cb6c87 --- /dev/null +++ b/src/v/cluster/data_migration_irpc_frontend.h @@ -0,0 +1,35 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#pragma once + +#include "cluster/data_migration_types.h" +#include "cluster/fwd.h" +#include "features/fwd.h" +#include "ssx/single_sharded.h" + +#include + +namespace cluster::data_migrations { + +class irpc_frontend : public ss::peering_sharded_service { +public: + irpc_frontend( + ss::sharded&, ssx::single_sharded&); + + ss::future + check_ntp_states(check_ntp_states_request&& req); + +private: + ss::sharded& _features; + ssx::single_sharded& _backend; +}; + +} // namespace cluster::data_migrations diff --git a/src/v/cluster/data_migration_rpc.json b/src/v/cluster/data_migration_rpc.json index e80c902dafff5..455a08202e0ad 100644 --- a/src/v/cluster/data_migration_rpc.json +++ b/src/v/cluster/data_migration_rpc.json @@ -19,6 +19,11 @@ "name": "remove_migration", "input_type": "remove_migration_request", "output_type": "remove_migration_reply" + }, + { + "name": "check_ntp_states", + "input_type": "check_ntp_states_request", + "output_type": "check_ntp_states_reply" } ] } \ No newline at end of file diff --git a/src/v/cluster/data_migration_service_handler.cc b/src/v/cluster/data_migration_service_handler.cc index 41af2c6c4c6a8..a9c868d133334 100644 --- a/src/v/cluster/data_migration_service_handler.cc +++ b/src/v/cluster/data_migration_service_handler.cc @@ -19,9 +19,11 @@ namespace cluster::data_migrations { service_handler::service_handler( ss::scheduling_group sc, ss::smp_service_group ssg, - ss::sharded& frontend) + ss::sharded& frontend, + ss::sharded& irpc_frontend) : data_migrations_service(sc, ssg) - , _frontend(frontend) {} + , _frontend(frontend) + , _irpc_frontend(irpc_frontend) {} ss::future service_handler::create_migration( create_migration_request request, ::rpc::streaming_context&) { @@ -60,6 +62,11 @@ ss::future service_handler::remove_migration( }); } +ss::future service_handler::check_ntp_states( + check_ntp_states_request request, ::rpc::streaming_context&) { + return _irpc_frontend.local().check_ntp_states(std::move(request)); +} + cluster::errc service_handler::map_error_code(std::error_code ec) { if (ec.category() == cluster::error_category()) { return cluster::errc(ec.value()); diff --git a/src/v/cluster/data_migration_service_handler.h b/src/v/cluster/data_migration_service_handler.h index 78b821801fb0e..ed56fdbfc5e19 100644 --- a/src/v/cluster/data_migration_service_handler.h +++ b/src/v/cluster/data_migration_service_handler.h @@ -13,6 +13,7 @@ #include "cluster/data_migration_rpc_service.h" #include "cluster/errc.h" #include "cluster/fwd.h" +#include "data_migration_irpc_frontend.h" #include @@ -21,7 +22,10 @@ namespace cluster::data_migrations { class service_handler : public data_migrations_service { public: explicit service_handler( - ss::scheduling_group, ss::smp_service_group, ss::sharded&); + ss::scheduling_group, + ss::smp_service_group, + ss::sharded&, + ss::sharded&); ss::future create_migration(create_migration_request, ::rpc::streaming_context&) final; @@ -32,10 +36,14 @@ class service_handler : public data_migrations_service { ss::future remove_migration(remove_migration_request, ::rpc::streaming_context&) final; + ss::future + check_ntp_states(check_ntp_states_request, ::rpc::streaming_context&) final; + private: static cluster::errc map_error_code(std::error_code); ss::sharded& _frontend; + ss::sharded& _irpc_frontend; }; } // namespace cluster::data_migrations diff --git a/src/v/cluster/data_migration_table.cc b/src/v/cluster/data_migration_table.cc index 8c6371ae07c70..8f0f63af4fdbc 100644 --- a/src/v/cluster/data_migration_table.cc +++ b/src/v/cluster/data_migration_table.cc @@ -79,8 +79,30 @@ ss::future<> migrations_table::apply_snapshot( model::offset, const controller_snapshot& snapshot) { _next_id = snapshot.data_migrations.next_id; _migrations.reserve(snapshot.data_migrations.migrations.size()); - for (auto& [id, migration] : snapshot.data_migrations.migrations) { - auto [it, _] = _migrations.emplace(id, migration.copy()); + + auto it = _migrations.cbegin(); + while (it != _migrations.cend()) { + auto prev = it++; + if (!snapshot.data_migrations.migrations.contains(prev->first)) { + _migrations.erase(prev); + _callbacks.notify(prev->first); + co_await _resources.invoke_on_all( + [&meta = prev->second](migrated_resources& resources) { + resources.remove_migration(meta); + }); + } + } + + for (const auto& [id, migration] : snapshot.data_migrations.migrations) { + auto it = _migrations.find(id); + if (it == _migrations.end()) { + it = _migrations.emplace(id, migration.copy()).first; + } else { + if (it->second.state == migration.state) { + continue; + } + it->second.state = migration.state; + } _callbacks.notify(id); co_await _resources.invoke_on_all( [&meta = it->second](migrated_resources& resources) { @@ -222,6 +244,12 @@ migrations_table::apply(update_data_migration_state_cmd cmd) { requested_state); auto it = _migrations.find(id); if (it == _migrations.end()) { + vlog( + dm_log.warn, + "can not update migration {} state to {}, migration not " + "found", + id, + requested_state); co_return errc::data_migration_not_exists; } diff --git a/src/v/cluster/data_migration_types.cc b/src/v/cluster/data_migration_types.cc index d66ec2e899358..5f65b1fa79cfe 100644 --- a/src/v/cluster/data_migration_types.cc +++ b/src/v/cluster/data_migration_types.cc @@ -36,12 +36,30 @@ data_migration copy_migration(const data_migration& migration) { migration); } +inbound_migration inbound_migration::copy() const { + return inbound_migration{.topics = topics.copy(), .groups = groups.copy()}; +} + +std::optional inbound_migration::next_replica_state(state state) { + if (state == state::preparing) { + return state::prepared; + }; + return std::nullopt; +} + outbound_migration outbound_migration::copy() const { return outbound_migration{ .topics = topics.copy(), .groups = groups.copy(), .copy_to = copy_to}; } -inbound_migration inbound_migration::copy() const { - return inbound_migration{.topics = topics.copy(), .groups = groups.copy()}; + +std::optional outbound_migration::next_replica_state(state state) { + if (state == state::preparing) { + return state::prepared; + }; + if (state == state::executing) { + return state::executed; + }; + return std::nullopt; } std::ostream& operator<<(std::ostream& o, state state) { @@ -65,6 +83,17 @@ std::ostream& operator<<(std::ostream& o, state state) { } } +std::ostream& operator<<(std::ostream& o, migrated_replica_status status) { + switch (status) { + case migrated_replica_status::waiting_for_rpc: + return o << "waiting_for_rpc"; + case migrated_replica_status::can_run: + return o << "can_run"; + case migrated_replica_status::done: + return o << "done"; + } +} + std::ostream& operator<<(std::ostream& o, migrated_resource_state state) { switch (state) { case migrated_resource_state::non_restricted: @@ -114,6 +143,14 @@ std::ostream& operator<<(std::ostream& o, const outbound_migration& dm) { return o; } +std::optional migration_metadata::next_replica_state() const { + return std::visit( + [this](const auto& migration) { + return migration.next_replica_state(state); + }, + migration); +} + std::ostream& operator<<(std::ostream& o, const migration_metadata& m) { fmt::print( o, @@ -124,6 +161,16 @@ std::ostream& operator<<(std::ostream& o, const migration_metadata& m) { return o; } +std::ostream& operator<<(std::ostream& o, const data_migration_ntp_state& r) { + fmt::print( + o, + "{{ntp: {}, migration: {}, sought_state: {}}}", + r.ntp, + r.migration, + r.state); + return o; +} + std::ostream& operator<<(std::ostream& o, const create_migration_cmd_data& d) { fmt::print( o, "{{id: {}, migration: {}}}", d.id, print_migration(d.migration)); @@ -173,4 +220,14 @@ std::ostream& operator<<(std::ostream& o, const remove_migration_reply& r) { return o; } +std::ostream& operator<<(std::ostream& o, const check_ntp_states_request& r) { + fmt::print(o, "{{sought_states: {}}}", r.sought_states); + return o; +} + +std::ostream& operator<<(std::ostream& o, const check_ntp_states_reply& r) { + fmt::print(o, "{{actual_states: {}}}", r.actual_states); + return o; +} + } // namespace cluster::data_migrations diff --git a/src/v/cluster/data_migration_types.h b/src/v/cluster/data_migration_types.h index 4f6e97397724c..5e8907453c166 100644 --- a/src/v/cluster/data_migration_types.h +++ b/src/v/cluster/data_migration_types.h @@ -19,6 +19,8 @@ #include +#include + namespace cluster::data_migrations { /** * Identifier of data migration, the identifier is guaranteed to be unique @@ -65,6 +67,18 @@ enum class state { }; std::ostream& operator<<(std::ostream& o, state); +/** + * For each migration state transition that requires work on partitions + * a partition replica has the following lifecycle: + * - waiting_for_rpc: work requested by raft0, shard not assigned; + * - can_run: seconded by RPC request, shard may be assigned to work on; + * - done: shard completed work and unassigned, done. + * Unless (or until) the shard is the partition leader, it gets stuck + * in can_run status. + */ +enum class migrated_replica_status { waiting_for_rpc, can_run, done }; +std::ostream& operator<<(std::ostream& o, migrated_replica_status); + /** * State of migrated resource i.e. either topic or consumer group, when resource * is blocked all the writes should be disabled, when it is restricted a @@ -151,11 +165,19 @@ struct inbound_migration inbound_migration copy() const; + static std::optional next_replica_state(state state); + auto serde_fields() { return std::tie(topics, groups); } friend bool operator==(const inbound_migration&, const inbound_migration&) = default; friend std::ostream& operator<<(std::ostream&, const inbound_migration&); + + auto topic_nts() const { + return std::as_const(topics) + | std::views::transform( + [](const inbound_topic& it) { return it.source_topic_name; }); + } }; /** @@ -192,11 +214,15 @@ struct outbound_migration outbound_migration copy() const; + static std::optional next_replica_state(state state); + auto serde_fields() { return std::tie(topics, groups, copy_to); } friend bool operator==(const outbound_migration&, const outbound_migration&) = default; friend std::ostream& operator<<(std::ostream&, const outbound_migration&); + + auto topic_nts() const { return std::as_const(topics) | std::views::all; } }; /** * Variant representing a migration. It can be either inbound or outbound data @@ -227,6 +253,8 @@ struct migration_metadata .id = id, .migration = copy_migration(migration), .state = state}; } + std::optional next_replica_state() const; + auto serde_fields() { return std::tie(id, migration, state); } friend bool operator==(const migration_metadata&, const migration_metadata&) @@ -235,6 +263,24 @@ struct migration_metadata friend std::ostream& operator<<(std::ostream&, const migration_metadata&); }; +struct data_migration_ntp_state + : serde::envelope< + data_migration_ntp_state, + serde::version<0>, + serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + using self = data_migration_ntp_state; + + model::ntp ntp; + id migration; + state state; + + auto serde_fields() { return std::tie(ntp, migration, state); } + + friend bool operator==(const self&, const self&) = default; + friend std::ostream& operator<<(std::ostream&, const self&); +}; + struct create_migration_cmd_data : serde::envelope< create_migration_cmd_data, @@ -384,4 +430,38 @@ struct remove_migration_reply operator<<(std::ostream&, const remove_migration_reply&); }; +struct check_ntp_states_request + : serde::envelope< + check_ntp_states_request, + serde::version<0>, + serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + using self = check_ntp_states_request; + + chunked_vector sought_states; + + auto serde_fields() { return std::tie(sought_states); } + + friend bool operator==(const self&, const self&) = default; + + friend std::ostream& operator<<(std::ostream&, const self&); +}; + +struct check_ntp_states_reply + : serde::envelope< + check_ntp_states_reply, + serde::version<0>, + serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + using self = check_ntp_states_reply; + + chunked_vector actual_states; + + auto serde_fields() { return std::tie(actual_states); } + + friend bool operator==(const self&, const self&) = default; + + friend std::ostream& operator<<(std::ostream&, const self&); +}; + } // namespace cluster::data_migrations diff --git a/src/v/cluster/data_migration_worker.cc b/src/v/cluster/data_migration_worker.cc new file mode 100644 index 0000000000000..02bbb60eb76e3 --- /dev/null +++ b/src/v/cluster/data_migration_worker.cc @@ -0,0 +1,58 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#include "cluster/data_migration_worker.h" + +#include "cluster/data_migration_types.h" +#include "partition_leaders_table.h" +#include "rpc/connection_cache.h" +#include "ssx/future-util.h" + +#include + +#include + +#include +#include + +namespace cluster::data_migrations { + +// TODO: add configuration property +worker::worker( + model::node_id self, + ss::sharded& leaders, + ss::sharded& as) + : _self(self) + , _leaders_table(leaders) + , _as(as) + , _operation_timeout(5s) {} + +ss::future worker::perform_partition_work( + model::ntp&& ntp, id migration, state sought_state, bool _tmp_wait_forever) { + // todo: subscribe to group_manager, wait until leader, perform actual work + std::ignore = std::tuple(std::move(ntp), migration, sought_state); + auto dur = rand() % 10 * 1s; + if (_tmp_wait_forever) { + dur += 100500s; + } + return ss::sleep(dur).then([]() { return errc::success; }); +} + +void worker::abort_partition_work(model::ntp&& ntp) { + // todo: at least abort it right when we are waiting for leadership + std::ignore = std::move(ntp); +} + +// if (_leaders_table.get_leader(ntp) == _self) { +// // todo: local action? +// // also do it somewhere else +// } + +} // namespace cluster::data_migrations diff --git a/src/v/cluster/data_migration_worker.h b/src/v/cluster/data_migration_worker.h new file mode 100644 index 0000000000000..f6298996261e0 --- /dev/null +++ b/src/v/cluster/data_migration_worker.h @@ -0,0 +1,58 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#pragma once + +#include "base/outcome.h" +#include "cluster/data_migration_types.h" +#include "cluster/fwd.h" +#include "errc.h" +#include "model/fundamental.h" + +#include +#include + +namespace cluster::data_migrations { +/* + * This service performs data migration operations on individual partitions + */ +class worker : public ss::peering_sharded_service { +public: + worker( + model::node_id self, + ss::sharded&, + ss::sharded&); + + /* + * Perform work necessary to transition an ntp to sought_state. Retries on + * most errors. Waits forever if our shard is not the leader for the ntp. + */ + ss::future perform_partition_work( + model::ntp&& ntp, + id migration, + state sought_state, + bool _tmp_wait_forever); + + /* + * Aborts requested work where possible (i.e. when we are not the leader or + * where we retry due to any error) + */ + void abort_partition_work(model::ntp&& ntp); + +private: + void handle_leadership_update(const model::ntp& ntp, bool is_leader); + + model::node_id _self; + ss::sharded& _leaders_table; + ss::sharded& _as; + std::chrono::milliseconds _operation_timeout; +}; + +} // namespace cluster::data_migrations diff --git a/src/v/cluster/fwd.h b/src/v/cluster/fwd.h index 7fcf26f279e26..b106b977707c1 100644 --- a/src/v/cluster/fwd.h +++ b/src/v/cluster/fwd.h @@ -83,9 +83,12 @@ class tm_stm; class rm_stm; namespace data_migrations { class migrated_resources; +class migration_frontend; +class worker; +class backend; class migrations_table; class frontend; -class backend; +class irpc_frontend; } // namespace data_migrations namespace tx { diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index ecfdbe0332a60..9bfb3fdeeb23d 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -2869,7 +2869,8 @@ void application::start_runtime_services( std::make_unique( sched_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), - std::ref(controller->get_data_migration_frontend()))); + std::ref(controller->get_data_migration_frontend()), + std::ref(controller->get_data_migration_irpc_frontend()))); s.add_services(std::move(runtime_services)); diff --git a/src/v/ssx/include/ssx/async_algorithm.h b/src/v/ssx/include/ssx/async_algorithm.h index e06fee81c200d..8ab55dde504ff 100644 --- a/src/v/ssx/include/ssx/async_algorithm.h +++ b/src/v/ssx/include/ssx/async_algorithm.h @@ -195,6 +195,37 @@ ss::future<> async_for_each(Iterator begin, Iterator end, Fn f) { detail::internal_counter{}, begin, end, std::move(f)); } +/** + * @brief Call f on every element, yielding occasionally. + * + * This is equivalent to std::for_each, except that the computational + * loop yields every Traits::interval (default 100) iterations in order + * to avoid reactor stalls. The returned future resolves when all elements + * have been processed. + * + * The function is taken by value. + * + * The container must remain live, and its begin and end iterators, as they were + * when called, must remain valid until the returned future resolves. + * + * @param container reference to a container + * @param f the function to call on each element + * @return ss::future<> a future which resolves when all elements have been + * processed + */ +template +requires requires(Container c, Fn fn) { + { ss::futurize_invoke(fn, *std::begin(c)) } -> std::same_as>; + std::end(c); +} +ss::future<> async_for_each(Container& container, Fn f) { + return async_for_each_fast( + detail::internal_counter{}, + container.begin(), + container.end(), + std::move(f)); +} + /** * @brief Call f on every element, yielding occasionally and accepting * an externally provided counter for yield control. diff --git a/src/v/ssx/include/ssx/single_sharded.h b/src/v/ssx/include/ssx/single_sharded.h index 235ff07074c9f..305e379a20bcd 100644 --- a/src/v/ssx/include/ssx/single_sharded.h +++ b/src/v/ssx/include/ssx/single_sharded.h @@ -19,6 +19,7 @@ #include #include +#include #include namespace ssx { @@ -152,11 +153,21 @@ class single_sharded : ss::sharded> { return base::invoke_on( _shard, options, - [func = std::forward(func)]( - maybe_service& maybe_service, Args&&... args) { - func(*maybe_service, std::forward(args)...); - }, - std::forward(args)...); + // After smp::sumbit_to stores this lambda in the queue and we return + // to the caller, any references provided may become dangling. So the + // lambda must keep parameters and function values, not references. + [func = std::forward(func), + // This is wild, as it moves even from lvalues. However, that's how + // sharded<>::invoke_on behaves, and we mimic it. + args_tup = std::tuple(std::move(args)...)]( + maybe_service& maybe_service) mutable { + return std::apply( + // move the captured func (and args below), as we use them once + std::move(func), + // caller responsible to make sure the service is still there + std::tuple_cat( + std::forward_as_tuple(*maybe_service), std::move(args_tup))); + }); } /// Invoke a callable on the instance of `Service`. diff --git a/src/v/ssx/tests/CMakeLists.txt b/src/v/ssx/tests/CMakeLists.txt index f233f4e278cd2..ce8d0f6d2829f 100644 --- a/src/v/ssx/tests/CMakeLists.txt +++ b/src/v/ssx/tests/CMakeLists.txt @@ -32,7 +32,7 @@ rp_test( SOURCES abort_source_test.cc sharded_ptr_test.cc - single_sharded.cc + single_sharded_test.cc LIBRARIES v::seastar_testing_main ARGS "-- -c 2" LABELS ssx diff --git a/src/v/ssx/tests/single_sharded.cc b/src/v/ssx/tests/single_sharded.cc deleted file mode 100644 index 60e3a3aa4e9d4..0000000000000 --- a/src/v/ssx/tests/single_sharded.cc +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2024 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -#include "ssx/single_sharded.h" - -#include -#include -#include -#include -#include - -#include - -#include - -struct counter { - int started = 0; - int called_foo = 0; - int called_bar = 0; - int stopped = 0; - ss::future<> stop() { return ss::make_ready_future(); } -}; - -struct single_service { - int member = 22; - counter& cntr; - single_service(counter& cntr, ss::sharded& cntrs, int a) - : cntr(cntr) { - BOOST_REQUIRE_EQUAL(&cntr, &cntrs.local()); - BOOST_REQUIRE_EQUAL(a, 1); - ++cntr.started; - } - void foo(ss::sharded& cntrs, int a, int&& b, int& c) { - BOOST_REQUIRE_EQUAL(&cntr, &cntrs.local()); - BOOST_REQUIRE_EQUAL(a, 1); - BOOST_REQUIRE_EQUAL(b, 2); - BOOST_REQUIRE_EQUAL(c, -3); - c = 3; - ++cntr.called_foo; - member = 23; - } - void bar(std::vector&& v, std::unique_ptr uptr) { - BOOST_REQUIRE_EQUAL(v.size(), 2); - BOOST_REQUIRE_EQUAL(bool(uptr), true); - ++cntr.called_bar; - } - ss::future<> stop() { - ++cntr.stopped; - return ss::make_ready_future(); - } -}; - -struct caller { - ssx::single_sharded& sngl; - ss::sharded& cntrs; - explicit caller( - ssx::single_sharded& sngl, ss::sharded& cntrs) - : sngl(sngl) - , cntrs(cntrs) {} - ss::future<> call_twice() { - co_await sngl.invoke_on_instance([this](single_service& sngl_inst) { - int c = -3; - sngl_inst.foo(cntrs, 1, 2, c); - BOOST_REQUIRE_EQUAL(c, 3); - }); - co_await sngl.invoke_on_instance( - [](single_service& sngl_inst, std::vector&& v) { - sngl_inst.bar(std::move(v), std::make_unique()); - }, - std::vector{0, 0}); - } - ss::future<> stop() { return ss::make_ready_future(); } -}; - -SEASTAR_THREAD_TEST_CASE(single_sharded) { - ss::shard_id the_shard = ss::smp::count - 1; - ss::shard_id wrong_shard = std::max(ss::shard_id{0}, the_shard - 1); - - ss::sharded counters; - ssx::single_sharded single; - ss::sharded callers; - - counters.start().get(); - single - .start_on( - the_shard, - ss::sharded_parameter( - [&counters]() { return std::ref(counters.local()); }), - std::ref(counters), - 1) - .get(); - callers.start(std::ref(single), std::ref(counters)).get(); - - callers.invoke_on_all([](caller& cllr) { return cllr.call_twice(); }).get(); - - ss::smp::submit_to(the_shard, [&single, &counters]() { - int c = -3; - single.local().foo(counters, 1, 2, c); - BOOST_REQUIRE_EQUAL(c, 3); - BOOST_REQUIRE_EQUAL(std::as_const(single).local().member, 23); - }).get(); - - if (the_shard != wrong_shard) { - BOOST_REQUIRE_THROW( - ss::smp::submit_to(wrong_shard, [&single]() { single.local(); }) - .get(), - ss::no_sharded_instance_exception); - BOOST_REQUIRE_THROW( - ss::smp::submit_to( - wrong_shard, [&single]() { std::as_const(single).local(); }) - .get(), - ss::no_sharded_instance_exception); - } - - callers.stop().get(); - single.stop().get(); - counters - .invoke_on_all([the_shard](counter cntr) { - bool on_the_shard = the_shard == ss::this_shard_id(); - BOOST_REQUIRE_EQUAL(cntr.started, on_the_shard ? 1 : 0); - BOOST_REQUIRE_EQUAL( - cntr.called_foo, on_the_shard ? ss::smp::count + 1 : 0); - BOOST_REQUIRE_EQUAL( - cntr.called_bar, on_the_shard ? ss::smp::count : 0); - BOOST_REQUIRE_EQUAL(cntr.stopped, on_the_shard ? 1 : 0); - }) - .get(); - counters.stop().get(); -} diff --git a/src/v/ssx/tests/single_sharded_test.cc b/src/v/ssx/tests/single_sharded_test.cc new file mode 100644 index 0000000000000..c86ca939051da --- /dev/null +++ b/src/v/ssx/tests/single_sharded_test.cc @@ -0,0 +1,204 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "ssx/future-util.h" +#include "ssx/single_sharded.h" + +#include +#include +#include +#include +#include + +#include + +#include +#include + +struct counter { + int started = 0; + int called_foo = 0; + int stopped = 0; + ss::future<> stop() { return ss::make_ready_future(); } +}; + +struct single_service { + int member = 22; + counter& cntr; + single_service(counter& cntr, ss::sharded& cntrs, int a) + : cntr(cntr) { + BOOST_REQUIRE_EQUAL(&cntr, &cntrs.local()); + BOOST_REQUIRE_EQUAL(a, 1); + ++cntr.started; + } + int foo( + ss::sharded& cntrs, + int a, + std::vector&& v, + std::unique_ptr uptr) { + BOOST_REQUIRE_EQUAL(&cntr, &cntrs.local()); + BOOST_REQUIRE_EQUAL(a, 1); + // make sure it hasn't accidentally been moved-from + BOOST_REQUIRE_EQUAL(v.size(), 2); + // same, but for an item that cannot be copied + BOOST_REQUIRE_EQUAL(bool(uptr), true); + ++cntr.called_foo; + member = 23; + return 42; + } + ss::future<> stop() { + ++cntr.stopped; + return ss::make_ready_future(); + } +}; + +struct caller { + ssx::single_sharded& sngl; + ss::sharded& cntrs; + explicit caller( + ssx::single_sharded& sngl, ss::sharded& cntrs) + : sngl(sngl) + , cntrs(cntrs) {} + ss::future<> call_thrice() { + std::unique_ptr> vec_uptr; + auto fut = ssx::now(1); + int ret; + + // pointer to method + vec_uptr = std::make_unique>(2, 0); + fut = sngl.invoke_on_instance( + &single_service::foo, + std::ref(cntrs), + 1, + std::move(*vec_uptr), + std::make_unique()); + vec_uptr = nullptr; + ret = co_await std::move(fut); + BOOST_REQUIRE_EQUAL(ret, 42); + + // rvalue ref to non-copiable lambda + auto mvbl_lambda = [ensure_nocopy = std::make_unique>()]( + single_service& ss, + ss::sharded& cntrs, + int a, + std::vector&& v, + std::unique_ptr&& ui) { + return ss.foo(cntrs, a, std::move(v), std::move(ui)); + }; + auto ml_uptr = std::make_unique( + std::move(mvbl_lambda)); + vec_uptr = std::make_unique>(2, 0); + fut = sngl.invoke_on_instance( + std::move(*ml_uptr), + std::ref(cntrs), + 1, + std::move(*vec_uptr), + std::make_unique()); + vec_uptr = nullptr; + ml_uptr = nullptr; + ret = co_await std::move(fut); + BOOST_REQUIRE_EQUAL(ret, 42); + + // lvalue reference to copyable lambda (sadly invoke_on copies) + auto cpbl_lambda = []( + single_service& ss, + ss::sharded& cntrs, + int a, + std::vector&& v, + std::unique_ptr&& ui) { + return ss.foo(cntrs, a, std::move(v), std::move(ui)); + }; + auto cl_uptr = std::make_unique( + std::move(cpbl_lambda)); + vec_uptr = std::make_unique>(2, 0); + fut = sngl.invoke_on_instance( + *cl_uptr, + std::ref(cntrs), + 1, + std::move(*vec_uptr), + std::make_unique()); + vec_uptr = nullptr; + ml_uptr = nullptr; + ret = co_await std::move(fut); + BOOST_REQUIRE_EQUAL(ret, 42); + } + ss::future<> stop() { return ss::make_ready_future(); } +}; + +SEASTAR_THREAD_TEST_CASE(single_sharded) { + ss::shard_id the_shard = ss::smp::count - 1; + + ss::sharded counters; + ssx::single_sharded single; + ss::sharded callers; + + counters.start().get(); + single + .start_on( + the_shard, + ss::sharded_parameter( + [&counters]() { return std::ref(counters.local()); }), + std::ref(counters), + 1) + .get(); + callers.start(std::ref(single), std::ref(counters)).get(); + + callers.invoke_on_all(&caller::call_thrice).get(); + + ss::smp::submit_to(the_shard, [&single, &counters]() { + single.local().foo( + counters, 1, std::vector{0, 0}, std::make_unique()); + BOOST_REQUIRE_EQUAL(std::as_const(single).local().member, 23); + }).get(); + + callers.stop().get(); + single.stop().get(); + counters + .invoke_on_all([the_shard](counter cntr) { + bool on_the_shard = the_shard == ss::this_shard_id(); + BOOST_REQUIRE_EQUAL(cntr.started, on_the_shard ? 1 : 0); + BOOST_REQUIRE_EQUAL( + cntr.called_foo, on_the_shard ? ss::smp::count * 3 + 1 : 0); + BOOST_REQUIRE_EQUAL(cntr.stopped, on_the_shard ? 1 : 0); + }) + .get(); + counters.stop().get(); +} + +SEASTAR_THREAD_TEST_CASE(single_sharded_wrong_shard) { + BOOST_REQUIRE(ss::smp::count > 1); + + ss::shard_id the_shard = ss::smp::count - 2; + ss::shard_id wrong_shard = ss::smp::count - 1; + + ss::sharded counters; + ssx::single_sharded single; + + counters.start().get(); + single + .start_on( + the_shard, + ss::sharded_parameter( + [&counters]() { return std::ref(counters.local()); }), + std::ref(counters), + 1) + .get(); + + BOOST_REQUIRE_THROW( + ss::smp::submit_to(wrong_shard, [&single]() { single.local(); }).get(), + ss::no_sharded_instance_exception); + BOOST_REQUIRE_THROW( + ss::smp::submit_to( + wrong_shard, [&single]() { std::as_const(single).local(); }) + .get(), + ss::no_sharded_instance_exception); + + single.stop().get(); + counters.stop().get(); +} diff --git a/tests/rptest/tests/data_migrations_api_test.py b/tests/rptest/tests/data_migrations_api_test.py index a8dc111acc55b..713a14b067736 100644 --- a/tests/rptest/tests/data_migrations_api_test.py +++ b/tests/rptest/tests/data_migrations_api_test.py @@ -37,7 +37,7 @@ def migration_in_state(): wait_until( migration_in_state, - timeout_sec=30, + timeout_sec=90, backoff_sec=1, err_msg=f"Failed waiting for migration {id} to reach {state} state" ) @@ -101,8 +101,20 @@ def test_creating_and_listing_migrations(self): admin.execute_data_migration_action(out_migration_id, MigrationAction.prepare) + self.logger.info('waiting for preparing') self.wait_for_migration_state(out_migration_id, 'preparing') - + self.logger.info('waiting for prepared') + self.wait_for_migration_state(out_migration_id, 'prepared') + admin.execute_data_migration_action(out_migration_id, + MigrationAction.execute) + self.logger.info('waiting for executing') + self.wait_for_migration_state(out_migration_id, 'executing') + self.wait_for_migration_state(out_migration_id, 'executed') admin.execute_data_migration_action(out_migration_id, - MigrationAction.cancel) - self.wait_for_migration_state(out_migration_id, 'canceling') + MigrationAction.finish) + self.wait_for_migration_state(out_migration_id, 'finished') + + # TODO: check unhappy scenarios like this + # admin.execute_data_migration_action(out_migration_id, + # MigrationAction.cancel) + # self.wait_for_migration_state(out_migration_id, 'canceling')