diff --git a/src/v/cloud_storage/remote_segment.cc b/src/v/cloud_storage/remote_segment.cc index 3b1fc7f844f15..bae4330fa02bc 100644 --- a/src/v/cloud_storage/remote_segment.cc +++ b/src/v/cloud_storage/remote_segment.cc @@ -278,6 +278,7 @@ remote_segment::offset_data_stream( ss::gate::holder g(_gate); co_await hydrate(); offset_index::find_result pos; + std::optional prefetch_override = std::nullopt; if (first_timestamp) { // Time queries are linear search from front of the segment. The // dominant cost of a time query on a remote partition is promoting @@ -290,6 +291,7 @@ remote_segment::offset_data_stream( .kaf_offset = _base_rp_offset - _base_offset_delta, .file_pos = 0, }; + prefetch_override = 0; } else { pos = maybe_get_offsets(start).value_or(offset_index::find_result{ .rp_offset = _base_rp_offset, @@ -320,7 +322,8 @@ remote_segment::offset_data_stream( pos.kaf_offset, end, pos.file_pos, - std::move(options)); + std::move(options), + prefetch_override); data_stream = ss::input_stream{ ss::data_source{std::move(chunk_ds)}}; } diff --git a/src/v/cloud_storage/segment_chunk_api.cc b/src/v/cloud_storage/segment_chunk_api.cc index 1a1080196f7ee..8f41c093b8eb7 100644 --- a/src/v/cloud_storage/segment_chunk_api.cc +++ b/src/v/cloud_storage/segment_chunk_api.cc @@ -110,8 +110,8 @@ bool segment_chunks::downloads_in_progress() const { }); } -ss::future -segment_chunks::do_hydrate_and_materialize(chunk_start_offset_t chunk_start) { +ss::future segment_chunks::do_hydrate_and_materialize( + chunk_start_offset_t chunk_start, std::optional prefetch_override) { gate_guard g{_gate}; vassert(_started, "chunk API is not started"); @@ -121,15 +121,15 @@ segment_chunks::do_hydrate_and_materialize(chunk_start_offset_t chunk_start) { chunk_end = next->first - 1; } - co_await _segment.hydrate_chunk(segment_chunk_range{ - _chunks, - config::shard_local_cfg().cloud_storage_chunk_prefetch, - chunk_start}); + const auto prefetch = prefetch_override.value_or( + config::shard_local_cfg().cloud_storage_chunk_prefetch); + co_await _segment.hydrate_chunk( + segment_chunk_range{_chunks, prefetch, chunk_start}); co_return co_await _segment.materialize_chunk(chunk_start); } -ss::future -segment_chunks::hydrate_chunk(chunk_start_offset_t chunk_start) { +ss::future segment_chunks::hydrate_chunk( + chunk_start_offset_t chunk_start, std::optional prefetch_override) { gate_guard g{_gate}; vassert(_started, "chunk API is not started"); @@ -161,7 +161,8 @@ segment_chunks::hydrate_chunk(chunk_start_offset_t chunk_start) { // Keep retrying if materialization fails. bool done = false; while (!done) { - auto handle = co_await do_hydrate_and_materialize(chunk_start); + auto handle = co_await do_hydrate_and_materialize( + chunk_start, prefetch_override); if (handle) { done = true; chunk.handle = ss::make_lw_shared(std::move(handle)); @@ -433,6 +434,8 @@ segment_chunk_range::segment_chunk_range( size_t prefetch, chunk_start_offset_t start) { auto it = chunks.find(start); + vassert( + it != chunks.end(), "failed to find {} in chunk start offsets", start); auto n_it = std::next(it); // We need one chunk which will be downloaded for the current read, plus the diff --git a/src/v/cloud_storage/segment_chunk_api.h b/src/v/cloud_storage/segment_chunk_api.h index 311786813201e..5191ac21b4428 100644 --- a/src/v/cloud_storage/segment_chunk_api.h +++ b/src/v/cloud_storage/segment_chunk_api.h @@ -45,8 +45,9 @@ class segment_chunks { // hydration. The waiters are managed per chunk in `segment_chunk::waiters`. // The first reader to request hydration queues the download. The next // readers are added to wait list. - ss::future - hydrate_chunk(chunk_start_offset_t chunk_start); + ss::future hydrate_chunk( + chunk_start_offset_t chunk_start, + std::optional prefetch_override = std::nullopt); // For all chunks between first and last, increment the // required_by_readers_in_future value by one, and increment the @@ -76,8 +77,9 @@ class segment_chunks { // Attempts to download chunk into cache and return the file handle for // segment_chunk. Should be retried if there is a failure due to cache // eviction between download and opening the file handle. - ss::future - do_hydrate_and_materialize(chunk_start_offset_t chunk_start); + ss::future do_hydrate_and_materialize( + chunk_start_offset_t chunk_start, + std::optional prefetch_override = std::nullopt); // Periodically closes chunk file handles for the space to be reclaimable by // cache eviction. The chunks are evicted when they are no longer opened for diff --git a/src/v/cloud_storage/segment_chunk_data_source.cc b/src/v/cloud_storage/segment_chunk_data_source.cc index 8008b29a7951f..75b4cf7eb4fa7 100644 --- a/src/v/cloud_storage/segment_chunk_data_source.cc +++ b/src/v/cloud_storage/segment_chunk_data_source.cc @@ -22,7 +22,8 @@ chunk_data_source_impl::chunk_data_source_impl( kafka::offset start, kafka::offset end, int64_t begin_stream_at, - ss::file_input_stream_options stream_options) + ss::file_input_stream_options stream_options, + std::optional prefetch_override) : _chunks(chunks) , _segment(segment) , _first_chunk_start(_segment.get_chunk_start_for_kafka_offset(start)) @@ -31,7 +32,8 @@ chunk_data_source_impl::chunk_data_source_impl( , _current_chunk_start(_first_chunk_start) , _stream_options(std::move(stream_options)) , _rtc{_as} - , _ctxlog{cst_log, _rtc, _segment.get_segment_path()().native()} { + , _ctxlog{cst_log, _rtc, _segment.get_segment_path()().native()} + , _prefetch_override{prefetch_override} { vlog( _ctxlog.trace, "chunk data source initialized with file position {} to {}", @@ -73,7 +75,8 @@ ss::future> chunk_data_source_impl::get() { ss::future<> chunk_data_source_impl::load_chunk_handle(chunk_start_offset_t chunk_start) { try { - _current_data_file = co_await _chunks.hydrate_chunk(chunk_start); + _current_data_file = co_await _chunks.hydrate_chunk( + chunk_start, _prefetch_override); } catch (const ss::abort_requested_exception& ex) { throw; } catch (const ss::gate_closed_exception& ex) { diff --git a/src/v/cloud_storage/segment_chunk_data_source.h b/src/v/cloud_storage/segment_chunk_data_source.h index bb77a2592e157..fbe0e25347726 100644 --- a/src/v/cloud_storage/segment_chunk_data_source.h +++ b/src/v/cloud_storage/segment_chunk_data_source.h @@ -27,7 +27,8 @@ class chunk_data_source_impl final : public ss::data_source_impl { kafka::offset start, kafka::offset end, int64_t begin_stream_at, - ss::file_input_stream_options stream_options); + ss::file_input_stream_options stream_options, + std::optional prefetch_override = std::nullopt); chunk_data_source_impl(const chunk_data_source_impl&) = delete; chunk_data_source_impl& operator=(const chunk_data_source_impl&) = delete; @@ -68,6 +69,7 @@ class chunk_data_source_impl final : public ss::data_source_impl { ss::abort_source _as; retry_chain_node _rtc; retry_chain_logger _ctxlog; + std::optional _prefetch_override; }; } // namespace cloud_storage