From fc9546927b3a9900bd3a3592b52a648e54dba3a8 Mon Sep 17 00:00:00 2001 From: Abhijat Malviya Date: Mon, 5 Jun 2023 13:24:05 +0530 Subject: [PATCH] cloud_storage: Make stream consumer into friend class, use better name --- src/v/cloud_storage/remote_segment.cc | 65 ++++++++++++++++----------- src/v/cloud_storage/remote_segment.h | 16 +------ 2 files changed, 40 insertions(+), 41 deletions(-) diff --git a/src/v/cloud_storage/remote_segment.cc b/src/v/cloud_storage/remote_segment.cc index bae4330fa02b..c446c5531d41 100644 --- a/src/v/cloud_storage/remote_segment.cc +++ b/src/v/cloud_storage/remote_segment.cc @@ -73,6 +73,41 @@ class bounded_stream final : public ss::data_source_impl { namespace cloud_storage { +class split_segment_into_chunk_range_consumer { +public: + split_segment_into_chunk_range_consumer( + cloud_storage::remote_segment& remote_segment, + cloud_storage::segment_chunk_range range) + : _segment{remote_segment} + , _range{std::move(range)} {} + + ss::future + operator()(uint64_t size, ss::input_stream stream) { + for (const auto [start, end] : _range) { + const auto bytes_to_read = end.value_or(_segment._size - 1) - start + + 1; + auto reservation = co_await _segment._cache.reserve_space( + bytes_to_read, 1); + vlog( + cst_log.trace, + "making stream from byte offset {} for {} bytes", + start, + bytes_to_read); + auto dsi = std::make_unique(stream, bytes_to_read); + auto stream_upto = ss::input_stream{ + ss::data_source{std::move(dsi)}}; + co_await _segment.put_chunk_in_cache( + reservation, std::move(stream_upto), start); + } + co_await stream.close(); + co_return size; + } + +private: + cloud_storage::remote_segment& _segment; + cloud_storage::segment_chunk_range _range; +}; + std::filesystem::path generate_index_path(const cloud_storage::remote_segment_path& p) { return fmt::format("{}.index", p().native()); @@ -430,8 +465,7 @@ ss::future remote_segment::put_segment_in_cache( co_return size_bytes; } -ss::future remote_segment::put_chunk_in_cache( - uint64_t size, +ss::future<> remote_segment::put_chunk_in_cache( space_reservation_guard& reservation, ss::input_stream stream, chunk_start_offset_t chunk_start) { @@ -447,8 +481,6 @@ ss::future remote_segment::put_chunk_in_cache( put_exception); std::rethrow_exception(put_exception); } - - co_return size; } ss::future<> remote_segment::do_hydrate_segment() { @@ -905,28 +937,6 @@ ss::future<> remote_segment::hydrate() { .discard_result(); } -remote_segment::consume_stream::consume_stream( - remote_segment& remote_segment, segment_chunk_range range) - : _segment{remote_segment} - , _range{std::move(range)} {} - -ss::future remote_segment::consume_stream::operator()( - uint64_t size, ss::input_stream stream) { - for (const auto [start, end] : _range) { - const auto space_required = end.value_or(_segment._size - 1) - start - + 1; - auto reservation = co_await _segment._cache.reserve_space( - space_required, 1); - auto dsi = std::make_unique( - stream, end.value_or(_segment._size)); - auto stream_upto = ss::input_stream{ - ss::data_source{std::move(dsi)}}; - co_await _segment.put_chunk_in_cache( - size, reservation, std::move(stream_upto), start); - } - co_return size; -} - ss::future<> remote_segment::hydrate_chunk(segment_chunk_range range) { const auto start = range.first_offset(); const auto path_to_start = get_path_to_chunk(start); @@ -954,7 +964,8 @@ ss::future<> remote_segment::hydrate_chunk(segment_chunk_range range) { cache_hydration_timeout, cache_hydration_backoff, &_rtc}; const auto end = range.last_offset().value_or(_size - 1); - auto consumer = consume_stream{*this, std::move(range)}; + auto consumer = split_segment_into_chunk_range_consumer{ + *this, std::move(range)}; auto res = co_await _api.download_segment( _bucket, _path, std::move(consumer), rtc, std::make_pair(start, end)); if (res != download_result::success) { diff --git a/src/v/cloud_storage/remote_segment.h b/src/v/cloud_storage/remote_segment.h index 174c9034c0d2..257c3f605601 100644 --- a/src/v/cloud_storage/remote_segment.h +++ b/src/v/cloud_storage/remote_segment.h @@ -213,8 +213,7 @@ class remote_segment final { /// Stores a segment chunk in cache. The chunk is stored in a path derived /// from the segment path: _chunks/chunk_start_file_offset. - ss::future put_chunk_in_cache( - uint64_t, + ss::future<> put_chunk_in_cache( space_reservation_guard&, ss::input_stream, chunk_start_offset_t chunk_start); @@ -303,18 +302,7 @@ class remote_segment final { std::optional _chunks_api; std::optional _coarse_index; - class consume_stream { - public: - consume_stream( - remote_segment& remote_segment, segment_chunk_range range); - - ss::future - operator()(uint64_t, ss::input_stream stream); - - private: - remote_segment& _segment; - segment_chunk_range _range; - }; + friend class split_segment_into_chunk_range_consumer; }; class remote_segment_batch_consumer;