diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index efb44bfcd452d..ea577bd6894f2 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -66,6 +67,164 @@ std::ostream& operator<<(std::ostream& o, cache_element_status s) { static constexpr std::string_view tmp_extension{".part"}; +namespace { + +using namespace std::chrono_literals; + +[[maybe_unused]] bool has_label( + const ss::metrics::impl::labels_type& labels, + std::string_view k, + std::string_view v) { + return std::find_if( + labels.cbegin(), + labels.cend(), + [&](const auto& pair) { + return pair.first == k && pair.second == v; + }) + != labels.cend(); +} + +struct cache_control { + static constexpr ss::lowres_clock::duration limit = 10s; + +public: + explicit cache_control() + : _updated(ss::lowres_clock::now()) + , _duration(limit) {} + + bool should_skip_cache() { + const auto now = ss::lowres_clock::now(); + const auto delta = now - _updated; + + if (delta > _duration) { + _updated = now; + if (_throttled) { + _duration = 3s; + vlog( + cst_log.info, + "un-throttling cache access for {}ms", + _duration); + _throttled = false; + } else { + auto curr_delta = current_consumption_delta(); + _throttled = curr_delta <= 0.0; + _duration = limit; + vlog( + cst_log.info, + "calculated consumption delta: {}, throttling: {}", + curr_delta, + _throttled); + } + } + + return _throttled; + } + +private: + double queue_adjusted_consumption(std::string_view class_name) { + auto all_metrics = *ss::metrics::impl::get_values(); + auto metric_values_v = all_metrics.values; + auto metadata_v = all_metrics.metadata; + + auto values_it = metric_values_v.cbegin(); + + for (auto it = metadata_v->cbegin(); + it != metadata_v->cend() && values_it != metric_values_v.cend(); + ++it, ++values_it) { + if (it->mf.name != "io_queue_adjusted_consumption") { + continue; + } + + auto values = values_it->begin(); + for (const auto& metric_info : it->metrics) { + const auto& labels = metric_info.id.labels(); + if ( + has_label(labels, "mountpoint", "/var/lib/redpanda/data") + && has_label(labels, "class", class_name)) { + return values->d(); + } + values++; + } + } + + return 0; + } + + double current_consumption_delta() { + const auto raft_cons = queue_adjusted_consumption("raft"); + const auto si_cons = queue_adjusted_consumption("shadow-indexing"); + const auto delta = raft_cons - si_cons; + vlog( + cst_log.info, + "raft cons: {} SI cons: {} delta: {}", + raft_cons, + si_cons, + delta); + return delta; + } + + std::optional current_p95_latency() { + auto all_metrics = *ss::metrics::impl::get_values(); + auto metric_values_v = all_metrics.values; + auto metadata_v = all_metrics.metadata; + + std::vector> counts{}; + bool found_family = false; + auto values_it = metric_values_v.cbegin(); + for (auto it = metadata_v->cbegin(); + it != metadata_v->cend() && values_it != metric_values_v.cend() + && !found_family; + ++it, ++values_it) { + if (it->mf.name != "kafka_latency_produce_latency_us") { + continue; + } + + found_family = true; + for (const auto& bucket : + values_it->begin()->get_histogram().buckets) { + counts.emplace_back(bucket.upper_bound, bucket.count); + } + } + + if (counts.empty()) { + vlog(cst_log.error, "metrics no count!!"); + return std::nullopt; + } + + auto result = counts.front().first; + const auto p95_count = counts.back().second * 0.95; + for (const auto& [u, c] : counts) { + if (c < p95_count) { + result = u; + } else { + break; + } + } + return result; + } + + ss::lowres_clock::time_point _updated; + uint64_t _current_p95_latency{0}; + ss::lowres_clock::duration _duration; + bool _throttled{false}; +}; + +static thread_local std::unique_ptr cache_controller; + +void init_cc() { cache_controller = std::make_unique(); } + +bool control_skip_cache() { + if (unlikely(!cache_controller)) { + init_cc(); + } + + return cache_controller->should_skip_cache(); +} + +} // namespace + +bool cache::should_skip_cache() const { return control_skip_cache(); } + cache::cache( std::filesystem::path cache_dir, size_t disk_size, diff --git a/src/v/cloud_storage/cache_service.h b/src/v/cloud_storage/cache_service.h index 2f6f6670e30fa..4f41dabac9aa9 100644 --- a/src/v/cloud_storage/cache_service.h +++ b/src/v/cloud_storage/cache_service.h @@ -99,6 +99,8 @@ class space_reservation_guard { class cache : public ss::peering_sharded_service { public: + bool should_skip_cache() const; + /// C-tor. /// /// \param cache_dir is a directory where cached data is stored diff --git a/src/v/cloud_storage/remote_segment.cc b/src/v/cloud_storage/remote_segment.cc index ff47ded1bf1af..97fcf2bf05444 100644 --- a/src/v/cloud_storage/remote_segment.cc +++ b/src/v/cloud_storage/remote_segment.cc @@ -25,6 +25,7 @@ #include "model/record.h" #include "model/record_batch_types.h" #include "raft/consensus.h" +#include "random/generators.h" #include "resource_mgmt/io_priority.h" #include "ssx/future-util.h" #include "ssx/sformat.h" @@ -39,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -305,25 +307,49 @@ remote_segment::offset_data_stream( kafka::offset start, kafka::offset end, std::optional first_timestamp, - ss::io_priority_class io_priority, - bool skip_cache) { + ss::io_priority_class io_priority) { vlog(_ctxlog.debug, "remote segment file input stream at offset {}", start); ss::gate::holder g(_gate); - co_await hydrate(); - std::optional indexed_pos; - std::optional prefetch_override = std::nullopt; + // skip_cache = random_generators::get_int(100) + // >= config::shard_local_cfg().skip_cache_chance(); + + auto skip_cache = _cache.should_skip_cache(); + + bool is_materialized = is_state_materialized(); + if (skip_cache && !is_legacy_mode_engaged() && _coarse_index.has_value()) { + is_materialized = cache_element_status::available + == co_await _cache.is_cached(get_path_to_chunk( + get_chunk_start_for_kafka_offset(start))); + if (is_state_materialized() && !is_materialized) { + vlog( + _ctxlog.info, + "index present but chunk for kafka offset {} absent, cache will " + "be skipped", + start); + } + } - if (skip_cache) { - vlog(_ctxlog.info, "using direct stream to serve request"); + if (skip_cache & !is_materialized) { + vlog( + _ctxlog.info, + "using direct stream to serve request {{{}-{}}}", + start, + end); auto stream = std::make_unique( co_await build_stream()); co_return input_stream_with_offsets{ .stream = ss::input_stream{ss::data_source{std::move(stream)}}, .rp_offset = _base_rp_offset, - .kafka_offset = _base_rp_offset - _base_offset_delta}; + .kafka_offset = _base_rp_offset - _base_offset_delta, + .persistent = false}; } + co_await hydrate(); + + std::optional indexed_pos; + std::optional prefetch_override = std::nullopt; + // Perform index lookup by timestamp or offset. This reduces the number // of hydrated chunks required to serve the request. if (first_timestamp) { @@ -506,7 +532,14 @@ ss::future<> remote_segment::put_chunk_in_cache( ss::input_stream stream, chunk_start_offset_t chunk_start) { try { - co_await _cache.put(get_path_to_chunk(chunk_start), stream, reservation) + co_await _cache + .put( + get_path_to_chunk(chunk_start), + stream, + reservation, + priority_manager::local().shadow_indexing_priority(), + 128_KiB, + 1) .finally([&stream] { return stream.close(); }); } catch (...) { auto put_exception = std::current_exception(); @@ -1366,6 +1399,11 @@ remote_segment_batch_reader::read_some( co_return ss::circular_buffer{}; } _bytes_consumed = new_bytes_consumed.value(); + + // if (_config.over_budget && should_reset_parser) { + // co_await _parser->close(); + // _parser.reset(); + // } } _total_size = 0; co_return std::move(_ringbuf); @@ -1383,8 +1421,10 @@ remote_segment_batch_reader::init_parser() { model::offset_cast(_config.start_offset), model::offset_cast(_config.max_offset), _config.first_timestamp, - priority_manager::local().shadow_indexing_priority(), - true); + priority_manager::local().shadow_indexing_priority()); + + // Reset parser if overbudget when stream is not disk-based + should_reset_parser = !stream_off.persistent; auto parser = std::make_unique( std::make_unique( diff --git a/src/v/cloud_storage/remote_segment.h b/src/v/cloud_storage/remote_segment.h index fbb2ce439bac9..3c9a2b8ad4694 100644 --- a/src/v/cloud_storage/remote_segment.h +++ b/src/v/cloud_storage/remote_segment.h @@ -68,7 +68,16 @@ struct stream_with_leased_client : public ss::data_source_impl { co_return co_await stream.read(); } - ss::future<> close() override { co_return co_await stream.close(); } + ss::future<> close() override { + auto is_eof = stream.eof(); + co_await stream.close(); + if (!is_eof) { + vlog( + cst_log.info, + "stopping leased client because it did not consume stream"); + lease->client->shutdown(); + } + } lease_t lease; ss::input_stream stream; @@ -123,6 +132,7 @@ class remote_segment final { ss::input_stream stream; model::offset rp_offset; kafka::offset kafka_offset; + bool persistent{true}; }; /// create an input stream _sharing_ the underlying file handle /// starting at position @pos @@ -130,8 +140,7 @@ class remote_segment final { kafka::offset start, kafka::offset end, std::optional, - ss::io_priority_class, - bool skip_cache = false); + ss::io_priority_class); /// Hydrate the segment for segment meta version v2 or lower. For v3 or /// higher, only hydrate the index. If the index hydration fails, fall back @@ -346,6 +355,8 @@ class remote_segment_batch_reader final { friend class remote_segment_batch_consumer; public: + bool should_reset_parser{false}; + remote_segment_batch_reader( ss::lw_shared_ptr, const storage::log_reader_config& config,