Skip to content

Commit

Permalink
cloud_storage: Allow overriding prefetch size
Browse files Browse the repository at this point in the history
Allow overriding prefetch size per reader. The main purpose of this is
to allow disabling prefetch (by setting it to zero) for timequeries.
  • Loading branch information
abhijat committed Jun 29, 2023
1 parent c9cd373 commit c39d558
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 18 deletions.
5 changes: 4 additions & 1 deletion src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ remote_segment::offset_data_stream(
ss::gate::holder g(_gate);
co_await hydrate();
offset_index::find_result pos;
std::optional<uint16_t> prefetch_override = std::nullopt;
if (first_timestamp) {
// Time queries are linear search from front of the segment. The
// dominant cost of a time query on a remote partition is promoting
Expand All @@ -290,6 +291,7 @@ remote_segment::offset_data_stream(
.kaf_offset = _base_rp_offset - _base_offset_delta,
.file_pos = 0,
};
prefetch_override = 0;
} else {
pos = maybe_get_offsets(start).value_or(offset_index::find_result{
.rp_offset = _base_rp_offset,
Expand Down Expand Up @@ -320,7 +322,8 @@ remote_segment::offset_data_stream(
pos.kaf_offset,
end,
pos.file_pos,
std::move(options));
std::move(options),
prefetch_override);
data_stream = ss::input_stream<char>{
ss::data_source{std::move(chunk_ds)}};
}
Expand Down
21 changes: 12 additions & 9 deletions src/v/cloud_storage/segment_chunk_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ bool segment_chunks::downloads_in_progress() const {
});
}

ss::future<ss::file>
segment_chunks::do_hydrate_and_materialize(chunk_start_offset_t chunk_start) {
ss::future<ss::file> segment_chunks::do_hydrate_and_materialize(
chunk_start_offset_t chunk_start, std::optional<uint16_t> prefetch_override) {
gate_guard g{_gate};
vassert(_started, "chunk API is not started");

Expand All @@ -121,15 +121,15 @@ segment_chunks::do_hydrate_and_materialize(chunk_start_offset_t chunk_start) {
chunk_end = next->first - 1;
}

co_await _segment.hydrate_chunk(segment_chunk_range{
_chunks,
config::shard_local_cfg().cloud_storage_chunk_prefetch,
chunk_start});
const auto prefetch = prefetch_override.value_or(
config::shard_local_cfg().cloud_storage_chunk_prefetch);
co_await _segment.hydrate_chunk(
segment_chunk_range{_chunks, prefetch, chunk_start});
co_return co_await _segment.materialize_chunk(chunk_start);
}

ss::future<segment_chunk::handle_t>
segment_chunks::hydrate_chunk(chunk_start_offset_t chunk_start) {
ss::future<segment_chunk::handle_t> segment_chunks::hydrate_chunk(
chunk_start_offset_t chunk_start, std::optional<uint16_t> prefetch_override) {
gate_guard g{_gate};
vassert(_started, "chunk API is not started");

Expand Down Expand Up @@ -161,7 +161,8 @@ segment_chunks::hydrate_chunk(chunk_start_offset_t chunk_start) {
// Keep retrying if materialization fails.
bool done = false;
while (!done) {
auto handle = co_await do_hydrate_and_materialize(chunk_start);
auto handle = co_await do_hydrate_and_materialize(
chunk_start, prefetch_override);
if (handle) {
done = true;
chunk.handle = ss::make_lw_shared(std::move(handle));
Expand Down Expand Up @@ -433,6 +434,8 @@ segment_chunk_range::segment_chunk_range(
size_t prefetch,
chunk_start_offset_t start) {
auto it = chunks.find(start);
vassert(
it != chunks.end(), "failed to find {} in chunk start offsets", start);
auto n_it = std::next(it);

// We need one chunk which will be downloaded for the current read, plus the
Expand Down
10 changes: 6 additions & 4 deletions src/v/cloud_storage/segment_chunk_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ class segment_chunks {
// hydration. The waiters are managed per chunk in `segment_chunk::waiters`.
// The first reader to request hydration queues the download. The next
// readers are added to wait list.
ss::future<segment_chunk::handle_t>
hydrate_chunk(chunk_start_offset_t chunk_start);
ss::future<segment_chunk::handle_t> hydrate_chunk(
chunk_start_offset_t chunk_start,
std::optional<uint16_t> prefetch_override = std::nullopt);

// For all chunks between first and last, increment the
// required_by_readers_in_future value by one, and increment the
Expand Down Expand Up @@ -76,8 +77,9 @@ class segment_chunks {
// Attempts to download chunk into cache and return the file handle for
// segment_chunk. Should be retried if there is a failure due to cache
// eviction between download and opening the file handle.
ss::future<ss::file>
do_hydrate_and_materialize(chunk_start_offset_t chunk_start);
ss::future<ss::file> do_hydrate_and_materialize(
chunk_start_offset_t chunk_start,
std::optional<uint16_t> prefetch_override = std::nullopt);

// Periodically closes chunk file handles for the space to be reclaimable by
// cache eviction. The chunks are evicted when they are no longer opened for
Expand Down
9 changes: 6 additions & 3 deletions src/v/cloud_storage/segment_chunk_data_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ chunk_data_source_impl::chunk_data_source_impl(
kafka::offset start,
kafka::offset end,
int64_t begin_stream_at,
ss::file_input_stream_options stream_options)
ss::file_input_stream_options stream_options,
std::optional<uint16_t> prefetch_override)
: _chunks(chunks)
, _segment(segment)
, _first_chunk_start(_segment.get_chunk_start_for_kafka_offset(start))
Expand All @@ -31,7 +32,8 @@ chunk_data_source_impl::chunk_data_source_impl(
, _current_chunk_start(_first_chunk_start)
, _stream_options(std::move(stream_options))
, _rtc{_as}
, _ctxlog{cst_log, _rtc, _segment.get_segment_path()().native()} {
, _ctxlog{cst_log, _rtc, _segment.get_segment_path()().native()}
, _prefetch_override{prefetch_override} {
vlog(
_ctxlog.trace,
"chunk data source initialized with file position {} to {}",
Expand Down Expand Up @@ -73,7 +75,8 @@ ss::future<ss::temporary_buffer<char>> chunk_data_source_impl::get() {
ss::future<>
chunk_data_source_impl::load_chunk_handle(chunk_start_offset_t chunk_start) {
try {
_current_data_file = co_await _chunks.hydrate_chunk(chunk_start);
_current_data_file = co_await _chunks.hydrate_chunk(
chunk_start, _prefetch_override);
} catch (const ss::abort_requested_exception& ex) {
throw;
} catch (const ss::gate_closed_exception& ex) {
Expand Down
4 changes: 3 additions & 1 deletion src/v/cloud_storage/segment_chunk_data_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class chunk_data_source_impl final : public ss::data_source_impl {
kafka::offset start,
kafka::offset end,
int64_t begin_stream_at,
ss::file_input_stream_options stream_options);
ss::file_input_stream_options stream_options,
std::optional<uint16_t> prefetch_override = std::nullopt);

chunk_data_source_impl(const chunk_data_source_impl&) = delete;
chunk_data_source_impl& operator=(const chunk_data_source_impl&) = delete;
Expand Down Expand Up @@ -68,6 +69,7 @@ class chunk_data_source_impl final : public ss::data_source_impl {
ss::abort_source _as;
retry_chain_node _rtc;
retry_chain_logger _ctxlog;
std::optional<uint16_t> _prefetch_override;
};

} // namespace cloud_storage

0 comments on commit c39d558

Please sign in to comment.