diff --git a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc index e96a4c5d0090..ddf19d79d2b2 100644 --- a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc +++ b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc @@ -11,8 +11,10 @@ #include "archival/ntp_archiver_service.h" #include "cloud_storage/remote.h" #include "cloud_storage/spillover_manifest.h" +#include "cloud_storage/tests/manual_fixture.h" #include "cloud_storage/tests/produce_utils.h" #include "cloud_storage/tests/s3_imposter.h" +#include "cluster/health_monitor_frontend.h" #include "config/configuration.h" #include "kafka/server/tests/produce_consume_utils.h" #include "model/fundamental.h" @@ -464,3 +466,119 @@ FIXTURE_TEST(test_consume_during_spillover, cloud_storage_manual_e2e_test) { } cleanup.cancel(); } + +FIXTURE_TEST( + reclaimable_reported_in_health_report, + cloud_storage_manual_multinode_test_base) { + config::shard_local_cfg().retention_local_trim_interval.set_value( + std::chrono::milliseconds(2000)); + + // start a second fixutre and wait for stable setup + auto fx2 = start_second_fixture(); + tests::cooperative_spin_wait_with_timeout(3s, [this] { + return app.controller->get_members_table().local().node_ids().size() + == 2; + }).get(); + + // test topic + const model::topic topic_name("tapioca"); + model::ntp ntp(model::kafka_namespace, topic_name, 0); + cluster::topic_properties props; + props.shadow_indexing = model::shadow_indexing_mode::full; + props.cleanup_policy_bitflags = model::cleanup_policy_bitflags::deletion; + props.segment_size = 64_KiB; + props.retention_local_target_bytes = tristate(1); + add_topic({model::kafka_namespace, topic_name}, 1, props, 2).get(); + + // figuring out the leader is useful for constructing the producer. the + // follower is just the "other" node. + redpanda_thread_fixture* fx_l = nullptr; + boost_require_eventually(10s, [&] { + cluster::partition* prt_a + = app.partition_manager.local().get(ntp).get(); + cluster::partition* prt_b + = fx2->app.partition_manager.local().get(ntp).get(); + if (!prt_a || !prt_b) { + return false; + } + if (prt_a->is_leader()) { + fx_l = this; + return true; + } + if (prt_b->is_leader()) { + fx_l = fx2.get(); + return true; + } + return false; + }); + + auto prt_l = fx_l->app.partition_manager.local().get(ntp); + + kafka_produce_transport producer(fx_l->make_kafka_client().get()); + producer.start().get(); + + auto get_reclaimable = [&]() -> std::optional> { + auto report = app.controller->get_health_monitor() + .local() + .get_cluster_health( + cluster::cluster_report_filter{}, + cluster::force_refresh::yes, + model::timeout_clock::now() + std::chrono::seconds(2)) + .get(); + if (report.has_value()) { + std::vector sizes; + for (auto& node_report : report.value().node_reports) { + for (auto& topic : node_report.topics) { + if ( + topic.tp_ns + != model::topic_namespace_view( + model::kafka_namespace, topic_name)) { + continue; + } + for (auto partition : topic.partitions) { + sizes.push_back( + partition.reclaimable_size_bytes.value_or(0)); + } + } + } + if (!sizes.empty()) { + return sizes; + } + } + return std::nullopt; + }; + + for (int j = 0; j < 20; j++) { + for (int i = 0; i < 200; i++) { + producer + .produce_to_partition( + topic_name, + model::partition_id(0), + tests::kv_t::sequence(0, 200)) + .get(); + } + + // drive the uploading + auto& archiver = prt_l->archiver()->get(); + archiver.sync_for_tests().get(); + archiver.upload_next_candidates().get(); + + // not for synchronization... just to give the system time to propogate + // all the state changes are are happening so that this overall loop + // doesn't spin to completion too fast. + ss::sleep(std::chrono::seconds(2)).get(); + + auto sizes = get_reclaimable(); + if (sizes.has_value()) { + BOOST_REQUIRE(!sizes->empty()); + if (std::all_of(sizes->begin(), sizes->end(), [](size_t s) { + return s > 0; + })) { + return; // test success + } + } + } + + // health report never reported non-zero reclaimable sizes. bummer! + BOOST_REQUIRE(false); +} diff --git a/src/v/cluster/health_monitor_backend.cc b/src/v/cluster/health_monitor_backend.cc index 139886082ad8..d5c2ceb65f91 100644 --- a/src/v/cluster/health_monitor_backend.cc +++ b/src/v/cluster/health_monitor_backend.cc @@ -671,6 +671,7 @@ struct ntp_report { ntp_leader leader; size_t size_bytes; std::optional under_replicated_replicas; + size_t reclaimable_size_bytes; }; partition_status to_partition_status(const ntp_report& ntpr) { @@ -680,7 +681,8 @@ partition_status to_partition_status(const ntp_report& ntpr) { .leader_id = ntpr.leader.leader_id, .revision_id = ntpr.leader.revision_id, .size_bytes = ntpr.size_bytes, - .under_replicated_replicas = ntpr.under_replicated_replicas}; + .under_replicated_replicas = ntpr.under_replicated_replicas, + .reclaimable_size_bytes = ntpr.reclaimable_size_bytes}; } ss::chunked_fifo collect_shard_local_reports( @@ -703,6 +705,7 @@ ss::chunked_fifo collect_shard_local_reports( }, .size_bytes = p.second->size_bytes() + p.second->non_log_disk_size_bytes(), .under_replicated_replicas = p.second->get_under_replicated(), + .reclaimable_size_bytes = p.second->reclaimable_local_size_bytes(), }; }); } else { @@ -715,8 +718,9 @@ ss::chunked_fifo collect_shard_local_reports( .leader_id = partition->get_leader_id(), .revision_id = partition->get_revision_id(), }, - .size_bytes = partition->size_bytes(), + .size_bytes = partition->size_bytes() + partition->non_log_disk_size_bytes(), .under_replicated_replicas = partition->get_under_replicated(), + .reclaimable_size_bytes = partition->reclaimable_local_size_bytes(), }); } } diff --git a/src/v/cluster/health_monitor_types.cc b/src/v/cluster/health_monitor_types.cc index 63be4eae0038..67cd66c7e25d 100644 --- a/src/v/cluster/health_monitor_types.cc +++ b/src/v/cluster/health_monitor_types.cc @@ -140,12 +140,13 @@ std::ostream& operator<<(std::ostream& o, const partition_status& ps) { fmt::print( o, "{{id: {}, term: {}, leader_id: {}, revision_id: {}, size_bytes: {}, " - "under_replicated: {}}}", + "reclaimable_size_bytes: {}, under_replicated: {}}}", ps.id, ps.term, ps.leader_id, ps.revision_id, ps.size_bytes, + ps.reclaimable_size_bytes, ps.under_replicated_replicas); return o; } diff --git a/src/v/cluster/health_monitor_types.h b/src/v/cluster/health_monitor_types.h index d252a215dbd4..bd3ff634b4f4 100644 --- a/src/v/cluster/health_monitor_types.h +++ b/src/v/cluster/health_monitor_types.h @@ -57,23 +57,7 @@ struct node_state struct partition_status : serde:: - envelope, serde::compat_version<0>> { - /** - * We increase a version here 'backward' since incorrect assertion would - * cause older redpanda versions to crash. - * - * Version: -1: added revision_id field - * Version: -2: added size_bytes field - * - * Same versioning should also be supported in get_node_health_request - */ - - static constexpr int8_t initial_version = 0; - static constexpr int8_t revision_id_version = -1; - static constexpr int8_t size_bytes_version = -2; - - static constexpr int8_t current_version = size_bytes_version; - + envelope, serde::compat_version<0>> { static constexpr size_t invalid_size_bytes = size_t(-1); model::partition_id id; @@ -83,6 +67,21 @@ struct partition_status size_t size_bytes; std::optional under_replicated_replicas; + /* + * estimated amount of data above local retention that is subject to + * reclaim under disk pressure. this is useful for the partition balancer + * which is interested in free space on a node. a node may have very little + * physical free space, but have effective free space represented by + * reclaimable size bytes. + * + * an intuitive relationship between size_bytes and reclaimable_size_bytes + * would have the former being >= than the later. however due to the way + * that data is collected it is conceivable that this inequality doesn't + * hold. callers should check for this condition and normalize the values or + * ignore the update. + */ + std::optional reclaimable_size_bytes; + auto serde_fields() { return std::tie( id, @@ -90,7 +89,8 @@ struct partition_status leader_id, revision_id, size_bytes, - under_replicated_replicas); + under_replicated_replicas, + reclaimable_size_bytes); } friend std::ostream& operator<<(std::ostream&, const partition_status&); diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 56b499f720d2..2d46b157d088 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -275,6 +275,10 @@ class partition { size_t size_bytes() const { return _raft->log()->size_bytes(); } + size_t reclaimable_local_size_bytes() const { + return _raft->log()->reclaimable_local_size_bytes(); + } + uint64_t non_log_disk_size_bytes() const; ss::future<> update_configuration(topic_properties); diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index f081f6c2f9a5..5dc63007f40c 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -1529,7 +1529,18 @@ ss::future<> disk_log_impl::truncate_prefix(truncate_prefix_config cfg) { vassert(!_closed, "truncate_prefix() on closed log - {}", *this); return _failure_probes.truncate_prefix().then([this, cfg]() mutable { // dispatch the actual truncation - return do_truncate_prefix(cfg); + return do_truncate_prefix(cfg) + .then([this] { + /* + * after truncation do a quick refresh of cached variables that + * are computed during disk usage calculation. this is useful for + * providing more timely updates of reclaimable space through the + * health report. + */ + return disk_usage_and_reclaimable_space( + _manager.default_gc_config()); + }) + .discard_result(); }); } @@ -2113,6 +2124,11 @@ disk_log_impl::disk_usage_and_reclaimable_space(gc_config input_cfg) { .local_retention = lcl.total(), }; + /* + * cache this for access by the health + */ + _reclaimable_local_size_bytes = reclaim.local_retention; + co_return std::make_pair(usage, reclaim); } @@ -2587,4 +2603,23 @@ disk_log_impl::get_reclaimable_offsets(gc_config cfg) { co_return res; } +size_t disk_log_impl::reclaimable_local_size_bytes() const { + /* + * circumstances/configuration under which this log will be trimming back to + * local retention size may change. catch these before reporting potentially + * stale information. + */ + if (!is_cloud_retention_active()) { + return 0; + } + if (config().is_read_replica_mode_enabled()) { + // https://github.com/redpanda-data/redpanda/issues/11936 + return 0; + } + if (deletion_exempt(config().ntp())) { + return 0; + } + return _reclaimable_local_size_bytes; +} + } // namespace storage diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index 700ad04da1dd..547c34881807 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -140,6 +140,8 @@ class disk_log_impl final : public log { return _segments_rolling_lock.get_units(); } + size_t reclaimable_local_size_bytes() const override; + private: friend class disk_log_appender; // for multi-term appends friend class disk_log_builder; // for tests @@ -289,6 +291,7 @@ class disk_log_impl final : public log { mutex _segments_rolling_lock; std::optional _cloud_gc_offset; + size_t _reclaimable_local_size_bytes{0}; }; } // namespace storage diff --git a/src/v/storage/log.h b/src/v/storage/log.h index 126b2486613e..a181ce32018f 100644 --- a/src/v/storage/log.h +++ b/src/v/storage/log.h @@ -143,6 +143,12 @@ class log { virtual probe& get_probe() = 0; + /* + * estimate amount of data beyond local retention. zero will be returned in + * cases where this is not applicable, such as the log not being TS-enabled. + */ + virtual size_t reclaimable_local_size_bytes() const = 0; + private: ntp_config _config; diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index c46a7873df6b..3e708c0f58bf 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -749,14 +749,7 @@ ss::future log_manager::disk_usage() { * TODO: this will be factored out to make the sharing of settings easier to * maintain. */ - model::timestamp collection_threshold; - if (!_config.delete_retention()) { - collection_threshold = model::timestamp(0); - } else { - collection_threshold = model::timestamp( - model::timestamp::now().value() - - _config.delete_retention()->count()); - } + auto cfg = default_gc_config(); fragmented_vector> logs; for (auto& it : _logs) { @@ -766,10 +759,7 @@ ss::future log_manager::disk_usage() { co_return co_await ss::map_reduce( logs.begin(), logs.end(), - [this, collection_threshold](ss::shared_ptr log) { - return log->disk_usage( - gc_config(collection_threshold, _config.retention_bytes())); - }, + [cfg](ss::shared_ptr log) { return log->disk_usage(cfg); }, usage_report{}, [](usage_report acc, usage_report update) { return acc + update; }); } @@ -797,4 +787,16 @@ void log_manager::trigger_gc() { _housekeeping_sem.signal(); } +gc_config log_manager::default_gc_config() const { + model::timestamp collection_threshold; + if (!_config.delete_retention()) { + collection_threshold = model::timestamp(0); + } else { + collection_threshold = model::timestamp( + model::timestamp::now().value() + - _config.delete_retention()->count()); + } + return {collection_threshold, _config.retention_bytes()}; +} + } // namespace storage diff --git a/src/v/storage/log_manager.h b/src/v/storage/log_manager.h index 697ca3f0fd8a..7d057ce7ea26 100644 --- a/src/v/storage/log_manager.h +++ b/src/v/storage/log_manager.h @@ -236,6 +236,8 @@ class log_manager { */ void trigger_gc(); + gc_config default_gc_config() const; + private: using logs_type = absl::flat_hash_map>;