diff --git a/src/v/cluster/cluster_utils.cc b/src/v/cluster/cluster_utils.cc index d3f2f8db975ae..547e15849445f 100644 --- a/src/v/cluster/cluster_utils.cc +++ b/src/v/cluster/cluster_utils.cc @@ -17,6 +17,7 @@ #include "cluster/simple_batch_builder.h" #include "cluster/types.h" #include "config/configuration.h" +#include "raft/consensus_utils.h" #include "raft/errc.h" #include "rpc/backoff_policy.h" #include "rpc/types.h" @@ -400,18 +401,7 @@ std::optional check_result_configuration( new_configuration.id()); } auto& current_configuration = it->second.broker; - /** - * do no allow to decrease node core count - */ - if ( - current_configuration.properties().cores - > new_configuration.properties().cores) { - return fmt::format( - "core count must not decrease on any broker, currently configured " - "core count: {}, requested core count: {}", - current_configuration.properties().cores, - new_configuration.properties().cores); - } + /** * When cluster member configuration changes Redpanda by default doesn't * allow the change if a new cluster configuration would have two @@ -477,4 +467,34 @@ std::optional check_result_configuration( } return {}; } + +ss::future<> copy_persistent_state( + const model::ntp& ntp, + raft::group_id group, + storage::kvstore& source_kvs, + ss::shard_id target_shard, + ss::sharded& storage) { + return ss::when_all_succeed( + storage::disk_log_impl::copy_kvstore_state( + ntp, source_kvs, target_shard, storage), + raft::details::copy_persistent_state( + group, source_kvs, target_shard, storage), + storage::offset_translator::copy_persistent_state( + group, source_kvs, target_shard, storage), + raft::copy_persistent_stm_state( + ntp, source_kvs, target_shard, storage)) + .discard_result(); +} + +ss::future<> remove_persistent_state( + const model::ntp& ntp, raft::group_id group, storage::kvstore& source_kvs) { + return ss::when_all_succeed( + storage::disk_log_impl::remove_kvstore_state(ntp, source_kvs), + raft::details::remove_persistent_state(group, source_kvs), + storage::offset_translator::remove_persistent_state( + group, source_kvs), + raft::remove_persistent_stm_state(ntp, source_kvs)) + .discard_result(); +} + } // namespace cluster diff --git a/src/v/cluster/cluster_utils.h b/src/v/cluster/cluster_utils.h index da882a7608df4..35d18450f17b4 100644 --- a/src/v/cluster/cluster_utils.h +++ b/src/v/cluster/cluster_utils.h @@ -354,4 +354,17 @@ std::optional check_result_configuration( const members_table::cache_t& current_brokers, const model::broker& to_update); +/// Copies all bits of partition kvstore state from source kvstore to kvstore on +/// target shard. +ss::future<> copy_persistent_state( + const model::ntp&, + raft::group_id, + storage::kvstore& source_kvs, + ss::shard_id target_shard, + ss::sharded&); + +/// Removes all bits of partition kvstore state in source kvstore. +ss::future<> remove_persistent_state( + const model::ntp&, raft::group_id, storage::kvstore& source_kvs); + } // namespace cluster diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index f8da70a0dcd88..b8fda5cb6a56a 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -204,8 +204,10 @@ ss::future<> controller::wire_up() { std::ref(_node_status_table)); }) .then([this] { - return _shard_placement.start(ss::sharded_parameter( - [this] { return std::ref(_storage.local().kvs()); })); + return _shard_placement.start( + ss::sharded_parameter([] { return ss::this_shard_id(); }), + ss::sharded_parameter( + [this] { return std::ref(_storage.local().kvs()); })); }) .then([this] { _probe.start(); }); } @@ -229,571 +231,524 @@ ss::future<> controller::start( std::back_inserter(seed_nodes), [](const model::broker& b) { return b.id(); }); - return validate_configuration_invariants() - .then([this, initial_raft0_brokers]() mutable { - return create_raft0( - _partition_manager, - _shard_table, - config::node().data_directory().as_sstring(), - std::move(initial_raft0_brokers)); - }) - .then([this](consensus_ptr c) { _raft0 = c; }) - .then([this] { return _partition_leaders.start(std::ref(_tp_state)); }) - .then( - [this] { return _drain_manager.start(std::ref(_partition_manager)); }) - .then([this, application_start_time] { - return _members_manager.start_single( - _raft0, - std::ref(_stm), - std::ref(_feature_table), - std::ref(_members_table), - std::ref(_connections), - std::ref(_partition_allocator), - std::ref(_storage), - std::ref(_drain_manager), - std::ref(_partition_balancer_state), - std::ref(_as), - application_start_time); - }) - .then([this] { - return _feature_backend.start_single( - std::ref(_feature_table), std::ref(_storage)); - }) - .then([this] { - return _bootstrap_backend.start_single( - std::ref(_credentials), - std::ref(_storage), - std::ref(_members_manager), - std::ref(_feature_table), - std::ref(_feature_backend), - std::ref(_recovery_table)); - }) - .then([this] { return _recovery_table.start(); }) - .then([this] { - return _recovery_manager.start_single( - std::ref(_as), - std::ref(_stm), - std::ref(_feature_table), - std::ref(_cloud_storage_api), - std::ref(_recovery_table), - std::ref(_storage), - _raft0); - }) - .then([this] { return _plugin_table.start(); }) - .then([this] { return _plugin_backend.start_single(&_plugin_table); }) - .then([this] { return _quota_store.start(); }) - .then( - [this] { return _quota_backend.start_single(std::ref(_quota_store)); }) - .then([this] { - return _config_frontend.start( - std::ref(_stm), - std::ref(_connections), - std::ref(_partition_leaders), - std::ref(_feature_table), - std::ref(_as)); - }) - .then([this] { - return _config_manager.start_single( - std::ref(_config_preload), - std::ref(_config_frontend), - std::ref(_connections), - std::ref(_partition_leaders), - std::ref(_feature_table), - std::ref(_members_table), - std::ref(_as)); - }) - .then([this] { - return _data_migration_frontend.start( - _raft0->self().id(), - std::ref(*_data_migration_table), - std::ref(_feature_table), - std::ref(_stm), - std::ref(_partition_leaders), - std::ref(_connections), - std::ref(_as)); - }) - .then([this] { - limiter_configuration limiter_conf{ - config::shard_local_cfg() - .enable_controller_log_rate_limiting.bind(), - config::shard_local_cfg().rps_limit_topic_operations.bind(), - config::shard_local_cfg() - .controller_log_accummulation_rps_capacity_topic_operations - .bind(), - config::shard_local_cfg() - .rps_limit_acls_and_users_operations.bind(), - config::shard_local_cfg() - .controller_log_accummulation_rps_capacity_acls_and_users_operations - .bind(), - config::shard_local_cfg() - .rps_limit_node_management_operations.bind(), - config::shard_local_cfg() - .controller_log_accummulation_rps_capacity_node_management_operations - .bind(), - config::shard_local_cfg().rps_limit_move_operations.bind(), - config::shard_local_cfg() - .controller_log_accummulation_rps_capacity_move_operations.bind(), - config::shard_local_cfg().rps_limit_configuration_operations.bind(), - config::shard_local_cfg() - .controller_log_accummulation_rps_capacity_configuration_operations - .bind(), - }; - return _stm.start_single( - std::move(limiter_conf), - std::ref(_feature_table), - config::shard_local_cfg().controller_snapshot_max_age_sec.bind(), - std::ref(clusterlog), - _raft0.get(), - raft::persistent_last_applied::yes, - absl::flat_hash_set{ - model::record_batch_type::checkpoint, - model::record_batch_type::raft_configuration, - model::record_batch_type::data_policy_management_cmd}, - std::ref(_tp_updates_dispatcher), - std::ref(_security_manager), - std::ref(_members_manager), - std::ref(_config_manager), - std::ref(_feature_backend), - std::ref(_bootstrap_backend), - std::ref(_plugin_backend), - std::ref(_recovery_manager), - std::ref(_quota_backend), - std::ref(*_data_migration_table)); - }) - .then([this] { - return _members_frontend.start( - std::ref(_stm), - std::ref(_connections), - std::ref(_partition_leaders), - std::ref(_feature_table), - std::ref(_as)); - }) - .then([this] { - return _security_frontend.start( - _raft0->self().id(), - this, - std::ref(_stm), - std::ref(_connections), - std::ref(_partition_leaders), - std::ref(_feature_table), - std::ref(_as), - std::ref(_authorizer)); - }) - .then([this] { - return _ephemeral_credential_frontend.start( - self(), - std::ref(_credentials), - std::ref(_ephemeral_credentials), - std::ref(_feature_table), - std::ref(_connections)); - }) - .then([this] { - return _tp_frontend.start( - _raft0->self().id(), - std::ref(_stm), - std::ref(_connections), - std::ref(_partition_allocator), - std::ref(_partition_leaders), - std::ref(_tp_state), - std::ref(_hm_frontend), - std::ref(_as), - std::ref(_cloud_storage_api), - std::ref(_feature_table), - std::ref(_members_table), - std::ref(_partition_manager), - std::ref(_shard_table), - std::ref(_shard_balancer), - ss::sharded_parameter( - [this] { return std::ref(_data_migrated_resources.local()); }), - ss::sharded_parameter( - [this] { return std::ref(_plugin_table.local()); }), - ss::sharded_parameter( - [this] { return std::ref(_metadata_cache.local()); }), - ss::sharded_parameter([] { - return config::shard_local_cfg() - .storage_space_alert_free_threshold_percent.bind(); - }), - ss::sharded_parameter([] { - return config::shard_local_cfg() - .minimum_topic_replication.bind(); - }), - ss::sharded_parameter([] { - return config::shard_local_cfg() - .partition_autobalancing_topic_aware.bind(); - })); - }) - .then([this] { - return _plugin_frontend.start( - _raft0->self().id(), - ss::sharded_parameter( - [this] { return &_partition_leaders.local(); }), - ss::sharded_parameter([this] { return &_plugin_table.local(); }), - ss::sharded_parameter([this] { return &_tp_state.local(); }), - ss::sharded_parameter([this] { - return _stm.local_is_initialized() ? &_stm.local() : nullptr; - }), - ss::sharded_parameter([this] { return &_connections.local(); }), - ss::sharded_parameter([this] { return &_as.local(); })); - }) - .then( - [this] { return _quota_frontend.start(std::ref(_stm), std::ref(_as)); }) - .then([this] { - return _members_backend.start_single( - std::ref(_tp_frontend), - std::ref(_tp_state), - std::ref(_partition_allocator), - std::ref(_members_table), - std::ref(_api), - std::ref(_members_manager), - std::ref(_members_frontend), - std::ref(_feature_table), - _raft0, - std::ref(_as)); - }) - .then([this] { - return _backend.start( - std::ref(_tp_state), - std::ref(_shard_placement), - std::ref(_shard_table), - std::ref(_partition_manager), - std::ref(_members_table), - std::ref(_partition_leaders), - std::ref(_tp_frontend), - std::ref(_storage), - std::ref(_feature_table), - ss::sharded_parameter([] { - return config::shard_local_cfg() - .controller_backend_housekeeping_interval_ms.bind(); - }), - ss::sharded_parameter([] { - return config::shard_local_cfg() - .initial_retention_local_target_bytes_default.bind(); - }), - ss::sharded_parameter([] { - return config::shard_local_cfg() - .initial_retention_local_target_ms_default.bind(); - }), - ss::sharded_parameter([] { - return config::shard_local_cfg() - .retention_local_target_bytes_default.bind(); - }), - ss::sharded_parameter([] { - return config::shard_local_cfg() - .retention_local_target_ms_default.bind(); - }), - ss::sharded_parameter([] { - return config::shard_local_cfg().retention_local_strict.bind(); - }), - std::ref(_as)); - }) - .then([this] { - return _shard_balancer.start_single( - std::ref(_shard_placement), - std::ref(_feature_table), - std::ref(_storage), - std::ref(_tp_state), - std::ref(_backend), - config::shard_local_cfg() - .core_balancing_on_core_count_change.bind(), - config::shard_local_cfg().core_balancing_continuous.bind(), - config::shard_local_cfg().core_balancing_debounce_timeout.bind()); - }) - .then( - [this] { return _drain_manager.invoke_on_all(&drain_manager::start); }) - .then([this, initial_raft0_brokers]() mutable { - return _members_manager.invoke_on( - members_manager::shard, - [initial_raft0_brokers = std::move(initial_raft0_brokers)]( - members_manager& manager) mutable { - return manager.start(std::move(initial_raft0_brokers)); - }); - }) - .then([this] { - /** - * Controller state machine MUST be started after all entities that - * receives `apply_update` notifications - */ - return _stm.invoke_on(controller_stm_shard, &controller_stm::start); - }) - .then([this, &as = shard0_as] { - auto disk_dirty_offset = _raft0->log()->offsets().dirty_offset; - - return _stm - .invoke_on( - controller_stm_shard, - [disk_dirty_offset, &as = as](controller_stm& stm) { - // we do not have to use timeout in here as all the batches to - // apply have to be accesssible - auto last_applied = stm.bootstrap_last_applied(); - - // Consistency check: on a bug-free system, the last_applied - // value in the kvstore always points to data on disk. - // However, if there is a bug, or someone has removed some log - // segments out of band, we will hang trying to read up to - // last_applied. Mitigate this by clamping it to the top of - // the log on disk. - if (last_applied > disk_dirty_offset) { - vlog( - clusterlog.error, - "Inconsistency detected between KVStore last_applied " - "({}) and actual log size ({}). If the storage " - "directory was not modified intentionally, this is a " - "bug.", - last_applied, - disk_dirty_offset); - - // Try waiting for replay to reach the disk_dirty_offset, - // ignore last_applied. - return stm - .wait(disk_dirty_offset, model::time_from_now(5s)) - .handle_exception_type([](const ss::timed_out_error&) { - // Ignore timeout: it just means the controller - // log replay is done without hitting the disk - // log hwm (truncation happened), or that we were - // slow and controller replay will continue in - // the background while the rest of redpanda - // starts up. - }); - } + auto conf_invariants = co_await validate_configuration_invariants(); - vlog( - clusterlog.info, - "Controller log replay starting (to offset {})", - last_applied); - - if (last_applied == model::offset{}) { - return ss::now(); - } else { - // The abort source we use here is specific to our startup - // phase, where we can't yet safely use our member abort - // source. - return stm.wait(last_applied, model::no_timeout, as); - } - }) - .then([this] { - vlog(clusterlog.info, "Controller log replay complete."); - /// Once the controller log is replayed and topics are recovered - /// print the RF minimum warning - _tp_frontend.local().print_rf_warning_message(); - }); - }) - .then([this, &discovery] { return cluster_creation_hook(discovery); }) - .then([this] { - // start shard_balancer before controller_backend so that it boostraps - // shard_placement_table and controller_backend can start with already - // initialized table. - return _shard_balancer.invoke_on( - shard_balancer::shard_id, &shard_balancer::start); - }) - .then( - [this] { return _backend.invoke_on_all(&controller_backend::start); }) - .then([this] { - return _api.start( - _raft0->self().id(), - std::ref(_backend), - std::ref(_tp_state), - std::ref(_shard_table), - std::ref(_connections), - std::ref(_hm_frontend), - std::ref(_members_table), - std::ref(_partition_balancer), - std::ref(_as)); - }) - .then([this] { - return _members_backend.invoke_on( - members_manager::shard, &members_backend::start); - }) - .then([this] { - return _config_manager.invoke_on( - config_manager::shard, &config_manager::start); - }) - .then([this] { - return _feature_manager.start_single( - std::ref(_stm), - std::ref(_as), - std::ref(_members_table), - std::ref(_raft_manager), - std::ref(_hm_frontend), - std::ref(_hm_backend), - std::ref(_feature_table), - std::ref(_connections), - std::ref(_roles), - _raft0->group()); - }) - .then([this] { - return _health_manager.start_single( - _raft0->self().id(), - config::shard_local_cfg().internal_topic_replication_factor(), - config::shard_local_cfg().health_manager_tick_interval(), - config::shard_local_cfg() - .partition_autobalancing_concurrent_moves.bind(), - std::ref(_tp_state), - std::ref(_tp_frontend), - std::ref(_partition_allocator), - std::ref(_partition_leaders), - std::ref(_members_table), - std::ref(_as)); - }) - .then([this] { - return _health_manager.invoke_on( - health_manager::shard, &health_manager::start); - }) - .then([this] { - return _hm_backend.start_single( - _raft0, - std::ref(_members_table), - std::ref(_connections), - std::ref(_partition_manager), - std::ref(_raft_manager), - std::ref(_as), - std::ref(_local_monitor), - std::ref(_drain_manager), - std::ref(_feature_table), - std::ref(_partition_leaders), - std::ref(_tp_state)); - }) - .then([this] { - _leader_balancer = std::make_unique( - _tp_state.local(), - _partition_leaders.local(), - _members_table.local(), - _hm_backend.local(), - _feature_table.local(), - std::ref(_connections), - std::ref(_shard_table), - std::ref(_partition_manager), - std::ref(_as), - config::shard_local_cfg().enable_leader_balancer.bind(), - config::shard_local_cfg().leader_balancer_idle_timeout.bind(), - config::shard_local_cfg().leader_balancer_mute_timeout.bind(), - config::shard_local_cfg().leader_balancer_node_mute_timeout.bind(), - config::shard_local_cfg() - .leader_balancer_transfer_limit_per_shard.bind(), - _raft0); - return _leader_balancer->start(); - }) - .then([this] { - return _hm_frontend.start( - std::ref(_hm_backend), - std::ref(_node_status_table), - ss::sharded_parameter([]() { - return config::shard_local_cfg().alive_timeout_ms.bind(); - })); - }) - .then([this] { - return _hm_frontend.invoke_on_all(&health_monitor_frontend::start); - }) - .then([this] { - return _oidc_service.invoke_on_all(&security::oidc::service::start); - }) - .then([this, seed_nodes = std::move(seed_nodes)]() mutable { - return _feature_manager.invoke_on( - feature_manager::backend_shard, - &feature_manager::start, - std::move(seed_nodes)); - }) - .then([this] { - return _metrics_reporter.start_single( - _raft0, - std::ref(_stm), - std::ref(_members_table), - std::ref(_tp_state), - std::ref(_hm_frontend), - std::ref(_config_frontend), - std::ref(_feature_table), - std::ref(_roles), - std::addressof(_plugin_table), - std::ref(_as)); - }) - .then([this] { - return _metrics_reporter.invoke_on(0, &metrics_reporter::start); - }) - .then([this] { - return _partition_balancer.start_single( - _raft0, - std::ref(_stm), - std::ref(_partition_balancer_state), - std::ref(_hm_backend), - std::ref(_partition_allocator), - std::ref(_tp_frontend), - std::ref(_members_frontend), - config::shard_local_cfg().partition_autobalancing_mode.bind(), - config::shard_local_cfg() - .partition_autobalancing_node_availability_timeout_sec.bind(), - config::shard_local_cfg() - .partition_autobalancing_max_disk_usage_percent.bind(), - config::shard_local_cfg() - .storage_space_alert_free_threshold_percent.bind(), - config::shard_local_cfg() - .partition_autobalancing_tick_interval_ms.bind(), - config::shard_local_cfg() - .partition_autobalancing_concurrent_moves.bind(), - config::shard_local_cfg() - .partition_autobalancing_tick_moves_drop_threshold.bind(), - config::shard_local_cfg().segment_fallocation_step.bind(), - config::shard_local_cfg() - .partition_autobalancing_min_size_threshold.bind(), - config::shard_local_cfg().node_status_interval.bind(), - config::shard_local_cfg().raft_learner_recovery_rate.bind(), - config::shard_local_cfg() - .partition_autobalancing_topic_aware.bind()); - }) - .then([this] { - return _partition_balancer.invoke_on( - partition_balancer_backend::shard, - &partition_balancer_backend::start); - }) - .then([this, offsets_uploader, producer_id_recovery, offsets_recovery] { - if (config::node().recovery_mode_enabled()) { - return; - } - auto bucket_opt = get_configured_bucket(); - if (!bucket_opt.has_value()) { - return; + _raft0 = co_await create_raft0( + _partition_manager, + _shard_table, + config::node().data_directory().as_sstring(), + initial_raft0_brokers); + + co_await _partition_leaders.start(std::ref(_tp_state)); + co_await _drain_manager.start(std::ref(_partition_manager)); + co_await _members_manager.start_single( + _raft0, + std::ref(_stm), + std::ref(_feature_table), + std::ref(_members_table), + std::ref(_connections), + std::ref(_partition_allocator), + std::ref(_storage), + std::ref(_drain_manager), + std::ref(_partition_balancer_state), + std::ref(_as), + application_start_time); + co_await _feature_backend.start_single( + std::ref(_feature_table), std::ref(_storage)); + co_await _bootstrap_backend.start_single( + std::ref(_credentials), + std::ref(_storage), + std::ref(_members_manager), + std::ref(_feature_table), + std::ref(_feature_backend), + std::ref(_recovery_table)); + + co_await _recovery_table.start(); + co_await _recovery_manager.start_single( + std::ref(_as), + std::ref(_stm), + std::ref(_feature_table), + std::ref(_cloud_storage_api), + std::ref(_recovery_table), + std::ref(_storage), + _raft0); + + co_await _plugin_table.start(); + co_await _plugin_backend.start_single(&_plugin_table); + + co_await _quota_store.start(); + co_await _quota_backend.start_single(std::ref(_quota_store)); + + co_await _config_frontend.start( + std::ref(_stm), + std::ref(_connections), + std::ref(_partition_leaders), + std::ref(_feature_table), + std::ref(_as)); + co_await _config_manager.start_single( + std::ref(_config_preload), + std::ref(_config_frontend), + std::ref(_connections), + std::ref(_partition_leaders), + std::ref(_feature_table), + std::ref(_members_table), + std::ref(_as)); + + co_await _data_migration_frontend.start( + _raft0->self().id(), + std::ref(*_data_migration_table), + std::ref(_feature_table), + std::ref(_stm), + std::ref(_partition_leaders), + std::ref(_connections), + std::ref(_as)); + + { + limiter_configuration limiter_conf{ + config::shard_local_cfg().enable_controller_log_rate_limiting.bind(), + config::shard_local_cfg().rps_limit_topic_operations.bind(), + config::shard_local_cfg() + .controller_log_accummulation_rps_capacity_topic_operations.bind(), + config::shard_local_cfg().rps_limit_acls_and_users_operations.bind(), + config::shard_local_cfg() + .controller_log_accummulation_rps_capacity_acls_and_users_operations + .bind(), + config::shard_local_cfg().rps_limit_node_management_operations.bind(), + config::shard_local_cfg() + .controller_log_accummulation_rps_capacity_node_management_operations + .bind(), + config::shard_local_cfg().rps_limit_move_operations.bind(), + config::shard_local_cfg() + .controller_log_accummulation_rps_capacity_move_operations.bind(), + config::shard_local_cfg().rps_limit_configuration_operations.bind(), + config::shard_local_cfg() + .controller_log_accummulation_rps_capacity_configuration_operations + .bind(), + }; + co_await _stm.start_single( + std::move(limiter_conf), + std::ref(_feature_table), + config::shard_local_cfg().controller_snapshot_max_age_sec.bind(), + std::ref(clusterlog), + _raft0.get(), + raft::persistent_last_applied::yes, + absl::flat_hash_set{ + model::record_batch_type::checkpoint, + model::record_batch_type::raft_configuration, + model::record_batch_type::data_policy_management_cmd}, + std::ref(_tp_updates_dispatcher), + std::ref(_security_manager), + std::ref(_members_manager), + std::ref(_config_manager), + std::ref(_feature_backend), + std::ref(_bootstrap_backend), + std::ref(_plugin_backend), + std::ref(_recovery_manager), + std::ref(_quota_backend), + std::ref(*_data_migration_table)); + } + + co_await _members_frontend.start( + std::ref(_stm), + std::ref(_connections), + std::ref(_partition_leaders), + std::ref(_feature_table), + std::ref(_as)); + + co_await _security_frontend.start( + _raft0->self().id(), + this, + std::ref(_stm), + std::ref(_connections), + std::ref(_partition_leaders), + std::ref(_feature_table), + std::ref(_as), + std::ref(_authorizer)); + co_await _ephemeral_credential_frontend.start( + self(), + std::ref(_credentials), + std::ref(_ephemeral_credentials), + std::ref(_feature_table), + std::ref(_connections)); + + co_await _tp_frontend.start( + _raft0->self().id(), + std::ref(_stm), + std::ref(_connections), + std::ref(_partition_allocator), + std::ref(_partition_leaders), + std::ref(_tp_state), + std::ref(_hm_frontend), + std::ref(_as), + std::ref(_cloud_storage_api), + std::ref(_feature_table), + std::ref(_members_table), + std::ref(_partition_manager), + std::ref(_shard_table), + std::ref(_shard_balancer), + ss::sharded_parameter( + [this] { return std::ref(_data_migrated_resources.local()); }), + ss::sharded_parameter([this] { return std::ref(_plugin_table.local()); }), + ss::sharded_parameter( + [this] { return std::ref(_metadata_cache.local()); }), + ss::sharded_parameter([] { + return config::shard_local_cfg() + .storage_space_alert_free_threshold_percent.bind(); + }), + ss::sharded_parameter([] { + return config::shard_local_cfg().minimum_topic_replication.bind(); + }), + ss::sharded_parameter([] { + return config::shard_local_cfg() + .partition_autobalancing_topic_aware.bind(); + })); + + co_await _plugin_frontend.start( + _raft0->self().id(), + ss::sharded_parameter([this] { return &_partition_leaders.local(); }), + ss::sharded_parameter([this] { return &_plugin_table.local(); }), + ss::sharded_parameter([this] { return &_tp_state.local(); }), + ss::sharded_parameter([this] { + return _stm.local_is_initialized() ? &_stm.local() : nullptr; + }), + ss::sharded_parameter([this] { return &_connections.local(); }), + ss::sharded_parameter([this] { return &_as.local(); })); + + co_await _quota_frontend.start(std::ref(_stm), std::ref(_as)); + + co_await _members_backend.start_single( + std::ref(_tp_frontend), + std::ref(_tp_state), + std::ref(_partition_allocator), + std::ref(_members_table), + std::ref(_api), + std::ref(_members_manager), + std::ref(_members_frontend), + std::ref(_feature_table), + _raft0, + std::ref(_as)); + + co_await _backend.start( + std::ref(_tp_state), + std::ref(_shard_placement), + std::ref(_shard_table), + std::ref(_partition_manager), + std::ref(_members_table), + std::ref(_partition_leaders), + std::ref(_tp_frontend), + std::ref(_storage), + std::ref(_feature_table), + ss::sharded_parameter([] { + return config::shard_local_cfg() + .controller_backend_housekeeping_interval_ms.bind(); + }), + ss::sharded_parameter([] { + return config::shard_local_cfg() + .initial_retention_local_target_bytes_default.bind(); + }), + ss::sharded_parameter([] { + return config::shard_local_cfg() + .initial_retention_local_target_ms_default.bind(); + }), + ss::sharded_parameter([] { + return config::shard_local_cfg() + .retention_local_target_bytes_default.bind(); + }), + ss::sharded_parameter([] { + return config::shard_local_cfg() + .retention_local_target_ms_default.bind(); + }), + ss::sharded_parameter( + [] { return config::shard_local_cfg().retention_local_strict.bind(); }), + std::ref(_as)); + + co_await _shard_balancer.start_single( + std::ref(_shard_placement), + std::ref(_feature_table), + std::ref(_storage), + std::ref(_tp_state), + std::ref(_backend), + config::shard_local_cfg().core_balancing_on_core_count_change.bind(), + config::shard_local_cfg().core_balancing_continuous.bind(), + config::shard_local_cfg().core_balancing_debounce_timeout.bind(), + config::shard_local_cfg().topic_partitions_per_shard.bind(), + config::shard_local_cfg().topic_partitions_reserve_shard0.bind()); + + co_await _drain_manager.invoke_on_all(&drain_manager::start); + + co_await _members_manager.invoke_on( + members_manager::shard, + [initial_raft0_brokers](members_manager& manager) mutable { + return manager.start(std::move(initial_raft0_brokers)); + }); + + /** + * Controller state machine MUST be started after all entities that + * receives `apply_update` notifications + */ + co_await _stm.invoke_on(controller_stm_shard, &controller_stm::start); + + auto disk_dirty_offset = _raft0->log()->offsets().dirty_offset; + co_await _stm.invoke_on( + controller_stm_shard, + [disk_dirty_offset, &as = shard0_as](controller_stm& stm) { + // we do not have to use timeout in here as all the batches to + // apply have to be accesssible + auto last_applied = stm.bootstrap_last_applied(); + + // Consistency check: on a bug-free system, the last_applied + // value in the kvstore always points to data on disk. + // However, if there is a bug, or someone has removed some log + // segments out of band, we will hang trying to read up to + // last_applied. Mitigate this by clamping it to the top of + // the log on disk. + if (last_applied > disk_dirty_offset) { + vlog( + clusterlog.error, + "Inconsistency detected between KVStore last_applied " + "({}) and actual log size ({}). If the storage " + "directory was not modified intentionally, this is a " + "bug.", + last_applied, + disk_dirty_offset); + + // Try waiting for replay to reach the disk_dirty_offset, + // ignore last_applied. + return stm.wait(disk_dirty_offset, model::time_from_now(5s)) + .handle_exception_type([](const ss::timed_out_error&) { + // Ignore timeout: it just means the controller + // log replay is done without hitting the disk + // log hwm (truncation happened), or that we were + // slow and controller replay will continue in + // the background while the rest of redpanda + // starts up. + }); } - cloud_storage_clients::bucket_name bucket = bucket_opt.value(); - _metadata_uploader = std::make_unique( - _raft_manager.local(), - _storage.local(), - bucket, - _cloud_storage_api.local(), - _raft0, - _tp_state.local(), - offsets_uploader); - if (config::shard_local_cfg().enable_cluster_metadata_upload_loop()) { - _metadata_uploader->start(); + + vlog( + clusterlog.info, + "Controller log replay starting (to offset {})", + last_applied); + + if (last_applied == model::offset{}) { + return ss::now(); + } else { + // The abort source we use here is specific to our startup + // phase, where we can't yet safely use our member abort + // source. + return stm.wait(last_applied, model::no_timeout, as); } - _recovery_backend - = std::make_unique( - _recovery_manager.local(), + }); + vlog(clusterlog.info, "Controller log replay complete."); + + /// Once the controller log is replayed and topics are recovered + /// print the RF minimum warning + _tp_frontend.local().print_rf_warning_message(); + + co_await cluster_creation_hook(discovery); + + // start shard_balancer before controller_backend so that it bootstraps + // shard_placement_table and controller_backend can start with already + // initialized table. + co_await _shard_balancer.invoke_on( + shard_balancer::shard_id, + &shard_balancer::start, + conf_invariants.core_count); + + if (conf_invariants.core_count > ss::smp::count) { + // Successfully starting shard_balancer with reduced core count means + // that all partition info from extra kvstores has been copied and we + // can finally update the configuration invariants. + auto new_invariants = configuration_invariants( + *config::node().node_id(), ss::smp::count); + co_await _storage.local().kvs().put( + storage::kvstore::key_space::controller, + invariants_key, + reflection::to_iobuf(configuration_invariants{new_invariants})); + conf_invariants = new_invariants; + vlog( + clusterlog.info, + "successfully decreased core count, " + "updated configuration invariants: {}", + conf_invariants); + } + + co_await _backend.invoke_on_all(&controller_backend::start); + + co_await _api.start( + _raft0->self().id(), + std::ref(_backend), + std::ref(_tp_state), + std::ref(_shard_table), + std::ref(_connections), + std::ref(_hm_frontend), + std::ref(_members_table), + std::ref(_partition_balancer), + std::ref(_as)); + + co_await _members_backend.invoke_on( + members_manager::shard, &members_backend::start); + co_await _config_manager.invoke_on( + config_manager::shard, &config_manager::start); + co_await _feature_manager.start_single( + std::ref(_stm), + std::ref(_as), + std::ref(_members_table), + std::ref(_raft_manager), + std::ref(_hm_frontend), + std::ref(_hm_backend), + std::ref(_feature_table), + std::ref(_connections), + std::ref(_roles), + _raft0->group()); + + co_await _health_manager.start_single( + _raft0->self().id(), + config::shard_local_cfg().internal_topic_replication_factor(), + config::shard_local_cfg().health_manager_tick_interval(), + config::shard_local_cfg().partition_autobalancing_concurrent_moves.bind(), + std::ref(_tp_state), + std::ref(_tp_frontend), + std::ref(_partition_allocator), + std::ref(_partition_leaders), + std::ref(_members_table), + std::ref(_as)); + co_await _health_manager.invoke_on( + health_manager::shard, &health_manager::start); + + co_await _hm_backend.start_single( + _raft0, + std::ref(_members_table), + std::ref(_connections), + std::ref(_partition_manager), + std::ref(_raft_manager), + std::ref(_as), + std::ref(_local_monitor), + std::ref(_drain_manager), + std::ref(_feature_table), + std::ref(_partition_leaders), + std::ref(_tp_state)); + + _leader_balancer = std::make_unique( + _tp_state.local(), + _partition_leaders.local(), + _members_table.local(), + _hm_backend.local(), + _feature_table.local(), + std::ref(_connections), + std::ref(_shard_table), + std::ref(_partition_manager), + std::ref(_as), + config::shard_local_cfg().enable_leader_balancer.bind(), + config::shard_local_cfg().leader_balancer_idle_timeout.bind(), + config::shard_local_cfg().leader_balancer_mute_timeout.bind(), + config::shard_local_cfg().leader_balancer_node_mute_timeout.bind(), + config::shard_local_cfg().leader_balancer_transfer_limit_per_shard.bind(), + _raft0); + co_await _leader_balancer->start(); + + co_await _hm_frontend.start( + std::ref(_hm_backend), + std::ref(_node_status_table), + ss::sharded_parameter( + []() { return config::shard_local_cfg().alive_timeout_ms.bind(); })); + co_await _hm_frontend.invoke_on_all(&health_monitor_frontend::start); + + co_await _oidc_service.invoke_on_all(&security::oidc::service::start); + + co_await _feature_manager.invoke_on( + feature_manager::backend_shard, + &feature_manager::start, + std::vector{seed_nodes}); + + co_await _metrics_reporter.start_single( + _raft0, + std::ref(_stm), + std::ref(_members_table), + std::ref(_tp_state), + std::ref(_hm_frontend), + std::ref(_config_frontend), + std::ref(_feature_table), + std::ref(_roles), + std::addressof(_plugin_table), + std::ref(_as)); + co_await _metrics_reporter.invoke_on(0, &metrics_reporter::start); + + co_await _partition_balancer.start_single( + _raft0, + std::ref(_stm), + std::ref(_partition_balancer_state), + std::ref(_hm_backend), + std::ref(_partition_allocator), + std::ref(_tp_frontend), + std::ref(_members_frontend), + config::shard_local_cfg().partition_autobalancing_mode.bind(), + config::shard_local_cfg() + .partition_autobalancing_node_availability_timeout_sec.bind(), + config::shard_local_cfg() + .partition_autobalancing_max_disk_usage_percent.bind(), + config::shard_local_cfg() + .storage_space_alert_free_threshold_percent.bind(), + config::shard_local_cfg().partition_autobalancing_tick_interval_ms.bind(), + config::shard_local_cfg().partition_autobalancing_concurrent_moves.bind(), + config::shard_local_cfg() + .partition_autobalancing_tick_moves_drop_threshold.bind(), + config::shard_local_cfg().segment_fallocation_step.bind(), + config::shard_local_cfg() + .partition_autobalancing_min_size_threshold.bind(), + config::shard_local_cfg().node_status_interval.bind(), + config::shard_local_cfg().raft_learner_recovery_rate.bind(), + config::shard_local_cfg().partition_autobalancing_topic_aware.bind()); + co_await _partition_balancer.invoke_on( + partition_balancer_backend::shard, &partition_balancer_backend::start); + + if (!config::node().recovery_mode_enabled()) { + auto bucket_opt = get_configured_bucket(); + if (bucket_opt.has_value()) { + cloud_storage_clients::bucket_name bucket = bucket_opt.value(); + _metadata_uploader = std::make_unique( _raft_manager.local(), + _storage.local(), + bucket, _cloud_storage_api.local(), - _cloud_cache.local(), - _members_table.local(), - _feature_table.local(), - _credentials.local(), + _raft0, _tp_state.local(), - _api.local(), - _feature_manager.local(), - _config_frontend.local(), - _security_frontend.local(), - _tp_frontend.local(), - producer_id_recovery, - offsets_recovery, - std::ref(_recovery_table), - _raft0); - if (!config::shard_local_cfg() - .disable_cluster_recovery_loop_for_tests()) { - _recovery_backend->start(); - } - }) - .then([this] { - _data_migration_backend = std::make_unique( - std::ref(*_data_migration_table), - std::ref(_data_migration_frontend.local()), - std::ref(_as.local())); - return _data_migration_backend->start(); - }); + offsets_uploader); + if (config::shard_local_cfg() + .enable_cluster_metadata_upload_loop()) { + _metadata_uploader->start(); + } + _recovery_backend + = std::make_unique( + _recovery_manager.local(), + _raft_manager.local(), + _cloud_storage_api.local(), + _cloud_cache.local(), + _members_table.local(), + _feature_table.local(), + _credentials.local(), + _tp_state.local(), + _api.local(), + _feature_manager.local(), + _config_frontend.local(), + _security_frontend.local(), + _tp_frontend.local(), + producer_id_recovery, + offsets_recovery, + std::ref(_recovery_table), + _raft0); + if (!config::shard_local_cfg() + .disable_cluster_recovery_loop_for_tests()) { + _recovery_backend->start(); + } + } + } + + _data_migration_backend = std::make_unique( + std::ref(*_data_migration_table), + std::ref(_data_migration_frontend.local()), + std::ref(_as.local())); + co_await _data_migration_backend->start(); } ss::future<> controller::set_ready() { @@ -1156,7 +1111,8 @@ controller::do_get_controller_partition_state(model::node_id target_node) { * from startup up far enough to disrupt the rest of the cluster. * @return */ -ss::future<> controller::validate_configuration_invariants() { +ss::future +controller::validate_configuration_invariants() { auto invariants_buf = _storage.local().kvs().get( storage::kvstore::key_space::controller, invariants_key); vassert( @@ -1168,10 +1124,15 @@ ss::future<> controller::validate_configuration_invariants() { if (!invariants_buf) { // store configuration invariants - return _storage.local().kvs().put( + co_await _storage.local().kvs().put( storage::kvstore::key_space::controller, invariants_key, - reflection::to_iobuf(std::move(current))); + reflection::to_iobuf(configuration_invariants{current})); + vlog( + clusterlog.info, + "persisted initial configuration invariants: {}", + current); + co_return current; } auto invariants = reflection::from_iobuf( std::move(*invariants_buf)); @@ -1184,30 +1145,25 @@ ss::future<> controller::validate_configuration_invariants() { "supported", invariants.node_id, current.node_id); - return ss::make_exception_future( - configuration_invariants_changed(invariants, current)); + throw configuration_invariants_changed(invariants, current); } - if (invariants.core_count > current.core_count) { - vlog( - clusterlog.error, - "Detected change in number of cores dedicated to run redpanda." - "Decreasing redpanda core count is not allowed. Expected core " - "count " - "{}, currently have {} cores.", - invariants.core_count, - ss::smp::count); - return ss::make_exception_future( - configuration_invariants_changed(invariants, current)); - } else if (invariants.core_count != current.core_count) { + if (current.core_count > invariants.core_count) { // Update the persistent invariants to reflect increased core // count -- this tracks the high water mark of core count, to - // reject subsequent decreases. - return _storage.local().kvs().put( + // track the number of extra kvstore shards that we need to process if + // the core count later decreases. + co_await _storage.local().kvs().put( storage::kvstore::key_space::controller, invariants_key, - reflection::to_iobuf(std::move(current))); + reflection::to_iobuf(configuration_invariants{current})); + invariants = current; + vlog(clusterlog.info, "updated configuration invariants: {}", current); + } else if (current.core_count < invariants.core_count) { + // If core count decreased, do nothing just now. shard_balancer will + // check if decreasing is possible and we will update + // configuration_invariants in kvstore later. } - return ss::now(); + co_return invariants; } } // namespace cluster diff --git a/src/v/cluster/controller.h b/src/v/cluster/controller.h index a71eb7502039f..9261d65404daa 100644 --- a/src/v/cluster/controller.h +++ b/src/v/cluster/controller.h @@ -256,7 +256,7 @@ class controller { std::optional get_configured_bucket(); // Checks configuration invariants stored in kvstore - ss::future<> validate_configuration_invariants(); + ss::future validate_configuration_invariants(); config_manager::preload_result _config_preload; diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index e5bdd9845f339..fd422f2d7cad9 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -1057,7 +1057,7 @@ ss::future> controller_backend::reconcile_ntp_step( = log_revision_on_node(replicas_view, _self); switch (placement.get_reconciliation_action(expected_log_revision)) { - case shard_placement_table::reconciliation_action::remove: { + case shard_placement_table::reconciliation_action::remove_partition: { // Cleanup obsolete revisions that should not exist on this node. This // is typically done after the replicas update is finished. rs.set_cur_operation( @@ -1074,6 +1074,17 @@ ss::future> controller_backend::reconcile_ntp_step( } co_return ss::stop_iteration::no; } + case shard_placement_table::reconciliation_action::remove_kvstore_state: + // Cleanup obsolete kvstore state (without removing the partition data + // itself). This can be required after a cross-shard transfer is + // retried. + rs.set_cur_operation( + replicas_view.last_update_finished_revision(), + partition_operation_type::finish_update, + replicas_view.assignment); + co_await remove_partition_kvstore_state( + ntp, placement.current.value().group, expected_log_revision.value()); + co_return ss::stop_iteration::no; case shard_placement_table::reconciliation_action::wait_for_target_update: co_return errc::waiting_for_shard_placement_update; case shard_placement_table::reconciliation_action::transfer: { @@ -1724,42 +1735,143 @@ ss::future controller_backend::transfer_partition( ntp, log_revision); - auto maybe_dest = co_await _shard_placement.prepare_transfer( - ntp, log_revision); - if (maybe_dest.has_error()) { - co_return maybe_dest.error(); + auto transfer_info = co_await _shard_placement.prepare_transfer( + ntp, log_revision, _shard_placement.container()); + if (transfer_info.source_error != errc::success) { + co_return transfer_info.source_error; + } else if (transfer_info.dest_error != errc::success) { + co_return transfer_info.dest_error; + } else if (transfer_info.is_finished) { + co_return errc::success; } - ss::shard_id destination = maybe_dest.value(); + ss::shard_id destination = transfer_info.destination.value(); auto partition = _partition_manager.local().get(ntp); if (partition) { co_await shutdown_partition(std::move(partition)); } - // TODO: copy, not move - co_await raft::details::move_persistent_state( - group, ss::this_shard_id(), destination, _storage); - co_await storage::offset_translator::move_persistent_state( - group, ss::this_shard_id(), destination, _storage); - co_await raft::move_persistent_stm_state( - ntp, ss::this_shard_id(), destination, _storage); - - co_await container().invoke_on( - destination, [&ntp, log_revision](controller_backend& dest) { - return dest._shard_placement - .finish_transfer_on_destination(ntp, log_revision) - .then([&] { - auto it = dest._states.find(ntp); - if (it != dest._states.end()) { - it->second->wakeup_event.set(); - } - }); - }); + co_await copy_persistent_state( + ntp, group, _storage.local().kvs(), destination, _storage); - co_await _shard_placement.finish_transfer_on_source(ntp, log_revision); + auto shard_callback = [this](const model::ntp& ntp) { + auto& dest = container().local(); + auto it = dest._states.find(ntp); + if (it != dest._states.end()) { + it->second->wakeup_event.set(); + } + }; + + co_await _shard_placement.finish_transfer( + ntp, log_revision, _shard_placement.container(), shard_callback); co_return errc::success; } +ss::future<> controller_backend::transfer_partitions_from_extra_shard( + storage::kvstore& extra_kvs, shard_placement_table& extra_spt) { + vassert( + _topic_table_notify_handle == notification_id_type_invalid, + "method is expected to be called before controller_backend is started"); + + co_await ss::max_concurrent_for_each( + extra_spt.shard_local_states(), + 256, + [&](const shard_placement_table::ntp2state_t::value_type& kv) { + const auto& [ntp, state] = kv; + return transfer_partition_from_extra_shard( + ntp, state, extra_kvs, extra_spt); + }); +} + +ss::future<> controller_backend::transfer_partition_from_extra_shard( + const model::ntp& ntp, + shard_placement_table::placement_state placement, + storage::kvstore& extra_kvs, + shard_placement_table& extra_spt) { + vlog( + clusterlog.debug, + "[{}] transferring partition from extra shard, placement: {}", + ntp, + placement); + + auto target = _shard_placement.get_target(ntp); + if (!target) { + co_return; + } + model::revision_id log_rev = target->log_revision; + + using reconciliation_action = shard_placement_table::reconciliation_action; + + if ( + placement.get_reconciliation_action(log_rev) + != reconciliation_action::transfer) { + // this can happen if the partition is already superceded by a partition + // with greater log revision. + co_return; + } + + auto transfer_info = co_await extra_spt.prepare_transfer( + ntp, log_rev, _shard_placement.container()); + if (transfer_info.source_error != errc::success) { + // This can happen if this extra shard was the destination of an + // unfinished x-shard transfer. We can ignore this partition as we + // already have a valid copy of kvstore data on one of the valid shards. + co_return; + } + + if (transfer_info.dest_error != errc::success) { + // clear kvstore state on destination + co_await container().invoke_on( + transfer_info.destination.value(), + [&ntp, log_rev](controller_backend& dest) { + auto dest_placement = dest._shard_placement.state_on_this_shard( + ntp); + vassert(dest_placement, "[{}] expected placement", ntp); + switch (dest_placement->get_reconciliation_action(log_rev)) { + case reconciliation_action::create: + case reconciliation_action::transfer: + case reconciliation_action::wait_for_target_update: + vassert( + false, + "[{}] unexpected reconciliation action, placement: {}", + ntp, + *dest_placement); + case reconciliation_action::remove_partition: + // TODO: remove obsolete log directory + case reconciliation_action::remove_kvstore_state: + break; + } + return remove_persistent_state( + ntp, + dest_placement->current.value().group, + dest._storage.local().kvs()) + .then([&dest, &ntp, log_rev] { + return dest._shard_placement.finish_delete(ntp, log_rev); + }); + }); + + transfer_info = co_await extra_spt.prepare_transfer( + ntp, log_rev, _shard_placement.container()); + } + + vassert( + transfer_info.destination && transfer_info.dest_error == errc::success, + "[{}] expected successful prepare_transfer, destination error: {}", + ntp, + transfer_info.dest_error); + + if (transfer_info.is_finished) { + co_return; + } + + ss::shard_id destination = transfer_info.destination.value(); + co_await copy_persistent_state( + ntp, target->group, extra_kvs, destination, _storage); + + co_await extra_spt.finish_transfer( + ntp, log_rev, _shard_placement.container(), [](const model::ntp&) {}); +} + ss::future<> controller_backend::shutdown_partition(ss::lw_shared_ptr partition) { vlog( @@ -1831,14 +1943,29 @@ ss::future controller_backend::delete_partition( if (part) { co_await remove_from_shard_table(ntp, part->group(), log_revision); co_await _partition_manager.local().remove(ntp, mode); - } + } else { + // TODO: delete log directory even when there is no partition object - // TODO: delete kvstore state even when there is no partition object + co_await remove_persistent_state( + ntp, placement->current->group, _storage.local().kvs()); + } co_await _shard_placement.finish_delete(ntp, log_revision); co_return errc::success; } +ss::future<> controller_backend::remove_partition_kvstore_state( + model::ntp ntp, raft::group_id group, model::revision_id log_revision) { + vlog( + clusterlog.debug, + "[{}] removing obsolete partition kvstore state, log_revision: {}", + ntp, + log_revision); + + co_await remove_persistent_state(ntp, group, _storage.local().kvs()); + co_await _shard_placement.finish_delete(ntp, log_revision); +} + bool controller_backend::should_skip(const model::ntp& ntp) const { return config::node().recovery_mode_enabled() && model::is_user_topic(ntp); } diff --git a/src/v/cluster/controller_backend.h b/src/v/cluster/controller_backend.h index c434c5ee09026..8fab87e8725fb 100644 --- a/src/v/cluster/controller_backend.h +++ b/src/v/cluster/controller_backend.h @@ -238,6 +238,12 @@ class controller_backend void notify_reconciliation(const model::ntp&); + /// Copy partition kvstore data from an extra shard (i.e. kvstore shard that + /// is >= ss::smp::count). This method is expected to be called *before* + /// start(). + ss::future<> transfer_partitions_from_extra_shard( + storage::kvstore&, shard_placement_table&); + private: struct ntp_reconciliation_state; @@ -306,6 +312,15 @@ class controller_backend model::revision_id cmd_revision, partition_removal_mode mode); + ss::future<> remove_partition_kvstore_state( + model::ntp, raft::group_id, model::revision_id log_revision); + + ss::future<> transfer_partition_from_extra_shard( + const model::ntp&, + shard_placement_table::placement_state, + storage::kvstore&, + shard_placement_table&); + ss::future> reconcile_partition_reconfiguration( ntp_reconciliation_state&, ss::lw_shared_ptr, @@ -391,7 +406,8 @@ class controller_backend absl::btree_map> _states; - cluster::notification_id_type _topic_table_notify_handle; + cluster::notification_id_type _topic_table_notify_handle + = notification_id_type_invalid; // Limits the number of concurrently executing reconciliation fibers. // Initially reconciliation is blocked and we deposit a non-zero amount of // units when we are ready to start reconciling. diff --git a/src/v/cluster/scheduling/allocation_node.cc b/src/v/cluster/scheduling/allocation_node.cc index 8b2ec285b77b3..4f3436fc0427d 100644 --- a/src/v/cluster/scheduling/allocation_node.cc +++ b/src/v/cluster/scheduling/allocation_node.cc @@ -142,15 +142,13 @@ void allocation_node::remove_final_count(partition_allocation_domain domain) { } void allocation_node::update_core_count(uint32_t core_count) { - vassert( - core_count >= cpus(), - "decreasing node core count is not supported, current core count {} > " - "requested core count {}", - cpus(), - core_count); - auto current_cpus = cpus(); - for (auto i = current_cpus; i < core_count; ++i) { - _weights.push_back(0); + auto old_count = _weights.size(); + if (core_count < old_count) { + _weights.resize(core_count); + } else { + for (auto i = old_count; i < core_count; ++i) { + _weights.push_back(0); + } } _max_capacity = allocation_capacity( (core_count * _partitions_per_shard()) - _partitions_reserve_shard0()); diff --git a/src/v/cluster/shard_balancer.cc b/src/v/cluster/shard_balancer.cc index bc8cd8a6a86c3..1c152a97b61d9 100644 --- a/src/v/cluster/shard_balancer.cc +++ b/src/v/cluster/shard_balancer.cc @@ -49,10 +49,12 @@ shard_balancer::shard_balancer( ss::sharded& cb, config::binding balancing_on_core_count_change, config::binding balancing_continuous, - config::binding debounce_timeout) + config::binding debounce_timeout, + config::binding partitions_per_shard, + config::binding partitions_reserve_shard0) : _shard_placement(spt.local()) , _features(features.local()) - , _kvstore(storage.local().kvs()) + , _storage(storage.local()) , _topics(topics) , _controller_backend(cb) , _self(*config::node().node_id()) @@ -60,6 +62,8 @@ shard_balancer::shard_balancer( , _balancing_continuous(std::move(balancing_continuous)) , _debounce_timeout(std::move(debounce_timeout)) , _debounce_jitter(_debounce_timeout()) + , _partitions_per_shard(std::move(partitions_per_shard)) + , _partitions_reserve_shard0(std::move(partitions_reserve_shard0)) , _balance_timer([this] { balance_timer_callback(); }) , _total_counts(ss::smp::count, 0) { _total_counts.at(0) += 1; // controller partition @@ -70,7 +74,7 @@ shard_balancer::shard_balancer( }); } -ss::future<> shard_balancer::start() { +ss::future<> shard_balancer::start(size_t kvstore_shard_count) { vassert( ss::this_shard_id() == shard_id, "method can only be invoked on shard {}", @@ -79,14 +83,14 @@ ss::future<> shard_balancer::start() { auto gate_holder = _gate.hold(); auto lock = co_await _mtx.get_units(); + // Collect the set of node-local ntps from topic_table + // We expect topic_table to remain unchanged throughout the method // invocation because it is supposed to be called after local controller // replay is finished but before we start getting new controller updates // from the leader. auto tt_version = _topics.local().topics_map_revision(); - // 1. collect the set of node-local ntps from topic_table - chunked_hash_map local_group2ntp; chunked_hash_map local_ntp2log_revision; const auto& topics = _topics.local(); @@ -115,14 +119,112 @@ ss::future<> shard_balancer::start() { }); } - // 2. restore shard_placement_table from the kvstore or from topic_table. + if (kvstore_shard_count > ss::smp::count) { + // Check that we can decrease shard count - if (_shard_placement.is_persistence_enabled()) { - co_await _shard_placement.initialize_from_kvstore(local_group2ntp); - } else if (_features.is_active( - features::feature::node_local_core_assignment)) { - // joiner node? enable persistence without initializing + ss::sstring reject_reason; + if (!_features.is_active( + features::feature::node_local_core_assignment)) { + reject_reason + = "node_local_core_assignment feature flag is not yet active"; + } + if (!_balancing_on_core_count_change()) { + reject_reason = "balancing on core count change is disabled"; + } + size_t max_capacity = ss::smp::count * _partitions_per_shard(); + max_capacity -= std::min( + max_capacity, static_cast(_partitions_reserve_shard0())); + if (local_group2ntp.size() > max_capacity) { + reject_reason = ssx::sformat( + "the number of partition replicas on this node ({}) is greater " + "than max capacity with this core count ({})", + local_group2ntp.size(), + max_capacity); + } + + if (!reject_reason.empty()) { + throw std::runtime_error(fmt_with_ctx( + fmt::format, + "Detected decrease in number of cores dedicated to run Redpanda " + "from {} to {}, but it is impossible because {}.", + kvstore_shard_count, + ss::smp::count, + reject_reason)); + } + } + + std::vector> extra_kvstores; + for (ss::shard_id s = ss::smp::count; s < kvstore_shard_count; ++s) { + extra_kvstores.push_back(co_await _storage.make_extra_kvstore(s)); + } + + co_await init_shard_placement( + lock, local_group2ntp, local_ntp2log_revision, extra_kvstores) + .finally([&] { + return ss::parallel_for_each( + extra_kvstores, [](auto& kvs) { return kvs->stop(); }); + }); + + if (kvstore_shard_count > ss::smp::count) { + // Now that all partition info is copied from extra kvstores, we can + // remove them. + co_await _storage.log_mgr().remove_orphan_files( + config::node().data_directory().as_sstring(), + {model::redpanda_ns}, + [](model::ntp ntp, storage::partition_path::metadata) { + return ntp.tp.topic == model::kvstore_topic + && ntp.tp.partition() >= static_cast(ss::smp::count); + }); + } + + // we shouldn't be receiving any controller updates at this point, so no + // risk of missing a notification between initializing shard_placement_table + // and subscribing. + _topic_table_notify_handle = _topics.local().register_delta_notification( + [this](topic_table::delta_range_t deltas_range) { + for (const auto& delta : deltas_range) { + // Filter out only deltas that might change the set of partition + // replicas on this node. + switch (delta.type) { + case topic_table_delta_type::disabled_flag_updated: + case topic_table_delta_type::properties_updated: + continue; + default: + _to_assign.insert(delta.ntp); + _wakeup_event.set(); + break; + } + } + }); + + vassert( + tt_version == _topics.local().topics_map_revision(), + "topic_table unexpectedly changed"); + + ssx::background = assign_fiber(); +} + +ss::future<> shard_balancer::init_shard_placement( + mutex::units& lock, + const chunked_hash_map& local_group2ntp, + const chunked_hash_map& + local_ntp2log_revision, + const std::vector>& extra_kvstores) { + // 1. restore shard_placement_table from the kvstore or from topic_table. + + if ( + _features.is_active(features::feature::node_local_core_assignment) + && !_shard_placement.is_persistence_enabled()) { + // Joiner node joining a cluster that has already enabled the feature? + // Enable persistence before initializing. co_await _shard_placement.enable_persistence(); + } + + std::vector> extra_spts; + + if (_shard_placement.is_persistence_enabled()) { + extra_spts = co_await _shard_placement.initialize_from_kvstore( + local_group2ntp, extra_kvstores); } else { // topic_table is still the source of truth co_await _shard_placement.initialize_from_topic_table(_topics, _self); @@ -136,13 +238,12 @@ ss::future<> shard_balancer::start() { } } - // 3. Initialize shard partition counts and assign non-assigned local ntps. + // 2. Initialize shard partition counts and assign non-assigned local ntps. // // Note: old assignments for ntps not in local_group2ntp have already been // removed during shard_placement_table initialization. - co_await ssx::async_for_each_counter( - counter, + co_await ssx::async_for_each( local_ntp2log_revision.begin(), local_ntp2log_revision.end(), [&](const std::pair kv) { @@ -165,37 +266,23 @@ ss::future<> shard_balancer::start() { co_await do_assign_ntps(lock); + // 3. Do balancing on startup if needed + if ( _balancing_on_core_count_change() && _features.is_active(features::feature::node_local_core_assignment)) { - co_await balance_on_core_count_change(lock); + co_await balance_on_core_count_change( + lock, ss::smp::count + extra_kvstores.size()); } - vassert( - tt_version == _topics.local().topics_map_revision(), - "topic_table unexpectedly changed"); - - // we shouldn't be receiving any controller updates at this point, so no - // risk of missing a notification between initializing shard_placement_table - // and subscribing. - _topic_table_notify_handle = _topics.local().register_delta_notification( - [this](topic_table::delta_range_t deltas_range) { - for (const auto& delta : deltas_range) { - // Filter out only deltas that might change the set of partition - // replicas on this node. - switch (delta.type) { - case topic_table_delta_type::disabled_flag_updated: - case topic_table_delta_type::properties_updated: - continue; - default: - _to_assign.insert(delta.ntp); - _wakeup_event.set(); - break; - } - } - }); + // 4. Move partition info from extra kvstores - ssx::background = assign_fiber(); + for (size_t i = 0; i < extra_kvstores.size(); ++i) { + auto& extra_kvs = *extra_kvstores.at(i); + auto& extra_spt = *extra_spts.at(i); + co_await _controller_backend.local() + .transfer_partitions_from_extra_shard(extra_kvs, extra_spt); + } } ss::future<> shard_balancer::stop() { @@ -398,9 +485,10 @@ void shard_balancer::maybe_assign( new_targets.emplace(ntp, target); } -ss::future<> shard_balancer::balance_on_core_count_change(mutex::units& lock) { +ss::future<> shard_balancer::balance_on_core_count_change( + mutex::units& lock, size_t kvstore_shard_count) { uint32_t last_rebalance_core_count = 0; - auto state_buf = _kvstore.get( + auto state_buf = _storage.kvs().get( storage::kvstore::key_space::shard_placement, state_kvstore_key()); if (state_buf) { last_rebalance_core_count = serde::from_iobuf( @@ -408,11 +496,13 @@ ss::future<> shard_balancer::balance_on_core_count_change(mutex::units& lock) { .last_rebalance_core_count; } - // If there is no state in kvstore, this means that we are restarting with - // shard balancing enabled for the first time, and this is a good time to - // rebalance as well. + // If there is no state in kvstore (and therefore last_rebalance_core_count + // is 0), this means that we are restarting with shard balancing enabled for + // the first time, and this is a good time to rebalance as well. - if (last_rebalance_core_count == ss::smp::count) { + if ( + last_rebalance_core_count == ss::smp::count + && kvstore_shard_count == ss::smp::count) { co_return; } @@ -476,7 +566,7 @@ ss::future<> shard_balancer::do_balance(mutex::units& lock) { return set_target(ntp, target, lock); }); - co_await _kvstore.put( + co_await _storage.kvs().put( storage::kvstore::key_space::shard_placement, state_kvstore_key(), serde::to_iobuf(persisted_state{ @@ -581,14 +671,18 @@ void shard_balancer::update_counts( topic_data_t& topic_data, const std::optional& prev, const std::optional& next) { - if (prev) { + // Shard values that are >= ss::smp::count are possible when initializing + // shard placement after a core count decrease. We ignore them because + // partition counts on extra shards are not needed for balancing. + + if (prev && prev->shard < ss::smp::count) { topic_data.shard2count.at(prev->shard) -= 1; topic_data.total_count -= 1; // TODO: check negative values _total_counts.at(prev->shard) -= 1; } - if (next) { + if (next && next->shard < ss::smp::count) { topic_data.shard2count.at(next->shard) += 1; topic_data.total_count += 1; _total_counts.at(next->shard) += 1; diff --git a/src/v/cluster/shard_balancer.h b/src/v/cluster/shard_balancer.h index 80b6a8d0c5af7..8420a31aff529 100644 --- a/src/v/cluster/shard_balancer.h +++ b/src/v/cluster/shard_balancer.h @@ -42,9 +42,11 @@ class shard_balancer { ss::sharded&, config::binding balancing_on_core_count_change, config::binding balancing_continuous, - config::binding debounce_timeout); + config::binding debounce_timeout, + config::binding partitions_per_shard, + config::binding partitions_reserve_shard0); - ss::future<> start(); + ss::future<> start(size_t kvstore_shard_count); ss::future<> stop(); /// Persist current shard_placement_table contents to kvstore. Executed once @@ -62,10 +64,18 @@ class shard_balancer { private: void process_delta(const topic_table::delta&); + ss::future<> init_shard_placement( + mutex::units& lock, + const chunked_hash_map& local_group2ntp, + const chunked_hash_map& + local_ntp2log_revision, + const std::vector>& extra_kvstores); + ss::future<> assign_fiber(); ss::future<> do_assign_ntps(mutex::units& lock); - ss::future<> balance_on_core_count_change(mutex::units& lock); + ss::future<> balance_on_core_count_change( + mutex::units& lock, size_t kvstore_shard_count); void balance_timer_callback(); ss::future<> do_balance(mutex::units& lock); @@ -102,7 +112,7 @@ class shard_balancer { private: shard_placement_table& _shard_placement; features::feature_table& _features; - storage::kvstore& _kvstore; + storage::api& _storage; ss::sharded& _topics; ss::sharded& _controller_backend; model::node_id _self; @@ -111,6 +121,8 @@ class shard_balancer { config::binding _balancing_continuous; config::binding _debounce_timeout; simple_time_jitter _debounce_jitter; + config::binding _partitions_per_shard; + config::binding _partitions_reserve_shard0; cluster::notification_id_type _topic_table_notify_handle; ss::timer _balance_timer; diff --git a/src/v/cluster/shard_placement_table.cc b/src/v/cluster/shard_placement_table.cc index bddc210e99170..15cfac47a9a54 100644 --- a/src/v/cluster/shard_placement_table.cc +++ b/src/v/cluster/shard_placement_table.cc @@ -63,13 +63,22 @@ shard_placement_table::placement_state::get_reconciliation_action( if (assigned) { return reconciliation_action::wait_for_target_update; } - return reconciliation_action::remove; + return reconciliation_action::remove_partition; } - if (current && current->log_revision < expected_log_revision) { - return reconciliation_action::remove; - } - if (_is_initial_for && _is_initial_for < expected_log_revision) { - return reconciliation_action::remove; + if (current) { + if (current->log_revision < expected_log_revision) { + return reconciliation_action::remove_partition; + } else if (current->log_revision > expected_log_revision) { + return reconciliation_action::wait_for_target_update; + } else if (current->status == hosted_status::obsolete) { + return reconciliation_action::remove_kvstore_state; + } + } else if (_is_initial_for) { + if (_is_initial_for < expected_log_revision) { + return reconciliation_action::remove_partition; + } else if (_is_initial_for > expected_log_revision) { + return reconciliation_action::wait_for_target_update; + } } if (assigned) { if (assigned->log_revision != expected_log_revision) { @@ -88,11 +97,16 @@ std::ostream& operator<<(std::ostream& o, const shard_placement_table::placement_state& ps) { fmt::print( o, - "{{current: {}, assigned: {}, is_initial_for: {}, next: {}}}", + "{{current: {}, assigned: {}, is_initial_for: {}, next: ", ps.current, ps.assigned, - ps._is_initial_for, - ps._next); + ps._is_initial_for); + if (ps._next) { + fmt::print( + o, "{{s: {}, r: {}}}}}", ps._next->shard, ps._next->revision); + } else { + fmt::print(o, "{{nullopt}}}}"); + } return o; } @@ -153,14 +167,13 @@ bytes current_state_kvstore_key(const raft::group_id group) { } // namespace -shard_placement_table::shard_placement_table(storage::kvstore& kvstore) - : _kvstore(kvstore) {} +shard_placement_table::shard_placement_table( + ss::shard_id shard, storage::kvstore& kvstore) + : _shard(shard) + , _kvstore(kvstore) {} bool shard_placement_table::is_persistence_enabled() const { - vassert( - ss::this_shard_id() == assignment_shard_id, - "method can only be invoked on shard {}", - assignment_shard_id); + assert_is_assignment_shard(); if (_persistence_enabled) { return true; @@ -171,10 +184,7 @@ bool shard_placement_table::is_persistence_enabled() const { } ss::future<> shard_placement_table::enable_persistence() { - vassert( - ss::this_shard_id() == assignment_shard_id, - "method can only be invoked on shard {}", - assignment_shard_id); + assert_is_assignment_shard(); if (is_persistence_enabled()) { co_return; @@ -307,12 +317,11 @@ struct shard_placement_table::ntp_init_data { } }; -ss::future<> shard_placement_table::initialize_from_kvstore( - const chunked_hash_map& local_group2ntp) { - vassert( - ss::this_shard_id() == assignment_shard_id, - "method can only be invoked on shard {}", - assignment_shard_id); +ss::future>> +shard_placement_table::initialize_from_kvstore( + const chunked_hash_map& local_group2ntp, + const std::vector>& extra_kvstores) { + assert_is_assignment_shard(); vassert( is_persistence_enabled(), @@ -320,12 +329,22 @@ ss::future<> shard_placement_table::initialize_from_kvstore( co_await container().invoke_on_all( [](shard_placement_table& spt) { spt._persistence_enabled = true; }); + std::vector> extra_spts; + for (size_t i = 0; i < extra_kvstores.size(); ++i) { + extra_spts.push_back(std::make_unique( + ss::smp::count + i, *extra_kvstores[i])); + } + // 1. gather kvstore markers from all shards auto shard2init_states = co_await container().map( [&local_group2ntp](shard_placement_table& spt) { return spt.gather_init_states(local_group2ntp); }); + for (const auto& spt : extra_spts) { + shard2init_states.push_back( + co_await spt->gather_init_states(local_group2ntp)); + } // 2. merge into up-to-date shard_placement_table state @@ -365,6 +384,9 @@ ss::future<> shard_placement_table::initialize_from_kvstore( [&ntp2init_data](shard_placement_table& spt) { return spt.scatter_init_data(ntp2init_data); }); + for (auto& spt : extra_spts) { + co_await spt->scatter_init_data(ntp2init_data); + } co_await ssx::async_for_each( ntp2init_data.begin(), @@ -391,6 +413,8 @@ ss::future<> shard_placement_table::initialize_from_kvstore( _ntp2entry.emplace(ntp, std::move(entry)); } }); + + co_return extra_spts; } ss::future>> @@ -412,7 +436,8 @@ shard_placement_table::gather_init_states( if (ntp_it == partitions.end()) { vlog( clusterlog.trace, - "recovered orphan assigned marker, group: {}", + "shard {}: recovered orphan assigned marker, group: {}", + _shard, group); orphan_assignments.push_back(group); } else { @@ -420,8 +445,9 @@ shard_placement_table::gather_init_states( val.copy()); vlog( clusterlog.trace, - "[{}] recovered assigned marker, lr: {} sr: {}", + "[{}] shard {}: recovered assigned marker, lr: {} sr: {}", ntp_it->second, + _shard, marker.log_revision, marker.shard_revision); @@ -437,8 +463,10 @@ shard_placement_table::gather_init_states( auto marker = serde::from_iobuf(val.copy()); vlog( clusterlog.trace, - "[{}] recovered cur state marker, lr: {} sr: {} complete: {}", + "[{}] shard {}: recovered cur state marker, lr: {} sr: {} " + "complete: {}", marker.ntp, + _shard, marker.log_revision, marker.shard_revision, marker.is_complete); @@ -449,7 +477,7 @@ shard_placement_table::gather_init_states( ssx::sformat, "duplicate ntp {} in kvstore map on shard {}", marker.ntp, - ss::this_shard_id())); + _shard)); } state.current = shard_local_state( group, @@ -490,18 +518,24 @@ ss::future<> shard_placement_table::scatter_init_data( auto& state = it->second; if (state.current) { - if (ss::this_shard_id() == init_data.hosted.shard) { - if (init_data.receiving.shard) { - state._next = init_data.receiving.shard; + if (_shard == init_data.hosted.shard) { + if ( + init_data.receiving.shard + && init_data.receiving.shard < ss::smp::count) { + state._next = placement_state::versioned_shard{ + .shard = init_data.receiving.shard.value(), + .revision = init_data.receiving.revision}; } - } else if (ss::this_shard_id() != init_data.receiving.shard) { + } else if ( + _shard != init_data.receiving.shard || !init_data.hosted.shard + || _shard >= ss::smp::count) { state.current->status = hosted_status::obsolete; } } ss::future<> fut = ss::now(); if (state.assigned) { - if (ss::this_shard_id() != init_data.assigned.shard) { + if (_shard != init_data.assigned.shard) { fut = _kvstore.remove( kvstore_key_space, assignment_kvstore_key(state.assigned->group)); @@ -509,6 +543,11 @@ ss::future<> shard_placement_table::scatter_init_data( } else if (!init_data.hosted.shard) { state._is_initial_for = init_data.log_revision; } + + if (_shard >= ss::smp::count) { + // mark states on extra shards as ready to transfer + state.assigned = std::nullopt; + } } if (state.is_empty()) { @@ -516,8 +555,9 @@ ss::future<> shard_placement_table::scatter_init_data( } else { vlog( clusterlog.info, - "[{}] recovered placement state: {}", + "[{}] shard {}: recovered placement state: {}", ntp, + _shard, state); } @@ -527,10 +567,8 @@ ss::future<> shard_placement_table::scatter_init_data( ss::future<> shard_placement_table::initialize_from_topic_table( ss::sharded& topics, model::node_id self) { - vassert( - ss::this_shard_id() == assignment_shard_id, - "method can only be invoked on shard {}", - assignment_shard_id); + assert_is_assignment_shard(); + vassert( !is_persistence_enabled(), "can't initialize from topic_table, persistence has already been " @@ -548,6 +586,11 @@ ss::future<> shard_placement_table::initialize_from_topic_table( ss::future<> shard_placement_table::do_initialize_from_topic_table( const topic_table& topics, model::node_id self) { + vassert( + ss::this_shard_id() == _shard, + "trying to init table for extra shard {} from topic_table", + _shard); + // We expect topic_table to remain unchanged throughout the loop because the // method is supposed to be called after local controller replay is finished // but before we start getting new controller updates from the leader. @@ -633,10 +676,12 @@ ss::future<> shard_placement_table::set_target( const model::ntp& ntp, std::optional target, shard_callback_t shard_callback) { - vassert( - ss::this_shard_id() == assignment_shard_id, - "method can only be invoked on shard {}", - assignment_shard_id); + assert_is_assignment_shard(); + + if (target) { + vassert( + target->shard < ss::smp::count, "[{}] bad target: {}", ntp, target); + } // ensure that there is no concurrent enable_persistence() call auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); @@ -703,7 +748,7 @@ ss::future<> shard_placement_table::set_target( assignment_kvstore_key(target->group), std::move(marker_buf)); }); - } else { + } else if (prev_target.value().shard < ss::smp::count) { co_await container().invoke_on( prev_target.value().shard, [group = prev_target->group, &ntp](shard_placement_table& other) { @@ -719,7 +764,9 @@ ss::future<> shard_placement_table::set_target( entry.target = target; - if (prev_target && (!target || target->shard != prev_target->shard)) { + if ( + prev_target && prev_target->shard < ss::smp::count + && (!target || target->shard != prev_target->shard)) { co_await container().invoke_on( prev_target->shard, [&ntp, shard_callback](shard_placement_table& other) { @@ -781,7 +828,7 @@ ss::future<> shard_placement_table::set_target( // 3. Lastly, remove obsolete kvstore marker if ( - _persistence_enabled && prev_target + _persistence_enabled && prev_target && prev_target->shard < ss::smp::count && (!target || target->shard != prev_target->shard)) { co_await container().invoke_on( prev_target->shard, @@ -815,10 +862,7 @@ shard_placement_table::state_on_this_shard(const model::ntp& ntp) const { std::optional shard_placement_table::get_target(const model::ntp& ntp) const { - vassert( - ss::this_shard_id() == assignment_shard_id, - "method can only be invoked on shard {}", - assignment_shard_id); + assert_is_assignment_shard(); auto it = _ntp2entry.find(ntp); if (it != _ntp2entry.end()) { return it->second->target; @@ -829,10 +873,7 @@ shard_placement_table::get_target(const model::ntp& ntp) const { ss::future<> shard_placement_table::for_each_ntp( ss::noncopyable_function< void(const model::ntp&, const shard_placement_target&)> func) const { - vassert( - ss::this_shard_id() == assignment_shard_id, - "method can only be invoked on shard {}", - assignment_shard_id); + assert_is_assignment_shard(); return ssx::async_for_each( _ntp2entry.begin(), _ntp2entry.end(), @@ -912,8 +953,11 @@ ss::future shard_placement_table::prepare_create( co_return errc::success; } -ss::future> shard_placement_table::prepare_transfer( - const model::ntp& ntp, model::revision_id expected_log_rev) { +ss::future +shard_placement_table::prepare_transfer( + const model::ntp& ntp, + model::revision_id expected_log_rev, + ss::sharded& sharded_spt) { // ensure that there is no concurrent enable_persistence() call auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); @@ -921,6 +965,8 @@ ss::future> shard_placement_table::prepare_transfer( vassert(state_it != _states.end(), "[{}] expected state", ntp); auto& state = state_it->second; + prepare_transfer_info ret; + if (state.current) { vassert( state.current->log_revision >= expected_log_rev, @@ -932,20 +978,22 @@ ss::future> shard_placement_table::prepare_transfer( if (state.current->log_revision > expected_log_rev) { // New log revision transferred from another shard, but we don't // know about it yet. Wait for the assignment update. - co_return errc::waiting_for_shard_placement_update; + ret.source_error = errc::waiting_for_shard_placement_update; + co_return ret; } if (state.current->status == hosted_status::receiving) { // This shard needs to transfer partition state somewhere else, but // haven't yet received it itself. Wait for it. - co_return errc::waiting_for_partition_shutdown; + ret.source_error = errc::waiting_for_partition_shutdown; + co_return ret; } - if (state.current->status == hosted_status::obsolete) { - // Previous finish_transfer_on_source() failed? Retry it. - co_await do_delete(ntp, state, persistence_lock_holder); - co_return errc::success; - } + vassert( + state.current->status == hosted_status::hosted, + "[{}] unexpected current: {} (expected hosted status)", + ntp, + state.current); } else { vassert( state._is_initial_for >= expected_log_rev, @@ -955,16 +1003,23 @@ ss::future> shard_placement_table::prepare_transfer( expected_log_rev); if (state._is_initial_for > expected_log_rev) { - co_return errc::waiting_for_shard_placement_update; + ret.source_error = errc::waiting_for_shard_placement_update; + co_return ret; } } - if (!state._next) { + const bool is_initial = !state.current; + + if (state._next) { + ret.destination = state._next->shard; + // TODO: check that _next is still waiting for our transfer + } else { if (state.assigned) { - co_return errc::waiting_for_shard_placement_update; + ret.source_error = errc::waiting_for_shard_placement_update; + co_return ret; } - auto maybe_dest = co_await container().invoke_on( + auto maybe_dest = co_await sharded_spt.invoke_on( assignment_shard_id, [&ntp, expected_log_rev](shard_placement_table& spt) { auto it = spt._ntp2entry.find(ntp); @@ -977,16 +1032,20 @@ ss::future> shard_placement_table::prepare_transfer( } return std::optional{}; }); - if (!maybe_dest || maybe_dest == ss::this_shard_id()) { + if (!maybe_dest || maybe_dest == _shard) { // Inconsistent state, likely because we are in the middle of // shard_placement_table update, wait for it to finish. - co_return errc::waiting_for_shard_placement_update; + ret.source_error = errc::waiting_for_shard_placement_update; + co_return ret; } - ss::shard_id destination = maybe_dest.value(); + ret.destination = maybe_dest.value(); // check if destination is ready - auto ec = co_await container().invoke_on( - destination, [&ntp, expected_log_rev](shard_placement_table& dest) { + model::shard_revision_id shard_rev; + co_await sharded_spt.invoke_on( + ret.destination.value(), + [&ntp, &shard_rev, &ret, expected_log_rev, is_initial]( + shard_placement_table& dest) { auto dest_it = dest._states.find(ntp); if ( dest_it == dest._states.end() || !dest_it->second.assigned @@ -994,37 +1053,27 @@ ss::future> shard_placement_table::prepare_transfer( // We are in the middle of shard_placement_table update, and // the destination shard doesn't yet know that it is the // destination. Wait for the update to finish. - return ss::make_ready_future( - errc::waiting_for_shard_placement_update); + ret.dest_error = errc::waiting_for_shard_placement_update; + return ss::now(); } auto& dest_state = dest_it->second; - if (dest_state._next) { - // probably still finishing a previous transfer to this - // shard and we are already trying to transfer it back. - return ss::make_ready_future( - errc::waiting_for_partition_shutdown); - } else if (dest_state.current) { - if (dest_state.current->log_revision != expected_log_rev) { - // someone has to delete obsolete log revision first - return ss::make_ready_future( - errc::waiting_for_partition_shutdown); - } - // probably still finishing a previous transfer to this - // shard and we are already trying to transfer it back. - return ss::make_ready_future( - errc::waiting_for_partition_shutdown); + if (dest_state.current || dest_state._is_initial_for) { + // Wait for the destination to clear obsolete data + ret.dest_error = errc::waiting_for_partition_shutdown; + return ss::now(); + } + + if (is_initial) { + dest_state._is_initial_for = expected_log_rev; + return ss::now(); } // at this point we commit to the transfer on the // destination shard + shard_rev = dest_state.assigned.value().shard_revision; dest_state.current = shard_local_state( dest_state.assigned.value(), hosted_status::receiving); - if (dest_state._is_initial_for <= expected_log_rev) { - dest_state._is_initial_for = std::nullopt; - } - - // TODO: immediate hosted or _is_initial_for if source is empty. if (dest._persistence_enabled) { auto marker_buf = serde::to_iobuf(current_state_marker{ @@ -1044,90 +1093,113 @@ ss::future> shard_placement_table::prepare_transfer( kvstore_key_space, current_state_kvstore_key(dest_state.current->group), std::move(marker_buf)) - .then([] { return errc::success; }); + .handle_exception([&dest_state](std::exception_ptr ex) { + // "unlock" destination in case of kvstore errors so + // that we can retry later. + dest_state.current = std::nullopt; + return ss::make_exception_future(std::move(ex)); + }); } else { - return ss::make_ready_future(errc::success); + return ss::now(); } }); - if (ec != errc::success) { - co_return ec; + if (ret.dest_error != errc::success) { + co_return ret; + } + + if (is_initial) { + state._is_initial_for = std::nullopt; + if (state.is_empty()) { + _states.erase(ntp); + } + ret.is_finished = true; + co_return ret; } // at this point we commit to the transfer on the source shard - state._next = destination; + state._next = placement_state::versioned_shard{ + .shard = ret.destination.value(), + .revision = shard_rev, + }; } - // TODO: check that _next is still waiting for our transfer - co_return state._next.value(); + co_return ret; } -ss::future<> shard_placement_table::finish_transfer_on_destination( - const model::ntp& ntp, model::revision_id expected_log_rev) { +ss::future<> shard_placement_table::finish_transfer( + const model::ntp& ntp, + model::revision_id expected_log_rev, + ss::sharded& sharded_spt, + shard_callback_t shard_callback) { // ensure that there is no concurrent enable_persistence() call auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); auto it = _states.find(ntp); - if (it == _states.end()) { - co_return; - } + vassert(it != _states.end(), "[{}] expected state", ntp); auto& state = it->second; - if (state.current && state.current->log_revision == expected_log_rev) { - vassert( - state.current->status == hosted_status::receiving, - "[{}] unexpected local status, current: {}", - ntp, - it->second.current); - if (_persistence_enabled) { - auto marker_buf = serde::to_iobuf(current_state_marker{ - .ntp = ntp, - .log_revision = expected_log_rev, - .shard_revision = state.current->shard_revision, - .is_complete = true, - }); - vlog( - clusterlog.trace, - "[{}] put transferred cur state marker, lr: {} sr: {}", - ntp, - expected_log_rev, - state.current->shard_revision); - co_await _kvstore.put( - kvstore_key_space, - current_state_kvstore_key(state.current->group), - std::move(marker_buf)); - } + vassert(state._next, "[{}] expected _next, state: {}", ntp, state); + co_await sharded_spt.invoke_on( + state._next->shard, + [&ntp, expected_shard_rev = state._next->revision, shard_callback]( + shard_placement_table& dest) { + auto dest_it = dest._states.find(ntp); + vassert(dest_it != dest._states.end(), "[{}] expected state", ntp); + auto& dest_state = dest_it->second; - state.current->status = hosted_status::hosted; - } - vlog( - clusterlog.trace, - "[{}] finished transfer on destination, placement: {}", - ntp, - state); -} + vassert( + dest_state.current + && dest_state.current->shard_revision == expected_shard_rev + && dest_state.current->status == hosted_status::receiving, + "[{}] unexpected current: {} (expected shard revision: {})", + ntp, + dest_state.current, + expected_shard_rev); -ss::future<> shard_placement_table::finish_transfer_on_source( - const model::ntp& ntp, model::revision_id expected_log_rev) { - // ensure that there is no concurrent enable_persistence() call - auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); + ss::future<> fut = ss::now(); + if (dest._persistence_enabled) { + auto marker_buf = serde::to_iobuf(current_state_marker{ + .ntp = ntp, + .log_revision = dest_state.current->log_revision, + .shard_revision = dest_state.current->shard_revision, + .is_complete = true, + }); + vlog( + clusterlog.trace, + "[{}] put transferred cur state marker, lr: {} sr: {}", + ntp, + dest_state.current->log_revision, + dest_state.current->shard_revision); + fut = dest._kvstore.put( + kvstore_key_space, + current_state_kvstore_key(dest_state.current->group), + std::move(marker_buf)); + } - auto it = _states.find(ntp); - vassert(it != _states.end(), "[{}] expected state", ntp); - auto& state = it->second; + return std::move(fut).then([&ntp, &dest_state, shard_callback] { + dest_state.current->status = hosted_status::hosted; + vlog( + clusterlog.trace, + "[{}] finished transfer on destination, placement: {}", + ntp, + dest_state); + shard_callback(ntp); + }); + }); + state._next = std::nullopt; - if (state.current) { - vassert( - state.current->log_revision == expected_log_rev, - "[{}] unexpected current: {} (expected log revision: {})", - ntp, - state.current, - expected_log_rev); - } else if (state._is_initial_for == expected_log_rev) { + if (state.current && state.current->log_revision == expected_log_rev) { + state.current->status = hosted_status::obsolete; + } + + if (state._is_initial_for == expected_log_rev) { state._is_initial_for = std::nullopt; } - co_await do_delete(ntp, state, persistence_lock_holder); + if (state.is_empty()) { + _states.erase(ntp); + } } ss::future shard_placement_table::prepare_delete( @@ -1139,14 +1211,6 @@ ss::future shard_placement_table::prepare_delete( vassert(it != _states.end(), "[{}] expected state", ntp); auto& state = it->second; - if (state._is_initial_for && state._is_initial_for < cmd_revision) { - state._is_initial_for = std::nullopt; - if (state.is_empty()) { - _states.erase(it); - co_return errc::success; - } - } - if (state.current) { if (state.current->log_revision >= cmd_revision) { // New log revision transferred from another shard, but we didn't @@ -1163,6 +1227,33 @@ ss::future shard_placement_table::prepare_delete( state.current->status = hosted_status::obsolete; } + if (state._next) { + // notify destination shard that the transfer won't finish + co_await container().invoke_on( + state._next->shard, + [&ntp, expected_shard_rev = state._next->revision]( + shard_placement_table& dest) { + auto it = dest._states.find(ntp); + if ( + it != dest._states.end() && it->second.current + && it->second.current->shard_revision == expected_shard_rev + && it->second.current->status == hosted_status::receiving) { + it->second.current->status = hosted_status::obsolete; + } + + // TODO: notify reconciliation fiber + }); + + state._next = std::nullopt; + } + + if (state._is_initial_for && state._is_initial_for < cmd_revision) { + state._is_initial_for = std::nullopt; + if (state.is_empty()) { + _states.erase(it); + } + } + co_return errc::success; } @@ -1181,23 +1272,6 @@ ss::future<> shard_placement_table::finish_delete( state.current, expected_log_rev); - if (state._next) { - // notify destination shard that the transfer won't finish - co_await container().invoke_on( - state._next.value(), - [&ntp, expected_log_rev](shard_placement_table& dest) { - auto it = dest._states.find(ntp); - if ( - it != dest._states.end() && it->second.current - && it->second.current->log_revision == expected_log_rev - && it->second.current->status == hosted_status::receiving) { - it->second.current->status = hosted_status::obsolete; - } - - // TODO: notify reconciliation fiber - }); - } - co_await do_delete(ntp, state, persistence_lock_holder); } @@ -1223,4 +1297,13 @@ ss::future<> shard_placement_table::do_delete( co_return; } +void shard_placement_table::assert_is_assignment_shard() const { + vassert( + ss::this_shard_id() == _shard + && ss::this_shard_id() == assignment_shard_id, + "method can only be invoked on shard {} (table for shard: {})", + assignment_shard_id, + _shard); +} + } // namespace cluster diff --git a/src/v/cluster/shard_placement_table.h b/src/v/cluster/shard_placement_table.h index 10b21d7f4bde4..498f2ab6bacde 100644 --- a/src/v/cluster/shard_placement_table.h +++ b/src/v/cluster/shard_placement_table.h @@ -94,7 +94,9 @@ class shard_placement_table enum class reconciliation_action { /// Partition must be removed from this node - remove, + remove_partition, + /// Partition kvstore state must be removed from this shard + remove_kvstore_state, /// Partition must be transferred to other shard transfer, /// Wait until target catches up with topic_table @@ -129,6 +131,11 @@ class shard_placement_table return !current && !_is_initial_for && !assigned; } + struct versioned_shard { + ss::shard_id shard; + model::shard_revision_id revision; + }; + /// If this shard is the initial shard for some incarnation of this /// partition on this node, this field will contain the corresponding /// log revision. Invariant: if both _is_initial_for and current @@ -137,12 +144,12 @@ class shard_placement_table /// If x-shard transfer is in progress, will hold the destination. Note /// that it is initialized from target but in contrast to target, it /// can't change mid-transfer. - std::optional _next; + std::optional _next; }; using ntp2state_t = absl::node_hash_map; - explicit shard_placement_table(storage::kvstore&); + explicit shard_placement_table(ss::shard_id, storage::kvstore&); /// Must be called on assignment_shard_id. bool is_persistence_enabled() const; @@ -150,8 +157,10 @@ class shard_placement_table /// Must be called on assignment_shard_id. /// precondition: is_persistence_enabled() == true - ss::future<> initialize_from_kvstore( - const chunked_hash_map& local_group2ntp); + ss::future>> + initialize_from_kvstore( + const chunked_hash_map& local_group2ntp, + const std::vector>& extra_kvstores); /// Must be called on assignment_shard_id. /// precondition: is_persistence_enabled() == false @@ -187,19 +196,30 @@ class shard_placement_table ss::future prepare_create(const model::ntp&, model::revision_id expected_log_rev); - // return value is a tri-state: - // * if it returns a shard_id value, a transfer to that shard must be - // performed - // * if it returns errc::success, transfer has already been performed - // * else, we must wait before we begin the transfer. - ss::future> - prepare_transfer(const model::ntp&, model::revision_id expected_log_rev); + struct prepare_transfer_info { + // will hold non-success value if source shard is not yet ready for + // transfer. + errc source_error = errc::success; + // will hold destination shard if source_error == success. + std::optional destination; + // will hold non-success value if destination shard is not yet ready for + // transfer. + errc dest_error = errc::success; + // true if the caller doesn't have to do anything else - the transfer is + // already finished + bool is_finished = false; + }; - ss::future<> finish_transfer_on_destination( - const model::ntp&, model::revision_id expected_log_rev); + ss::future prepare_transfer( + const model::ntp&, + model::revision_id expected_log_rev, + ss::sharded&); - ss::future<> finish_transfer_on_source( - const model::ntp&, model::revision_id expected_log_rev); + ss::future<> finish_transfer( + const model::ntp&, + model::revision_id expected_log_rev, + ss::sharded&, + shard_callback_t); ss::future prepare_delete(const model::ntp&, model::revision_id cmd_revision); @@ -208,6 +228,8 @@ class shard_placement_table finish_delete(const model::ntp&, model::revision_id expected_log_rev); private: + void assert_is_assignment_shard() const; + ss::future<> do_delete( const model::ntp&, placement_state&, @@ -237,6 +259,7 @@ class shard_placement_table // modifications. ssx::rwlock _persistence_lock; bool _persistence_enabled = false; + ss::shard_id _shard; storage::kvstore& _kvstore; // only on shard 0, _ntp2entry will hold targets for all ntps on this node. diff --git a/src/v/cluster/tests/shard_placement_table_test.cc b/src/v/cluster/tests/shard_placement_table_test.cc index 504de91dcbc9b..b383b1625f24b 100644 --- a/src/v/cluster/tests/shard_placement_table_test.cc +++ b/src/v/cluster/tests/shard_placement_table_test.cc @@ -259,7 +259,7 @@ class reconciliation_backend expected_log_revision); switch (placement.get_reconciliation_action(expected_log_revision)) { - case shard_placement_table::reconciliation_action::remove: { + case shard_placement_table::reconciliation_action::remove_partition: { auto cmd_revision = expected_log_revision.value_or(_ntpt.revision); auto ec = co_await delete_partition(ntp, placement, cmd_revision); if (ec) { @@ -267,6 +267,10 @@ class reconciliation_backend } co_return ss::stop_iteration::no; } + case shard_placement_table::reconciliation_action::remove_kvstore_state: + co_await remove_partition_kvstore_state( + ntp, placement.current.value().log_revision); + co_return ss::stop_iteration::no; case shard_placement_table::reconciliation_action:: wait_for_target_update: co_return errc::waiting_for_shard_placement_update; @@ -379,26 +383,32 @@ class reconciliation_backend co_return errc::success; } - bool launched_expected = _launched.erase(ntp); - if (launched_expected) { + bool was_launched = _launched.erase(ntp); + if (was_launched) { vlog( _logger.trace, "[{}] stopped partition log_revision: {}", ntp, placement.current->log_revision); } + if ( + placement.current->status + != shard_placement_table::hosted_status::hosted) { + vassert(!was_launched, "[{}] unexpected launched", ntp); + } + co_await ss::sleep(1ms * random_generators::get_int(30)); co_await _ntp2shards.invoke_on( 0, [ntp, - log_revision = placement.current.value().log_revision, + current = placement.current.value(), shard = ss::this_shard_id(), - launched_expected](ntp2shards_t& ntp2shards) { + was_launched](ntp2shards_t& ntp2shards) { auto& shards = ntp2shards[ntp]; bool erased = shards.shards_with_some_state.erase(shard); - if (launched_expected) { + if (was_launched) { vassert( erased, "[{}] unexpected set contents (deleting on: {})", @@ -406,19 +416,25 @@ class reconciliation_backend shard); } - auto& p_shards = shards.rev2shards[log_revision]; + auto& p_shards = shards.rev2shards[current.log_revision]; - vassert( - (launched_expected && p_shards.launched_on == shard) - || (!launched_expected && !p_shards.launched_on), - "[{}] unexpected launched: {} (shard: {}, expected: {})", - ntp, - p_shards.launched_on, - shard, - launched_expected); - p_shards.launched_on = std::nullopt; + if ( + current.status + != shard_placement_table::hosted_status::obsolete) { + vassert( + (was_launched && p_shards.launched_on == shard) + || (!was_launched && !p_shards.launched_on), + "[{}] unexpected launched_on: {} (shard: {}, expected: {})", + ntp, + p_shards.launched_on, + shard, + was_launched); + } + if (was_launched) { + p_shards.launched_on = std::nullopt; + } - if (launched_expected) { + if (was_launched) { vassert( p_shards.current_state_on == shard, "[{}] unexpected current: {} (deleting on: {})", @@ -430,7 +446,7 @@ class reconciliation_backend p_shards.current_state_on = std::nullopt; } - if (launched_expected) { + if (was_launched) { vassert( !p_shards.next_state_on, "[{}] unexpected next: {} (deleting on: {})", @@ -448,27 +464,58 @@ class reconciliation_backend co_return ec; } + ss::future<> remove_partition_kvstore_state( + const model::ntp& ntp, model::revision_id log_revision) { + vlog( + _logger.trace, + "[{}] removing partition kvstore state, log_revision: {}", + ntp, + log_revision); + + vassert(!_launched.contains(ntp), "[{}] unexpected launched", ntp); + + co_await _ntp2shards.invoke_on( + 0, [ntp, shard = ss::this_shard_id()](ntp2shards_t& ntp2shards) { + auto& shards = ntp2shards[ntp]; + + vassert( + shards.shards_with_some_state.erase(shard), + "[{}] unexpected set contents, shard: {}", + ntp, + shard); + }); + + co_await _shard_placement.finish_delete(ntp, log_revision); + } + ss::future transfer_partition( const model::ntp& ntp, model::revision_id log_revision, bool state_expected) { - auto maybe_dest = co_await _shard_placement.prepare_transfer( - ntp, log_revision); - if (maybe_dest.has_error()) { + auto transfer_info = co_await _shard_placement.prepare_transfer( + ntp, log_revision, _shard_placement.container()); + if (transfer_info.source_error != errc::success) { vlog( _logger.trace, - "[{}] preparing transfer error: {}", + "[{}] preparing transfer source error: {}", ntp, - maybe_dest.error()); - co_return maybe_dest.error(); + transfer_info.source_error); + co_return transfer_info.source_error; } + ss::shard_id destination = transfer_info.destination.value(); vlog( _logger.trace, - "[{}] preparing transfer dest: {}", + "[{}] preparing transfer dest: {} (error: {})", ntp, - maybe_dest.value()); - ss::shard_id destination = maybe_dest.value(); + destination, + transfer_info.dest_error); + if (transfer_info.dest_error != errc::success) { + co_return transfer_info.dest_error; + } + if (transfer_info.is_finished) { + co_return errc::success; + } bool launched_expected = _launched.erase(ntp); if (launched_expected) { @@ -519,22 +566,36 @@ class reconciliation_backend source); } - vassert( - !p_shards.next_state_on, - "[{}] unexpected next: {} (transferring from: {})", - ntp, - p_shards.next_state_on, - source); + if (p_shards.next_state_on == destination) { + // transfer was retried + vassert( + shards.shards_with_some_state.contains(destination), + "[{}] unexpected set contents, destination: {}", + ntp, + destination); + } else { + vassert( + !p_shards.next_state_on, + "[{}] unexpected next: {} (transferring from: {})", + ntp, + p_shards.next_state_on, + source); - vassert( - shards.shards_with_some_state.insert(destination).second, - "[{}] unexpected set contents, destination: {}", - ntp, - destination); - p_shards.next_state_on = destination; + vassert( + shards.shards_with_some_state.insert(destination).second, + "[{}] unexpected set contents, destination: {}", + ntp, + destination); + p_shards.next_state_on = destination; + } }); co_await ss::sleep(1ms * random_generators::get_int(30)); + if (random_generators::get_int(5) == 0) { + // simulate partial failure of the transfer. + throw std::runtime_error{ + fmt_with_ctx(fmt::format, "[{}] transfer failed!", ntp)}; + } co_await _ntp2shards.invoke_on( 0, [ntp, log_revision, destination](ntp2shards_t& ntp2shards) { @@ -555,34 +616,16 @@ class reconciliation_backend shards.next_state_on = std::nullopt; }); - co_await container().invoke_on( - destination, [&ntp, log_revision](reconciliation_backend& dest) { - return dest._shard_placement - .finish_transfer_on_destination(ntp, log_revision) - .then([&] { - auto it = dest._states.find(ntp); - if (it != dest._states.end()) { - it->second->wakeup_event.set(); - } - }); - }); - - co_await _ntp2shards.invoke_on( - 0, - [ntp, shard = ss::this_shard_id(), state_expected]( - ntp2shards_t& ntp2shards) { - auto& ntp_shards = ntp2shards[ntp]; - bool erased = ntp_shards.shards_with_some_state.erase(shard); - if (!state_expected) { - vassert( - !erased, - "[{}] unexpected set contents, source: {}", - ntp, - shard); - } - }); + auto shard_callback = [this](const model::ntp& ntp) { + auto& dest = container().local(); + auto it = dest._states.find(ntp); + if (it != dest._states.end()) { + it->second->wakeup_event.set(); + } + }; - co_await _shard_placement.finish_transfer_on_source(ntp, log_revision); + co_await _shard_placement.finish_transfer( + ntp, log_revision, _shard_placement.container(), shard_callback); vlog(_logger.trace, "[{}] transferred", ntp); co_return errc::success; } @@ -996,6 +1039,7 @@ class shard_placement_test_fixture : public seastar_test { config::mock_binding(10ms), test_dir, storage::make_sanitized_file_config()), + ss::sharded_parameter([] { return ss::this_shard_id(); }), ss::sharded_parameter([this] { return std::ref(sr.local()); }), std::ref(ft)); co_await kvs->invoke_on_all( @@ -1003,6 +1047,7 @@ class shard_placement_test_fixture : public seastar_test { spt = std::make_unique(); co_await spt->start( + ss::sharded_parameter([] { return ss::this_shard_id(); }), ss::sharded_parameter([this] { return std::ref(kvs->local()); })); if (!first_start) { @@ -1010,7 +1055,7 @@ class shard_placement_test_fixture : public seastar_test { for (const auto& [ntp, meta] : ntpt.local().ntp2meta) { local_group2ntp.emplace(meta.group, ntp); } - co_await spt->local().initialize_from_kvstore(local_group2ntp); + co_await spt->local().initialize_from_kvstore(local_group2ntp, {}); for (auto& [ntp, shards] : _ntp2shards.local()) { if ( diff --git a/src/v/raft/consensus_utils.cc b/src/v/raft/consensus_utils.cc index a9482e30e8c8f..92dd4014669fd 100644 --- a/src/v/raft/consensus_utils.cc +++ b/src/v/raft/consensus_utils.cc @@ -236,9 +236,9 @@ bytes serialize_group_key(raft::group_id group, metadata_key key_type) { return iobuf_to_bytes(buf); } -ss::future<> move_persistent_state( +ss::future<> copy_persistent_state( raft::group_id group, - ss::shard_id source_shard, + storage::kvstore& source_kvs, ss::shard_id target_shard, ss::sharded& api) { struct persistent_state { @@ -249,98 +249,87 @@ ss::future<> move_persistent_state( std::optional highest_known_offset; std::optional next_cfg_idx; }; - using state_ptr = std::unique_ptr; - using state_fptr = ss::foreign_ptr>; - - state_fptr state = co_await api.invoke_on( - source_shard, [gr = group](storage::api& api) { - const auto ks = storage::kvstore::key_space::consensus; - persistent_state state{ - .voted_for = api.kvs().get( - ks, serialize_group_key(gr, metadata_key::voted_for)), - .last_applied = api.kvs().get( - ks, serialize_group_key(gr, metadata_key::last_applied_offset)), - .unique_run_id = api.kvs().get( - ks, serialize_group_key(gr, metadata_key::unique_local_id)), - .configuration_map = api.kvs().get( - ks, serialize_group_key(gr, metadata_key::config_map)), - .highest_known_offset = api.kvs().get( - ks, - serialize_group_key( - gr, metadata_key::config_latest_known_offset)), - .next_cfg_idx = api.kvs().get( - ks, serialize_group_key(gr, metadata_key::config_next_cfg_idx))}; - return ss::make_foreign( - std::make_unique(std::move(state))); - }); + const auto ks = storage::kvstore::key_space::consensus; + const persistent_state state{ + .voted_for = source_kvs.get( + ks, serialize_group_key(group, metadata_key::voted_for)), + .last_applied = source_kvs.get( + ks, serialize_group_key(group, metadata_key::last_applied_offset)), + .unique_run_id = source_kvs.get( + ks, serialize_group_key(group, metadata_key::unique_local_id)), + .configuration_map = source_kvs.get( + ks, serialize_group_key(group, metadata_key::config_map)), + .highest_known_offset = source_kvs.get( + ks, + serialize_group_key(group, metadata_key::config_latest_known_offset)), + .next_cfg_idx = source_kvs.get( + ks, serialize_group_key(group, metadata_key::config_next_cfg_idx))}; co_await api.invoke_on( - target_shard, [gr = group, state = std::move(state)](storage::api& api) { + target_shard, [gr = group, &state](storage::api& api) { const auto ks = storage::kvstore::key_space::consensus; std::vector> write_futures; write_futures.reserve(6); - if (state->voted_for) { + if (state.voted_for) { write_futures.push_back(api.kvs().put( ks, serialize_group_key(gr, metadata_key::voted_for), - state->voted_for->copy())); + state.voted_for->copy())); } - if (state->last_applied) { + if (state.last_applied) { write_futures.push_back(api.kvs().put( ks, serialize_group_key(gr, metadata_key::last_applied_offset), - state->last_applied->copy())); + state.last_applied->copy())); } - if (state->unique_run_id) { + if (state.unique_run_id) { write_futures.push_back(api.kvs().put( ks, serialize_group_key(gr, metadata_key::unique_local_id), - state->unique_run_id->copy())); + state.unique_run_id->copy())); } - if (state->configuration_map) { + if (state.configuration_map) { write_futures.push_back(api.kvs().put( ks, serialize_group_key(gr, metadata_key::config_map), - state->configuration_map->copy())); + state.configuration_map->copy())); } - if (state->highest_known_offset) { + if (state.highest_known_offset) { write_futures.push_back(api.kvs().put( ks, serialize_group_key( gr, metadata_key::config_latest_known_offset), - state->highest_known_offset->copy())); + state.highest_known_offset->copy())); } - if (state->next_cfg_idx) { + if (state.next_cfg_idx) { write_futures.push_back(api.kvs().put( ks, serialize_group_key(gr, metadata_key::config_next_cfg_idx), - state->next_cfg_idx->copy())); + state.next_cfg_idx->copy())); } - return ss::when_all_succeed( - write_futures.begin(), write_futures.end()); + return ss::when_all_succeed(std::move(write_futures)); }); +} - // remove on source shard - co_await api.invoke_on(source_shard, [gr = group](storage::api& api) { - const auto ks = storage::kvstore::key_space::consensus; - std::vector> remove_futures; - remove_futures.reserve(6); - remove_futures.push_back(api.kvs().remove( - ks, serialize_group_key(gr, metadata_key::voted_for))); - remove_futures.push_back(api.kvs().remove( - ks, serialize_group_key(gr, metadata_key::last_applied_offset))); - remove_futures.push_back(api.kvs().remove( - ks, serialize_group_key(gr, metadata_key::unique_local_id))); - remove_futures.push_back(api.kvs().remove( - ks, serialize_group_key(gr, metadata_key::config_map))); - remove_futures.push_back(api.kvs().remove( - ks, - serialize_group_key(gr, metadata_key::config_latest_known_offset))); - remove_futures.push_back(api.kvs().remove( - ks, serialize_group_key(gr, metadata_key::config_next_cfg_idx))); - return ss::when_all_succeed( - remove_futures.begin(), remove_futures.end()); - }); +ss::future<> +remove_persistent_state(raft::group_id group, storage::kvstore& kvs) { + const auto ks = storage::kvstore::key_space::consensus; + std::vector> remove_futures; + remove_futures.reserve(6); + remove_futures.push_back( + kvs.remove(ks, serialize_group_key(group, metadata_key::voted_for))); + remove_futures.push_back(kvs.remove( + ks, serialize_group_key(group, metadata_key::last_applied_offset))); + remove_futures.push_back(kvs.remove( + ks, serialize_group_key(group, metadata_key::unique_local_id))); + remove_futures.push_back( + kvs.remove(ks, serialize_group_key(group, metadata_key::config_map))); + remove_futures.push_back(kvs.remove( + ks, + serialize_group_key(group, metadata_key::config_latest_known_offset))); + remove_futures.push_back(kvs.remove( + ks, serialize_group_key(group, metadata_key::config_next_cfg_idx))); + co_await ss::when_all_succeed(std::move(remove_futures)); } // Return previous offset. This is different from diff --git a/src/v/raft/consensus_utils.h b/src/v/raft/consensus_utils.h index 5f60ac7d7b2b7..f02a5a2687ba9 100644 --- a/src/v/raft/consensus_utils.h +++ b/src/v/raft/consensus_utils.h @@ -182,15 +182,20 @@ auto for_each_ref_extract_configuration( bytes serialize_group_key(raft::group_id, metadata_key); /** - * moves raft persistent state from KV store on source shard to the one on + * copies raft persistent state from KV store on source shard to the one on * target shard. */ -ss::future<> move_persistent_state( +ss::future<> copy_persistent_state( raft::group_id, - ss::shard_id source_shard, + storage::kvstore& source_kvs, ss::shard_id target_shard, ss::sharded&); +/** + * removes raft persistent state from a kvstore. + */ +ss::future<> remove_persistent_state(raft::group_id, storage::kvstore&); + /// Creates persitent state for pre-existing partition (stored in S3 bucket). /// /// The function is supposed to be called before creating a raft group with the diff --git a/src/v/raft/persisted_stm.cc b/src/v/raft/persisted_stm.cc index 1a42dbb53f2ed..f7a62f9ceeeae 100644 --- a/src/v/raft/persisted_stm.cc +++ b/src/v/raft/persisted_stm.cc @@ -57,45 +57,46 @@ stm_snapshot_key(const ss::sstring& snapshot_name, const model::ntp& ntp) { return ssx::sformat("{}/{}", snapshot_name, ntp); } -ss::future<> do_move_persistent_stm_state( +const std::vector& stm_snapshot_names() { + static const std::vector names{ + cluster::archival_stm_snapshot, + cluster::tm_stm_snapshot, + cluster::id_allocator_snapshot, + cluster::rm_stm_snapshot}; + + return names; +} + +ss::future<> do_copy_persistent_stm_state( ss::sstring snapshot_name, model::ntp ntp, - ss::shard_id source_shard, + storage::kvstore& source_kvs, ss::shard_id target_shard, ss::sharded& api) { - using state_ptr = std::unique_ptr; - using state_fptr = ss::foreign_ptr; - const auto key_as_str = stm_snapshot_key(snapshot_name, ntp); bytes key; key.append( reinterpret_cast(key_as_str.begin()), key_as_str.size()); - state_fptr snapshot = co_await api.invoke_on( - source_shard, [key](storage::api& api) { - const auto ks = storage::kvstore::key_space::stms; - auto snapshot_data = api.kvs().get(ks, key); - auto snapshot_ptr = !snapshot_data ? nullptr - : std::make_unique( - std::move(*snapshot_data)); - return ss::make_foreign(std::move(snapshot_ptr)); - }); - + auto snapshot = source_kvs.get(storage::kvstore::key_space::stms, key); if (snapshot) { co_await api.invoke_on( - target_shard, - [key, snapshot = std::move(snapshot)](storage::api& api) { - const auto ks = storage::kvstore::key_space::stms; - return api.kvs().put(ks, key, snapshot->copy()); + target_shard, [key, &snapshot](storage::api& api) { + return api.kvs().put( + storage::kvstore::key_space::stms, key, snapshot->copy()); }); - - co_await api.invoke_on(source_shard, [key](storage::api& api) { - const auto ks = storage::kvstore::key_space::stms; - return api.kvs().remove(ks, key); - }); } } +ss::future<> do_remove_persistent_stm_state( + ss::sstring snapshot_name, model::ntp ntp, storage::kvstore& kvs) { + const auto key_as_str = stm_snapshot_key(snapshot_name, ntp); + bytes key; + key.append( + reinterpret_cast(key_as_str.begin()), key_as_str.size()); + co_await kvs.remove(storage::kvstore::key_space::stms, key); +} + } // namespace template @@ -623,23 +624,26 @@ template persisted_stm::persisted_stm( template persisted_stm::persisted_stm( ss::sstring, seastar::logger&, raft::consensus*, storage::kvstore&); -ss::future<> move_persistent_stm_state( +ss::future<> copy_persistent_stm_state( model::ntp ntp, - ss::shard_id source_shard, + storage::kvstore& source_kvs, ss::shard_id target_shard, ss::sharded& api) { - static const std::vector stm_snapshot_names{ - cluster::archival_stm_snapshot, - cluster::tm_stm_snapshot, - cluster::id_allocator_snapshot, - cluster::rm_stm_snapshot}; - return ss::parallel_for_each( - stm_snapshot_names, - [ntp = std::move(ntp), source_shard, target_shard, &api]( + stm_snapshot_names(), + [ntp = std::move(ntp), &source_kvs, target_shard, &api]( const ss::sstring& snapshot_name) { - return do_move_persistent_stm_state( - snapshot_name, ntp, source_shard, target_shard, api); + return do_copy_persistent_stm_state( + snapshot_name, ntp, source_kvs, target_shard, api); + }); +} + +ss::future<> +remove_persistent_stm_state(model::ntp ntp, storage::kvstore& kvs) { + return ss::parallel_for_each( + stm_snapshot_names(), + [ntp = std::move(ntp), &kvs](const ss::sstring& snapshot_name) { + return do_remove_persistent_stm_state(snapshot_name, ntp, kvs); }); } diff --git a/src/v/raft/persisted_stm.h b/src/v/raft/persisted_stm.h index a854118448261..5ce78d723757f 100644 --- a/src/v/raft/persisted_stm.h +++ b/src/v/raft/persisted_stm.h @@ -119,12 +119,14 @@ class kvstore_backed_stm_snapshot { storage::kvstore& _kvstore; }; -ss::future<> move_persistent_stm_state( +ss::future<> copy_persistent_stm_state( model::ntp ntp, - ss::shard_id source_shard, + storage::kvstore& source_kvs, ss::shard_id target_shard, ss::sharded&); +ss::future<> remove_persistent_stm_state(model::ntp ntp, storage::kvstore&); + template concept supported_stm_snapshot = requires(T s, stm_snapshot&& snapshot) { { s.perform_initial_cleanup() } -> std::same_as>; diff --git a/src/v/storage/api.h b/src/v/storage/api.h index 3525f859c2b8b..ff09162beb081 100644 --- a/src/v/storage/api.h +++ b/src/v/storage/api.h @@ -39,7 +39,7 @@ class api : public ss::peering_sharded_service { ss::future<> start() { _kvstore = std::make_unique( - _kv_conf_cb(), _resources, _feature_table); + _kv_conf_cb(), ss::this_shard_id(), _resources, _feature_table); return _kvstore->start().then([this] { _log_mgr = std::make_unique( _log_conf_cb(), kvs(), _resources, _feature_table); @@ -47,6 +47,18 @@ class api : public ss::peering_sharded_service { }); } + ss::future> + make_extra_kvstore(ss::shard_id s) { + vassert( + s >= ss::smp::count, + "can't make extra kvstore for existing shard {}", + s); + auto kvs = std::make_unique( + _kv_conf_cb(), s, _resources, _feature_table); + co_await kvs->start(); + co_return kvs; + } + void stop_cluster_uuid_waiters() { _has_cluster_uuid_cond.broken(); } ss::future<> stop() { diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index a821325f2b0ed..65b136955319a 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -20,6 +20,7 @@ #include "model/timestamp.h" #include "reflection/adl.h" #include "ssx/future-util.h" +#include "storage/api.h" #include "storage/chunk_cache.h" #include "storage/compacted_offset_list.h" #include "storage/compaction_reducers.h" @@ -184,14 +185,7 @@ ss::future<> disk_log_impl::remove() { vlog(stlog.info, "Finished removing all segments:{}", config()); }) .then([this] { - return _kvstore.remove( - kvstore::key_space::storage, - internal::start_offset_key(config().ntp())); - }) - .then([this] { - return _kvstore.remove( - kvstore::key_space::storage, - internal::clean_segment_key(config().ntp())); + return remove_kvstore_state(config().ntp(), _kvstore); }); }) .finally([this] { _probe->clear_metrics(); }); @@ -3687,4 +3681,40 @@ size_t disk_log_impl::reclaimable_size_bytes() const { return _reclaimable_size_bytes; } +ss::future<> disk_log_impl::copy_kvstore_state( + model::ntp ntp, + storage::kvstore& source_kvs, + ss::shard_id target_shard, + ss::sharded& storage) { + const auto ks = kvstore::key_space::storage; + std::optional start_offset = source_kvs.get( + ks, internal::start_offset_key(ntp)); + std::optional clean_segment = source_kvs.get( + ks, internal::clean_segment_key(ntp)); + + co_await storage.invoke_on(target_shard, [&](storage::api& api) { + const auto ks = kvstore::key_space::storage; + std::vector> write_futures; + write_futures.reserve(2); + if (start_offset) { + write_futures.push_back(api.kvs().put( + ks, internal::start_offset_key(ntp), start_offset->copy())); + } + if (clean_segment) { + write_futures.push_back(api.kvs().put( + ks, internal::clean_segment_key(ntp), clean_segment->copy())); + } + return ss::when_all_succeed(std::move(write_futures)); + }); +} + +ss::future<> disk_log_impl::remove_kvstore_state( + const model::ntp& ntp, storage::kvstore& kvs) { + const auto ks = kvstore::key_space::storage; + return ss::when_all_succeed( + kvs.remove(ks, internal::start_offset_key(ntp)), + kvs.remove(ks, internal::clean_segment_key(ntp))) + .discard_result(); +} + } // namespace storage diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index dfdc72bca881e..d544eb10160b7 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -208,6 +208,15 @@ class disk_log_impl final : public log { const auto& compaction_ratio() const { return _compaction_ratio; } + static ss::future<> copy_kvstore_state( + model::ntp, + storage::kvstore& source_kvs, + ss::shard_id target_shard, + ss::sharded&); + + static ss::future<> + remove_kvstore_state(const model::ntp&, storage::kvstore&); + private: friend class disk_log_appender; // for multi-term appends friend class disk_log_builder; // for tests diff --git a/src/v/storage/kvstore.cc b/src/v/storage/kvstore.cc index cb70ec4aba528..f9c40b973bad5 100644 --- a/src/v/storage/kvstore.cc +++ b/src/v/storage/kvstore.cc @@ -37,12 +37,13 @@ namespace storage { kvstore::kvstore( kvstore_config kv_conf, + ss::shard_id shard, storage_resources& resources, ss::sharded& feature_table) : _conf(kv_conf) , _resources(resources) , _feature_table(feature_table) - , _ntpc(model::kvstore_ntp(ss::this_shard_id()), _conf.base_dir) + , _ntpc(model::kvstore_ntp(shard), _conf.base_dir) , _snap( std::filesystem::path(_ntpc.work_directory()), simple_snapshot_manager::default_snapshot_filename, @@ -59,7 +60,9 @@ kvstore::~kvstore() noexcept = default; ss::future<> kvstore::start() { vlog(lg.debug, "Starting kvstore: dir {}", _ntpc.work_directory()); - if (!config::shard_local_cfg().disable_metrics()) { + bool is_main_instance = static_cast(ss::this_shard_id()) + == _ntpc.ntp().tp.partition(); + if (is_main_instance && !config::shard_local_cfg().disable_metrics()) { _probe.metrics.add_group( prometheus_sanitize::metrics_name("storage:kvstore"), { diff --git a/src/v/storage/kvstore.h b/src/v/storage/kvstore.h index 3ec6b89a1042d..c60ba14f01a9b 100644 --- a/src/v/storage/kvstore.h +++ b/src/v/storage/kvstore.h @@ -107,6 +107,7 @@ class kvstore { explicit kvstore( kvstore_config kv_conf, + ss::shard_id shard, storage_resources&, ss::sharded& feature_table); ~kvstore() noexcept; diff --git a/src/v/storage/offset_translator.cc b/src/v/storage/offset_translator.cc index d0702dec895b1..dd2a4aff622ee 100644 --- a/src/v/storage/offset_translator.cc +++ b/src/v/storage/offset_translator.cc @@ -377,73 +377,62 @@ ss::future<> offset_translator::do_checkpoint() { _checkpoint_hint = false; } -ss::future<> offset_translator::move_persistent_state( +ss::future<> offset_translator::copy_persistent_state( raft::group_id group, - ss::shard_id source_shard, + storage::kvstore& source_kvs, ss::shard_id target_shard, ss::sharded& api) { struct ot_state { std::optional highest_known_offset; std::optional offset_map; }; - using state_ptr = std::unique_ptr; vlog( storage::stlog.debug, - "moving group {} offset translator state from {} to {}", + "moving group {} offset translator state to {}", group, - source_shard, target_shard); static constexpr auto ks = storage::kvstore::key_space::offset_translator; - auto state = co_await api.invoke_on( - source_shard, [gr = group](storage::api& api) { - ot_state st{ - .highest_known_offset = api.kvs().get( - ks, - serialize_kvstore_key( - gr, kvstore_key_type::highest_known_offset)), - .offset_map = api.kvs().get( - ks, serialize_kvstore_key(gr, kvstore_key_type::offsets_map)), - }; - return ss::make_foreign( - std::make_unique(std::move(st))); - }); + ot_state state{ + .highest_known_offset = source_kvs.get( + ks, + serialize_kvstore_key(group, kvstore_key_type::highest_known_offset)), + .offset_map = source_kvs.get( + ks, serialize_kvstore_key(group, kvstore_key_type::offsets_map)), + }; co_await api.invoke_on( - target_shard, - [gr = group, - state = std::move(state)](storage::api& api) -> ss::future<> { + target_shard, [gr = group, &state](storage::api& api) -> ss::future<> { std::vector> write_futures; write_futures.reserve(2); - if (state->offset_map) { + if (state.offset_map) { write_futures.push_back(api.kvs().put( ks, serialize_kvstore_key(gr, kvstore_key_type::offsets_map), - state->offset_map->copy())); + state.offset_map->copy())); } - if (state->highest_known_offset) { + if (state.highest_known_offset) { write_futures.push_back(api.kvs().put( ks, serialize_kvstore_key( gr, kvstore_key_type::highest_known_offset), - state->highest_known_offset->copy())); + state.highest_known_offset->copy())); } - return ss::when_all_succeed( - write_futures.begin(), write_futures.end()); + return ss::when_all_succeed(std::move(write_futures)); }); +} - // remove on source shard - co_await api.invoke_on(source_shard, [gr = group](storage::api& api) { - std::vector> remove_futures; - remove_futures.reserve(2); - remove_futures.push_back(api.kvs().remove( - ks, - serialize_kvstore_key(gr, kvstore_key_type::highest_known_offset))); - remove_futures.push_back(api.kvs().remove( - ks, serialize_kvstore_key(gr, kvstore_key_type::offsets_map))); - return ss::when_all_succeed( - remove_futures.begin(), remove_futures.end()); - }); +ss::future<> offset_translator::remove_persistent_state( + raft::group_id group, storage::kvstore& kvs) { + static constexpr auto ks = storage::kvstore::key_space::offset_translator; + std::vector> remove_futures; + remove_futures.reserve(2); + remove_futures.push_back(kvs.remove( + ks, + serialize_kvstore_key(group, kvstore_key_type::highest_known_offset))); + remove_futures.push_back(kvs.remove( + ks, serialize_kvstore_key(group, kvstore_key_type::offsets_map))); + co_await ss::when_all_succeed(std::move(remove_futures)); } } // namespace storage diff --git a/src/v/storage/offset_translator.h b/src/v/storage/offset_translator.h index 97453c1e92330..7f85d6ca98474 100644 --- a/src/v/storage/offset_translator.h +++ b/src/v/storage/offset_translator.h @@ -103,14 +103,17 @@ class offset_translator { ss::future<> remove_persistent_state(); - /// Moves offset translator persistent state from source to target shard. - /// Note when move state on the source shard is deleted - static ss::future<> move_persistent_state( + /// Copies offset translator persistent state from source to target shard. + static ss::future<> copy_persistent_state( raft::group_id, - ss::shard_id source_shard, + storage::kvstore& source_kvs, ss::shard_id target_shard, ss::sharded&); + /// Removes offset translator persistent state from a kvstore. + static ss::future<> + remove_persistent_state(raft::group_id, storage::kvstore&); + /// Get offset translator storage::kvstore keys. Used only for testing bytes offsets_map_key() const; bytes highest_known_offset_key() const; diff --git a/src/v/storage/tests/kvstore_fixture.h b/src/v/storage/tests/kvstore_fixture.h index 913d2ec211e98..5ff280f7e19b2 100644 --- a/src/v/storage/tests/kvstore_fixture.h +++ b/src/v/storage/tests/kvstore_fixture.h @@ -34,7 +34,7 @@ class kvstore_test_fixture { std::unique_ptr make_kvstore() { return std::make_unique( - _kv_config, resources, _feature_table); + _kv_config, ss::this_shard_id(), resources, _feature_table); } ~kvstore_test_fixture() { diff --git a/src/v/storage/tests/offset_translator_tests.cc b/src/v/storage/tests/offset_translator_tests.cc index 87d20319e0281..71a6014718efc 100644 --- a/src/v/storage/tests/offset_translator_tests.cc +++ b/src/v/storage/tests/offset_translator_tests.cc @@ -641,8 +641,11 @@ FIXTURE_TEST(test_moving_persistent_state, base_fixture) { // use last available shard auto target_shard = ss::smp::count - 1; // move state to target shard - storage::offset_translator::move_persistent_state( - raft::group_id(0), ss::this_shard_id(), target_shard, _api) + storage::offset_translator::copy_persistent_state( + raft::group_id(0), _api.local().kvs(), target_shard, _api) + .get(); + storage::offset_translator::remove_persistent_state( + raft::group_id(0), _api.local().kvs()) .get(); // validate translation on target shard diff --git a/src/v/storage/tests/storage_test_fixture.h b/src/v/storage/tests/storage_test_fixture.h index 840dd24ddb21e..20dc71fd272d3 100644 --- a/src/v/storage/tests/storage_test_fixture.h +++ b/src/v/storage/tests/storage_test_fixture.h @@ -205,6 +205,7 @@ class storage_test_fixture { config::mock_binding(10ms), test_dir, storage::make_sanitized_file_config()), + ss::this_shard_id(), resources, feature_table) { configure_unit_test_logging(); diff --git a/tests/rptest/tests/node_resize_test.py b/tests/rptest/tests/node_resize_test.py index 82015f5bb9990..0dc59665b57ca 100644 --- a/tests/rptest/tests/node_resize_test.py +++ b/tests/rptest/tests/node_resize_test.py @@ -18,17 +18,15 @@ from rptest.tests.redpanda_test import RedpandaTest RESIZE_LOG_ALLOW_LIST = RESTART_LOG_ALLOW_LIST + [ - re.compile("Decreasing redpanda core count is not allowed"), - re.compile( - "Failure during startup: cluster::configuration_invariants_changed") + re.compile("Detected decrease in number of cores"), ] class NodeResizeTest(RedpandaTest): """ Validate redpanda behaviour on node core count changes. At time of writing this simply checks - that redpanda refuses to start if core count has decreased: if we make node resizes more - flexible in future, this test should be updated to exercise that. + that redpanda refuses to start if core count has decreased and core balancing on core count + change is unavailable. """ INITIAL_NUM_CPUS = 2 @@ -37,6 +35,7 @@ def __init__(self, *args, **kwargs): super().__init__( *args, resource_settings=ResourceSettings(num_cpus=self.INITIAL_NUM_CPUS), + extra_rp_conf={"core_balancing_on_core_count_change": False}, **kwargs) def _restart_with_num_cpus(self, node: ClusterNode, num_cpus: int, diff --git a/tests/rptest/tests/shard_placement_test.py b/tests/rptest/tests/shard_placement_test.py index 687d465ea979b..20baaa86f4fbd 100644 --- a/tests/rptest/tests/shard_placement_test.py +++ b/tests/rptest/tests/shard_placement_test.py @@ -345,7 +345,10 @@ def test_manual_rebalance(self): @cluster(num_nodes=6) def test_core_count_change(self): - self.redpanda.set_resource_settings(ResourceSettings(num_cpus=1)) + initial_core_count = self.redpanda.get_node_cpu_count() + + self.redpanda.set_resource_settings( + ResourceSettings(num_cpus=initial_core_count - 1)) self.redpanda.start() admin = Admin(self.redpanda) @@ -353,7 +356,8 @@ def test_core_count_change(self): n_partitions = 10 - for topic in ["foo", "bar"]: + topics = ["foo", "bar"] + for topic in topics: # create topics with rf=5 for ease of accounting rpk.create_topic(topic, partitions=n_partitions, replicas=5) @@ -365,18 +369,41 @@ def test_core_count_change(self): node = self.redpanda.nodes[0] node_id = self.redpanda.node_id(node) - self.redpanda.stop_node(node) - self.redpanda.set_resource_settings(ResourceSettings(num_cpus=2)) - self.redpanda.start_node(node) - self.redpanda.wait_for_membership(first_start=False) + + def restart_node(num_cpus): + self.redpanda.stop_node(node) + self.redpanda.set_resource_settings( + ResourceSettings(num_cpus=num_cpus)) + self.redpanda.start_node(node) + self.redpanda.wait_for_membership(first_start=False) + + def configuration_updated(): + for n in self.redpanda.nodes: + broker = [ + b for b in admin.get_brokers(node=n) + if b["node_id"] == node_id + ][0] + if broker["num_cores"] != num_cpus: + return False + return True + + wait_until(configuration_updated, timeout_sec=15, backoff_sec=2) + + restart_node(num_cpus=initial_core_count) # check that the node moved partitions to the new core + def check_balanced_shard_map(shard_map, num_cpus): + self.print_shard_stats(shard_map) + counts_by_topic = self.get_shard_counts_by_topic( + shard_map, node_id) + for topic in topics: + shard_counts = counts_by_topic[topic] + assert len(shard_counts) == num_cpus + assert sum(shard_counts) == n_partitions + assert max(shard_counts) - min(shard_counts) <= 1 + shard_map = self.get_replica_shard_map([node], admin) - self.print_shard_stats(shard_map) - counts_by_topic = self.get_shard_counts_by_topic(shard_map, node_id) - assert len(counts_by_topic) > 0 - for topic, shard_counts in counts_by_topic.items(): - assert max(shard_counts) - min(shard_counts) <= 1 + check_balanced_shard_map(shard_map, initial_core_count) # do some manual moves and check that their effects remain # if the core count doesn't change. @@ -404,9 +431,26 @@ def test_core_count_change(self): self.print_shard_stats(map_after_restart) assert map_after_restart == shard_map - self.stop_client_load() + self.logger.info("decreasing core count...") + + restart_node(num_cpus=initial_core_count - 1) + shard_map = self.get_replica_shard_map([node], admin) + check_balanced_shard_map(shard_map, initial_core_count - 1) + + self.logger.info("creating another topic...") + rpk.create_topic("quux", partitions=n_partitions, replicas=5) + topics.append("quux") - # TODO: core count decrease (not supported yet) + shard_map = self.wait_shard_map_stationary([node], admin) + check_balanced_shard_map(shard_map, initial_core_count - 1) + + self.logger.info("increasing core count back...") + + restart_node(num_cpus=initial_core_count) + shard_map = self.get_replica_shard_map([node], admin) + check_balanced_shard_map(shard_map, initial_core_count) + + self.stop_client_load() @cluster(num_nodes=6) def test_node_join(self):