diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index c5aaa5bb8465..d86df89cf1fc 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -259,6 +259,21 @@ configuration::configuration() {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, 64, {.min = 1, .max = 16384}) + , raft_replica_max_pending_flush_bytes( + *this, + "raft_replica_max_pending_flush_bytes", + "Max not flushed bytes per partition. If configured threshold is reached " + "log will automatically be flushed even though it wasn't explicitly " + "requested", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + 256_KiB) + , raft_flush_timer_interval_ms( + *this, + "raft_flush_timer_interval_ms", + "Interval of checking partition against the " + "`raft_replica_max_pending_flush_bytes`", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + 100ms) , enable_usage( *this, "enable_usage", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index fa1d8fd1d992..4160725c6088 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -87,6 +87,8 @@ struct configuration final : public config_store { bounded_property raft_recovery_default_read_size; property raft_enable_lw_heartbeat; bounded_property raft_recovery_concurrency_per_shard; + property> raft_replica_max_pending_flush_bytes; + property raft_flush_timer_interval_ms; // Kafka property enable_usage; bounded_property usage_num_windows; diff --git a/src/v/raft/append_entries_buffer.cc b/src/v/raft/append_entries_buffer.cc index 07f4d4536fc8..52e00fcd372d 100644 --- a/src/v/raft/append_entries_buffer.cc +++ b/src/v/raft/append_entries_buffer.cc @@ -109,7 +109,7 @@ ss::future<> append_entries_buffer::do_flush( } } if (needs_flush) { - f = _consensus.flush_log(); + f = _consensus.flush_log().discard_result(); } } diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 79ec81776e6a..449e4eeca3f0 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1960,7 +1960,7 @@ consensus::do_append_entries(append_entries_request&& r) { } auto f = ss::now(); if (r.is_flush_required() && lstats.dirty_offset > _flushed_offset) { - f = flush_log(); + f = flush_log().discard_result(); } auto last_visible = std::min( lstats.dirty_offset, request_metadata.last_visible_index); @@ -2597,36 +2597,36 @@ append_entries_reply consensus::make_append_entries_reply( return reply; } -ss::future<> consensus::flush_log() { - if (!_has_pending_flushes) { - return ss::now(); +ss::future consensus::flush_log() { + if (!has_pending_flushes()) { + co_return flushed::no; } - _probe->log_flushed(); - _has_pending_flushes = false; auto flushed_up_to = _log->offsets().dirty_offset; - return _log->flush().then([this, flushed_up_to] { - auto lstats = _log->offsets(); - /** - * log flush may be interleaved with trucation, hence we need to check - * if log was truncated, if so we do nothing, flushed offset will be - * updated in the truncation path. - */ - if (flushed_up_to > lstats.dirty_offset) { - return; - } + _probe->log_flushed(); + _not_flushed_bytes = 0; + co_await _log->flush(); + const auto lstats = _log->offsets(); + /** + * log flush may be interleaved with trucation, hence we need to check + * if log was truncated, if so we do nothing, flushed offset will be + * updated in the truncation path. + */ + if (flushed_up_to > lstats.dirty_offset) { + co_return flushed::yes; + } - _flushed_offset = std::max(flushed_up_to, _flushed_offset); - vlog(_ctxlog.trace, "flushed offset updated: {}", _flushed_offset); - // TODO: remove this assertion when we will remove committed_offset - // from storage. - vassert( - lstats.committed_offset >= _flushed_offset, - "Raft incorrectly tracking flushed log offset. Expected offset: {}, " - " current log offsets: {}, log: {}", - _flushed_offset, - lstats, - _log); - }); + _flushed_offset = std::max(flushed_up_to, _flushed_offset); + vlog(_ctxlog.trace, "flushed offset updated: {}", _flushed_offset); + // TODO: remove this assertion when we will remove committed_offset + // from storage. + vassert( + lstats.committed_offset >= _flushed_offset, + "Raft incorrectly tracking flushed log offset. Expected offset: {}, " + " current log offsets: {}, log: {}", + _flushed_offset, + lstats, + _log); + co_return flushed::yes; } ss::future consensus::disk_append( @@ -2678,7 +2678,7 @@ ss::future consensus::disk_append( */ _last_quorum_replicated_index = ret.last_offset; } - _has_pending_flushes = true; + _not_flushed_bytes += ret.byte_size; // TODO // if we rolled a log segment. write current configuration // for speedy recovery in the background @@ -2783,8 +2783,8 @@ ss::future<> consensus::refresh_commit_index() { return _op_lock.get_units() .then([this](ssx::semaphore_units u) mutable { auto f = ss::now(); - if (_has_pending_flushes) { - f = flush_log(); + if (has_pending_flushes()) { + f = flush_log().discard_result(); } if (!is_elected_leader()) { @@ -3832,4 +3832,27 @@ void consensus::upsert_recovery_state( } } +ss::future<> consensus::maybe_flush_log(size_t threshold_bytes) { + // if there is nothing to do exit without grabbing an op_lock, this check is + // sloppy as we data can be in flight but it is ok since next check will + // detect it and flush log. + if (_not_flushed_bytes < threshold_bytes) { + co_return; + } + try { + auto holder = _bg.hold(); + auto u = co_await _op_lock.get_units(); + auto flushed = co_await flush_log(); + if (flushed && is_leader()) { + for (auto& [id, idx] : _fstats) { + // force full heartbeat to move the committed index forward + idx.last_sent_protocol_meta.reset(); + } + } + } catch (const ss::gate_closed_exception&) { + } catch (const ss::broken_semaphore&) { + // ignore exception, group is shutting down. + } +} + } // namespace raft diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index b2ce59bda903..f0cc4934a140 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -355,7 +355,7 @@ class consensus { return _received_snapshot_index; } size_t received_snapshot_bytes() const { return _received_snapshot_bytes; } - bool has_pending_flushes() const { return _has_pending_flushes; } + bool has_pending_flushes() const { return _not_flushed_bytes > 0; } model::offset start_offset() const { return model::next_offset(_last_snapshot_index); @@ -503,6 +503,11 @@ class consensus { get_follower_recovery_state() const { return _follower_recovery_state; } + /** + * Flushes underlying log only if there are more not flushed bytes than the + * requested threshold. + */ + ss::future<> maybe_flush_log(size_t threshold_bytes); private: friend replicate_entries_stm; @@ -598,7 +603,8 @@ class consensus { void trigger_leadership_notification(); /// \brief _does not_ hold the lock. - ss::future<> flush_log(); + using flushed = ss::bool_class; + ss::future flush_log(); void maybe_step_down(); @@ -794,7 +800,7 @@ class consensus { follower_stats _fstats; replicate_batcher _batcher; - bool _has_pending_flushes{false}; + size_t _not_flushed_bytes{0}; /// used to wait for background ops before shutting down ss::gate _bg; diff --git a/src/v/raft/group_manager.cc b/src/v/raft/group_manager.cc index 69ea41a7cb0e..5331cf1c0957 100644 --- a/src/v/raft/group_manager.cc +++ b/src/v/raft/group_manager.cc @@ -51,17 +51,30 @@ group_manager::group_manager( _configuration.recovery_concurrency_per_shard, _configuration.heartbeat_interval) , _feature_table(feature_table.local()) + , _flush_timer_jitter(_configuration.flush_timer_interval_ms) , _is_ready(false) { + _flush_timer.set_callback([this] { + ssx::spawn_with_gate(_gate, [this] { + return flush_groups().finally([this] { + if (_gate.is_closed()) { + return; + } + _flush_timer.arm(_flush_timer_jitter()); + }); + }); + }); setup_metrics(); } ss::future<> group_manager::start() { co_await _heartbeats.start(); co_await _recovery_scheduler.start(); + _flush_timer.arm(_flush_timer_jitter()); } ss::future<> group_manager::stop() { auto f = _gate.close(); + _flush_timer.cancel(); f = f.then([this] { return _recovery_scheduler.stop(); }); @@ -119,11 +132,12 @@ ss::future> group_manager::create_group( _feature_table, _is_ready ? std::nullopt : std::make_optional(min_voter_priority), keep_snapshotted_log); - - return ss::with_gate(_gate, [this, raft] { - return _heartbeats.register_group(raft).then([this, raft] { - _groups.push_back(raft); - return raft; + return _groups_mutex.with([this, raft = std::move(raft)] { + return ss::with_gate(_gate, [this, raft] { + return _heartbeats.register_group(raft).then([this, raft] { + _groups.push_back(raft); + return raft; + }); }); }); } @@ -158,24 +172,30 @@ raft::group_configuration group_manager::create_initial_configuration( } ss::future<> group_manager::remove(ss::lw_shared_ptr c) { - return c->stop() - .then([c] { return c->remove_persistent_state(); }) - .then( - [this, id = c->group()] { return _heartbeats.deregister_group(id); }) - .finally([this, c] { - _groups.erase( - std::remove(_groups.begin(), _groups.end(), c), _groups.end()); - }); + return _groups_mutex.with([this, c = std::move(c)] { + return c->stop() + .then([c] { return c->remove_persistent_state(); }) + .then([this, id = c->group()] { + return _heartbeats.deregister_group(id); + }) + .finally([this, c] { + _groups.erase( + std::remove(_groups.begin(), _groups.end(), c), _groups.end()); + }); + }); } ss::future<> group_manager::shutdown(ss::lw_shared_ptr c) { - return c->stop() - .then( - [this, id = c->group()] { return _heartbeats.deregister_group(id); }) - .finally([this, c] { - _groups.erase( - std::remove(_groups.begin(), _groups.end(), c), _groups.end()); - }); + return _groups_mutex.with([this, c = std::move(c)] { + return c->stop() + .then([this, id = c->group()] { + return _heartbeats.deregister_group(id); + }) + .finally([this, c] { + _groups.erase( + std::remove(_groups.begin(), _groups.end(), c), _groups.end()); + }); + }); } void group_manager::trigger_leadership_notification( @@ -203,4 +223,22 @@ void group_manager::setup_metrics() { sm::description("Number of raft groups"))}); } +ss::future<> group_manager::flush_groups() { + /** + * Assumes that gate is already held + */ + auto u = co_await _groups_mutex.get_units(); + + co_await ss::max_concurrent_for_each( + _groups.begin(), + _groups.end(), + 32, + [this](const ss::lw_shared_ptr& raft) { + if (_configuration.replica_max_not_flushed_bytes()) { + return raft->maybe_flush_log( + _configuration.replica_max_not_flushed_bytes().value()); + } + return ss::now(); + }); +} } // namespace raft diff --git a/src/v/raft/group_manager.h b/src/v/raft/group_manager.h index 33e5e634e1f2..d4fafc386a57 100644 --- a/src/v/raft/group_manager.h +++ b/src/v/raft/group_manager.h @@ -16,6 +16,7 @@ #include "raft/heartbeat_manager.h" #include "raft/recovery_memory_quota.h" #include "raft/recovery_scheduler.h" +#include "raft/timeout_jitter.h" #include "raft/types.h" #include "rpc/fwd.h" #include "ssx/metrics.h" @@ -46,6 +47,8 @@ class group_manager { config::binding enable_lw_heartbeat; config::binding recovery_concurrency_per_shard; config::binding election_timeout_ms; + config::binding> replica_max_not_flushed_bytes; + config::binding flush_timer_interval_ms; }; using config_provider_fn = ss::noncopyable_function; @@ -96,9 +99,11 @@ class group_manager { void trigger_leadership_notification(raft::leadership_status); void setup_metrics(); + ss::future<> flush_groups(); + raft::group_configuration create_initial_configuration( std::vector, model::revision_id) const; - + mutex _groups_mutex; model::node_id _self; ss::scheduling_group _raft_sg; raft::consensus_client_protocol _client; @@ -115,6 +120,9 @@ class group_manager { recovery_memory_quota _recovery_mem_quota; recovery_scheduler _recovery_scheduler; features::feature_table& _feature_table; + ss::timer _flush_timer; + timeout_jitter _flush_timer_jitter; + bool _is_ready; }; diff --git a/src/v/raft/replicate_entries_stm.cc b/src/v/raft/replicate_entries_stm.cc index ab76a03180f2..8cb5e1ee9528 100644 --- a/src/v/raft/replicate_entries_stm.cc +++ b/src/v/raft/replicate_entries_stm.cc @@ -54,7 +54,7 @@ ss::future> replicate_entries_stm::flush_log() { using ret_t = result; auto flush_f = ss::now(); if (_is_flush_required) { - flush_f = _ptr->flush_log(); + flush_f = _ptr->flush_log().discard_result(); } auto f = flush_f diff --git a/src/v/raft/tests/basic_raft_fixture_test.cc b/src/v/raft/tests/basic_raft_fixture_test.cc index ce9fac4f077c..42a1143df1d9 100644 --- a/src/v/raft/tests/basic_raft_fixture_test.cc +++ b/src/v/raft/tests/basic_raft_fixture_test.cc @@ -19,6 +19,8 @@ #include "test_utils/async.h" #include "test_utils/test.h" +#include + using namespace raft; /** @@ -136,3 +138,37 @@ TEST_F_CORO(raft_fixture, validate_adding_nodes_to_cluster) { co_await assert_logs_equal(); } + +TEST_F_CORO( + raft_fixture, validate_committed_offset_advancement_after_log_flush) { + co_await create_simple_group(3); + // wait for leader + auto leader = co_await wait_for_leader(10s); + auto& leader_node = node(leader); + + // replicate batches with acks=1 and validate that committed offset did not + // advance + auto committed_offset_before = leader_node.raft()->committed_offset(); + auto result = co_await leader_node.raft()->replicate( + make_batches(10, 10, 128), + replicate_options(consistency_level::leader_ack)); + + ASSERT_TRUE_CORO(result.has_value()); + // wait for batches to be replicated on all of the nodes + co_await tests::cooperative_spin_wait_with_timeout( + 10s, [this, expected = result.value().last_offset] { + return std::all_of( + nodes().begin(), nodes().end(), [expected](const auto& p) { + return p.second->raft()->last_visible_index() == expected; + }); + }); + ASSERT_EQ_CORO( + committed_offset_before, leader_node.raft()->committed_offset()); + + co_await assert_logs_equal(); + + // flush log on all of the nodes + co_await parallel_for_each_node( + [](auto& n) { return n.raft()->maybe_flush_log(0); }); + co_await wait_for_committed_offset(result.value().last_offset, 10s); +} diff --git a/src/v/raft/tests/raft_fixture.h b/src/v/raft/tests/raft_fixture.h index d8ed8f779ff1..a1fc09d61439 100644 --- a/src/v/raft/tests/raft_fixture.h +++ b/src/v/raft/tests/raft_fixture.h @@ -24,6 +24,8 @@ #include "raft/recovery_memory_quota.h" #include "raft/state_machine_manager.h" #include "raft/types.h" +#include "random/generators.h" +#include "ssx/sformat.h" #include "storage/api.h" #include "test_utils/test.h" #include "utils/prefix_logger.h" @@ -33,6 +35,7 @@ #include #include +#include #include @@ -256,6 +259,26 @@ class raft_fixture return model::make_memory_record_batch_reader(std::move(batches)); } + model::record_batch_reader make_batches( + size_t batch_count, + size_t batch_record_count, + size_t record_payload_size) { + ss::circular_buffer batches; + batches.reserve(batch_count); + for (auto b_idx : boost::irange(batch_count)) { + storage::record_batch_builder builder( + model::record_batch_type::raft_data, model::offset(0)); + for (int r_idx : boost::irange(batch_record_count)) { + builder.add_raw_kv( + serde::to_iobuf(ssx::sformat("r-{}-{}", b_idx, r_idx)), + serde::to_iobuf( + random_generators::get_bytes(record_payload_size))); + } + batches.push_back(std::move(builder).build()); + } + + return model::make_memory_record_batch_reader(std::move(batches)); + } ss::future<> assert_logs_equal() { std::vector> node_batches; diff --git a/src/v/raft/tests/simple_raft_fixture.h b/src/v/raft/tests/simple_raft_fixture.h index 1b47feaaeb2f..543981cef4d5 100644 --- a/src/v/raft/tests/simple_raft_fixture.h +++ b/src/v/raft/tests/simple_raft_fixture.h @@ -83,7 +83,12 @@ struct simple_raft_fixture { .enable_lw_heartbeat = config::mock_binding(true), .recovery_concurrency_per_shard = config::mock_binding(64), - .election_timeout_ms = config::mock_binding(10ms)}; + .election_timeout_ms = config::mock_binding(10ms), + .replica_max_not_flushed_bytes + = config::mock_binding>(std::nullopt), + .flush_timer_interval_ms = config::mock_binding(100ms), + + }; }, [] { return raft::recovery_memory_quota::configuration{ diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index fcbf6af29eac..44642e31257a 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1118,6 +1118,11 @@ void application::wire_up_redpanda_services( .raft_recovery_concurrency_per_shard.bind(), .election_timeout_ms = config::shard_local_cfg().raft_election_timeout_ms.bind(), + .replica_max_not_flushed_bytes + = config::shard_local_cfg() + .raft_replica_max_pending_flush_bytes.bind(), + .flush_timer_interval_ms + = config::shard_local_cfg().raft_flush_timer_interval_ms.bind(), }; }, [] { diff --git a/tests/rptest/tests/raft_periodic_flush_test.py b/tests/rptest/tests/raft_periodic_flush_test.py new file mode 100644 index 000000000000..01261dfe1d79 --- /dev/null +++ b/tests/rptest/tests/raft_periodic_flush_test.py @@ -0,0 +1,74 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from rptest.services.admin import Admin +from rptest.services.cluster import cluster +from rptest.clients.types import TopicSpec +from rptest.tests.end_to_end import EndToEndTest +from rptest.util import wait_until + + +class PeriodicFlushWithRelaxedConsistencyTest(EndToEndTest): + @cluster(num_nodes=5) + def test_changing_periodic_flush_threshold(self): + + self.start_redpanda(num_nodes=3) + # set raft_max_not_flushed_bytes to high value + self.redpanda.set_cluster_config( + {"raft_replica_max_pending_flush_bytes": 10 * (1024 * 1024)}) + + # create topic with single partition + spec = TopicSpec(partition_count=1, replication_factor=3) + self.client().create_topic(spec) + + self.topic = spec.name + + self.start_producer(1, throughput=1000, acks=1) + self.start_consumer() + self.consumer.start() + msg_count = 10000 + wait_until( + lambda: len(self.producer.acked_values) >= msg_count, + timeout_sec=60, + backoff_sec=1, + err_msg=f"Producer didn't end producing {msg_count} messages") + # wait for at least 15000 records to be consumed + + self.producer.stop() + self.run_validation(min_records=msg_count, + producer_timeout_sec=300, + consumer_timeout_sec=300) + + admin = Admin(self.redpanda) + p_state = admin.get_partition_state(namespace='kafka', + topic=self.topic, + partition=0) + self.logger.info(f"initial partition state: {p_state}") + assert all([ + r['committed_offset'] < r['dirty_offset'] + for r in p_state['replicas'] + ]), "With ACKS=1, committed offset should not be advanced immediately" + + self.redpanda.set_cluster_config( + {"raft_replica_max_pending_flush_bytes": 1}) + + def committed_offset_advanced(): + p_state = admin.get_partition_state(namespace='kafka', + topic=self.topic, + partition=0) + + return all([ + r['committed_offset'] == r['dirty_offset'] + for r in p_state['replicas'] + ]) + + wait_until( + committed_offset_advanced, 30, 1, + "committed offset did not advance after the change of max flushed bytes" + ) diff --git a/tests/rptest/tests/storage_resources_test.py b/tests/rptest/tests/storage_resources_test.py index 1e42fa84039b..a82e5d10854f 100644 --- a/tests/rptest/tests/storage_resources_test.py +++ b/tests/rptest/tests/storage_resources_test.py @@ -16,7 +16,7 @@ from rptest.services.rpk_consumer import RpkConsumer from rptest.clients.rpk import RpkTool from ducktape.utils.util import wait_until -from ducktape.mark import parametrize +from ducktape.mark import matrix from ducktape.cluster.cluster_spec import ClusterSpec @@ -146,13 +146,13 @@ def ready(): wait_until(ready, timeout_sec=30, backoff_sec=5) - def _write(self, msg_size, msg_count): + def _write(self, msg_size, msg_count, acks): producer = RpkProducer(self.test_context, self.redpanda, self.topic, msg_size, msg_count=msg_count, - acks=-1) + acks=acks) producer.start() producer.wait() producer.free() @@ -200,9 +200,8 @@ def _get_partition_segments(self, partition_idx): return segments @cluster(num_nodes=2) - @parametrize(clean_shutdown=False) - @parametrize(clean_shutdown=True) - def test_recovery_reads(self, clean_shutdown): + @matrix(acks=[1, -1], clean_shutdown=[True, False]) + def test_recovery_reads(self, clean_shutdown, acks): """ Verify the amount of disk IO that occurs on both clean and unclean restarts. @@ -237,7 +236,7 @@ def test_recovery_reads(self, clean_shutdown): assert (msg_count * msg_size / self.PARTITION_COUNT) < per_partition_checkpoint_threshold - self._write(msg_size, msg_count) + self._write(msg_size, msg_count, acks) total_bytes = msg_size * msg_count # Use low level seastar metrics, because storage log metrics do @@ -296,4 +295,6 @@ def test_recovery_reads(self, clean_shutdown): # the byte count: we have already read >= the total size of data # on disk. self._read_tips() - self._read_all(msg_count) + # do not read all messages for acks=1 as they may be lost + if acks == -1: + self._read_all(msg_count)