diff --git a/src/v/bytes/include/bytes/bytes.h b/src/v/bytes/include/bytes/bytes.h index 52e054d0ab991..3d958a7244b2a 100644 --- a/src/v/bytes/include/bytes/bytes.h +++ b/src/v/bytes/include/bytes/bytes.h @@ -104,6 +104,13 @@ inline iobuf bytes_to_iobuf(const bytes& in) { return out; } +inline iobuf bytes_to_iobuf(bytes_view in) { + iobuf out; + // NOLINTNEXTLINE + out.append(reinterpret_cast(in.data()), in.size()); + return out; +} + // NOLINTNEXTLINE(cert-dcl58-cpp): hash<> specialization namespace std { template<> diff --git a/src/v/cluster/cluster_utils.cc b/src/v/cluster/cluster_utils.cc index 2f1e98cb87419..6bf1e3446dc6f 100644 --- a/src/v/cluster/cluster_utils.cc +++ b/src/v/cluster/cluster_utils.cc @@ -237,12 +237,16 @@ std::optional placement_target_on_node( // expected shard is determined by the resulting assignment // (including cancellation effects). return shard_placement_target{ - log_revision, resulting_shard_on_node.value()}; + replicas_view.assignment.group, + log_revision, + resulting_shard_on_node.value()}; } else { // partition is moved away from this node, but we keep the original // replica until update is finished. return shard_placement_target{ - log_revision, orig_shard_on_node.value()}; + replicas_view.assignment.group, + log_revision, + orig_shard_on_node.value()}; } } else if (replicas_view.update) { // if partition appears on the node as a result of the update, create @@ -252,6 +256,7 @@ std::optional placement_target_on_node( replicas_view.update->get_target_replicas(), node); if (updated_shard_on_node) { return shard_placement_target{ + replicas_view.assignment.group, replicas_view.update->get_update_revision(), updated_shard_on_node.value()}; } diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 89ee02a0909dd..0ee6892687723 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -187,7 +187,10 @@ ss::future<> controller::wire_up() { std::ref(_partition_allocator), std::ref(_node_status_table)); }) - .then([this] { return _shard_placement.start(); }) + .then([this] { + return _shard_placement.start(ss::sharded_parameter( + [this] { return std::ref(_storage.local().kvs()); })); + }) .then([this] { _probe.start(); }); } @@ -438,8 +441,9 @@ ss::future<> controller::start( }) .then([this] { return _shard_balancer.start_single( - std::ref(_tp_state), std::ref(_shard_placement), + std::ref(_feature_table), + std::ref(_tp_state), std::ref(_backend)); }) .then( diff --git a/src/v/cluster/shard_balancer.cc b/src/v/cluster/shard_balancer.cc index e6ae21e68d012..46979fe912b63 100644 --- a/src/v/cluster/shard_balancer.cc +++ b/src/v/cluster/shard_balancer.cc @@ -14,15 +14,18 @@ #include "cluster/cluster_utils.h" #include "cluster/logger.h" #include "config/node_config.h" +#include "ssx/async_algorithm.h" namespace cluster { shard_balancer::shard_balancer( - ss::sharded& topics, ss::sharded& spt, + ss::sharded& features, + ss::sharded& topics, ss::sharded& cb) - : _topics(topics) - , _shard_placement(spt) + : _shard_placement(spt.local()) + , _features(features.local()) + , _topics(topics) , _controller_backend(cb) , _self(*config::node().node_id()) {} @@ -32,19 +35,81 @@ ss::future<> shard_balancer::start() { "method can only be invoked on shard {}", shard_id); + // We expect topic_table to remain unchanged throughout the method + // invocation because it is supposed to be called after local controller + // replay is finished but before we start getting new controller updates + // from the leader. auto tt_version = _topics.local().topics_map_revision(); - co_await _shard_placement.invoke_on_all([this](shard_placement_table& spt) { - return spt.initialize(_topics.local(), _self); - }); + if (_shard_placement.is_persistence_enabled()) { + // 1. collect the set of node-local ntps from topic_table + + chunked_hash_map local_group2ntp; + chunked_hash_map local_ntp2log_revision; + const auto& topics = _topics.local(); + ssx::async_counter counter; + for (const auto& [ns_tp, md_item] : topics.all_topics_metadata()) { + vassert( + tt_version == topics.topics_map_revision(), + "topic_table unexpectedly changed"); + + co_await ssx::async_for_each_counter( + counter, + md_item.get_assignments().begin(), + md_item.get_assignments().end(), + [&](const partition_assignment& p_as) { + vassert( + tt_version == topics.topics_map_revision(), + "topic_table unexpectedly changed"); + + model::ntp ntp{ns_tp.ns, ns_tp.tp, p_as.id}; + auto replicas_view = topics.get_replicas_view( + ntp, md_item, p_as); + auto log_rev = log_revision_on_node(replicas_view, _self); + if (log_rev) { + local_group2ntp.emplace( + replicas_view.assignment.group, ntp); + local_ntp2log_revision.emplace(ntp, *log_rev); + } + }); + } + + // 2. restore shard_placement_table from the kvstore + + co_await _shard_placement.initialize_from_kvstore(local_group2ntp); + + // 3. assign non-assigned ntps that have to be assigned + + co_await ssx::async_for_each_counter( + counter, + local_ntp2log_revision.begin(), + local_ntp2log_revision.end(), + [&](const std::pair kv) { + const auto& [ntp, log_revision] = kv; + auto existing_target = _shard_placement.get_target(ntp); + if ( + !existing_target + || existing_target->log_revision != log_revision) { + _to_assign.insert(ntp); + } + }); + co_await do_assign_ntps(); + } else { + co_await _shard_placement.initialize_from_topic_table(_topics, _self); + + if (_features.is_active( + features::feature::shard_placement_persistence)) { + co_await _shard_placement.enable_persistence(); + } + } - // we shouldn't be receiving any controller updates at this point, so no - // risk of missing a notification between initializing shard_placement_table - // and subscribing. vassert( tt_version == _topics.local().topics_map_revision(), "topic_table unexpectedly changed"); + // we shouldn't be receiving any controller updates at this point, so no + // risk of missing a notification between initializing shard_placement_table + // and subscribing. _topic_table_notify_handle = _topics.local().register_delta_notification( [this](topic_table::delta_range_t deltas_range) { for (const auto& delta : deltas_range) { @@ -88,6 +153,12 @@ ss::future<> shard_balancer::assign_fiber() { co_return; } + if ( + _features.is_active(features::feature::shard_placement_persistence) + && !_shard_placement.is_persistence_enabled()) { + co_await _shard_placement.enable_persistence(); + } + co_await do_assign_ntps(); } } @@ -118,8 +189,7 @@ ss::future<> shard_balancer::assign_ntp(const model::ntp& ntp) { target); try { - co_await _shard_placement.local().set_target( - ntp, target, shard_callback); + co_await _shard_placement.set_target(ntp, target, shard_callback); } catch (...) { auto ex = std::current_exception(); if (!ssx::is_shutdown_exception(ex)) { diff --git a/src/v/cluster/shard_balancer.h b/src/v/cluster/shard_balancer.h index 3bc6feb299685..e5d66712a4dc7 100644 --- a/src/v/cluster/shard_balancer.h +++ b/src/v/cluster/shard_balancer.h @@ -31,8 +31,9 @@ class shard_balancer { static constexpr ss::shard_id shard_id = 0; shard_balancer( - ss::sharded&, ss::sharded&, + ss::sharded&, + ss::sharded&, ss::sharded&); ss::future<> start(); @@ -46,8 +47,9 @@ class shard_balancer { ss::future<> assign_ntp(const model::ntp&); private: + shard_placement_table& _shard_placement; + features::feature_table& _features; ss::sharded& _topics; - ss::sharded& _shard_placement; ss::sharded& _controller_backend; model::node_id _self; diff --git a/src/v/cluster/shard_placement_table.cc b/src/v/cluster/shard_placement_table.cc index 320b22d0a5535..8517c2d251a0c 100644 --- a/src/v/cluster/shard_placement_table.cc +++ b/src/v/cluster/shard_placement_table.cc @@ -24,7 +24,8 @@ std::ostream& operator<<( std::ostream& o, const shard_placement_table::shard_local_assignment& as) { fmt::print( o, - "{{log_revision: {}, shard_revision: {}}}", + "{{group: {}, log_revision: {}, shard_revision: {}}}", + as.group, as.log_revision, as.shard_revision); return o; @@ -47,7 +48,8 @@ std::ostream& operator<<( std::ostream& o, const shard_placement_table::shard_local_state& ls) { fmt::print( o, - "{{log_revision: {}, status: {}, shard_revision: {}}}", + "{{group: {}, log_revision: {}, status: {}, shard_revision: {}}}", + ls.group, ls.log_revision, ls.status, ls.shard_revision); @@ -94,7 +96,462 @@ operator<<(std::ostream& o, const shard_placement_table::placement_state& ps) { return o; } -ss::future<> shard_placement_table::initialize( +namespace { + +static constexpr auto kvstore_key_space + = storage::kvstore::key_space::shard_placement; + +// enum type is irrelevant, serde will serialize to 32 bit anyway +enum class kvstore_key_type { + persistence_enabled = 0, + assignment = 1, + current_state = 2, +}; + +bytes persistence_enabled_kvstore_key() { + iobuf buf; + serde::write(buf, kvstore_key_type::persistence_enabled); + return iobuf_to_bytes(buf); +} + +struct assignment_marker + : serde:: + envelope, serde::compat_version<0>> { + model::revision_id log_revision; + model::shard_revision_id shard_revision; + + auto serde_fields() { return std::tie(log_revision, shard_revision); } +}; + +bytes assignment_kvstore_key(const raft::group_id group) { + iobuf buf; + serde::write(buf, kvstore_key_type::assignment); + serde::write(buf, group); + return iobuf_to_bytes(buf); +} + +struct current_state_marker + : serde::envelope< + current_state_marker, + serde::version<0>, + serde::compat_version<0>> { + // NOTE: we need ntp in this marker because we want to be able to find and + // clean garbage kvstore state for old groups that have already been deleted + // from topic_table. Some of the partition kvstore state items use keys + // based on group id and some - based on ntp, so we need both. + model::ntp ntp; + model::revision_id log_revision; + model::shard_revision_id shard_revision; + bool is_complete = false; + + auto serde_fields() { + return std::tie(ntp, log_revision, shard_revision, is_complete); + } +}; + +bytes current_state_kvstore_key(const raft::group_id group) { + iobuf buf; + serde::write(buf, kvstore_key_type::current_state); + serde::write(buf, group); + return iobuf_to_bytes(buf); +} + +} // namespace + +shard_placement_table::shard_placement_table(storage::kvstore& kvstore) + : _kvstore(kvstore) {} + +bool shard_placement_table::is_persistence_enabled() const { + vassert( + ss::this_shard_id() == assignment_shard_id, + "method can only be invoked on shard {}", + assignment_shard_id); + + if (_persistence_enabled) { + return true; + } + + return _kvstore.get(kvstore_key_space, persistence_enabled_kvstore_key()) + .has_value(); +} + +ss::future<> shard_placement_table::enable_persistence() { + vassert( + ss::this_shard_id() == assignment_shard_id, + "method can only be invoked on shard {}", + assignment_shard_id); + + if (is_persistence_enabled()) { + co_return; + } + + vlog(clusterlog.info, "enabling table persistence..."); + + auto write_locks = container().map([](shard_placement_table& local) { + return local._persistence_lock.hold_write_lock().then( + [](ss::rwlock::holder holder) { + return ss::make_foreign( + std::make_unique(std::move(holder))); + }); + }); + + co_await container().invoke_on_all([](shard_placement_table& local) { + return local.persist_shard_local_state(); + }); + + co_await _kvstore.put( + kvstore_key_space, persistence_enabled_kvstore_key(), iobuf{}); + + co_await container().invoke_on_all( + [](shard_placement_table& local) { local._persistence_enabled = true; }); + + vlog(clusterlog.debug, "persistence enabled"); +} + +ss::future<> shard_placement_table::persist_shard_local_state() { + // 1. delete existing state + + chunked_vector old_keys; + co_await _kvstore.for_each( + kvstore_key_space, + [&](bytes_view key, const iobuf&) { old_keys.emplace_back(key); }); + + co_await ss::max_concurrent_for_each( + old_keys, 512, [this](const bytes& key) { + return _kvstore.remove(kvstore_key_space, key); + }); + + // 2. persist current map + + co_await ss::max_concurrent_for_each( + _states, 512, [this](const decltype(_states)::value_type& kv) { + const auto& [ntp, pstate] = kv; + auto f1 = ss::now(); + if (pstate.assigned) { + auto marker = assignment_marker{ + .log_revision = pstate.assigned->log_revision, + .shard_revision = pstate.assigned->shard_revision, + }; + f1 = _kvstore.put( + kvstore_key_space, + assignment_kvstore_key(pstate.assigned->group), + serde::to_iobuf(marker)); + } + + auto f2 = ss::now(); + if (pstate.current) { + auto marker = current_state_marker{ + .ntp = ntp, + .log_revision = pstate.current->log_revision, + .shard_revision = pstate.current->shard_revision, + .is_complete = pstate.current->status == hosted_status::hosted, + }; + f2 = _kvstore.put( + kvstore_key_space, + current_state_kvstore_key(pstate.current->group), + serde::to_iobuf(marker)); + } + + return ss::when_all(std::move(f1), std::move(f2)).discard_result(); + }); +} + +/// A struct used during initialization to merge kvstore markers recovered from +/// different shards and restore the most up-to-date shard_placement_table +/// state. +struct shard_placement_table::ntp_init_data { + struct versioned_shard { + std::optional shard; + model::shard_revision_id revision; + + void update( + std::optional s, model::shard_revision_id new_rev) { + if (new_rev > revision) { + shard = s; + revision = new_rev; + } + } + }; + + raft::group_id group; + model::revision_id log_revision; + versioned_shard hosted; + versioned_shard receiving; + versioned_shard assigned; + + void update_log_revision(raft::group_id gr, model::revision_id new_rev) { + if (new_rev > log_revision) { + *this = ntp_init_data{}; + group = gr; + log_revision = new_rev; + } + } + + void update(ss::shard_id s, const shard_local_assignment& new_assigned) { + update_log_revision(new_assigned.group, new_assigned.log_revision); + if (new_assigned.log_revision == log_revision) { + assigned.update(s, new_assigned.shard_revision); + } + } + + void update(ss::shard_id s, const shard_local_state& new_current) { + update_log_revision(new_current.group, new_current.log_revision); + if (new_current.log_revision == log_revision) { + switch (new_current.status) { + case hosted_status::hosted: + hosted.update(s, new_current.shard_revision); + receiving.update(std::nullopt, new_current.shard_revision); + break; + case hosted_status::receiving: + receiving.update(s, new_current.shard_revision); + break; + default: + break; + } + } + } +}; + +ss::future<> shard_placement_table::initialize_from_kvstore( + const chunked_hash_map& local_group2ntp) { + vassert( + ss::this_shard_id() == assignment_shard_id, + "method can only be invoked on shard {}", + assignment_shard_id); + + vassert( + is_persistence_enabled(), + "can't initialize from kvstore, persistence hasn't been enabled yet"); + co_await container().invoke_on_all( + [](shard_placement_table& spt) { spt._persistence_enabled = true; }); + + // 1. gather kvstore markers from all shards + + auto shard2init_states = co_await container().map( + [&local_group2ntp](shard_placement_table& spt) { + return spt.gather_init_states(local_group2ntp); + }); + + // 2. merge into up-to-date shard_placement_table state + + chunked_hash_map ntp2init_data; + model::shard_revision_id max_shard_revision; + ssx::async_counter counter; + for (ss::shard_id s = 0; s < shard2init_states.size(); ++s) { + co_await ssx::async_for_each_counter( + counter, + shard2init_states[s]->begin(), + shard2init_states[s]->end(), + [&](const ntp2state_t::value_type& kv) { + const auto& [ntp, state] = kv; + auto& init_data = ntp2init_data.try_emplace(ntp).first->second; + + if (state.assigned) { + init_data.update(s, state.assigned.value()); + max_shard_revision = std::max( + max_shard_revision, state.assigned->shard_revision); + } + + if (state.current) { + init_data.update(s, state.current.value()); + max_shard_revision = std::max( + max_shard_revision, state.current->shard_revision); + } + }); + } + + // 3. based on merged data, update in-memory state everywhere + + if (max_shard_revision != model::shard_revision_id{}) { + _cur_shard_revision = max_shard_revision + model::shard_revision_id{1}; + } + + co_await container().invoke_on_all( + [&ntp2init_data](shard_placement_table& spt) { + return spt.scatter_init_data(ntp2init_data); + }); + + co_await ssx::async_for_each( + ntp2init_data.begin(), + ntp2init_data.end(), + [this](const decltype(ntp2init_data)::value_type& kv) { + const auto& [ntp, init_data] = kv; + vlog( + clusterlog.trace, + "[{}] init data: group: {}, log_revision: {}, " + "assigned: {}, hosted: {}, receiving: {}", + ntp, + init_data.group, + init_data.log_revision, + init_data.assigned.shard, + init_data.hosted.shard, + init_data.receiving.shard); + + if (init_data.assigned.shard) { + auto entry = std::make_unique(); + entry->target = shard_placement_target( + init_data.group, + init_data.log_revision, + init_data.assigned.shard.value()); + _ntp2entry.emplace(ntp, std::move(entry)); + } + }); +} + +ss::future>> +shard_placement_table::gather_init_states( + const chunked_hash_map& partitions) { + chunked_vector orphan_assignments; + + co_await _kvstore.for_each( + kvstore_key_space, [&](bytes_view key_str, const iobuf& val) { + iobuf_parser key_parser(bytes_to_iobuf(key_str)); + + auto key_type = serde::read_nested(key_parser, 0); + switch (key_type) { + default: + return; + case kvstore_key_type::assignment: { + auto group = serde::read_nested(key_parser, 0); + auto ntp_it = partitions.find(group); + if (ntp_it == partitions.end()) { + vlog( + clusterlog.trace, + "recovered orphan assigned marker, group: {}", + group); + orphan_assignments.push_back(group); + } else { + auto marker = serde::from_iobuf( + val.copy()); + vlog( + clusterlog.trace, + "[{}] recovered assigned marker, lr: {} sr: {}", + ntp_it->second, + marker.log_revision, + marker.shard_revision); + + _states[ntp_it->second].assigned = shard_local_assignment{ + .group = group, + .log_revision = marker.log_revision, + .shard_revision = marker.shard_revision}; + } + break; + } + case kvstore_key_type::current_state: { + auto group = serde::read_nested(key_parser, 0); + auto marker = serde::from_iobuf(val.copy()); + vlog( + clusterlog.trace, + "[{}] recovered cur state marker, lr: {} sr: {} complete: {}", + marker.ntp, + marker.log_revision, + marker.shard_revision, + marker.is_complete); + + auto& state = _states[marker.ntp]; + if (state.current) { + throw std::runtime_error(fmt_with_ctx( + ssx::sformat, + "duplicate ntp {} in kvstore map on shard {}", + marker.ntp, + ss::this_shard_id())); + } + state.current = shard_local_state( + group, + marker.log_revision, + marker.is_complete ? hosted_status::hosted + : hosted_status::receiving, + marker.shard_revision); + break; + } + } + }); + + co_await ss::max_concurrent_for_each( + orphan_assignments.begin(), + orphan_assignments.end(), + 512, + [this](raft::group_id group) { + return _kvstore.remove( + kvstore_key_space, assignment_kvstore_key(group)); + }); + + co_return ss::make_foreign(std::make_unique(_states)); +} + +ss::future<> shard_placement_table::scatter_init_data( + const chunked_hash_map& + ntp2init_data) { + return ss::max_concurrent_for_each( + ntp2init_data.begin(), + ntp2init_data.end(), + 512, + [this](const std::pair& kv) { + const auto& [ntp, init_data] = kv; + auto it = _states.find(ntp); + if (it == _states.end()) { + return ss::now(); + } + auto& state = it->second; + + if (state.current) { + if (ss::this_shard_id() == init_data.hosted.shard) { + if (init_data.receiving.shard) { + state._next = init_data.receiving.shard; + } + } else if (ss::this_shard_id() != init_data.receiving.shard) { + state.current->status = hosted_status::obsolete; + } + } + + ss::future<> fut = ss::now(); + if (state.assigned) { + if (ss::this_shard_id() != init_data.assigned.shard) { + fut = _kvstore.remove( + kvstore_key_space, + assignment_kvstore_key(state.assigned->group)); + state.assigned = std::nullopt; + } else if (!init_data.hosted.shard) { + state._is_initial_for = init_data.log_revision; + } + } + + if (state.is_empty()) { + _states.erase(it); + } else { + vlog( + clusterlog.info, + "[{}] recovered placement state: {}", + ntp, + state); + } + + return fut; + }); +} + +ss::future<> shard_placement_table::initialize_from_topic_table( + ss::sharded& topics, model::node_id self) { + vassert( + ss::this_shard_id() == assignment_shard_id, + "method can only be invoked on shard {}", + assignment_shard_id); + vassert( + !is_persistence_enabled(), + "can't initialize from topic_table, persistence has already been " + "enabled"); + + co_await container().invoke_on_all( + [&topics, self](shard_placement_table& spt) { + return spt.do_initialize_from_topic_table(topics.local(), self); + }); + + if (!_ntp2entry.empty()) { + _cur_shard_revision += 1; + } +} + +ss::future<> shard_placement_table::do_initialize_from_topic_table( const topic_table& topics, model::node_id self) { // We expect topic_table to remain unchanged throughout the loop because the // method is supposed to be called after local controller replay is finished @@ -152,13 +609,15 @@ ss::future<> shard_placement_table::initialize( auto placement = placement_state(); auto assigned = shard_local_assignment{ + .group = target->group, .log_revision = target->log_revision, .shard_revision = _cur_shard_revision}; if (orig_shard && target->shard != orig_shard) { // cross-shard transfer, orig_shard gets the hosted marker if (ss::this_shard_id() == orig_shard) { - placement.current = shard_local_state::initial(assigned); + placement.current = shard_local_state( + assigned, hosted_status::hosted); _states.emplace(ntp, placement); } else if (ss::this_shard_id() == target->shard) { placement.assigned = assigned; @@ -166,16 +625,13 @@ ss::future<> shard_placement_table::initialize( } } else if (ss::this_shard_id() == target->shard) { // in other cases target shard gets the hosted marker - placement.current = shard_local_state::initial(assigned); + placement.current = shard_local_state( + assigned, hosted_status::hosted); placement.assigned = assigned; _states.emplace(ntp, placement); } }); } - - if (!_ntp2entry.empty()) { - _cur_shard_revision += 1; - } } ss::future<> shard_placement_table::set_target( @@ -187,6 +643,9 @@ ss::future<> shard_placement_table::set_target( "method can only be invoked on shard {}", assignment_shard_id); + // ensure that there is no concurrent enable_persistence() call + auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); + if (!target && !_ntp2entry.contains(ntp)) { co_return; } @@ -216,8 +675,6 @@ ss::future<> shard_placement_table::set_target( co_return; } - // 1. update node-wide map - const model::shard_revision_id shard_rev = _cur_shard_revision; _cur_shard_revision += 1; @@ -228,82 +685,128 @@ ss::future<> shard_placement_table::set_target( prev_target, target, shard_rev); - entry.target = target; - - // 2. update shard-local state - if (target) { - const bool is_initial - = (!prev_target || prev_target->log_revision != target->log_revision); - co_await container().invoke_on( - target->shard, - [&ntp, target, shard_rev, is_initial, shard_callback]( - shard_placement_table& other) { - return other.set_assigned_on_this_shard( - ntp, - target->log_revision, - shard_rev, - is_initial, - shard_callback); - }); + // 1. Persist the new target in kvstore + + if (_persistence_enabled) { + if (target) { + co_await container().invoke_on( + target->shard, + [&target, shard_rev, &ntp](shard_placement_table& other) { + auto marker_buf = serde::to_iobuf(assignment_marker{ + .log_revision = target->log_revision, + .shard_revision = shard_rev, + }); + vlog( + clusterlog.trace, + "[{}] put assigned marker, lr: {} sr: {}", + ntp, + target->log_revision, + shard_rev); + return other._kvstore.put( + kvstore_key_space, + assignment_kvstore_key(target->group), + std::move(marker_buf)); + }); + } else { + co_await container().invoke_on( + prev_target.value().shard, + [group = prev_target->group, &ntp](shard_placement_table& other) { + vlog(clusterlog.trace, "[{}] remove assigned marker", ntp); + return other._kvstore.remove( + kvstore_key_space, assignment_kvstore_key(group)); + }); + } } + // 2. At this point we've successfully committed the new target to + // persistent storage. Update in-memory state. + + entry.target = target; + if (prev_target && (!target || target->shard != prev_target->shard)) { co_await container().invoke_on( prev_target->shard, [&ntp, shard_callback](shard_placement_table& other) { - return other.remove_assigned_on_this_shard(ntp, shard_callback); + auto it = other._states.find(ntp); + if (it == other._states.end() || !it->second.assigned) { + return; + } + + vlog( + clusterlog.trace, + "[{}] removing assigned on this shard (was: {})", + ntp, + it->second.assigned); + + it->second.assigned = std::nullopt; + if (it->second.is_empty()) { + // We are on a shard that was previously a target, but didn't + // get to starting the transfer. + other._states.erase(it); + } + + // Notify the caller that something has changed on this shard. + shard_callback(ntp); }); } -} -ss::future<> shard_placement_table::set_assigned_on_this_shard( - const model::ntp& ntp, - model::revision_id target_log_rev, - model::shard_revision_id shard_rev, - bool is_initial, - shard_callback_t shard_callback) { - vlog( - clusterlog.trace, - "[{}] setting assigned on this shard, " - "log_revision: {}, shard_revision: {}, is_initial: {}", - ntp, - target_log_rev, - shard_rev, - is_initial); - - auto& state = _states.try_emplace(ntp).first->second; - state.assigned = shard_local_assignment{ - .log_revision = target_log_rev, - .shard_revision = shard_rev, - }; - if (is_initial) { - state._is_initial_for = target_log_rev; - } + if (target) { + const bool is_initial + = (!prev_target || prev_target->log_revision != target->log_revision); + shard_local_assignment as{ + .group = target->group, + .log_revision = target->log_revision, + .shard_revision = shard_rev, + }; + co_await container().invoke_on( + target->shard, + [&ntp, &as, is_initial, shard_callback](shard_placement_table& spt) { + auto& state = spt._states.try_emplace(ntp).first->second; - // Notify the caller that something has changed on this shard. - shard_callback(ntp); - co_return; -} + vlog( + clusterlog.trace, + "[{}] setting assigned on this shard to: {} (was: {}), " + "is_initial: {}", + ntp, + as, + state.assigned, + is_initial); -ss::future<> shard_placement_table::remove_assigned_on_this_shard( - const model::ntp& ntp, shard_callback_t shard_callback) { - vlog(clusterlog.trace, "[{}] removing assigned on this shard", ntp); + state.assigned = as; + if (is_initial) { + state._is_initial_for = as.log_revision; + } - auto it = _states.find(ntp); - if (it == _states.end()) { - co_return; + // Notify the caller that something has changed on this shard. + shard_callback(ntp); + }); } - it->second.assigned = std::nullopt; - if (it->second.is_empty()) { - // We are on a shard that was previously a target, but didn't get to - // starting the transfer. - _states.erase(it); - } + // 3. Lastly, remove obsolete kvstore marker - // Notify the caller that something has changed on this shard. - shard_callback(ntp); + if ( + _persistence_enabled && prev_target + && (!target || target->shard != prev_target->shard)) { + co_await container().invoke_on( + prev_target->shard, + [group = prev_target->group, &ntp](shard_placement_table& other) { + vlog( + clusterlog.trace, "[{}] remove obsolete assigned marker", ntp); + return other._kvstore + .remove(kvstore_key_space, assignment_kvstore_key(group)) + .handle_exception([group](std::exception_ptr ex) { + // Ignore the exception because the update has already been + // committed. Obsolete marker will be deleted after the next + // restart. + vlog( + clusterlog.debug, + "failed to remove assignment marker for group {}: {}", + group, + ex); + }); + }); + } } std::optional @@ -315,27 +818,70 @@ shard_placement_table::state_on_this_shard(const model::ntp& ntp) const { return std::nullopt; } +std::optional +shard_placement_table::get_target(const model::ntp& ntp) const { + vassert( + ss::this_shard_id() == assignment_shard_id, + "method can only be invoked on shard {}", + assignment_shard_id); + auto it = _ntp2entry.find(ntp); + if (it != _ntp2entry.end()) { + return it->second->target; + } + return std::nullopt; +} + ss::future shard_placement_table::prepare_create( const model::ntp& ntp, model::revision_id expected_log_rev) { + // ensure that there is no concurrent enable_persistence() call + auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); + auto state_it = _states.find(ntp); - vassert(state_it != _states.end(), "[{}] expected state", ntp); + if (state_it == _states.end()) { + // assignments got updated while we were waiting for the lock + co_return errc::waiting_for_shard_placement_update; + } auto& state = state_it->second; - vassert( - state.assigned && state.assigned->log_revision == expected_log_rev, - "[{}] unexpected assigned: {} (expected log revision: {})", - ntp, - state.assigned, - expected_log_rev); + + if (state.assigned->log_revision != expected_log_rev) { + // assignments got updated while we were waiting for the lock + co_return errc::waiting_for_shard_placement_update; + } if (state.current && state.current->log_revision != expected_log_rev) { // wait until partition with obsolete log revision is removed co_return errc::waiting_for_reconfiguration_finish; } + // copy assigned as it may change while we are updating kvstore + auto assigned = *state.assigned; + if (!state.current) { if (state._is_initial_for == expected_log_rev) { - state.current = shard_local_state::initial(*state.assigned); - state._is_initial_for = std::nullopt; + if (_persistence_enabled) { + auto marker_buf = serde::to_iobuf(current_state_marker{ + .ntp = ntp, + .log_revision = expected_log_rev, + .shard_revision = assigned.shard_revision, + .is_complete = true, + }); + vlog( + clusterlog.trace, + "[{}] put initial cur state marker, lr: {} sr: {}", + ntp, + expected_log_rev, + assigned.shard_revision); + co_await _kvstore.put( + kvstore_key_space, + current_state_kvstore_key(assigned.group), + std::move(marker_buf)); + } + + state.current = shard_local_state(assigned, hosted_status::hosted); + if (state._is_initial_for == expected_log_rev) { + // could have changed while we were updating kvstore. + state._is_initial_for = std::nullopt; + } } else { // x-shard transfer hasn't started yet, wait for it. co_return errc::waiting_for_partition_shutdown; @@ -353,18 +899,27 @@ ss::future shard_placement_table::prepare_create( ss::future> shard_placement_table::prepare_transfer( const model::ntp& ntp, model::revision_id expected_log_rev) { + // ensure that there is no concurrent enable_persistence() call + auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); + auto state_it = _states.find(ntp); vassert(state_it != _states.end(), "[{}] expected state", ntp); auto& state = state_it->second; if (state.current) { vassert( - state.current->log_revision == expected_log_rev, + state.current->log_revision >= expected_log_rev, "[{}] unexpected current: {} (expected log revision: {})", ntp, state.current, expected_log_rev); + if (state.current->log_revision > expected_log_rev) { + // New log revision transferred from another shard, but we don't + // know about it yet. Wait for the assignment update. + co_return errc::waiting_for_shard_placement_update; + } + if (state.current->status == hosted_status::receiving) { // This shard needs to transfer partition state somewhere else, but // haven't yet received it itself. Wait for it. @@ -373,25 +928,26 @@ ss::future> shard_placement_table::prepare_transfer( if (state.current->status == hosted_status::obsolete) { // Previous finish_transfer_on_source() failed? Retry it. - co_await do_delete(ntp, state); + co_await do_delete(ntp, state, persistence_lock_holder); co_return errc::success; } } else { vassert( - state._is_initial_for == expected_log_rev, + state._is_initial_for >= expected_log_rev, "[{}] unexpected is_initial_for: {} (expected log revision: {})", ntp, state._is_initial_for, expected_log_rev); + + if (state._is_initial_for > expected_log_rev) { + co_return errc::waiting_for_shard_placement_update; + } } if (!state._next) { - vassert( - !state.assigned, - "[{}] unexpected assigned: {} (expected log revision: {})", - ntp, - state.assigned, - expected_log_rev); + if (state.assigned) { + co_return errc::waiting_for_shard_placement_update; + } auto maybe_dest = co_await container().invoke_on( assignment_shard_id, @@ -423,32 +979,60 @@ ss::future> shard_placement_table::prepare_transfer( // We are in the middle of shard_placement_table update, and // the destination shard doesn't yet know that it is the // destination. Wait for the update to finish. - return errc::waiting_for_shard_placement_update; + return ss::make_ready_future( + errc::waiting_for_shard_placement_update); } auto& dest_state = dest_it->second; if (dest_state._next) { // probably still finishing a previous transfer to this // shard and we are already trying to transfer it back. - return errc::waiting_for_partition_shutdown; + return ss::make_ready_future( + errc::waiting_for_partition_shutdown); } else if (dest_state.current) { if (dest_state.current->log_revision != expected_log_rev) { // someone has to delete obsolete log revision first - return errc::waiting_for_reconfiguration_finish; + return ss::make_ready_future( + errc::waiting_for_partition_shutdown); } // probably still finishing a previous transfer to this // shard and we are already trying to transfer it back. - return errc::waiting_for_partition_shutdown; + return ss::make_ready_future( + errc::waiting_for_partition_shutdown); } // at this point we commit to the transfer on the // destination shard - dest_state.current = shard_local_state::receiving( - dest_state.assigned.value()); + dest_state.current = shard_local_state( + dest_state.assigned.value(), hosted_status::receiving); if (dest_state._is_initial_for <= expected_log_rev) { dest_state._is_initial_for = std::nullopt; } - return errc::success; + + // TODO: immediate hosted or _is_initial_for if source is empty. + + if (dest._persistence_enabled) { + auto marker_buf = serde::to_iobuf(current_state_marker{ + .ntp = ntp, + .log_revision = expected_log_rev, + .shard_revision = dest_state.current.value().shard_revision, + .is_complete = false, + }); + vlog( + clusterlog.trace, + "[{}] put receiving cur state marker, lr: {} sr: {}", + ntp, + expected_log_rev, + dest_state.current->shard_revision); + return dest._kvstore + .put( + kvstore_key_space, + current_state_kvstore_key(dest_state.current->group), + std::move(marker_buf)) + .then([] { return errc::success; }); + } else { + return ss::make_ready_future(errc::success); + } }); if (ec != errc::success) { @@ -465,6 +1049,9 @@ ss::future> shard_placement_table::prepare_transfer( ss::future<> shard_placement_table::finish_transfer_on_destination( const model::ntp& ntp, model::revision_id expected_log_rev) { + // ensure that there is no concurrent enable_persistence() call + auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); + auto it = _states.find(ntp); if (it == _states.end()) { co_return; @@ -476,7 +1063,27 @@ ss::future<> shard_placement_table::finish_transfer_on_destination( "[{}] unexpected local status, current: {}", ntp, it->second.current); - it->second.current->status = hosted_status::hosted; + + if (_persistence_enabled) { + auto marker_buf = serde::to_iobuf(current_state_marker{ + .ntp = ntp, + .log_revision = expected_log_rev, + .shard_revision = state.current->shard_revision, + .is_complete = true, + }); + vlog( + clusterlog.trace, + "[{}] put transferred cur state marker, lr: {} sr: {}", + ntp, + expected_log_rev, + state.current->shard_revision); + co_await _kvstore.put( + kvstore_key_space, + current_state_kvstore_key(state.current->group), + std::move(marker_buf)); + } + + state.current->status = hosted_status::hosted; } vlog( clusterlog.trace, @@ -487,6 +1094,9 @@ ss::future<> shard_placement_table::finish_transfer_on_destination( ss::future<> shard_placement_table::finish_transfer_on_source( const model::ntp& ntp, model::revision_id expected_log_rev) { + // ensure that there is no concurrent enable_persistence() call + auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); + auto it = _states.find(ntp); vassert(it != _states.end(), "[{}] expected state", ntp); auto& state = it->second; @@ -502,11 +1112,14 @@ ss::future<> shard_placement_table::finish_transfer_on_source( state._is_initial_for = std::nullopt; } - co_await do_delete(ntp, state); + co_await do_delete(ntp, state, persistence_lock_holder); } ss::future shard_placement_table::prepare_delete( const model::ntp& ntp, model::revision_id cmd_revision) { + // ensure that there is no concurrent enable_persistence() call + auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); + auto it = _states.find(ntp); vassert(it != _states.end(), "[{}] expected state", ntp); auto& state = it->second; @@ -520,12 +1133,11 @@ ss::future shard_placement_table::prepare_delete( } if (state.current) { - vassert( - state.current->log_revision < cmd_revision, - "[{}] unexpected current: {} (cmd revision: {})", - ntp, - state.current, - cmd_revision); + if (state.current->log_revision >= cmd_revision) { + // New log revision transferred from another shard, but we didn't + // expect it. Wait for the update. + co_return errc::waiting_for_shard_placement_update; + } if (state.current->status == hosted_status::receiving) { // If transfer to this shard is still in progress, we'll wait for @@ -541,6 +1153,9 @@ ss::future shard_placement_table::prepare_delete( ss::future<> shard_placement_table::finish_delete( const model::ntp& ntp, model::revision_id expected_log_rev) { + // ensure that there is no concurrent enable_persistence() call + auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); + auto it = _states.find(ntp); vassert(it != _states.end(), "[{}] expected state", ntp); auto& state = it->second; @@ -568,13 +1183,25 @@ ss::future<> shard_placement_table::finish_delete( }); } - co_await do_delete(ntp, state); + co_await do_delete(ntp, state, persistence_lock_holder); } ss::future<> shard_placement_table::do_delete( - const model::ntp& ntp, placement_state& state) { + const model::ntp& ntp, + placement_state& state, + ss::rwlock::holder& /*persistence_lock_holder*/) { state._next = std::nullopt; - state.current = std::nullopt; + + if (state.current) { + if (_persistence_enabled) { + vlog(clusterlog.trace, "[{}] remove cur state marker", ntp); + co_await _kvstore.remove( + kvstore_key_space, + current_state_kvstore_key(state.current->group)); + } + state.current = std::nullopt; + } + if (state.is_empty()) { _states.erase(ntp); } diff --git a/src/v/cluster/shard_placement_table.h b/src/v/cluster/shard_placement_table.h index 818377e0be8d9..9ae2f36b4bcac 100644 --- a/src/v/cluster/shard_placement_table.h +++ b/src/v/cluster/shard_placement_table.h @@ -14,7 +14,9 @@ #include "base/seastarx.h" #include "cluster/types.h" #include "container/chunked_hash_map.h" +#include "storage/fwd.h" #include "utils/mutex.h" +#include "utils/rwlock.h" #include @@ -28,10 +30,10 @@ namespace cluster { /// shard_balancer and current shard-local state is supposed to be modified by /// controller_backend as it creates/deletes/moves partitions. /// -/// Currently shard-local and target states are in-memory and target states -/// duplicate shard assignments that are stored in topic_table, but in the -/// future they will be persisted in the kvstore and target states will be set -/// independently. +/// Shard-local assignments and current states are persisted to kvstore on every +/// change. On startup this kvstore state is used to recover in-memory state. +/// In other words, we read kvstore only when initializing, and during normal +/// operation we only write to it. /// /// Note that in contrast to `cluster::shard_table` (that helps other parts of /// the system to find the shard where the `cluster::partition` object for some @@ -46,6 +48,7 @@ class shard_placement_table /// Struct used to express the fact that a partition replica of some ntp is /// expected on this shard. struct shard_local_assignment { + raft::group_id group; model::revision_id log_revision; model::shard_revision_id shard_revision; @@ -65,25 +68,25 @@ class shard_placement_table /// Current state of shard-local partition kvstore data on this shard. struct shard_local_state { + raft::group_id group; model::revision_id log_revision; hosted_status status; model::shard_revision_id shard_revision; - static shard_local_state initial(const shard_local_assignment& as) { - return shard_local_state{ - .log_revision = as.log_revision, - .status = hosted_status::hosted, - .shard_revision = as.shard_revision, - }; - } - - static shard_local_state receiving(const shard_local_assignment& as) { - return shard_local_state{ - .log_revision = as.log_revision, - .status = hosted_status::receiving, - .shard_revision = as.shard_revision, - }; - } + shard_local_state( + raft::group_id g, + model::revision_id lr, + hosted_status s, + model::shard_revision_id sr) + : group(g) + , log_revision(lr) + , status(s) + , shard_revision(sr) {} + + shard_local_state( + const shard_local_assignment& as, hosted_status status) + : shard_local_state( + as.group, as.log_revision, status, as.shard_revision) {} friend std::ostream& operator<<(std::ostream&, const shard_local_state&); @@ -137,8 +140,23 @@ class shard_placement_table std::optional _next; }; - // must be called on each shard - ss::future<> initialize(const topic_table&, model::node_id self); + using ntp2state_t = absl::node_hash_map; + + explicit shard_placement_table(storage::kvstore&); + + /// Must be called on assignment_shard_id. + bool is_persistence_enabled() const; + ss::future<> enable_persistence(); + + /// Must be called on assignment_shard_id. + /// precondition: is_persistence_enabled() == true + ss::future<> initialize_from_kvstore( + const chunked_hash_map& local_group2ntp); + + /// Must be called on assignment_shard_id. + /// precondition: is_persistence_enabled() == false + ss::future<> + initialize_from_topic_table(ss::sharded&, model::node_id self); using shard_callback_t = std::function; @@ -151,12 +169,12 @@ class shard_placement_table // getters + /// Must be called on assignment_shard_id. + std::optional get_target(const model::ntp&) const; + std::optional state_on_this_shard(const model::ntp&) const; - const absl::node_hash_map& - shard_local_states() const { - return _states; - } + const ntp2state_t& shard_local_states() const { return _states; } // partition lifecycle methods @@ -184,17 +202,23 @@ class shard_placement_table finish_delete(const model::ntp&, model::revision_id expected_log_rev); private: - ss::future<> set_assigned_on_this_shard( + ss::future<> do_delete( const model::ntp&, - model::revision_id target_log_revision, - model::shard_revision_id, - bool is_initial, - shard_callback_t); + placement_state&, + ss::rwlock::holder& persistence_lock_holder); + + ss::future<> persist_shard_local_state(); + + ss::future>> + gather_init_states(const chunked_hash_map&); + + struct ntp_init_data; ss::future<> - remove_assigned_on_this_shard(const model::ntp&, shard_callback_t); + scatter_init_data(const chunked_hash_map&); - ss::future<> do_delete(const model::ntp&, placement_state&); + ss::future<> + do_initialize_from_topic_table(const topic_table&, model::node_id self); private: friend class shard_placement_test_fixture; @@ -202,7 +226,12 @@ class shard_placement_table // per-shard state // // node_hash_map for pointer stability - absl::node_hash_map _states; + ntp2state_t _states; + // lock is needed to sync enabling persistence with shard-local + // modifications. + ssx::rwlock _persistence_lock; + bool _persistence_enabled = false; + storage::kvstore& _kvstore; // only on shard 0, _ntp2entry will hold targets for all ntps on this node. struct entry_t { diff --git a/src/v/cluster/tests/cluster_utils_tests.cc b/src/v/cluster/tests/cluster_utils_tests.cc index 89c707f4c9ff7..2ccf6188ac3af 100644 --- a/src/v/cluster/tests/cluster_utils_tests.cc +++ b/src/v/cluster/tests/cluster_utils_tests.cc @@ -39,13 +39,15 @@ SEASTAR_THREAD_TEST_CASE(test_find_shard_on_node) { } SEASTAR_THREAD_TEST_CASE(test_placement_target_on_node) { + raft::group_id group{111}; + cluster::replicas_t orig_replicas{ model::broker_shard{model::node_id(1), 2}, model::broker_shard{model::node_id(2), 1}, model::broker_shard{model::node_id(3), 0}}; cluster::partition_assignment orig_assignment( - raft::group_id(111), model::partition_id(23), orig_replicas); + group, model::partition_id(23), orig_replicas); cluster::topic_table::partition_meta partition_meta{ .replicas_revisions = { @@ -59,13 +61,13 @@ SEASTAR_THREAD_TEST_CASE(test_placement_target_on_node) { // node_id, log_revision, shard_id using expected_list_t = std::vector>; - auto check = []( + auto check = [group]( std::string_view case_id, const cluster::topic_table::partition_replicas_view& rv, expected_list_t expected_list) { for (auto [node_id, log_revision, shard_id] : expected_list) { cluster::shard_placement_target expected{ - model::revision_id(log_revision), shard_id}; + group, model::revision_id(log_revision), shard_id}; auto actual = cluster::placement_target_on_node( rv, model::node_id(node_id)); BOOST_REQUIRE_MESSAGE( diff --git a/src/v/cluster/tests/shard_placement_table_test.cc b/src/v/cluster/tests/shard_placement_table_test.cc index 7d20c80115bf1..a69c6f4b42a80 100644 --- a/src/v/cluster/tests/shard_placement_table_test.cc +++ b/src/v/cluster/tests/shard_placement_table_test.cc @@ -18,6 +18,7 @@ #include "storage/storage_resources.h" #include "test_utils/randoms.h" #include "test_utils/test.h" +#include "utils/prefix_logger.h" #include #include @@ -91,7 +92,8 @@ class reconciliation_backend ss::sharded& ntp2shards) : _ntpt(ntpt.local()) , _shard_placement(spt.local()) - , _ntp2shards(ntp2shards) {} + , _ntp2shards(ntp2shards) + , _logger(clusterlog, "RB") {} ss::future<> stop() { for (auto& [_, rs] : _states) { @@ -115,7 +117,7 @@ class reconciliation_backend auto& rs = *rs_it->second; rs.pending_notifies += 1; vlog( - clusterlog.trace, + _logger.trace, "[{}] notify reconciliation, pending_notifies: {}", ntp, rs.pending_notifies); @@ -175,7 +177,7 @@ class reconciliation_backend auto ex = std::current_exception(); if (!ssx::is_shutdown_exception(ex)) { vlog( - clusterlog.error, + _logger.error, "[{}] unexpected exception during reconciliation: {}", ntp, ex); @@ -205,7 +207,7 @@ class reconciliation_backend if (res.has_value()) { if (res.value() == ss::stop_iteration::yes) { vlog( - clusterlog.trace, + _logger.trace, "[{}] reconciled, notify count: {}", ntp, notifies); @@ -217,7 +219,7 @@ class reconciliation_backend continue; } else { vlog( - clusterlog.trace, + _logger.trace, "[{}] reconciliation attempt error: {}", ntp, res.error()); @@ -226,7 +228,7 @@ class reconciliation_backend } catch (ss::abort_requested_exception const&) { } catch (...) { vlog( - clusterlog.warn, + _logger.warn, "[{}] exception occured during reconciliation: {}", ntp, std::current_exception()); @@ -250,7 +252,7 @@ class reconciliation_backend } vlog( - clusterlog.trace, + _logger.trace, "[{}] placement state on this shard: {}, expected_log_revision: {}", ntp, placement, @@ -298,12 +300,17 @@ class reconciliation_backend model::revision_id log_revision, bool state_expected) { auto ec = co_await _shard_placement.prepare_create(ntp, log_revision); - vlog(clusterlog.trace, "[{}] creating partition: {}", ntp, ec); + vlog(_logger.trace, "[{}] creating partition: {}", ntp, ec); if (ec) { co_return ec; } _launched.insert(ntp); + vlog( + _logger.trace, + "[{}] started partition log_revision: {}", + ntp, + log_revision); co_await ss::sleep(1ms * random_generators::get_int(30)); co_await _ntp2shards.invoke_on( @@ -358,7 +365,7 @@ class reconciliation_backend model::revision_id cmd_revision) { auto ec = co_await _shard_placement.prepare_delete(ntp, cmd_revision); vlog( - clusterlog.trace, + _logger.trace, "[{}] deleting partition at cmd_revision: {}, ec: {}", ntp, cmd_revision, @@ -373,6 +380,13 @@ class reconciliation_backend } bool launched_expected = _launched.erase(ntp); + if (launched_expected) { + vlog( + _logger.trace, + "[{}] stopped partition log_revision: {}", + ntp, + placement.current->log_revision); + } co_await ss::sleep(1ms * random_generators::get_int(30)); co_await _ntp2shards.invoke_on( @@ -442,7 +456,7 @@ class reconciliation_backend ntp, log_revision); if (maybe_dest.has_error()) { vlog( - clusterlog.trace, + _logger.trace, "[{}] preparing transfer error: {}", ntp, maybe_dest.error()); @@ -450,13 +464,20 @@ class reconciliation_backend } vlog( - clusterlog.trace, + _logger.trace, "[{}] preparing transfer dest: {}", ntp, maybe_dest.value()); ss::shard_id destination = maybe_dest.value(); bool launched_expected = _launched.erase(ntp); + if (launched_expected) { + vlog( + _logger.trace, + "[{}] stopped partition for transfer, log_revision: {}", + ntp, + log_revision); + } co_await _ntp2shards.invoke_on( 0, @@ -562,7 +583,7 @@ class reconciliation_backend }); co_await _shard_placement.finish_transfer_on_source(ntp, log_revision); - vlog(clusterlog.trace, "[{}] transferred", ntp); + vlog(_logger.trace, "[{}] transferred", ntp); co_return errc::success; } @@ -575,6 +596,7 @@ class reconciliation_backend _states; absl::flat_hash_set _launched; ss::gate _gate; + prefix_logger _logger; }; // Limit concurrency to 4 so that there are more interesting repeats in randomly @@ -598,8 +620,14 @@ class shard_assigner { , _rb(rb) {} ss::future<> start() { - // TODO: bootstrap _shard_placement. We don't need it for now because we - // don't restart in this test yet. + for (const auto& [ntp, meta] : _ntpt.ntp2meta) { + auto maybe_target = _shard_placement.get_target(ntp); + if ( + !maybe_target + || maybe_target->log_revision != meta.log_revision) { + assign_eventually(ntp); + } + } ssx::background = assign_fiber(); co_return; @@ -610,6 +638,13 @@ class shard_assigner { return _gate.close(); } + void enable_persistence_eventually() { + if (!_enable_persistence) { + _enable_persistence = true; + _wakeup_event.set(); + } + } + void assign_eventually(const model::ntp& ntp) { _to_assign.insert(ntp); _wakeup_event.set(); @@ -630,6 +665,10 @@ class shard_assigner { co_return; } + if (_enable_persistence) { + co_await _shard_placement.enable_persistence(); + } + auto to_assign = std::exchange(_to_assign, {}); _in_progress = true; co_await ss::max_concurrent_for_each( @@ -644,6 +683,7 @@ class shard_assigner { std::optional target; if (auto it = _ntpt.ntp2meta.find(ntp); it != _ntpt.ntp2meta.end()) { target = shard_placement_target( + it->second.group, it->second.log_revision, random_generators::get_int(get_max_shard_id())); } @@ -673,12 +713,38 @@ class shard_assigner { ntp2shards_t& _ntp2shards; ss::sharded& _rb; + bool _enable_persistence = false; chunked_hash_set _to_assign; bool _in_progress = false; ssx::event _wakeup_event{"shard_assigner"}; ss::gate _gate; }; +template +void assert_key_sets_equal( + const Left& left, + std::string_view left_str, + const Right& right, + std::string_view right_str) { + std::vector keys1; + for (const auto& kv : left) { + if (!right.contains(kv.first)) { + keys1.push_back(kv.first); + } + } + ASSERT_TRUE(keys1.empty()) << "keys in " << left_str << ", but not in " + << right_str << ": " << keys1; + + std::vector keys2; + for (const auto& kv : right) { + if (!left.contains(kv.first)) { + keys2.push_back(kv.first); + } + } + ASSERT_TRUE(keys2.empty()) << "keys in " << right_str << ", but not in " + << left_str << ": " << keys2; +} + } // namespace class shard_placement_test_fixture : public seastar_test { @@ -686,22 +752,25 @@ class shard_placement_test_fixture : public seastar_test { shard_placement_test_fixture() : test_dir("test.data." + random_generators::gen_alphanum_string(10)) {} - ss::future<> quiescent_state_checks() { - auto shard2states = co_await spt.map( + using ntp2shard2state_t = absl::node_hash_map< + model::ntp, + std::map>; + + ss::future get_ntp2shard2state() const { + auto shard2states = co_await spt->map( [](shard_placement_table& spt) { return spt._states; }); - absl::node_hash_map< - model::ntp, - std::map> - ntp2shard2state; + ntp2shard2state_t ntp2shard2state; for (size_t s = 0; s < shard2states.size(); ++s) { for (const auto& [ntp, state] : shard2states[s]) { ntp2shard2state[ntp].emplace(s, state); } } - ASSERT_EQ_CORO(ntp2shard2state.size(), ntpt.local().ntp2meta.size()); + co_return ntp2shard2state; + } + void clean_ntp2shards() { auto& ntp2shards = _ntp2shards.local(); for (auto it = ntp2shards.begin(); it != ntp2shards.end();) { auto it_copy = it++; @@ -718,8 +787,23 @@ class shard_placement_test_fixture : public seastar_test { ntp2shards.erase(it_copy); } } + } - ASSERT_EQ_CORO(ntp2shards.size(), ntpt.local().ntp2meta.size()); + ss::future<> quiescent_state_checks() { + auto ntp2shard2state = co_await get_ntp2shard2state(); + assert_key_sets_equal( + ntp2shard2state, + "spt placement state map", + ntpt.local().ntp2meta, + "ntp2meta map"); + + clean_ntp2shards(); + const auto& ntp2shards = _ntp2shards.local(); + assert_key_sets_equal( + ntp2shards, + "reference ntp state map", + ntpt.local().ntp2meta, + "ntp2meta map"); for (const auto& [ntp, meta] : ntpt.local().ntp2meta) { auto states_it = ntp2shard2state.find(ntp); @@ -727,8 +811,8 @@ class shard_placement_test_fixture : public seastar_test { << "ntp: " << ntp; const auto& shard2state = states_it->second; - auto entry_it = spt.local()._ntp2entry.find(ntp); - ASSERT_TRUE_CORO(entry_it != spt.local()._ntp2entry.end()) + auto entry_it = spt->local()._ntp2entry.find(ntp); + ASSERT_TRUE_CORO(entry_it != spt->local()._ntp2entry.end()) << "ntp: " << ntp; ASSERT_TRUE_CORO(entry_it->second->target) << "ntp: " << ntp; ASSERT_TRUE_CORO(entry_it->second->mtx.ready()) << "ntp: " << ntp; @@ -779,7 +863,7 @@ class shard_placement_test_fixture : public seastar_test { ASSERT_EQ_CORO(shards.rev2shards.size(), 1) << "ntp: " << ntp; auto p_shards_it = shards.rev2shards.find(meta.log_revision); ASSERT_TRUE_CORO(p_shards_it != shards.rev2shards.end()) - << "ntp: " << ntp; + << "ntp: " << ntp << ", log_revision: " << meta.log_revision; const auto& p_shards = p_shards_it->second; ASSERT_EQ_CORO(p_shards.launched_on, target.shard) << "ntp: " << ntp; @@ -790,18 +874,118 @@ class shard_placement_test_fixture : public seastar_test { } } + ss::future<> check_spt_recovery() { + clean_ntp2shards(); + const auto& ntp2shards = _ntp2shards.local(); + auto ntp2shard2state = co_await get_ntp2shard2state(); + assert_key_sets_equal( + ntp2shards, + "reference ntp state map", + ntp2shard2state, + "spt placement state map"); + + for (const auto& [ntp, expected] : ntp2shards) { + auto states_it = ntp2shard2state.find(ntp); + ASSERT_TRUE_CORO(states_it != ntp2shard2state.end()) + << "ntp: " << ntp; + const auto& shard2state = states_it->second; + + // check main target map + auto entry_it = spt->local()._ntp2entry.find(ntp); + if (expected.target) { + ASSERT_TRUE_CORO(entry_it != spt->local()._ntp2entry.end()) + << "ntp: " << ntp; + ASSERT_EQ_CORO(entry_it->second->target, expected.target) + << "ntp: " << ntp; + ASSERT_TRUE_CORO(entry_it->second->mtx.ready()) + << "ntp: " << ntp; + } else { + ASSERT_TRUE_CORO(entry_it == spt->local()._ntp2entry.end()) + << "ntp: " << ntp; + } + + // check assigned markers + if (expected.target) { + ASSERT_TRUE_CORO(shard2state.contains(expected.target->shard)); + } + for (const auto& [s, placement] : shard2state) { + if (expected.target && s == expected.target->shard) { + ASSERT_TRUE_CORO(placement.assigned) + << "ntp: " << ntp << ", shard: " << s; + ASSERT_EQ_CORO( + placement.assigned->log_revision, + expected.target->log_revision) + << "ntp: " << ntp << ", shard: " << s; + } else { + ASSERT_TRUE_CORO(!placement.assigned) + << "ntp: " << ntp << ", shard: " << s; + } + } + + // check that all shards with state are known in the placement map. + for (ss::shard_id s : expected.shards_with_some_state) { + auto state_it = shard2state.find(s); + ASSERT_TRUE_CORO(state_it != shard2state.end()) + << "ntp: " << ntp << ", shard: " << s; + ASSERT_TRUE_CORO(state_it->second.current) + << "ntp: " << ntp << ", shard: " << s; + } + } + } + ss::future<> start() { co_await ft.start(); co_await ft.invoke_on_all( [](features::feature_table& ft) { ft.testing_activate_all(); }); - co_await ntpt.start(); - co_await _ntp2shards.start_single(); - co_await sr.start(); - co_await kvs.start( + co_await restart_node(true); + } + + ss::future<> stop() { + if (_shard_assigner) { + co_await _shard_assigner->stop(); + } + if (rb) { + co_await rb->stop(); + } + if (spt) { + co_await spt->stop(); + } + if (kvs) { + co_await kvs->stop(); + } + co_await sr.stop(); + co_await _ntp2shards.stop(); + co_await ntpt.stop(); + co_await ft.stop(); + } + + ss::future<> restart_node(bool first_start) { + if (_shard_assigner) { + co_await _shard_assigner->stop(); + } + if (rb) { + co_await rb->stop(); + } + if (spt) { + co_await spt->stop(); + } + if (kvs) { + co_await kvs->stop(); + } + + for (auto& [ntp, shards] : _ntp2shards.local()) { + for (auto& [lr, p_shards] : shards.rev2shards) { + // "stop" mock partitions + p_shards.launched_on = std::nullopt; + } + } + + kvs = std::make_unique(); + co_await kvs->start( storage::kvstore_config( 1_MiB, config::mock_binding(10ms), @@ -809,34 +993,44 @@ class shard_placement_test_fixture : public seastar_test { storage::make_sanitized_file_config()), ss::sharded_parameter([this] { return std::ref(sr.local()); }), std::ref(ft)); - co_await kvs.invoke_on_all( + co_await kvs->invoke_on_all( [](storage::kvstore& kvs) { return kvs.start(); }); - co_await spt.start(); + spt = std::make_unique(); + co_await spt->start( + ss::sharded_parameter([this] { return std::ref(kvs->local()); })); - co_await rb.start(std::ref(ntpt), std::ref(spt), std::ref(_ntp2shards)); + if (!first_start) { + chunked_hash_map local_group2ntp; + for (const auto& [ntp, meta] : ntpt.local().ntp2meta) { + local_group2ntp.emplace(meta.group, ntp); + } + co_await spt->local().initialize_from_kvstore(local_group2ntp); + + for (auto& [ntp, shards] : _ntp2shards.local()) { + if ( + shards.target + && !local_group2ntp.contains(shards.target->group)) { + // clear obsolete targets + shards.target = std::nullopt; + } + } + } + + co_await check_spt_recovery(); + + rb = std::make_unique(); + co_await rb->start( + std::ref(ntpt), std::ref(*spt), std::ref(_ntp2shards)); _shard_assigner = std::make_unique( - ntpt, spt, _ntp2shards, rb); + ntpt, *spt, _ntp2shards, *rb); co_await _shard_assigner->start(); - co_await rb.invoke_on_all( + co_await rb->invoke_on_all( [](reconciliation_backend& rb) { return rb.start(); }); } - ss::future<> stop() { - if (_shard_assigner) { - co_await _shard_assigner->stop(); - } - co_await rb.stop(); - co_await spt.stop(); - co_await kvs.stop(); - co_await sr.stop(); - co_await _ntp2shards.stop(); - co_await ntpt.stop(); - co_await ft.stop(); - } - ss::future<> TearDownAsync() override { co_await stop(); co_await ss::recursive_remove_directory( @@ -848,33 +1042,51 @@ class shard_placement_test_fixture : public seastar_test { ss::sharded ntpt; ss::sharded _ntp2shards; // only on shard 0 ss::sharded sr; - ss::sharded kvs; - ss::sharded spt; - ss::sharded rb; + std::unique_ptr> kvs; + std::unique_ptr> spt; + std::unique_ptr> rb; std::unique_ptr _shard_assigner; }; TEST_F_CORO(shard_placement_test_fixture, StressTest) { model::revision_id cur_revision{1}; raft::group_id cur_group{1}; + prefix_logger logger(clusterlog, "TEST"); co_await start(); + // enable persistence midway through the test + size_t enable_persistence_at = random_generators::get_int(4'000, 6'000); + for (size_t i = 0; i < 10'000; ++i) { + if (i == enable_persistence_at) { + _shard_assigner->enable_persistence_eventually(); + } + if (random_generators::get_int(15) == 0) { - vlog(clusterlog.info, "waiting for reconciliation"); + vlog(logger.info, "waiting for reconciliation"); for (size_t i = 0;; ++i) { ASSERT_TRUE_CORO(i < 50) << "taking too long to reconcile"; if (!(_shard_assigner->is_reconciled() - && co_await rb.local().is_reconciled())) { + && co_await rb->local().is_reconciled())) { co_await ss::sleep(100ms); } else { break; } } - vlog(clusterlog.info, "reconciled"); + vlog(logger.info, "reconciled"); co_await quiescent_state_checks(); + continue; + } + + if ( + spt->local().is_persistence_enabled() + && random_generators::get_int(50) == 0) { + vlog(logger.info, "restarting"); + co_await restart_node(false); + vlog(logger.info, "restarted"); + continue; } // small set of ntps to ensure frequent overlaps @@ -887,7 +1099,7 @@ TEST_F_CORO(shard_placement_test_fixture, StressTest) { auto group = cur_group++; auto revision = cur_revision++; vlog( - clusterlog.info, + logger.info, "[{}] OP: add, group: {}, log revision: {}", ntp, group, @@ -915,11 +1127,11 @@ TEST_F_CORO(shard_placement_test_fixture, StressTest) { {op_t::transfer, op_t::remove, op_t::increase_log_rev}); switch (op) { case op_t::transfer: - vlog(clusterlog.info, "[{}] OP: reassign shard", ntp); + vlog(logger.info, "[{}] OP: reassign shard", ntp); _shard_assigner->assign_eventually(ntp); break; case op_t::remove: { - vlog(clusterlog.info, "[{}] OP: remove", ntp); + vlog(logger.info, "[{}] OP: remove", ntp); auto revision = cur_revision++; co_await ntpt.invoke_on_all([&](ntp_table& ntpt) { ntpt.ntp2meta.erase(ntp); @@ -933,7 +1145,7 @@ TEST_F_CORO(shard_placement_test_fixture, StressTest) { case op_t::increase_log_rev: ntp_meta.log_revision = cur_revision++; vlog( - clusterlog.info, + logger.info, "[{}] OP: increase log revision to: {}", ntp, ntp_meta.log_revision); @@ -949,7 +1161,7 @@ TEST_F_CORO(shard_placement_test_fixture, StressTest) { } } - vlog(clusterlog.info, "finished"); + vlog(logger.info, "finished"); } } // namespace cluster diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 7c6a84c6f3439..7eb7da4ff80fd 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -438,8 +438,13 @@ std::ostream& operator<<(std::ostream& o, const partition_assignment& p_as) { return o; } -std::ostream& operator<<(std::ostream& o, const shard_placement_target& eg) { - fmt::print(o, "{{log_revision: {}, shard: {}}}", eg.log_revision, eg.shard); +std::ostream& operator<<(std::ostream& o, const shard_placement_target& spt) { + fmt::print( + o, + "{{group: {}, log_revision: {}, shard: {}}}", + spt.group, + spt.log_revision, + spt.shard); return o; } diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index fbe1ea9d099be..809f889870172 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -2442,10 +2442,13 @@ using replicas_revision_map /// Log revision is needed to distinguish different incarnations of the /// partition. struct shard_placement_target { - shard_placement_target(model::revision_id lr, ss::shard_id s) - : log_revision(lr) + shard_placement_target( + raft::group_id g, model::revision_id lr, ss::shard_id s) + : group(g) + , log_revision(lr) , shard(s) {} + raft::group_id group; model::revision_id log_revision; ss::shard_id shard; diff --git a/src/v/features/feature_table.cc b/src/v/features/feature_table.cc index e784527d3533b..8dc3f2539b45e 100644 --- a/src/v/features/feature_table.cc +++ b/src/v/features/feature_table.cc @@ -101,6 +101,8 @@ std::string_view to_string_view(feature f) { return "role_based_access_control"; case feature::cluster_topic_manifest_format_v2: return "cluster_topic_manifest_format_v2"; + case feature::shard_placement_persistence: + return "shard_placement_persistence"; /* * testing features diff --git a/src/v/features/feature_table.h b/src/v/features/feature_table.h index fad0212ca2105..67bc8ef1c5df0 100644 --- a/src/v/features/feature_table.h +++ b/src/v/features/feature_table.h @@ -75,6 +75,7 @@ enum class feature : std::uint64_t { partition_shard_in_health_report = 1ULL << 43U, role_based_access_control = 1ULL << 44U, cluster_topic_manifest_format_v2 = 1ULL << 45U, + shard_placement_persistence = 1ULL << 46U, // Dummy features for testing only test_alpha = 1ULL << 61U, @@ -380,6 +381,12 @@ constexpr static std::array feature_schema{ feature::cluster_topic_manifest_format_v2, feature_spec::available_policy::always, feature_spec::prepare_policy::always}, + feature_spec{ + cluster::cluster_version{13}, + "shard_placement_persistence", + feature::shard_placement_persistence, + feature_spec::available_policy::always, + feature_spec::prepare_policy::always}, }; std::string_view to_string_view(feature); diff --git a/src/v/storage/kvstore.cc b/src/v/storage/kvstore.cc index 8f003ac4b9c99..f913e7629fe85 100644 --- a/src/v/storage/kvstore.cc +++ b/src/v/storage/kvstore.cc @@ -17,6 +17,7 @@ #include "model/namespace.h" #include "prometheus/prometheus_sanitize.h" #include "reflection/adl.h" +#include "ssx/async_algorithm.h" #include "storage/parser.h" #include "storage/record_batch_builder.h" #include "storage/segment.h" @@ -198,6 +199,25 @@ ss::future<> kvstore::put(key_space ks, bytes key, std::optional value) { }); } +ss::future<> kvstore::for_each( + key_space ks, + ss::noncopyable_function visitor) { + vassert(_started, "kvstore has not been started"); + auto gh = _gate.hold(); + auto units = co_await _db_mut.get_units(); + + auto prefix = make_spaced_key(ks, bytes_view{}); + co_await ssx::async_for_each( + _db.begin(), _db.end(), [&](const map_t::value_type& kv) { + auto spaced_key = bytes_view{kv.first}; + if (!spaced_key.starts_with(prefix)) { + return; + } + auto key = spaced_key.substr(prefix.length()); + visitor(key, kv.second); + }); +} + void kvstore::apply_op( bytes key, std::optional value, ssx::semaphore_units const&) { auto it = _db.find(key); diff --git a/src/v/storage/kvstore.h b/src/v/storage/kvstore.h index 8f5a2f0dcddcc..3ec6b89a1042d 100644 --- a/src/v/storage/kvstore.h +++ b/src/v/storage/kvstore.h @@ -101,6 +101,7 @@ class kvstore { offset_translator = 4, usage = 5, stms = 6, + shard_placement = 7, /* your sub-system here */ }; @@ -117,6 +118,12 @@ class kvstore { ss::future<> put(key_space ks, bytes key, iobuf value); ss::future<> remove(key_space ks, bytes key); + /// Iterate over all key-value pairs in a keyspace. + /// NOTE: this will stall all updates, so use with a lot of caution. + ss::future<> for_each( + key_space ks, + ss::noncopyable_function visitor); + bool empty() const { vassert(_started, "kvstore has not been started"); return _db.empty(); diff --git a/src/v/storage/tests/kvstore_test.cc b/src/v/storage/tests/kvstore_test.cc index c122409df9d9e..0be160c70566f 100644 --- a/src/v/storage/tests/kvstore_test.cc +++ b/src/v/storage/tests/kvstore_test.cc @@ -59,6 +59,18 @@ FIXTURE_TEST(key_space, kvstore_test_fixture) { kvs->get(storage::kvstore::key_space::consensus, empty_key).value() == value_d); + std::map testing_kvs; + kvs + ->for_each( + storage::kvstore::key_space::testing, + [&](bytes_view key, const iobuf& val) { + BOOST_REQUIRE(testing_kvs.emplace(key, val.copy()).second); + }) + .get(); + BOOST_REQUIRE_EQUAL(testing_kvs.size(), 2); + BOOST_REQUIRE(testing_kvs.at(key) == value_a); + BOOST_REQUIRE(testing_kvs.at(empty_key) == value_c); + kvs->stop().get(); // still all true after recovery diff --git a/tools/offline_log_viewer/kvstore.py b/tools/offline_log_viewer/kvstore.py index 3abdacfd0135a..5fe53bd63b62d 100644 --- a/tools/offline_log_viewer/kvstore.py +++ b/tools/offline_log_viewer/kvstore.py @@ -91,6 +91,8 @@ def _decode_ks(self, ks): return "usage" elif ks == 6: return "stms" + elif ks == 7: + return "shard_placement" return "unknown" def decode(self): @@ -259,6 +261,23 @@ def decode_storage_key(k): return ret +def decode_shard_placement_key(k): + rdr = Reader(BytesIO(k)) + ret = {} + ret['type'] = rdr.read_int32() + if ret['type'] == 0: + ret['name'] = "persistence_enabled" + elif ret['type'] == 1: + ret['name'] = "assignment" + ret['group'] = rdr.read_int64() + elif ret['type'] == 2: + ret['name'] = "current_state" + ret['group'] = rdr.read_int64() + else: + ret['name'] = "unknown" + return ret + + def decode_key(ks, key): data = key if ks == "consensus": @@ -269,6 +288,8 @@ def decode_key(ks, key): data = decode_offset_translator_key(key) elif ks == "stms": data = decode_stm_snapshot_key(key) + elif ks == "shard_placement": + data = decode_shard_placement_key(key) else: data = key.hex() return {'keyspace': ks, 'data': data}