diff --git a/src/v/cluster/archival_metadata_stm.cc b/src/v/cluster/archival_metadata_stm.cc index eda579b9f396b..1aad51d86d335 100644 --- a/src/v/cluster/archival_metadata_stm.cc +++ b/src/v/cluster/archival_metadata_stm.cc @@ -9,6 +9,7 @@ #include "cluster/archival_metadata_stm.h" +#include "bytes/iobuf.h" #include "bytes/iostream.h" #include "cloud_storage/partition_manifest.h" #include "cloud_storage/remote.h" @@ -708,7 +709,7 @@ ss::future archival_metadata_stm::do_add_segments( co_return errc::success; } -ss::future<> archival_metadata_stm::apply(model::record_batch b) { +ss::future<> archival_metadata_stm::apply(const model::record_batch& b) { if (b.header().type == model::record_batch_type::prefix_truncate) { // Special case handling for prefix_truncate batches: these originate // in log_eviction_stm, but affect the entire partition, local and @@ -801,7 +802,7 @@ ss::future<> archival_metadata_stm::apply(model::record_batch b) { _manifest->advance_insync_offset(b.last_offset()); } -ss::future<> archival_metadata_stm::handle_raft_snapshot() { +ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) { cloud_storage::partition_manifest new_manifest{ _manifest->get_ntp(), _manifest->get_revision_id()}; @@ -1202,7 +1203,7 @@ archival_metadata_stm::get_segments_to_cleanup() const { ss::future<> archival_metadata_stm::stop() { _download_as.request_abort(); - co_await raft::state_machine::stop(); + co_await persisted_stm<>::stop(); } const cloud_storage::partition_manifest& diff --git a/src/v/cluster/archival_metadata_stm.h b/src/v/cluster/archival_metadata_stm.h index 44a2179e74e64..3fa3aff8cc127 100644 --- a/src/v/cluster/archival_metadata_stm.h +++ b/src/v/cluster/archival_metadata_stm.h @@ -225,6 +225,9 @@ class archival_metadata_stm final : public persisted_stm<> { model::offset max_collectible_offset() override; + const char* get_name() const final { return "archival_metadata_stm"; } + ss::future take_snapshot(model::offset) final { co_return iobuf{}; } + private: ss::future do_add_segments( std::vector, @@ -235,8 +238,8 @@ class archival_metadata_stm final : public persisted_stm<> { ss::future do_replicate_commands(model::record_batch, ss::abort_source&); - ss::future<> apply(model::record_batch batch) override; - ss::future<> handle_raft_snapshot() override; + ss::future<> apply(const model::record_batch& batch) override; + ss::future<> apply_raft_snapshot(const iobuf&) override; ss::future<> apply_local_snapshot(stm_snapshot_header, iobuf&&) override; ss::future take_local_snapshot() override; diff --git a/src/v/cluster/id_allocator_stm.cc b/src/v/cluster/id_allocator_stm.cc index b1bca850b3227..9f34dbdd7ab06 100644 --- a/src/v/cluster/id_allocator_stm.cc +++ b/src/v/cluster/id_allocator_stm.cc @@ -9,6 +9,7 @@ #include "cluster/id_allocator_stm.h" +#include "bytes/iobuf.h" #include "cluster/logger.h" #include "cluster/types.h" #include "config/configuration.h" @@ -118,7 +119,7 @@ id_allocator_stm::do_allocate_id(model::timeout_clock::duration timeout) { co_return stm_allocation_result{id, raft::errc::success}; } -ss::future<> id_allocator_stm::apply(model::record_batch b) { +ss::future<> id_allocator_stm::apply(const model::record_batch& b) { if (b.header().type != model::record_batch_type::id_allocator) { return ss::now(); } @@ -202,7 +203,7 @@ ss::future id_allocator_stm::take_local_snapshot() { std::logic_error("id_allocator_stm doesn't support snapshots")); } -ss::future<> id_allocator_stm::handle_raft_snapshot() { +ss::future<> id_allocator_stm::apply_raft_snapshot(const iobuf&) { _next_snapshot = _raft->start_offset(); _processed = 0; set_next(_next_snapshot); diff --git a/src/v/cluster/id_allocator_stm.h b/src/v/cluster/id_allocator_stm.h index b55e92eb086db..3776049b28afa 100644 --- a/src/v/cluster/id_allocator_stm.h +++ b/src/v/cluster/id_allocator_stm.h @@ -46,6 +46,9 @@ class id_allocator_stm final : public persisted_stm<> { ss::future allocate_id(model::timeout_clock::duration timeout); + const char* get_name() const final { return "id_allocator_stm"; } + ss::future take_snapshot(model::offset) final { co_return iobuf{}; } + private: // legacy structs left for backward compatibility with the "old" // on-disk log format @@ -95,12 +98,12 @@ class id_allocator_stm final : public persisted_stm<> { do_allocate_id(model::timeout_clock::duration); ss::future set_state(int64_t, model::timeout_clock::duration); - ss::future<> apply(model::record_batch) override; + ss::future<> apply(const model::record_batch&) final; ss::future<> write_snapshot(); ss::future<> apply_local_snapshot(stm_snapshot_header, iobuf&&) override; ss::future take_local_snapshot() override; - ss::future<> handle_raft_snapshot() override; + ss::future<> apply_raft_snapshot(const iobuf&) final; ss::future sync(model::timeout_clock::duration); mutex _lock; diff --git a/src/v/cluster/log_eviction_stm.cc b/src/v/cluster/log_eviction_stm.cc index 53a38af136cb4..1436b8972ebe4 100644 --- a/src/v/cluster/log_eviction_stm.cc +++ b/src/v/cluster/log_eviction_stm.cc @@ -11,6 +11,7 @@ #include "bytes/iostream.h" #include "cluster/prefix_truncate_record.h" +#include "model/fundamental.h" #include "raft/consensus.h" #include "raft/types.h" #include "serde/serde.h" @@ -36,12 +37,8 @@ struct snapshot_data }; log_eviction_stm::log_eviction_stm( - raft::consensus* raft, - ss::logger& logger, - ss::abort_source& as, - storage::kvstore& kvstore) - : persisted_stm("log_eviction_stm.snapshot", logger, raft, kvstore) - , _as(as) {} + raft::consensus* raft, ss::logger& logger, storage::kvstore& kvstore) + : persisted_stm("log_eviction_stm.snapshot", logger, raft, kvstore) {} ss::future<> log_eviction_stm::start() { ssx::spawn_with_gate(_gate, [this] { return monitor_log_eviction(); }); @@ -51,8 +48,12 @@ ss::future<> log_eviction_stm::start() { } ss::future<> log_eviction_stm::stop() { + vlog(_log.info, "DBG: log_eviction_stm::stop()"); + _as.request_abort(); _has_pending_truncation.broken(); + vlog(_log.info, "DBG: log_eviction_stm::persisted_stm::stop()"); co_await persisted_stm::stop(); + vlog(_log.info, "DBG: log_eviction_stm::after_stop()"); } ss::future<> log_eviction_stm::handle_log_eviction_events() { @@ -192,8 +193,10 @@ log_eviction_stm::do_write_raft_snapshot(model::offset truncation_point) { _log.debug, "Requesting raft snapshot with final offset: {}", truncation_point); + auto snapshot_data = co_await _raft->stm_manager()->take_snapshot( + truncation_point); co_await _raft->write_snapshot( - raft::write_snapshot_cfg(truncation_point, iobuf())); + raft::write_snapshot_cfg(truncation_point, std::move(snapshot_data))); } ss::future> @@ -317,7 +320,7 @@ ss::future log_eviction_stm::replicate_command( co_return result.value().last_offset; } -ss::future<> log_eviction_stm::apply(model::record_batch batch) { +ss::future<> log_eviction_stm::apply(const model::record_batch& batch) { if (likely( batch.header().type != model::record_batch_type::prefix_truncate)) { co_return; @@ -366,23 +369,8 @@ ss::future<> log_eviction_stm::apply(model::record_batch batch) { } } -ss::future<> log_eviction_stm::handle_raft_snapshot() { - /// In the case there is a gap detected in the log, the only path - /// forward is to read the raft snapshot and begin processing from the - /// raft last_snapshot_index - auto raft_snapshot = co_await _raft->open_snapshot(); - if (!raft_snapshot) { - throw std::runtime_error{fmt_with_ctx( - fmt::format, - "encountered a gap in the raft log (last_applied: {}, log start " - "offset: {}), but can't find the snapshot - ntp: {}", - last_applied_offset(), - _raft->start_offset(), - _raft->ntp())}; - } - - auto last_snapshot_index = raft_snapshot->metadata.last_included_index; - co_await raft_snapshot->close(); +ss::future<> log_eviction_stm::apply_raft_snapshot(const iobuf&) { + auto last_snapshot_index = model::prev_offset(_raft->start_offset()); _delete_records_eviction_offset = model::offset{}; _storage_eviction_offset = last_snapshot_index; set_next(model::next_offset(last_snapshot_index)); @@ -390,6 +378,7 @@ ss::future<> log_eviction_stm::handle_raft_snapshot() { _log.info, "Handled log eviction new effective start offset: {}", effective_start_offset()); + co_return; } ss::future<> log_eviction_stm::apply_local_snapshot( diff --git a/src/v/cluster/log_eviction_stm.h b/src/v/cluster/log_eviction_stm.h index c4728b03df41c..c4921f4d13829 100644 --- a/src/v/cluster/log_eviction_stm.h +++ b/src/v/cluster/log_eviction_stm.h @@ -46,8 +46,7 @@ class consensus; class log_eviction_stm : public persisted_stm { public: using offset_result = result; - log_eviction_stm( - raft::consensus*, ss::logger&, ss::abort_source&, storage::kvstore&); + log_eviction_stm(raft::consensus*, ss::logger&, storage::kvstore&); ss::future<> start() override; @@ -103,6 +102,9 @@ class log_eviction_stm : public persisted_stm { return model::next_offset(_delete_records_eviction_offset); } + const char* get_name() const final { return "log_eviction_stm"; } + ss::future take_snapshot(model::offset) final { co_return iobuf{}; } + protected: ss::future<> apply_local_snapshot(stm_snapshot_header, iobuf&&) override; @@ -117,8 +119,8 @@ class log_eviction_stm : public persisted_stm { ss::future<> monitor_log_eviction(); ss::future<> do_write_raft_snapshot(model::offset); ss::future<> handle_log_eviction_events(); - ss::future<> apply(model::record_batch) override; - ss::future<> handle_raft_snapshot() override; + ss::future<> apply(const model::record_batch&) final; + ss::future<> apply_raft_snapshot(const iobuf&) final; ss::future replicate_command( model::record_batch batch, @@ -126,7 +128,7 @@ class log_eviction_stm : public persisted_stm { std::optional> as); private: - ss::abort_source& _as; + ss::abort_source _as; // Offset we are able to truncate based on local retention policy, as // signaled by the storage layer. This value is not maintained via the diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index e5fce4f9e9b71..1fab966496ef5 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -23,6 +23,7 @@ #include "model/metadata.h" #include "model/namespace.h" #include "prometheus/prometheus_sanitize.h" +#include "raft/state_machine_manager.h" #include "raft/types.h" #include @@ -69,110 +70,35 @@ partition::partition( , _cloud_storage_probe( ss::make_shared(_raft->ntp())) , _upload_housekeeping(upload_hks) - , _kvstore(kvstore) { - auto stm_manager = _raft->log()->stm_manager(); - - if (is_id_allocator_topic(_raft->ntp())) { - _id_allocator_stm = ss::make_shared( - clusterlog, _raft.get()); - } else if (is_tx_manager_topic(_raft->ntp())) { - if ( - _raft->log_config().is_collectable() - && !storage::deletion_exempt(_raft->ntp())) { - _log_eviction_stm = ss::make_shared( - _raft.get(), clusterlog, _as, _kvstore); - stm_manager->add_stm(_log_eviction_stm); - } - - if (_is_tx_enabled) { - auto tm_stm_cache = _tm_stm_cache_manager.local().get( - _raft->ntp().tp.partition); - _tm_stm = ss::make_shared( - clusterlog, _raft.get(), feature_table, tm_stm_cache); - stm_manager->add_stm(_tm_stm); - } - } else { - if ( - _raft->log_config().is_collectable() - && !storage::deletion_exempt(_raft->ntp())) { - _log_eviction_stm = ss::make_shared( - _raft.get(), clusterlog, _as, _kvstore); - stm_manager->add_stm(_log_eviction_stm); - } - const model::topic_namespace tp_ns( - _raft->ntp().ns, _raft->ntp().tp.topic); - bool is_group_ntp = tp_ns == model::kafka_consumer_offsets_nt; - - bool has_rm_stm = (_is_tx_enabled || _is_idempotence_enabled) - && model::controller_ntp != _raft->ntp() - && !is_group_ntp; - - if (has_rm_stm) { - _rm_stm = ss::make_shared( - clusterlog, - _raft.get(), - _tx_gateway_frontend, - _feature_table, - max_concurrent_producer_ids); - stm_manager->add_stm(_rm_stm); - } - - // Construct cloud_storage read path (remote_partition) - if ( - config::shard_local_cfg().cloud_storage_enabled() - && _cloud_storage_api.local_is_initialized() - && _raft->ntp().ns == model::kafka_namespace) { - _archival_meta_stm - = ss::make_shared( - _raft.get(), - _cloud_storage_api.local(), - _feature_table.local(), - clusterlog, - _partition_mem_tracker); - stm_manager->add_stm(_archival_meta_stm); - - if (cloud_storage_cache.local_is_initialized()) { - const auto& bucket_config - = cloud_storage::configuration::get_bucket_config(); - auto bucket = bucket_config.value(); - if ( - read_replica_bucket - && _raft->log_config().is_read_replica_mode_enabled()) { - vlog( - clusterlog.info, - "{} Remote topic bucket is {}", - _raft->ntp(), - read_replica_bucket); - // Override the bucket for read replicas - _read_replica_bucket = read_replica_bucket; - bucket = read_replica_bucket; - } - if (!bucket) { - throw std::runtime_error{fmt::format( - "configuration property {} is not set", - bucket_config.name())}; - } - - _cloud_storage_manifest_view - = ss::make_shared( - _cloud_storage_api, - cloud_storage_cache, - _archival_meta_stm->manifest(), - cloud_storage_clients::bucket_name{*bucket}, - *_cloud_storage_probe); - - _cloud_storage_partition - = ss::make_shared( - _cloud_storage_manifest_view, - _cloud_storage_api.local(), - cloud_storage_cache.local(), - cloud_storage_clients::bucket_name{*bucket}, - *_cloud_storage_probe); + , _kvstore(kvstore) + , _max_concurrent_producer_ids(std::move(max_concurrent_producer_ids)) { + // Construct cloud_storage read path (remote_partition) + if ( + config::shard_local_cfg().cloud_storage_enabled() + && _cloud_storage_api.local_is_initialized() + && _raft->ntp().ns == model::kafka_namespace) { + if (_cloud_storage_cache.local_is_initialized()) { + const auto& bucket_config + = cloud_storage::configuration::get_bucket_config(); + auto bucket = bucket_config.value(); + if ( + read_replica_bucket + && _raft->log_config().is_read_replica_mode_enabled()) { + vlog( + clusterlog.info, + "{} Remote topic bucket is {}", + _raft->ntp(), + read_replica_bucket); + // Override the bucket for read replicas + _read_replica_bucket = read_replica_bucket; + bucket = read_replica_bucket; + } + if (!bucket) { + throw std::runtime_error{fmt::format( + "configuration property {} is not set", + bucket_config.name())}; } } - - // Construct cloud_storage write path (ntp_archiver) - maybe_construct_archiver(); } } @@ -489,43 +415,117 @@ kafka_stages partition::replicate_in_stages( } ss::future<> partition::start() { - auto ntp = _raft->ntp(); - + const auto& ntp = _raft->ntp(); _probe.setup_metrics(ntp); - - auto f = _raft->start(); - + raft::state_machine_manager_builder builder; + // special cases for id_allocator and transaction coordinator partitions if (is_id_allocator_topic(ntp)) { - return f.then([this] { return _id_allocator_stm->start(); }); - } else if (_log_eviction_stm) { - f = f.then([this] { return _log_eviction_stm->start(); }); - } - - if (_rm_stm) { - f = f.then([this] { return _rm_stm->start(); }); - } + _id_allocator_stm = builder.create_stm( + clusterlog, _raft.get()); + co_return co_await _raft->start(std::move(builder)); + } + + if (is_tx_manager_topic(_raft->ntp()) && _is_tx_enabled) { + _tm_stm = builder.create_stm( + clusterlog, + _raft.get(), + _feature_table, + _tm_stm_cache_manager.local().get(_raft->ntp().tp.partition)); + _raft->log()->stm_manager()->add_stm(_tm_stm); + co_return co_await _raft->start(std::move(builder)); + } + /** + * Data partitions + */ + const bool enable_log_eviction = _raft->log_config().is_collectable() + && !storage::deletion_exempt(_raft->ntp()); + if (enable_log_eviction) { + _log_eviction_stm = builder.create_stm( + _raft.get(), clusterlog, _kvstore); + _raft->log()->stm_manager()->add_stm(_log_eviction_stm); + } + const model::topic_namespace_view tp_ns(_raft->ntp()); + const bool is_group_ntp = tp_ns == model::kafka_consumer_offsets_nt; + const bool has_rm_stm = (_is_tx_enabled || _is_idempotence_enabled) + && model::controller_ntp != _raft->ntp() + && !is_group_ntp; + + if (has_rm_stm) { + _rm_stm = builder.create_stm( + clusterlog, + _raft.get(), + _tx_gateway_frontend, + _feature_table, + _max_concurrent_producer_ids); + _raft->log()->stm_manager()->add_stm(_rm_stm); + } + + // Construct cloud_storage read path (remote_partition) + if ( + config::shard_local_cfg().cloud_storage_enabled() + && _cloud_storage_api.local_is_initialized() + && _raft->ntp().ns == model::kafka_namespace) { + _archival_meta_stm = builder.create_stm( + _raft.get(), + _cloud_storage_api.local(), + _feature_table.local(), + clusterlog, + _partition_mem_tracker); + _raft->log()->stm_manager()->add_stm(_archival_meta_stm); + + if (_cloud_storage_cache.local_is_initialized()) { + const auto& bucket_config + = cloud_storage::configuration::get_bucket_config(); + auto bucket = bucket_config.value(); + if ( + _read_replica_bucket + && _raft->log_config().is_read_replica_mode_enabled()) { + vlog( + clusterlog.info, + "{} Remote topic bucket is {}", + _raft->ntp(), + _read_replica_bucket); + // Override the bucket for read replicas + bucket = _read_replica_bucket; + } + if (!bucket) { + throw std::runtime_error{fmt::format( + "configuration property {} is not set", + bucket_config.name())}; + } - if (_tm_stm) { - f = f.then([this] { return _tm_stm->start(); }); + _cloud_storage_manifest_view + = ss::make_shared( + _cloud_storage_api, + _cloud_storage_cache, + _archival_meta_stm->manifest(), + cloud_storage_clients::bucket_name{*bucket}, + *_cloud_storage_probe); + + _cloud_storage_partition + = ss::make_shared( + _cloud_storage_manifest_view, + _cloud_storage_api.local(), + _cloud_storage_cache.local(), + cloud_storage_clients::bucket_name{*bucket}, + *_cloud_storage_probe); + } } - if (_archival_meta_stm) { - f = f.then([this] { return _archival_meta_stm->start(); }); - } + maybe_construct_archiver(); if (_cloud_storage_manifest_view) { - f = f.then([this] { return _cloud_storage_manifest_view->start(); }); + co_await _cloud_storage_manifest_view->start(); } if (_cloud_storage_partition) { - f = f.then([this] { return _cloud_storage_partition->start(); }); + co_await _cloud_storage_partition->start(); } if (_archiver) { - f = f.then([this] { return _archiver->start(); }); + co_await _archiver->start(); } - - return f; + co_return co_await _raft->start(std::move(builder)); } ss::future<> partition::stop() { @@ -543,14 +543,6 @@ ss::future<> partition::stop() { co_await _archiver->stop(); } - if (_archival_meta_stm) { - vlog( - clusterlog.debug, - "Stopping archival_meta_stm on partition: {}", - partition_ntp); - co_await _archival_meta_stm->stop(); - } - if (_cloud_storage_partition) { vlog( clusterlog.debug, @@ -567,33 +559,6 @@ ss::future<> partition::stop() { co_await _cloud_storage_manifest_view->stop(); } - if (_id_allocator_stm) { - vlog( - clusterlog.debug, - "Stopping id_allocator_stm on partition: {}", - partition_ntp); - co_await _id_allocator_stm->stop(); - } - - if (_log_eviction_stm) { - vlog( - clusterlog.debug, - "Stopping log_eviction_stm on partition: {}", - partition_ntp); - co_await _log_eviction_stm->stop(); - } - - if (_rm_stm) { - vlog( - clusterlog.debug, "Stopping rm_stm on partition: {}", partition_ntp); - co_await _rm_stm->stop(); - } - - if (_tm_stm) { - vlog( - clusterlog.debug, "Stopping tm_stm on partition: {}", partition_ntp); - co_await _tm_stm->stop(); - } vlog(clusterlog.debug, "Stopped partition {}", partition_ntp); } @@ -605,9 +570,10 @@ partition::timequery(storage::timequery_config cfg) { } if (_raft->log()->start_timestamp() <= cfg.time) { - // The query is ahead of the local data's start_timestamp: this means - // it _might_ hit on local data: start_timestamp is not precise, so - // once we query we might still fall back to cloud storage + // The query is ahead of the local data's start_timestamp: this + // means it _might_ hit on local data: start_timestamp is not + // precise, so once we query we might still fall back to cloud + // storage auto result = co_await local_timequery(cfg); if (!result.has_value()) { // The local storage hit a case where it needs to fall back @@ -638,10 +604,10 @@ bool partition::may_read_from_cloud() const { ss::future> partition::cloud_storage_timequery(storage::timequery_config cfg) { if (may_read_from_cloud()) { - // We have data in the remote partition, and all the data in the raft - // log is ahead of the query timestamp or the topic is a read replica, - // so proceed to query the remote partition to try and find the earliest - // data that has timestamp >= the query time. + // We have data in the remote partition, and all the data in the + // raft log is ahead of the query timestamp or the topic is a read + // replica, so proceed to query the remote partition to try and + // find the earliest data that has timestamp >= the query time. vlog( clusterlog.debug, "timequery (cloud) {} t={} max_offset(k)={}", @@ -704,16 +670,18 @@ partition::local_timequery(storage::timequery_config cfg) { if ( _raft->log()->start_timestamp() <= cfg.time && result->time > cfg.time && may_answer_from_cloud) { - // start_timestamp() points to the beginning of the oldest segment, - // but start_offset points to somewhere within a segment. If our - // timequery hits the range between the start of segment and - // the start_offset, consensus::timequery may answer with - // the start offset rather than the pre-start-offset location - // where the timestamp is actually found. - // Ref https://github.com/redpanda-data/redpanda/issues/9669 + // start_timestamp() points to the beginning of the oldest + // segment, but start_offset points to somewhere within a + // segment. If our timequery hits the range between the start + // of segment and the start_offset, consensus::timequery may + // answer with the start offset rather than the + // pre-start-offset location where the timestamp is actually + // found. Ref + // https://github.com/redpanda-data/redpanda/issues/9669 vlog( clusterlog.debug, - "Timequery (raft) {} ts={} miss on local log (start_timestamp " + "Timequery (raft) {} ts={} miss on local log " + "(start_timestamp " "{}, result {})", _raft->ntp(), cfg.time, @@ -739,10 +707,10 @@ partition::local_timequery(storage::timequery_config cfg) { _cloud_storage_partition && _cloud_storage_partition->is_data_available() && may_answer_from_cloud) { - // Even though we hit data with the desired timestamp, we cannot - // be certain that this is the _first_ batch with the desired - // timestamp: return null so that the caller will fall back - // to cloud storage. + // Even though we hit data with the desired timestamp, we + // cannot be certain that this is the _first_ batch with + // the desired timestamp: return null so that the caller + // will fall back to cloud storage. co_return std::nullopt; } } @@ -973,8 +941,8 @@ static ss::future should_finalize( ss::future<> partition::finalize_remote_partition(ss::abort_source& as) { if (!_feature_table.local().is_active( features::feature::cloud_storage_manifest_format_v2)) { - // this is meant to prevent uploading manifests with new format while - // the cluster is in a mixed state + // this is meant to prevent uploading manifests with new format + // while the cluster is in a mixed state vlog( clusterlog.info, "skipping finalize of remote partition {}", ntp()); co_return; @@ -1051,11 +1019,11 @@ partition::transfer_leadership(transfer_leadership_request req) { bool archiver_clean = co_await _archiver->prepare_transfer_leadership( archival_timeout.value()); if (!archiver_clean) { - // This is legal: if we are very tight on bandwidth to S3, then it - // can take longer than the available timeout for an upload of - // a large segment to complete. If this happens, we will leak - // an object, but retain a consistent+correct manifest when - // the new leader writes it. + // This is legal: if we are very tight on bandwidth to S3, + // then it can take longer than the available timeout for an + // upload of a large segment to complete. If this happens, we + // will leak an object, but retain a consistent+correct + // manifest when the new leader writes it. vlog( clusterlog.warn, "Timed out waiting for {} uploads to complete before " diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 2d46b157d088b..9fe4b1de487e4 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -478,6 +478,7 @@ class partition { ss::sharded& _upload_housekeeping; storage::kvstore& _kvstore; + config::binding _max_concurrent_producer_ids; friend std::ostream& operator<<(std::ostream& o, const partition& x); }; diff --git a/src/v/cluster/persisted_stm.cc b/src/v/cluster/persisted_stm.cc index 17b5b4661218f..44bd2d363b133 100644 --- a/src/v/cluster/persisted_stm.cc +++ b/src/v/cluster/persisted_stm.cc @@ -15,6 +15,7 @@ #include "raft/consensus.h" #include "raft/errc.h" #include "raft/offset_monitor.h" +#include "raft/state_machine_base.h" #include "raft/types.h" #include "ssx/sformat.h" #include "storage/kvstore.h" @@ -61,7 +62,7 @@ persisted_stm::persisted_stm( ss::logger& logger, raft::consensus* c, Args&&... args) - : raft::state_machine(c, logger, ss::default_priority_class()) + : _raft(c) , _log(logger, ssx::sformat("[{} ({})]", _raft->ntp(), snapshot_mgr_name)) , _snapshot_backend(snapshot_mgr_name, _log, c, std::forward(args)...) { } @@ -71,6 +72,11 @@ ss::future> persisted_stm::load_local_snapshot() { return _snapshot_backend.load_snapshot(); } +template +ss::future<> persisted_stm::stop() { + co_await raft::state_machine_base::stop(); + co_await _gate.close(); +} template ss::future<> persisted_stm::remove_persistent_state() { @@ -83,7 +89,7 @@ file_backed_stm_snapshot::file_backed_stm_snapshot( , _log(log) , _snapshot_mgr( std::filesystem::path(c->log_config().work_directory()), - snapshot_name, + std::move(snapshot_name), ss::default_priority_class()) {} ss::future<> file_backed_stm_snapshot::remove_persistent_state() { @@ -559,7 +565,6 @@ ss::future<> persisted_stm::start() { } _on_snapshot_hydrated.broadcast(); _snapshot_hydrated = true; - co_await state_machine::start(); } template class persisted_stm; diff --git a/src/v/cluster/persisted_stm.h b/src/v/cluster/persisted_stm.h index a56e49473b551..ae11005086ff3 100644 --- a/src/v/cluster/persisted_stm.h +++ b/src/v/cluster/persisted_stm.h @@ -19,6 +19,7 @@ #include "raft/errc.h" #include "raft/logger.h" #include "raft/state_machine.h" +#include "raft/state_machine_base.h" #include "raft/types.h" #include "storage/snapshot.h" #include "storage/types.h" @@ -152,7 +153,7 @@ concept supported_stm_snapshot = requires(T s, stm_snapshot&& snapshot) { template class persisted_stm - : public raft::state_machine + : public raft::state_machine_base , public storage::snapshotable_stm { public: template @@ -202,8 +203,10 @@ class persisted_stm */ ss::future<> start() override; + ss::future<> stop() override; + model::offset last_applied() const final { - return raft::state_machine::last_applied_offset(); + return raft::state_machine_base::last_applied_offset(); } ss::future wait_no_throw( @@ -236,7 +239,9 @@ class persisted_stm bool _is_catching_up{false}; model::term_id _insync_term; model::offset _insync_offset; + raft::consensus* _raft; prefix_logger _log; + ss::gate _gate; private: ss::future<> wait_offset_committed( diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 28514a7852057..4e3dbf900470d 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -11,6 +11,7 @@ #include "bytes/iostream.h" #include "cluster/logger.h" +#include "cluster/persisted_stm.h" #include "cluster/tx_gateway_frontend.h" #include "cluster/tx_snapshot_adl_utils.h" #include "kafka/protocol/wire.h" @@ -20,6 +21,7 @@ #include "prometheus/prometheus_sanitize.h" #include "raft/consensus_utils.h" #include "raft/errc.h" +#include "raft/state_machine_base.h" #include "raft/types.h" #include "ssx/future-util.h" #include "ssx/metrics.h" @@ -323,6 +325,17 @@ rm_stm::rm_stm( }); } +ss::future rm_stm::bootstrap_committed_offset() { + /// It is useful for some STMs to know what the committed offset is so they + /// may do things like block until they have consumed all known committed + /// records. To achieve this, this method waits on offset 0, so on the first + /// call to `event_manager::notify_commit_index`, it is known that the + /// committed offset is in an initialized state. + return _raft->events() + .wait(model::offset(0), model::no_timeout, _as) + .then([this] { return _raft->committed_offset(); }); +} + ss::future> rm_stm::begin_tx( model::producer_identity pid, model::tx_seq tx_seq, @@ -952,9 +965,10 @@ ss::future> rm_stm::do_replicate( } ss::future<> rm_stm::stop() { + _as.request_abort(); auto_abort_timer.cancel(); _log_stats_timer.cancel(); - return raft::state_machine::stop(); + return persisted_stm<>::stop(); } ss::future<> rm_stm::start() { return persisted_stm::start(); } @@ -2112,15 +2126,15 @@ void rm_stm::apply_fence(model::record_batch&& b) { } } -ss::future<> rm_stm::apply(model::record_batch b) { +ss::future<> rm_stm::apply(const model::record_batch& b) { auto last_offset = b.last_offset(); const auto& hdr = b.header(); if (hdr.type == model::record_batch_type::tx_fence) { - apply_fence(std::move(b)); + apply_fence(b.copy()); } else if (hdr.type == model::record_batch_type::tx_prepare) { - apply_prepare(parse_prepare_batch(std::move(b))); + apply_prepare(parse_prepare_batch(b.copy())); } else if (hdr.type == model::record_batch_type::raft_data) { auto bid = model::batch_identity::from(hdr); if (hdr.attrs.is_control()) { @@ -2712,7 +2726,7 @@ ss::future<> rm_stm::do_remove_persistent_state() { co_return co_await persisted_stm::remove_persistent_state(); } -ss::future<> rm_stm::handle_raft_snapshot() { +ss::future<> rm_stm::apply_raft_snapshot(const iobuf&) { return _state_lock.hold_write_lock().then( [this]([[maybe_unused]] ss::basic_rwlock<>::holder unit) { vlog(_ctx_log.debug, "Resetting all state, reason: log eviction"); diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index ad98f9e03aee9..de5e851a9f152 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -11,6 +11,7 @@ #pragma once +#include "bytes/iobuf.h" #include "cluster/persisted_stm.h" #include "cluster/tx_utils.h" #include "cluster/types.h" @@ -33,6 +34,7 @@ #include "utils/prefix_logger.h" #include "utils/tracking_allocator.h" +#include #include #include @@ -364,8 +366,11 @@ class rm_stm final : public persisted_stm<> { uint64_t get_local_snapshot_size() const override; + const char* get_name() const final { return "rm_stm"; } + ss::future take_snapshot(model::offset) final { co_return iobuf{}; } + protected: - ss::future<> handle_raft_snapshot() override; + ss::future<> apply_raft_snapshot(const iobuf&) final; private: void setup_metrics(); @@ -445,7 +450,7 @@ class rm_stm final : public persisted_stm<> { abort_origin get_abort_origin(const model::producer_identity&, model::tx_seq) const; - ss::future<> apply(model::record_batch) override; + ss::future<> apply(const model::record_batch&) override; void apply_fence(model::record_batch&&); void apply_prepare(rm_stm::prepare_marker); ss::future<> @@ -465,6 +470,11 @@ class rm_stm final : public persisted_stm<> { return std::nullopt; } + /** + * Return when the committed offset has been established when STM starts. + */ + ss::future bootstrap_committed_offset(); + struct seq_entry_wrapper { seq_entry entry; model::term_id term; @@ -883,6 +893,7 @@ class rm_stm final : public persisted_stm<> { mutex _clean_old_pids_mtx; ssx::metrics::metric_groups _metrics = ssx::metrics::metric_groups::make_internal(); + ss::abort_source _as; }; struct fence_batch_data { diff --git a/src/v/cluster/tests/eviction_stm_test.cc b/src/v/cluster/tests/eviction_stm_test.cc index 749c591bf5e8d..8cb3b3805282b 100644 --- a/src/v/cluster/tests/eviction_stm_test.cc +++ b/src/v/cluster/tests/eviction_stm_test.cc @@ -20,11 +20,8 @@ ss::logger logger("eviction_stm_test"); class test_log_eviction_stm : public cluster::log_eviction_stm { public: test_log_eviction_stm( - raft::consensus* c, - ss::logger& logger, - ss::abort_source& as, - storage::kvstore& kvs) - : cluster::log_eviction_stm(c, logger, as, kvs) {} + raft::consensus* c, ss::logger& logger, storage::kvstore& kvs) + : cluster::log_eviction_stm(c, logger, kvs) {} /** * The two following methods can be used to drive the eviction stms @@ -80,7 +77,6 @@ FIXTURE_TEST(test_eviction_stm_deadlock, raft_test_fixture) { test_log_eviction_stm eviction_stm( leader_raft.get(), logger, - as, gr.get_member(leader_id).storage.local().kvs()); eviction_stm.start().get(); auto cleanup = ss::defer([&] { diff --git a/src/v/cluster/tests/id_allocator_stm_test.cc b/src/v/cluster/tests/id_allocator_stm_test.cc index 97eb066406562..6022fb20a037a 100644 --- a/src/v/cluster/tests/id_allocator_stm_test.cc +++ b/src/v/cluster/tests/id_allocator_stm_test.cc @@ -37,24 +37,32 @@ using namespace std::chrono_literals; ss::logger idstmlog{"idstm-test"}; -FIXTURE_TEST(stm_monotonicity_test, simple_raft_fixture) { - start_raft(); +struct id_allocator_stm_fixture : simple_raft_fixture { + void create_stm_and_start_raft() { + cfg.id_allocator_batch_size.set_value(int16_t(1)); + cfg.id_allocator_log_capacity.set_value(int16_t(2)); + create_raft(); + raft::state_machine_manager_builder stm_m_builder; + + _stm = stm_m_builder.create_stm( + idstmlog, _raft.get(), cfg); + + _raft->start(std::move(stm_m_builder)).get(); + _started = true; + } + ss::shared_ptr _stm; config::configuration cfg; - cfg.id_allocator_batch_size.set_value(int16_t(1)); - cfg.id_allocator_log_capacity.set_value(int16_t(2)); - - cluster::id_allocator_stm stm(idstmlog, _raft.get(), cfg); - - stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); +}; +FIXTURE_TEST(stm_monotonicity_test, id_allocator_stm_fixture) { + create_stm_and_start_raft(); wait_for_confirmed_leader(); int64_t last_id = -1; for (int i = 0; i < 5; i++) { - auto result = stm.allocate_id(1s).get0(); + auto result = _stm->allocate_id(1s).get0(); BOOST_REQUIRE_EQUAL(raft::errc::success, result.raft_status); BOOST_REQUIRE_LT(last_id, result.id); @@ -63,38 +71,30 @@ FIXTURE_TEST(stm_monotonicity_test, simple_raft_fixture) { } } -FIXTURE_TEST(stm_restart_test, simple_raft_fixture) { - start_raft(); - - config::configuration cfg; - cfg.id_allocator_batch_size.set_value(int16_t(1)); - cfg.id_allocator_log_capacity.set_value(int16_t(2)); - - cluster::id_allocator_stm stm1(idstmlog, _raft.get(), cfg); - stm1.start().get0(); +FIXTURE_TEST(stm_restart_test, id_allocator_stm_fixture) { + create_stm_and_start_raft(); wait_for_confirmed_leader(); int64_t last_id = -1; for (int i = 0; i < 5; i++) { - auto result = stm1.allocate_id(1s).get0(); + auto result = _stm->allocate_id(1s).get0(); BOOST_REQUIRE_EQUAL(raft::errc::success, result.raft_status); BOOST_REQUIRE_LT(last_id, result.id); last_id = result.id; } - stm1.stop().get0(); + stop_all(); + create_stm_and_start_raft(); + wait_for_confirmed_leader(); - cluster::id_allocator_stm stm2(idstmlog, _raft.get(), cfg); - stm2.start().get0(); for (int i = 0; i < 5; i++) { - auto result = stm2.allocate_id(1s).get0(); + auto result = _stm->allocate_id(1s).get0(); BOOST_REQUIRE_EQUAL(raft::errc::success, result.raft_status); BOOST_REQUIRE_LT(last_id, result.id); last_id = result.id; } - stm2.stop().get0(); } diff --git a/src/v/cluster/tests/idempotency_tests.cc b/src/v/cluster/tests/idempotency_tests.cc index a7fc77cc9ad9e..a16f3a8f651e0 100644 --- a/src/v/cluster/tests/idempotency_tests.cc +++ b/src/v/cluster/tests/idempotency_tests.cc @@ -9,6 +9,7 @@ #include "cluster/errc.h" #include "cluster/rm_stm.h" +#include "cluster/tests/rm_stm_test_fixture.h" #include "finjector/hbadger.h" #include "model/fundamental.h" #include "model/metadata.h" @@ -17,7 +18,6 @@ #include "model/timestamp.h" #include "raft/consensus_utils.h" #include "raft/tests/raft_group_fixture.h" -#include "raft/tests/simple_raft_fixture.h" #include "raft/types.h" #include "random/generators.h" #include "storage/record_batch_builder.h" @@ -28,38 +28,14 @@ #include -static ss::logger logger{"append-test"}; - -static config::binding get_config_bound() { - static config::config_store store; - static config::bounded_property max_saved_pids_count( - store, - "max_saved_pids_count", - "Max pids count inside rm_stm states", - {.needs_restart = config::needs_restart::no, - .visibility = config::visibility::user}, - std::numeric_limits::max(), - {.min = 1}); - - return max_saved_pids_count.bind(); -} - FIXTURE_TEST( test_rm_stm_doesnt_interfere_with_out_of_session_messages, - simple_raft_fixture) { - start_raft(); - - ss::sharded tx_gateway_frontend; - cluster::rm_stm stm( - logger, - _raft.get(), - tx_gateway_frontend, - _feature_table, - get_config_bound()); + rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; stm.testing_only_disable_auto_abort(); stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); wait_for_confirmed_leader(); wait_for_meta_initialized(); @@ -103,20 +79,12 @@ FIXTURE_TEST( } FIXTURE_TEST( - test_rm_stm_passes_monotonic_in_session_messages, simple_raft_fixture) { - start_raft(); - - ss::sharded tx_gateway_frontend; - cluster::rm_stm stm( - logger, - _raft.get(), - tx_gateway_frontend, - _feature_table, - get_config_bound()); + test_rm_stm_passes_monotonic_in_session_messages, rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; stm.testing_only_disable_auto_abort(); stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); wait_for_confirmed_leader(); wait_for_meta_initialized(); @@ -161,20 +129,12 @@ FIXTURE_TEST( BOOST_REQUIRE(r1.value().last_offset < r2.value().last_offset); } -FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, simple_raft_fixture) { - start_raft(); - - ss::sharded tx_gateway_frontend; - cluster::rm_stm stm( - logger, - _raft.get(), - tx_gateway_frontend, - _feature_table, - get_config_bound()); +FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; stm.testing_only_disable_auto_abort(); stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); wait_for_confirmed_leader(); wait_for_meta_initialized(); @@ -231,20 +191,12 @@ FIXTURE_TEST(test_rm_stm_caches_last_5_offsets, simple_raft_fixture) { } } -FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, simple_raft_fixture) { - start_raft(); - - ss::sharded tx_gateway_frontend; - cluster::rm_stm stm( - logger, - _raft.get(), - tx_gateway_frontend, - _feature_table, - get_config_bound()); +FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; stm.testing_only_disable_auto_abort(); stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); wait_for_confirmed_leader(); wait_for_meta_initialized(); @@ -296,20 +248,12 @@ FIXTURE_TEST(test_rm_stm_doesnt_cache_6th_offset, simple_raft_fixture) { } } -FIXTURE_TEST(test_rm_stm_prevents_gaps, simple_raft_fixture) { - start_raft(); - - ss::sharded tx_gateway_frontend; - cluster::rm_stm stm( - logger, - _raft.get(), - tx_gateway_frontend, - _feature_table, - get_config_bound()); +FIXTURE_TEST(test_rm_stm_prevents_gaps, rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; stm.testing_only_disable_auto_abort(); stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); wait_for_confirmed_leader(); wait_for_meta_initialized(); @@ -353,20 +297,12 @@ FIXTURE_TEST(test_rm_stm_prevents_gaps, simple_raft_fixture) { r2 == failure_type(cluster::errc::sequence_out_of_order)); } -FIXTURE_TEST(test_rm_stm_prevents_odd_session_start_off, simple_raft_fixture) { - start_raft(); - - ss::sharded tx_gateway_frontend; - cluster::rm_stm stm( - logger, - _raft.get(), - tx_gateway_frontend, - _feature_table, - get_config_bound()); +FIXTURE_TEST(test_rm_stm_prevents_odd_session_start_off, rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; stm.testing_only_disable_auto_abort(); stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); wait_for_confirmed_leader(); wait_for_meta_initialized(); @@ -394,20 +330,12 @@ FIXTURE_TEST(test_rm_stm_prevents_odd_session_start_off, simple_raft_fixture) { r == failure_type(cluster::errc::sequence_out_of_order)); } -FIXTURE_TEST(test_rm_stm_passes_immediate_retry, simple_raft_fixture) { - start_raft(); - - ss::sharded tx_gateway_frontend; - cluster::rm_stm stm( - logger, - _raft.get(), - tx_gateway_frontend, - _feature_table, - get_config_bound()); +FIXTURE_TEST(test_rm_stm_passes_immediate_retry, rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; stm.testing_only_disable_auto_abort(); stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); wait_for_confirmed_leader(); wait_for_meta_initialized(); diff --git a/src/v/cluster/tests/manual_log_deletion_test.cc b/src/v/cluster/tests/manual_log_deletion_test.cc index af1f4a5bad62d..117a0adb95484 100644 --- a/src/v/cluster/tests/manual_log_deletion_test.cc +++ b/src/v/cluster/tests/manual_log_deletion_test.cc @@ -60,7 +60,7 @@ struct manual_deletion_fixture : public raft_test_fixture { if (member.log->config().is_collectable()) { auto& kvstore = member.storage.local().kvs(); auto eviction_stm = std::make_unique( - member.consensus.get(), tstlog, member._as, kvstore); + member.consensus.get(), tstlog, kvstore); eviction_stm->start().get0(); eviction_stms.emplace(id, std::move(eviction_stm)); member.kill_eviction_stm_cb diff --git a/src/v/cluster/tests/rm_stm_test_fixture.h b/src/v/cluster/tests/rm_stm_test_fixture.h new file mode 100644 index 0000000000000..487d3c5dea350 --- /dev/null +++ b/src/v/cluster/tests/rm_stm_test_fixture.h @@ -0,0 +1,36 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#pragma once +#include "cluster/rm_stm.h" +#include "config/property.h" +#include "raft/tests/simple_raft_fixture.h" + +static ss::logger logger{"rm_stm-test"}; + +struct rm_stm_test_fixture : simple_raft_fixture { + void create_stm_and_start_raft( + storage::ntp_config::default_overrides overrides = {}) { + create_raft(overrides); + raft::state_machine_manager_builder stm_m_builder; + + _stm = stm_m_builder.create_stm( + logger, + _raft.get(), + tx_gateway_frontend, + _feature_table, + config::mock_binding(std::numeric_limits::max())); + + _raft->start(std::move(stm_m_builder)).get(); + _started = true; + } + + ss::sharded tx_gateway_frontend; + ss::shared_ptr _stm; +}; diff --git a/src/v/cluster/tests/rm_stm_tests.cc b/src/v/cluster/tests/rm_stm_tests.cc index 47de2129cbeb2..9ff298e4fd095 100644 --- a/src/v/cluster/tests/rm_stm_tests.cc +++ b/src/v/cluster/tests/rm_stm_tests.cc @@ -10,6 +10,7 @@ #include "cluster/errc.h" #include "cluster/rm_stm.h" #include "cluster/tests/randoms.h" +#include "cluster/tests/rm_stm_test_fixture.h" #include "cluster/tx_snapshot_adl_utils.h" #include "finjector/hbadger.h" #include "model/fundamental.h" @@ -20,7 +21,6 @@ #include "model/timestamp.h" #include "raft/consensus_utils.h" #include "raft/tests/raft_group_fixture.h" -#include "raft/tests/simple_raft_fixture.h" #include "raft/types.h" #include "random/generators.h" #include "storage/record_batch_builder.h" @@ -38,27 +38,11 @@ using namespace std::chrono_literals; static const failure_type invalid_producer_epoch(cluster::errc::invalid_producer_epoch); -static ss::logger logger{"rm_stm-test"}; - struct rich_reader { model::batch_identity id; model::record_batch_reader reader; }; -static config::binding get_config_bound() { - static config::config_store store; - static config::bounded_property max_saved_pids_count( - store, - "max_saved_pids_count", - "Max pids count inside rm_stm states", - {.needs_restart = config::needs_restart::no, - .visibility = config::visibility::user}, - std::numeric_limits::max(), - {.min = 1}); - - return max_saved_pids_count.bind(); -} - static rich_reader make_rreader( model::producer_identity pid, int first_seq, @@ -109,20 +93,11 @@ void check_snapshot_sizes(cluster::rm_stm& stm, raft::consensus* c) { // tests: // - a simple tx execution succeeds // - last_stable_offset doesn't advance past an ongoing transaction -FIXTURE_TEST(test_tx_happy_tx, simple_raft_fixture) { - start_raft(); - - ss::sharded tx_gateway_frontend; - cluster::rm_stm stm( - logger, - _raft.get(), - tx_gateway_frontend, - _feature_table, - get_config_bound()); +FIXTURE_TEST(test_tx_happy_tx, rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; stm.testing_only_disable_auto_abort(); - stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); auto tx_seq = model::tx_seq(0); wait_for_confirmed_leader(); @@ -190,20 +165,12 @@ FIXTURE_TEST(test_tx_happy_tx, simple_raft_fixture) { // tests: // - a simple tx aborting before prepare succeeds // - an aborted tx is reflected in aborted_transactions -FIXTURE_TEST(test_tx_aborted_tx_1, simple_raft_fixture) { - start_raft(); - - ss::sharded tx_gateway_frontend; - cluster::rm_stm stm( - logger, - _raft.get(), - tx_gateway_frontend, - _feature_table, - get_config_bound()); +FIXTURE_TEST(test_tx_aborted_tx_1, rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; stm.testing_only_disable_auto_abort(); stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); auto tx_seq = model::tx_seq(0); wait_for_confirmed_leader(); @@ -280,20 +247,13 @@ FIXTURE_TEST(test_tx_aborted_tx_1, simple_raft_fixture) { // tests: // - a simple tx aborting after prepare succeeds // - an aborted tx is reflected in aborted_transactions -FIXTURE_TEST(test_tx_aborted_tx_2, simple_raft_fixture) { - start_raft(); - - ss::sharded tx_gateway_frontend; - cluster::rm_stm stm( - logger, - _raft.get(), - tx_gateway_frontend, - _feature_table, - get_config_bound()); +FIXTURE_TEST(test_tx_aborted_tx_2, rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; stm.testing_only_disable_auto_abort(); stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); + auto tx_seq = model::tx_seq(0); wait_for_confirmed_leader(); @@ -369,20 +329,12 @@ FIXTURE_TEST(test_tx_aborted_tx_2, simple_raft_fixture) { } // transactional writes of an unknown tx are rejected -FIXTURE_TEST(test_tx_unknown_produce, simple_raft_fixture) { - start_raft(); - - ss::sharded tx_gateway_frontend; - cluster::rm_stm stm( - logger, - _raft.get(), - tx_gateway_frontend, - _feature_table, - get_config_bound()); +FIXTURE_TEST(test_tx_unknown_produce, rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; stm.testing_only_disable_auto_abort(); stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); wait_for_confirmed_leader(); wait_for_meta_initialized(); @@ -410,20 +362,13 @@ FIXTURE_TEST(test_tx_unknown_produce, simple_raft_fixture) { } // begin fences off old transactions -FIXTURE_TEST(test_tx_begin_fences_produce, simple_raft_fixture) { - start_raft(); - - ss::sharded tx_gateway_frontend; - cluster::rm_stm stm( - logger, - _raft.get(), - tx_gateway_frontend, - _feature_table, - get_config_bound()); +FIXTURE_TEST(test_tx_begin_fences_produce, rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; stm.testing_only_disable_auto_abort(); stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); + auto tx_seq = model::tx_seq(0); wait_for_confirmed_leader(); @@ -475,20 +420,13 @@ FIXTURE_TEST(test_tx_begin_fences_produce, simple_raft_fixture) { } // transactional writes of an aborted tx are rejected -FIXTURE_TEST(test_tx_post_aborted_produce, simple_raft_fixture) { - start_raft(); - - ss::sharded tx_gateway_frontend; - cluster::rm_stm stm( - logger, - _raft.get(), - tx_gateway_frontend, - _feature_table, - get_config_bound()); +FIXTURE_TEST(test_tx_post_aborted_produce, rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; stm.testing_only_disable_auto_abort(); stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); + auto tx_seq = model::tx_seq(0); wait_for_confirmed_leader(); @@ -544,21 +482,13 @@ FIXTURE_TEST(test_tx_post_aborted_produce, simple_raft_fixture) { // transactions. Multiple subsystems that interact with transactions rely on // aborted transactions for correctness. These serve as regression tests so that // we do not break the semantics. -FIXTURE_TEST(test_aborted_transactions, simple_raft_fixture) { - start_raft(); - - ss::sharded tx_gateway_frontend; - cluster::rm_stm stm( - logger, - _raft.get(), - tx_gateway_frontend, - _feature_table, - get_config_bound()); +FIXTURE_TEST(test_aborted_transactions, rm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; stm.testing_only_disable_auto_abort(); stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); wait_for_confirmed_leader(); wait_for_meta_initialized(); diff --git a/src/v/cluster/tests/tm_stm_tests.cc b/src/v/cluster/tests/tm_stm_tests.cc index f6f4faf03d7b2..a9c706c736c0e 100644 --- a/src/v/cluster/tests/tm_stm_tests.cc +++ b/src/v/cluster/tests/tm_stm_tests.cc @@ -48,20 +48,28 @@ static tm_transaction expect_tx(checked maybe_tx) { BOOST_REQUIRE(maybe_tx.has_value()); return maybe_tx.value(); } +struct tm_stm_test_fixture : simple_raft_fixture { + void create_stm_and_start_raft() { + create_raft(); + raft::state_machine_manager_builder stm_m_builder; + + _stm = stm_m_builder.create_stm( + tm_logger, + _raft.get(), + std::ref(_feature_table), + std::ref(tm_cache.cache)); + + _raft->start(std::move(stm_m_builder)).get(); + _started = true; + } -FIXTURE_TEST(test_tm_stm_new_tx, simple_raft_fixture) { - start_raft(); + ss::shared_ptr _stm; tm_cache_struct tm_cache; +}; - cluster::tm_stm stm( - tm_logger, - _raft.get(), - std::ref(_feature_table), - std::ref(tm_cache.cache)); - auto c = _raft.get(); - - stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); +FIXTURE_TEST(test_tm_stm_new_tx, tm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; wait_for_confirmed_leader(); wait_for_meta_initialized(); @@ -71,7 +79,7 @@ FIXTURE_TEST(test_tm_stm_new_tx, simple_raft_fixture) { auto op_code = stm .register_new_producer( - c->term(), tx_id, std::chrono::milliseconds(0), pid) + _raft->term(), tx_id, std::chrono::milliseconds(0), pid) .get0(); BOOST_REQUIRE_EQUAL(op_code, op_status::success); auto tx1 = expect_tx(stm.get_tx(tx_id).get0()); @@ -79,14 +87,14 @@ FIXTURE_TEST(test_tm_stm_new_tx, simple_raft_fixture) { BOOST_REQUIRE_EQUAL(tx1.pid, pid); BOOST_REQUIRE_EQUAL(tx1.status, tx_status::ready); BOOST_REQUIRE_EQUAL(tx1.partitions.size(), 0); - expect_tx(stm.mark_tx_ongoing(c->term(), tx_id).get0()); + expect_tx(stm.mark_tx_ongoing(_raft->term(), tx_id).get0()); std::vector partitions = { tm_transaction::tx_partition{ .ntp = model::ntp("kafka", "topic", 0), .etag = model::term_id(0)}, tm_transaction::tx_partition{ .ntp = model::ntp("kafka", "topic", 1), .etag = model::term_id(0)}}; BOOST_REQUIRE_EQUAL( - stm.add_partitions(c->term(), tx_id, partitions).get0(), + stm.add_partitions(_raft->term(), tx_id, partitions).get0(), cluster::tm_stm::op_status::success); BOOST_REQUIRE_EQUAL(tx1.partitions.size(), 0); auto tx2 = expect_tx(stm.get_tx(tx_id).get0()); @@ -95,13 +103,13 @@ FIXTURE_TEST(test_tm_stm_new_tx, simple_raft_fixture) { BOOST_REQUIRE_EQUAL(tx2.status, tx_status::ongoing); BOOST_REQUIRE_GT(tx2.tx_seq, tx1.tx_seq); BOOST_REQUIRE_EQUAL(tx2.partitions.size(), 2); - auto tx4 = expect_tx(stm.mark_tx_prepared(c->term(), tx_id).get()); + auto tx4 = expect_tx(stm.mark_tx_prepared(_raft->term(), tx_id).get()); BOOST_REQUIRE_EQUAL(tx4.id, tx_id); BOOST_REQUIRE_EQUAL(tx4.pid, pid); BOOST_REQUIRE_EQUAL(tx4.status, tx_status::prepared); BOOST_REQUIRE_EQUAL(tx4.tx_seq, tx2.tx_seq); BOOST_REQUIRE_EQUAL(tx4.partitions.size(), 2); - auto tx5 = expect_tx(stm.mark_tx_ongoing(c->term(), tx_id).get0()); + auto tx5 = expect_tx(stm.mark_tx_ongoing(_raft->term(), tx_id).get0()); BOOST_REQUIRE_EQUAL(tx5.id, tx_id); BOOST_REQUIRE_EQUAL(tx5.pid, pid); BOOST_REQUIRE_EQUAL(tx5.status, tx_status::ongoing); @@ -109,19 +117,9 @@ FIXTURE_TEST(test_tm_stm_new_tx, simple_raft_fixture) { BOOST_REQUIRE_EQUAL(tx5.partitions.size(), 0); } -FIXTURE_TEST(test_tm_stm_seq_tx, simple_raft_fixture) { - start_raft(); - tm_cache_struct tm_cache; - - cluster::tm_stm stm( - tm_logger, - _raft.get(), - std::ref(_feature_table), - std::ref(tm_cache.cache)); - auto c = _raft.get(); - - stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); +FIXTURE_TEST(test_tm_stm_seq_tx, tm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; wait_for_confirmed_leader(); wait_for_meta_initialized(); @@ -131,22 +129,22 @@ FIXTURE_TEST(test_tm_stm_seq_tx, simple_raft_fixture) { auto op_code = stm .register_new_producer( - c->term(), tx_id, std::chrono::milliseconds(0), pid) + _raft->term(), tx_id, std::chrono::milliseconds(0), pid) .get0(); BOOST_REQUIRE_EQUAL(op_code, op_status::success); auto tx1 = expect_tx(stm.get_tx(tx_id).get0()); - auto tx2 = stm.mark_tx_ongoing(c->term(), tx_id).get0(); + auto tx2 = stm.mark_tx_ongoing(_raft->term(), tx_id).get0(); std::vector partitions = { tm_transaction::tx_partition{ .ntp = model::ntp("kafka", "topic", 0), .etag = model::term_id(0)}, tm_transaction::tx_partition{ .ntp = model::ntp("kafka", "topic", 1), .etag = model::term_id(0)}}; BOOST_REQUIRE_EQUAL( - stm.add_partitions(c->term(), tx_id, partitions).get0(), + stm.add_partitions(_raft->term(), tx_id, partitions).get0(), cluster::tm_stm::op_status::success); auto tx3 = expect_tx(stm.get_tx(tx_id).get0()); - auto tx5 = expect_tx(stm.mark_tx_prepared(c->term(), tx_id).get()); - auto tx6 = expect_tx(stm.mark_tx_ongoing(c->term(), tx_id).get0()); + auto tx5 = expect_tx(stm.mark_tx_prepared(_raft->term(), tx_id).get()); + auto tx6 = expect_tx(stm.mark_tx_ongoing(_raft->term(), tx_id).get0()); BOOST_REQUIRE_EQUAL(tx6.id, tx_id); BOOST_REQUIRE_EQUAL(tx6.pid, pid); BOOST_REQUIRE_EQUAL(tx6.status, tx_status::ongoing); @@ -154,19 +152,9 @@ FIXTURE_TEST(test_tm_stm_seq_tx, simple_raft_fixture) { BOOST_REQUIRE_NE(tx6.tx_seq, tx1.tx_seq); } -FIXTURE_TEST(test_tm_stm_re_tx, simple_raft_fixture) { - start_raft(); - tm_cache_struct tm_cache; - - cluster::tm_stm stm( - tm_logger, - _raft.get(), - std::ref(_feature_table), - std::ref(tm_cache.cache)); - auto c = _raft.get(); - - stm.start().get0(); - auto stop = ss::defer([&stm] { stm.stop().get0(); }); +FIXTURE_TEST(test_tm_stm_re_tx, tm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; wait_for_confirmed_leader(); wait_for_meta_initialized(); @@ -176,7 +164,7 @@ FIXTURE_TEST(test_tm_stm_re_tx, simple_raft_fixture) { auto op_code = stm .register_new_producer( - c->term(), tx_id, std::chrono::milliseconds(0), pid1) + _raft->term(), tx_id, std::chrono::milliseconds(0), pid1) .get0(); BOOST_REQUIRE(op_code == op_status::success); auto tx1 = expect_tx(stm.get_tx(tx_id).get0()); @@ -185,21 +173,24 @@ FIXTURE_TEST(test_tm_stm_re_tx, simple_raft_fixture) { .ntp = model::ntp("kafka", "topic", 0), .etag = model::term_id(0)}, tm_transaction::tx_partition{ .ntp = model::ntp("kafka", "topic", 1), .etag = model::term_id(0)}}; - auto tx2 = stm.mark_tx_ongoing(c->term(), tx_id).get0(); + auto tx2 = stm.mark_tx_ongoing(_raft->term(), tx_id).get0(); BOOST_REQUIRE_EQUAL( - stm.add_partitions(c->term(), tx_id, partitions).get0(), + stm.add_partitions(_raft->term(), tx_id, partitions).get0(), cluster::tm_stm::op_status::success); auto tx3 = expect_tx(stm.get_tx(tx_id).get0()); - auto tx5 = expect_tx(stm.mark_tx_prepared(c->term(), tx_id).get()); - auto tx6 = expect_tx(stm.mark_tx_ongoing(c->term(), tx_id).get0()); + auto tx5 = expect_tx(stm.mark_tx_prepared(_raft->term(), tx_id).get()); + auto tx6 = expect_tx(stm.mark_tx_ongoing(_raft->term(), tx_id).get0()); auto pid2 = model::producer_identity{1, 1}; auto expected_pid = model::producer_identity(3, 5); - op_code - = stm - .re_register_producer( - c->term(), tx_id, std::chrono::milliseconds(0), pid2, expected_pid) - .get0(); + op_code = stm + .re_register_producer( + _raft->term(), + tx_id, + std::chrono::milliseconds(0), + pid2, + expected_pid) + .get0(); BOOST_REQUIRE_EQUAL(op_code, op_status::success); auto tx7 = expect_tx(stm.get_tx(tx_id).get0()); BOOST_REQUIRE_EQUAL(tx7.id, tx_id); @@ -285,17 +276,9 @@ void test_tm_hosts_tx_include_exclude_saved_in_snapshot( } } -FIXTURE_TEST(test_tm_stm_hosted_hash_1_partition, simple_raft_fixture) { - start_raft(); - tm_cache_struct tm_cache; - - cluster::tm_stm stm( - tm_logger, - _raft.get(), - std::ref(_feature_table), - std::ref(tm_cache.cache)); - auto c = _raft.get(); - stm.start().get0(); +FIXTURE_TEST(test_tm_stm_hosted_hash_1_partition, tm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; wait_for_confirmed_leader(); wait_for_meta_initialized(); @@ -304,10 +287,10 @@ FIXTURE_TEST(test_tm_stm_hosted_hash_1_partition, simple_raft_fixture) { BOOST_ASSERT(!stm.hosts(tx_id)); cluster::tm_stm::op_status init_hash_res - = stm.try_init_hosted_transactions(c->term(), 1).get0(); + = stm.try_init_hosted_transactions(_raft->term(), 1).get0(); BOOST_REQUIRE_EQUAL(init_hash_res, cluster::tm_stm::op_status::success); test_tm_hosts_tx(stm, 1); - test_tm_hosts_tx_include_exclude(stm, 1, c); + test_tm_hosts_tx_include_exclude(stm, 1, _raft.get()); for (size_t i = 0; i < 10; ++i) { try { @@ -319,38 +302,22 @@ FIXTURE_TEST(test_tm_stm_hosted_hash_1_partition, simple_raft_fixture) { } } } - stm.stop().get0(); + auto old_stm = _stm; stop_all(); + tm_cache = tm_cache_struct{}; + create_stm_and_start_raft(); + auto& new_stm = *_stm; - // Test load from snapshot - start_raft(); - tm_cache_struct tm_cache_new; - cluster::tm_stm new_stm( - tm_logger, - _raft.get(), - std::ref(_feature_table), - std::ref(tm_cache_new.cache)); - new_stm.start().get0(); - auto stop = ss::defer([&new_stm] { new_stm.stop().get0(); }); - c = _raft.get(); wait_for_confirmed_leader(); wait_for_meta_initialized(); - test_tm_hosts_tx(stm, 1); - test_tm_hosts_tx_include_exclude_saved_in_snapshot(new_stm, 1, c); + test_tm_hosts_tx(*old_stm, 1); + test_tm_hosts_tx_include_exclude_saved_in_snapshot(new_stm, 1, _raft.get()); } -FIXTURE_TEST(test_tm_stm_hosted_hash_16_partition, simple_raft_fixture) { - start_raft(); - tm_cache_struct tm_cache; - - cluster::tm_stm stm( - tm_logger, - _raft.get(), - std::ref(_feature_table), - std::ref(tm_cache.cache)); - auto c = _raft.get(); - stm.start().get0(); +FIXTURE_TEST(test_tm_stm_hosted_hash_16_partition, tm_stm_test_fixture) { + create_stm_and_start_raft(); + auto& stm = *_stm; wait_for_confirmed_leader(); wait_for_meta_initialized(); @@ -359,10 +326,10 @@ FIXTURE_TEST(test_tm_stm_hosted_hash_16_partition, simple_raft_fixture) { BOOST_ASSERT(!stm.hosts(tx_id)); cluster::tm_stm::op_status init_hash_res - = stm.try_init_hosted_transactions(c->term(), 16).get0(); + = stm.try_init_hosted_transactions(_raft->term(), 16).get0(); BOOST_REQUIRE_EQUAL(init_hash_res, cluster::tm_stm::op_status::success); test_tm_hosts_tx(stm, 16); - test_tm_hosts_tx_include_exclude(stm, 16, c); + test_tm_hosts_tx_include_exclude(stm, 16, _raft.get()); for (size_t i = 0; i < 10; ++i) { try { @@ -374,23 +341,15 @@ FIXTURE_TEST(test_tm_stm_hosted_hash_16_partition, simple_raft_fixture) { } } } - stm.stop().get0(); + auto old_stm = _stm; stop_all(); + tm_cache = tm_cache_struct{}; + create_stm_and_start_raft(); + auto& new_stm = *_stm; - // Test load from snapshot - start_raft(); - tm_cache_struct tm_cache_new; - cluster::tm_stm new_stm( - tm_logger, - _raft.get(), - std::ref(_feature_table), - std::ref(tm_cache_new.cache)); - new_stm.start().get0(); - auto stop = ss::defer([&new_stm] { new_stm.stop().get0(); }); - c = _raft.get(); wait_for_confirmed_leader(); wait_for_meta_initialized(); - test_tm_hosts_tx(stm, 16); - test_tm_hosts_tx_include_exclude_saved_in_snapshot(new_stm, 16, c); + test_tm_hosts_tx_include_exclude_saved_in_snapshot( + new_stm, 16, _raft.get()); } diff --git a/src/v/cluster/tests/tx_compaction_tests.cc b/src/v/cluster/tests/tx_compaction_tests.cc index c69410a65b9ee..abc7fb85b58d7 100644 --- a/src/v/cluster/tests/tx_compaction_tests.cc +++ b/src/v/cluster/tests/tx_compaction_tests.cc @@ -1,7 +1,7 @@ #include "cluster/rm_stm.h" +#include "cluster/tests/rm_stm_test_fixture.h" #include "config/config_store.h" #include "raft/tests/raft_group_fixture.h" -#include "raft/tests/simple_raft_fixture.h" #include "storage/tests/utils/disk_log_builder.h" #include "tx_compaction_utils.h" @@ -14,29 +14,14 @@ using cluster::random_tx_generator; #define STM_BOOTSTRAP() \ storage::ntp_config::default_overrides o; \ o.cleanup_policy_bitflags = model::cleanup_policy_bitflags::compaction; \ - start_raft(o); \ - ss::sharded tx_gateway_frontend; \ - config::config_store store; \ - config::bounded_property max_saved_pids_count( \ - store, \ - "max_saved_pids_count", \ - "Max pids count inside rm_stm states", \ - {.needs_restart = config::needs_restart::no, \ - .visibility = config::visibility::user}, \ - std::numeric_limits::max(), \ - {.min = 1}); \ - auto stm = ss::make_shared( \ - test_logger, \ - _raft.get(), \ - tx_gateway_frontend, \ - _feature_table, \ - max_saved_pids_count.bind()); \ + \ + create_stm_and_start_raft(o); \ + auto stm = _stm; \ stm->testing_only_disable_auto_abort(); \ - stm->start().get0(); \ auto stop = ss::defer([&] { \ _data_dir = "test_dir_" + random_generators::gen_alphanum_string(6); \ - stm->stop().get0(); \ stop_all(); \ + _stm = nullptr; \ }); \ wait_for_confirmed_leader(); \ wait_for_meta_initialized(); \ @@ -44,7 +29,7 @@ using cluster::random_tx_generator; log->stm_manager()->add_stm(stm); \ BOOST_REQUIRE(log); -FIXTURE_TEST(test_tx_compaction_combinations, simple_raft_fixture) { +FIXTURE_TEST(test_tx_compaction_combinations, rm_stm_test_fixture) { // This generates very interesting interleaved and non interleaved // transaction scopes with single and multi segment transactions. We // Validate that the resulting output segment file has all the aborted diff --git a/src/v/cluster/tm_stm.cc b/src/v/cluster/tm_stm.cc index 9307e3a66b276..c1e64c38d96e9 100644 --- a/src/v/cluster/tm_stm.cc +++ b/src/v/cluster/tm_stm.cc @@ -253,6 +253,31 @@ ss::future> tm_stm::do_barrier() { }); } +model::record_batch_reader make_checkpoint() { + storage::record_batch_builder builder( + model::record_batch_type::checkpoint, model::offset(0)); + builder.add_raw_kv(iobuf(), iobuf()); + return model::make_memory_record_batch_reader(std::move(builder).build()); +} + +ss::future> +tm_stm::quorum_write_empty_batch(model::timeout_clock::time_point timeout) { + using ret_t = result; + // replicate checkpoint batch + return _raft + ->replicate( + make_checkpoint(), + raft::replicate_options(raft::consistency_level::quorum_ack)) + .then([this, timeout](ret_t r) { + if (!r) { + return ss::make_ready_future(r); + } + return wait(r.value().last_offset, timeout).then([r]() mutable { + return r; + }); + }); +} + ss::future<> tm_stm::checkpoint_ongoing_txs() { if (!use_new_tx_version()) { co_return; @@ -999,16 +1024,16 @@ ss::future<> tm_stm::apply_hosted_transactions(model::record_batch b) { return ss::now(); } -ss::future<> tm_stm::apply(model::record_batch b) { +ss::future<> tm_stm::apply(const model::record_batch& b) { const auto& hdr = b.header(); _insync_offset = b.last_offset(); if (hdr.type == model::record_batch_type::tm_update) { - return apply_tm_update(std::move(hdr), std::move(b)); + return apply_tm_update(hdr, b.copy()); } if (hdr.type == model::record_batch_type::tx_tm_hosted_trasactions) { - return apply_hosted_transactions(std::move(b)); + return apply_hosted_transactions(b.copy()); } return ss::now(); @@ -1100,7 +1125,7 @@ tm_stm::expire_tx(model::term_id term, kafka::transactional_id tx_id) { co_return r0.error(); } -ss::future<> tm_stm::handle_raft_snapshot() { +ss::future<> tm_stm::apply_raft_snapshot(const iobuf&) { return _cache->write_lock().then( [this]([[maybe_unused]] ss::basic_rwlock<>::holder unit) { _cache->clear_log(); diff --git a/src/v/cluster/tm_stm.h b/src/v/cluster/tm_stm.h index 891d6561403ce..03638581eb573 100644 --- a/src/v/cluster/tm_stm.h +++ b/src/v/cluster/tm_stm.h @@ -11,6 +11,7 @@ #pragma once +#include "bytes/iobuf.h" #include "cluster/persisted_stm.h" #include "cluster/tm_stm_cache.h" #include "cluster/tm_tx_hash_ranges.h" @@ -202,8 +203,11 @@ class tm_stm final : public persisted_stm<> { return _raft->ntp().tp.partition; } + const char* get_name() const final { return "tm_stm"; } + ss::future take_snapshot(model::offset) final { co_return iobuf{}; } + protected: - ss::future<> handle_raft_snapshot() override; + ss::future<> apply_raft_snapshot(const iobuf&) final; private: std::optional find_tx(kafka::transactional_id); @@ -220,7 +224,7 @@ class tm_stm final : public persisted_stm<> { ss::lw_shared_ptr _cache; tm_tx_hosted_transactions _hosted_txes; - ss::future<> apply(model::record_batch b) override; + ss::future<> apply(const model::record_batch& b) final; ss::future<> apply_hosted_transactions(model::record_batch b); ss::future<> apply_tm_update(model::record_batch_header hdr, model::record_batch b); @@ -240,6 +244,8 @@ class tm_stm final : public persisted_stm<> { std::chrono::milliseconds, model::producer_identity); ss::future do_take_snapshot(); + ss::future> + quorum_write_empty_batch(model::timeout_clock::time_point); ss::future> replicate_quorum_ack(model::term_id term, model::record_batch&& batch) { diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index ea07f392b4850..f39f70875ab8d 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -23,12 +23,14 @@ #include "raft/consensus_client_protocol.h" #include "raft/consensus_utils.h" #include "raft/errc.h" +#include "raft/fwd.h" #include "raft/group_configuration.h" #include "raft/logger.h" #include "raft/prevote_stm.h" #include "raft/recovery_stm.h" #include "raft/replicate_entries_stm.h" #include "raft/rpc_client_protocol.h" +#include "raft/state_machine_manager.h" #include "raft/types.h" #include "raft/vote_stm.h" #include "reflection/adl.h" @@ -266,6 +268,9 @@ ss::future<> consensus::stop() { idx.second.follower_state_change.broken(); } co_await _event_manager.stop(); + if (_stm_manager) { + co_await _stm_manager->stop(); + } co_await _append_requests_buffer.stop(); co_await _batcher.stop(); @@ -1281,7 +1286,11 @@ ss::future consensus::force_replace_configuration_locally( co_return errc::success; } -ss::future<> consensus::start() { +ss::future<> consensus::start( + std::optional stm_manager_builder) { + if (stm_manager_builder) { + _stm_manager = std::move(stm_manager_builder.value()).build(this); + } return ss::try_with_gate(_bg, [this] { return do_start(); }); } @@ -1478,6 +1487,9 @@ ss::future<> consensus::do_start() { co_await _event_manager.start(); _append_requests_buffer.start(); + if (_stm_manager) { + co_await _stm_manager->start(); + } vlog( _ctxlog.info, diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index 34bd1534099cb..5464c8d66532f 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -32,6 +32,7 @@ #include "raft/probe.h" #include "raft/recovery_memory_quota.h" #include "raft/replicate_batcher.h" +#include "raft/state_machine_manager.h" #include "raft/timeout_jitter.h" #include "seastarx.h" #include "ssx/metrics.h" @@ -109,7 +110,8 @@ class consensus { keep_snapshotted_log = keep_snapshotted_log::no); /// Initial call. Allow for internal state recovery - ss::future<> start(); + ss::future<> + start(std::optional = std::nullopt); /// Stop all communications. ss::future<> stop(); @@ -238,6 +240,8 @@ class consensus { replicate_in_stages(model::record_batch_reader&&, replicate_options); uint64_t get_snapshot_size() const { return _snapshot_size; } + std::optional& stm_manager() { return _stm_manager; } + /** * Replication happens only when expected_term matches the current _term * otherwise consensus returns not_leader. This feature is needed to keep @@ -805,6 +809,7 @@ class consensus { offset_monitor _consumable_offset_monitor; ss::condition_variable _follower_reply; append_entries_buffer _append_requests_buffer; + std::optional _stm_manager; friend std::ostream& operator<<(std::ostream&, const consensus&); }; diff --git a/src/v/raft/state_machine_base.cc b/src/v/raft/state_machine_base.cc index 46437020e06c5..de8b649183ae1 100644 --- a/src/v/raft/state_machine_base.cc +++ b/src/v/raft/state_machine_base.cc @@ -17,6 +17,11 @@ namespace raft { void state_machine_base::set_next(model::offset offset) { + vassert( + offset >= _next, + "can not move next offset backward, current: {}, requested: {}", + _next, + offset); _next = offset; _waiters.notify(model::prev_offset(offset)); } diff --git a/src/v/raft/state_machine_base.h b/src/v/raft/state_machine_base.h index a882a7338a931..c0de9526b44c3 100644 --- a/src/v/raft/state_machine_base.h +++ b/src/v/raft/state_machine_base.h @@ -74,9 +74,9 @@ class state_machine_base { virtual const char* get_name() const = 0; /** - * Returns a snapshot of an STM state + * Returns a snapshot of an STM state with last included offset */ - virtual ss::future take_snapshot() = 0; + virtual ss::future take_snapshot(model::offset) = 0; /** * Last successfully applied offset diff --git a/src/v/raft/state_machine_manager.cc b/src/v/raft/state_machine_manager.cc index 0e5156d7b6c30..ecfcb520e55fb 100644 --- a/src/v/raft/state_machine_manager.cc +++ b/src/v/raft/state_machine_manager.cc @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -44,9 +45,7 @@ class batch_applicator { const char* ctx, const std::vector& machines, ss::abort_source& as, - ctx_log& log, - std::chrono::milliseconds apply_timeout, - ss::gate& gate); + ctx_log& log); ss::future operator()(model::record_batch); @@ -66,22 +65,16 @@ class batch_applicator { model::offset _last_applied; ss::abort_source& _as; ctx_log& _log; - std::chrono::milliseconds _apply_timeout; - ss::gate& _gate; }; batch_applicator::batch_applicator( const char* ctx, const std::vector& entries, ss::abort_source& as, - ctx_log& log, - std::chrono::milliseconds apply_timeout, - ss::gate& gate) + ctx_log& log) : _ctx(ctx) , _as(as) - , _log(log) - , _apply_timeout(apply_timeout) - , _gate(gate) { + , _log(log) { for (auto& m : entries) { _machines.push_back(apply_state{.stm_entry = m}); } @@ -135,13 +128,10 @@ ss::future batch_applicator::apply_to_stm( if (state.stm_entry->stm->next() > batch.base_offset()) { co_return false; } - auto holder = _gate.hold(); - auto f = state.stm_entry->stm->apply(batch).finally( - [holder = std::move(holder)] {}); - co_await ss::with_timeout( - clock_type::now() + _apply_timeout, std::move(f)); + co_await state.stm_entry->stm->apply(batch); state.stm_entry->stm->set_next(model::next_offset(last_offset)); + co_return true; } catch (...) { vlog( _log.warn, @@ -157,30 +147,39 @@ ss::future batch_applicator::apply_to_stm( } state_machine_manager::state_machine_manager( - consensus* raft, - ctx_log logger, - config::binding apply_timeout, - std::vector stms, - ss::scheduling_group apply_sg) + consensus* raft, std::vector stms, ss::scheduling_group apply_sg) : _raft(raft) - , _log(std::move(logger)) - , _apply_timeout(std::move(apply_timeout)) + , _log(ctx_log(_raft->group(), _raft->ntp())) , _apply_sg(apply_sg) { - for (auto& managed_stm : stms) { - if (managed_stm.is_snapshotable_stm) { - register_in_log_stm_manger( - ss::dynamic_pointer_cast( - managed_stm.stm)); - } + for (auto& stm : stms) { + const char* name = stm->get_name(); _machines.try_emplace( - managed_stm.stm->get_name(), - ss::make_lw_shared(std::move(managed_stm.stm))); + name, ss::make_lw_shared(std::move(stm))); } } ss::future<> state_machine_manager::start() { vlog(_log.debug, "starting state machine manager"); - _started = true; + if (_machines.empty()) { + co_return; + } + co_await ss::coroutine::parallel_for_each(_machines, [this](auto& pair) { + vlog(_log.trace, "starting {} state machine", pair.first); + return pair.second->stm->start(); + }); + std::vector offsets; + for (const auto& [name, stm_meta] : _machines) { + vlog( + _log.trace, + "DBG: stm {} last_applied {} offset", + name, + stm_meta->stm->last_applied_offset()); + offsets.push_back(stm_meta->stm->last_applied_offset()); + } + std::sort(offsets.begin(), offsets.end()); + _next = model::next_offset(offsets.front()); + vlog(_log.trace, "DBG: initial next: {}", _next); + ssx::spawn_with_gate(_gate, [this] { return ss::do_until( [this] { return _as.abort_requested(); }, [this] { return apply(); }); @@ -189,15 +188,20 @@ ss::future<> state_machine_manager::start() { } ss::future<> state_machine_manager::stop() { - vlog(_log.debug, "stopping state machine manager"); + vlog( + _log.debug, + "stopping state machine manager with {} state machines", + _machines.size()); _apply_mutex.broken(); _as.request_abort(); - return _gate.close(); -} -void state_machine_manager::register_in_log_stm_manger( - const ss::shared_ptr& stm) { - _raft->log()->stm_manager()->add_stm(stm); + co_await ss::coroutine::parallel_for_each(_machines, [this](auto p) { + vlog(_log.info, "DBG: stopping {} state machine", p.first); + return p.second->stm->stop().then([p, this] { + vlog(_log.info, "DBG: stopped {} state machine", p.first); + }); + }); + co_await _gate.close(); } ss::future<> state_machine_manager::apply_raft_snapshot() { @@ -211,19 +215,49 @@ ss::future<> state_machine_manager::apply_raft_snapshot() { snapshot->metadata.last_included_index); const auto snapshot_sz = co_await snapshot->reader.get_snapshot_size(); - iobuf_parser parser(co_await read_iobuf_exactly( - snapshot->reader.input(), snapshot_sz)); - auto snap = co_await serde::read_async(parser); const auto last_offset = snapshot->metadata.last_included_index; - for (auto& [key, buffer] : snap.snapshot_map) { - auto it = _machines.find(key); - if (it != _machines.end()) { - co_await it->second->stm->apply_raft_snapshot(buffer); - it->second->stm->set_next(model::next_offset(last_offset)); + + /** + * Previously all the STMs in Redpanda (excluding controller) were + * using empty Raft snapshots. If snapshot is empty we still apply + * it to maintain backward compatibility. + */ + if (snapshot_sz == 0) { + vlog( + _log.debug, + "applying empty snapshot at offset: {} for backward " + "compatibility", + snapshot->metadata.last_included_index); + for (auto& [_, stm_meta] : _machines) { + if ( + stm_meta->stm->last_applied_offset() + < snapshot->metadata.last_included_index) { + co_await stm_meta->stm->apply_raft_snapshot(iobuf{}); + stm_meta->stm->set_next(std::max( + model::next_offset(last_offset), + stm_meta->stm->next())); + } + } + } else { + iobuf_parser parser(co_await read_iobuf_exactly( + snapshot->reader.input(), snapshot_sz)); + auto snap = co_await serde::read_async( + parser); + + for (auto& [key, buffer] : snap.snapshot_map) { + auto it = _machines.find(key); + if ( + it != _machines.end() + && it->second->stm->last_applied_offset() + < snapshot->metadata.last_included_index) { + co_await it->second->stm->apply_raft_snapshot(buffer); + it->second->stm->set_next(std::max( + model::next_offset(last_offset), + it->second->stm->next())); + } } } _next = model::next_offset(snapshot->metadata.last_included_index); - } catch (...) { vlog( _log.error, @@ -278,8 +312,7 @@ ss::future<> state_machine_manager::apply() { } } auto last_applied = co_await std::move(reader).consume( - batch_applicator( - default_ctx, machines, _as, _log, _apply_timeout(), _gate), + batch_applicator(default_ctx, machines, _as, _log), model::no_timeout); _next = std::max(model::next_offset(last_applied), _next); @@ -326,7 +359,7 @@ void state_machine_manager::maybe_start_background_apply( } ss::future<> state_machine_manager::background_apply_fiber(entry_ptr entry) { - while (entry->stm->next() != _next) { + while (entry->stm->next() < _next) { storage::log_reader_config config( entry->stm->next(), _next, ss::default_priority_class()); @@ -334,15 +367,14 @@ ss::future<> state_machine_manager::background_apply_fiber(entry_ptr entry) { _log.debug, "reading batches in range [{}, {}] for '{}' stm background apply", entry->stm->next(), - last_applied(), + _next, entry->stm->get_name()); bool error = false; try { model::record_batch_reader reader = co_await _raft->make_reader( config); co_await std::move(reader).consume( - batch_applicator( - background_ctx, {entry}, _as, _log, _apply_timeout(), _gate), + batch_applicator(background_ctx, {entry}, _as, _log), model::no_timeout); } catch (...) { @@ -363,28 +395,27 @@ ss::future<> state_machine_manager::background_apply_fiber(entry_ptr entry) { entry->stm->get_name()); } -ss::future<> state_machine_manager::take_snapshot() { +ss::future +state_machine_manager::take_snapshot(model::offset last_included_offset) { auto u = co_await _apply_mutex.get_units(); // snapshot can only be taken after all background applies finished auto units = co_await acquire_background_apply_mutexes(); - auto const last_applied = model::prev_offset(_next); vlog( _log.debug, "taking snapshot with last included offset: {}", - last_applied); + last_included_offset); // wait for all STMs to be on the same page - co_await wait(last_applied, model::no_timeout, _as); + co_await wait(last_included_offset, model::no_timeout, _as); // TODO: do it in parallel managed_snapshot snapshot; for (auto& [key, entry] : _machines) { snapshot.snapshot_map.try_emplace( - key, co_await entry->stm->take_snapshot()); + key, co_await entry->stm->take_snapshot(last_included_offset)); } - co_await _raft->write_snapshot( - write_snapshot_cfg(last_applied, serde::to_iobuf(std::move(snapshot)))); + co_return serde::to_iobuf(std::move(snapshot)); } ss::future<> state_machine_manager::wait( diff --git a/src/v/raft/state_machine_manager.h b/src/v/raft/state_machine_manager.h index 722d94de1c1ff..9695e5a9bbc4b 100644 --- a/src/v/raft/state_machine_manager.h +++ b/src/v/raft/state_machine_manager.h @@ -12,8 +12,8 @@ #pragma once #include "config/property.h" +#include "model/fundamental.h" #include "model/record.h" -#include "raft/consensus.h" #include "raft/fwd.h" #include "raft/logger.h" #include "raft/state_machine_base.h" @@ -35,8 +35,6 @@ namespace raft { template concept ManagableStateMachine = std::derived_from; -class consensus; - /** * State machine manager is an entry point for registering state machines * built on top of replicated log. State machine managers uses a single @@ -61,7 +59,7 @@ class state_machine_manager final { std::optional> as = std::nullopt); - ss::future<> take_snapshot(); + ss::future take_snapshot(model::offset); ss::future<> start(); ss::future<> stop(); @@ -69,16 +67,11 @@ class state_machine_manager final { model::offset last_applied() const { return model::prev_offset(_next); } private: - struct managed_stm { - bool is_snapshotable_stm; - ss::shared_ptr stm; - }; + using stm_ptr = ss::shared_ptr; state_machine_manager( consensus* raft, - ctx_log logger, - config::binding apply_timeout, - std::vector stms_to_manage, + std::vector stms_to_manage, ss::scheduling_group apply_sg); friend class batch_applicator; @@ -134,11 +127,9 @@ class state_machine_manager final { ctx_log _log; mutex _apply_mutex; state_machines_t _machines; - bool _started{false}; model::offset _next{0}; ss::gate _gate; ss::abort_source _as; - config::binding _apply_timeout; ss::scheduling_group _apply_sg; }; @@ -147,42 +138,22 @@ class state_machine_manager final { */ class state_machine_manager_builder { public: - state_machine_manager_builder( - consensus* raft, - ctx_log logger, - config::binding apply_timeout) - : _raft(raft) - , _log(std::move(logger)) - , _apply_timeout(std::move(apply_timeout)){}; - template ss::shared_ptr create_stm(Args&&... args) { auto machine = ss::make_shared(std::forward(args)...); - - _stms.push_back(state_machine_manager::managed_stm{ - .is_snapshotable_stm - = std::is_base_of_v, - .stm = machine}); + _stms.push_back(machine); return machine; } void with_scheduing_group(ss::scheduling_group sg) { _sg = sg; } - state_machine_manager build() && { - return { - _raft, - std::move(_log), - std::move(_apply_timeout), - std::move(_stms), - _sg}; + state_machine_manager build(raft::consensus* raft) && { + return {raft, std::move(_stms), _sg}; } private: - consensus* _raft; - ctx_log _log; - std::vector _stms; - config::binding _apply_timeout; + std::vector _stms; ss::scheduling_group _sg = ss::default_scheduling_group(); }; diff --git a/src/v/raft/tests/raft_group_fixture.h b/src/v/raft/tests/raft_group_fixture.h index b4def30136c9b..b8378eac105b1 100644 --- a/src/v/raft/tests/raft_group_fixture.h +++ b/src/v/raft/tests/raft_group_fixture.h @@ -24,6 +24,7 @@ #include "raft/heartbeat_manager.h" #include "raft/rpc_client_protocol.h" #include "raft/service.h" +#include "raft/state_machine_manager.h" #include "random/generators.h" #include "rpc/backoff_policy.h" #include "rpc/connection_cache.h" @@ -221,7 +222,7 @@ struct raft_node { hbeats->start().get0(); hbeats->register_group(consensus).get(); started = true; - consensus->start().get0(); + consensus->start(raft::state_machine_manager_builder{}).get0(); } ss::future<> stop_node() { diff --git a/src/v/raft/tests/simple_raft_fixture.h b/src/v/raft/tests/simple_raft_fixture.h index f1ca33e7eff8d..20745a273b055 100644 --- a/src/v/raft/tests/simple_raft_fixture.h +++ b/src/v/raft/tests/simple_raft_fixture.h @@ -39,7 +39,7 @@ struct simple_raft_fixture { : _self{0} , _data_dir("test_dir_" + random_generators::gen_alphanum_string(6)) {} - void start_raft(storage::ntp_config::default_overrides overrides = {}) { + void create_raft(storage::ntp_config::default_overrides overrides = {}) { ss::smp::invoke_on_all([]() { // We want immediate elections, to avoid a sleep at the start of // every instantiation of a test setup. @@ -113,20 +113,21 @@ struct simple_raft_fixture { overrides))) .then([this](ss::shared_ptr log) mutable { auto group = raft::group_id(0); - return _group_mgr.local() - .create_group( - group, - {self_broker()}, - log, - raft::with_learner_recovery_throttle::yes) - .then([log](ss::lw_shared_ptr c) { - return c->start().then([c] { return c; }); - }); + return _group_mgr.local().create_group( + group, + {self_broker()}, + log, + raft::with_learner_recovery_throttle::yes); }) .get0(); + } + void start_raft(storage::ntp_config::default_overrides overrides = {}) { + create_raft(overrides); + _raft->start().get(); _started = true; } + ~simple_raft_fixture() { stop_all(); } void stop_all() { diff --git a/src/v/raft/tests/state_machine_manager_test.cc b/src/v/raft/tests/state_machine_manager_test.cc index d540c39ce7d16..f8628049b5173 100644 --- a/src/v/raft/tests/state_machine_manager_test.cc +++ b/src/v/raft/tests/state_machine_manager_test.cc @@ -168,7 +168,7 @@ struct simple_kv : public raft::state_machine_base { const char* get_name() const override { return "simple_kv"; }; - ss::future take_snapshot() override { + ss::future take_snapshot(model::offset) override { co_return serde::to_iobuf(state); }; @@ -193,7 +193,7 @@ struct counting_stm : public raft::state_machine_base { const char* get_name() const override { return "counter"; }; - ss::future take_snapshot() override { + ss::future take_snapshot(model::offset) override { co_return serde::to_iobuf(state); }; @@ -222,48 +222,13 @@ struct throwing_kv : public simple_kv { } }; -struct holding_kv : public simple_kv { - const char* get_name() const override { return "holding_kv"; }; - - ss::future<> apply(const model::record_batch& b) override { - // MUST COPY A BATCH AS IT IT GOING TO BE USED AFTER THE SCHEDULING - // POINT - auto batch = b.copy(); - co_await sig.wait([this] { return signalled; }); - if (batch.base_offset() < next()) { - co_return; - } - vassert( - batch.base_offset() == next(), - "batch {} base offset is not the next to apply, expected base " - "offset: {}", - batch.header(), - next()); - - apply_to_state(batch, state); - co_return; - } - - void signal() { - signalled = true; - sig.broadcast(); - } - - bool signalled = false; - ss::condition_variable sig; -}; - FIXTURE_TEST(basic_state_machine_apply_test, state_machine_manager_fixture) { start_raft(); - raft::state_machine_manager_builder builder( - _raft.get(), - raft::ctx_log(_raft->group(), _raft->ntp()), - config::mock_binding(1s)); - + raft::state_machine_manager_builder builder; auto kv_stm = builder.create_stm(); auto kv_2_stm = builder.create_stm(); auto counting = builder.create_stm(); - auto manager = std::move(builder).build(); + auto manager = std::move(builder).build(_raft.get()); wait_for_becoming_leader(); auto deferred_stop = ss::defer([&manager] { manager.stop().get(); }); @@ -280,16 +245,13 @@ FIXTURE_TEST(apply_with_exceptions, state_machine_manager_fixture) { start_raft(); wait_for_becoming_leader(); auto expected = build_state(10000); - raft::state_machine_manager_builder builder( - _raft.get(), - raft::ctx_log(_raft->group(), _raft->ntp()), - config::mock_binding(1s)); + raft::state_machine_manager_builder builder; auto kv_stm = builder.create_stm(); auto throwing_kv_stm = builder.create_stm(); auto counting = builder.create_stm(); - auto manager = std::move(builder).build(); + auto manager = std::move(builder).build(_raft.get()); auto deferred_stop = ss::defer([&manager] { manager.stop().get(); }); manager.start().get(); manager.wait(_raft->dirty_offset(), raft::clock_type::now() + 10s).get(); @@ -298,53 +260,17 @@ FIXTURE_TEST(apply_with_exceptions, state_machine_manager_fixture) { BOOST_REQUIRE_EQUAL(kv_stm->state, throwing_kv_stm->state); } -FIXTURE_TEST(test_independent_progress, state_machine_manager_fixture) { - start_raft(); - raft::state_machine_manager_builder builder( - _raft.get(), - raft::ctx_log(_raft->group(), _raft->ntp()), - config::mock_binding(1s)); - - auto kv_stm = builder.create_stm(); - - auto throwing_kv_stm = builder.create_stm(); - auto holding_stm = builder.create_stm(); - auto manager = std::move(builder).build(); - wait_for_becoming_leader(); - auto deferred_stop = ss::defer([&manager] { manager.stop().get(); }); - manager.start().get(); - - auto expected = build_state(10000); - // wait only for kv_stm - kv_stm->wait(_raft->dirty_offset(), raft::clock_type::now() + 10s).get(); - - BOOST_REQUIRE_EQUAL(kv_stm->state, expected); - - // since holding stm hasn't yet applied any of the batches, assert that last - // applied offset didn't advance - BOOST_REQUIRE_EQUAL(holding_stm->last_applied_offset(), model::offset{}); - - holding_stm->signal(); - // wait for all the stms - manager.wait(_raft->dirty_offset(), raft::clock_type::now() + 10s).get(); - // check holding stm state - BOOST_REQUIRE_EQUAL(holding_stm->state, expected); -} - FIXTURE_TEST(snapshot_test, state_machine_manager_fixture) { start_raft(); wait_for_becoming_leader(); auto expected = build_state(1000); { - raft::state_machine_manager_builder builder( - _raft.get(), - raft::ctx_log(_raft->group(), _raft->ntp()), - config::mock_binding(1s)); + raft::state_machine_manager_builder builder; auto kv_stm = builder.create_stm(); auto kv_2_stm = builder.create_stm(); auto counting = builder.create_stm(); - auto manager = std::move(builder).build(); + auto manager = std::move(builder).build(_raft.get()); auto deferred_stop = ss::defer([&manager] { manager.stop().get(); }); @@ -355,19 +281,15 @@ FIXTURE_TEST(snapshot_test, state_machine_manager_fixture) { BOOST_REQUIRE_EQUAL(kv_stm->state, expected); BOOST_REQUIRE_EQUAL(kv_stm->state, kv_2_stm->state); - manager.take_snapshot().get(); + manager.take_snapshot(kv_stm->last_applied_offset()).get(); } { // start manager once again to trigger recovery from snapshot - raft::state_machine_manager_builder builder( - _raft.get(), - raft::ctx_log(_raft->group(), _raft->ntp()), - config::mock_binding(1s)); - + raft::state_machine_manager_builder builder; auto kv_stm = builder.create_stm(); auto kv_2_stm = builder.create_stm(); auto counting = builder.create_stm(); - auto manager = std::move(builder).build(); + auto manager = std::move(builder).build(_raft.get()); wait_for_becoming_leader(); auto deferred_stop = ss::defer([&manager] { manager.stop().get(); }); @@ -387,36 +309,29 @@ FIXTURE_TEST(waiting_for_background_fiber, state_machine_manager_fixture) { auto expected = build_state(1000); { - raft::state_machine_manager_builder builder( - _raft.get(), - raft::ctx_log(_raft->group(), _raft->ntp()), - config::mock_binding(1s)); - + raft::state_machine_manager_builder builder; auto kv_stm = builder.create_stm(); auto throwing = builder.create_stm(); - auto manager = std::move(builder).build(); + auto manager = std::move(builder).build(_raft.get()); auto deferred_stop = ss::defer([&manager] { manager.stop().get(); }); manager.start().get(); ss::sleep(std::chrono::milliseconds(20)).get(); - manager.take_snapshot().get(); + manager.take_snapshot(kv_stm->last_applied_offset()).get(); BOOST_REQUIRE_EQUAL(kv_stm->state, expected); BOOST_REQUIRE_EQUAL(kv_stm->state, throwing->state); } { // start manager once again to trigger recovery from snapshot - raft::state_machine_manager_builder builder( - _raft.get(), - raft::ctx_log(_raft->group(), _raft->ntp()), - config::mock_binding(1s)); + raft::state_machine_manager_builder builder; auto kv_stm = builder.create_stm(); auto throwing = builder.create_stm(); - auto manager = std::move(builder).build(); + auto manager = std::move(builder).build(_raft.get()); wait_for_becoming_leader(); auto deferred_stop = ss::defer([&manager] { manager.stop().get(); }); diff --git a/tests/rptest/tests/scaling_up_test.py b/tests/rptest/tests/scaling_up_test.py index 0344ee0931762..84a7a2f96783b 100644 --- a/tests/rptest/tests/scaling_up_test.py +++ b/tests/rptest/tests/scaling_up_test.py @@ -43,7 +43,7 @@ def setup(self): Adding nodes to the cluster should result in partition reallocations to new nodes """ - rebalance_timeout = 120 + rebalance_timeout = 240 group_topic_partitions = 16 def _replicas_per_node(self):