diff --git a/src/v/cloud_storage/remote_segment.cc b/src/v/cloud_storage/remote_segment.cc index 141198832a028..09fd7071622fb 100644 --- a/src/v/cloud_storage/remote_segment.cc +++ b/src/v/cloud_storage/remote_segment.cc @@ -901,8 +901,6 @@ ss::future<> remote_segment::hydrate() { .discard_result(); } -ss::future<> remote_segment::hydrate_chunk( - chunk_start_offset_t start, std::optional end) { remote_segment::consume_stream::consume_stream( remote_segment& remote_segment, segment_chunk_range range) : _segment{remote_segment} @@ -920,20 +918,20 @@ ss::future remote_segment::consume_stream::operator()( } co_return size; } + +ss::future<> remote_segment::hydrate_chunk(segment_chunk_range range) { retry_chain_node rtc{ cache_hydration_timeout, cache_hydration_backoff, &_rtc}; - const auto space_required = end.value_or(_size - 1) - start + 1; + const auto start = range.first_offset(); + const auto end = range.last_offset().value_or(_size - 1); + + const auto space_required = end - start + 1; const auto reserved = co_await _cache.reserve_space(space_required); + auto consumer = consume_stream{*this, std::move(range)}; auto res = co_await _api.download_segment( - _bucket, - _path, - [this, start](auto size, auto stream) { - return put_chunk_in_cache(size, std::move(stream), start); - }, - rtc, - std::make_pair(start, end.value_or(_size - 1))); + _bucket, _path, std::move(consumer), rtc, std::make_pair(start, end)); if (res != download_result::success) { throw download_exception{res, _path}; } diff --git a/src/v/cloud_storage/remote_segment.h b/src/v/cloud_storage/remote_segment.h index 4cc575dbae99c..e8681df091e65 100644 --- a/src/v/cloud_storage/remote_segment.h +++ b/src/v/cloud_storage/remote_segment.h @@ -130,11 +130,10 @@ class remote_segment final { /// implementation, but the index is still required to be present first. ss::future<> hydrate(); - /// Hydrate a part of a segment, identified by the given start and end - /// offsets. If the end offset is std::nullopt, the last offset in the file - /// is used as the end offset. - ss::future<> hydrate_chunk( - chunk_start_offset_t start, std::optional end); + /// Hydrate a part of a segment, identified by the given range. The range + /// can contain data for multiple contiguous chunks, in which case multiple + /// files are written to cache. + ss::future<> hydrate_chunk(segment_chunk_range range); /// Loads the segment chunk file from cache into an open file handle. If the /// file is not present in cache, the returned file handle is unopened. diff --git a/src/v/cloud_storage/segment_chunk_api.cc b/src/v/cloud_storage/segment_chunk_api.cc index b39ae4de2c95c..707c90054bf6f 100644 --- a/src/v/cloud_storage/segment_chunk_api.cc +++ b/src/v/cloud_storage/segment_chunk_api.cc @@ -121,7 +121,10 @@ segment_chunks::do_hydrate_and_materialize(chunk_start_offset_t chunk_start) { chunk_end = next->first - 1; } - co_await _segment.hydrate_chunk(chunk_start, chunk_end); + // TODO (abhijat) - use property here! + size_t prefetch = 3; + co_await _segment.hydrate_chunk( + segment_chunk_range{_chunks, prefetch, chunk_start}); co_return co_await _segment.materialize_chunk(chunk_start); }