Skip to content

Commit

Permalink
cloud_storage: Use chunk range when hydrating
Browse files Browse the repository at this point in the history
This change wires up the chunk range and the bounded stream and consumer
to hydrate a range of chunks instead of one chunk at a time.
  • Loading branch information
abhijat committed May 23, 2023
1 parent 39bb240 commit 227bde3
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 16 deletions.
18 changes: 8 additions & 10 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -901,8 +901,6 @@ ss::future<> remote_segment::hydrate() {
.discard_result();
}

ss::future<> remote_segment::hydrate_chunk(
chunk_start_offset_t start, std::optional<chunk_start_offset_t> end) {
remote_segment::consume_stream::consume_stream(
remote_segment& remote_segment, segment_chunk_range range)
: _segment{remote_segment}
Expand All @@ -920,20 +918,20 @@ ss::future<uint64_t> remote_segment::consume_stream::operator()(
}
co_return size;
}

ss::future<> remote_segment::hydrate_chunk(segment_chunk_range range) {
retry_chain_node rtc{
cache_hydration_timeout, cache_hydration_backoff, &_rtc};

const auto space_required = end.value_or(_size - 1) - start + 1;
const auto start = range.first_offset();
const auto end = range.last_offset().value_or(_size - 1);

const auto space_required = end - start + 1;
const auto reserved = co_await _cache.reserve_space(space_required);

auto consumer = consume_stream{*this, std::move(range)};
auto res = co_await _api.download_segment(
_bucket,
_path,
[this, start](auto size, auto stream) {
return put_chunk_in_cache(size, std::move(stream), start);
},
rtc,
std::make_pair(start, end.value_or(_size - 1)));
_bucket, _path, std::move(consumer), rtc, std::make_pair(start, end));
if (res != download_result::success) {
throw download_exception{res, _path};
}
Expand Down
9 changes: 4 additions & 5 deletions src/v/cloud_storage/remote_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,10 @@ class remote_segment final {
/// implementation, but the index is still required to be present first.
ss::future<> hydrate();

/// Hydrate a part of a segment, identified by the given start and end
/// offsets. If the end offset is std::nullopt, the last offset in the file
/// is used as the end offset.
ss::future<> hydrate_chunk(
chunk_start_offset_t start, std::optional<chunk_start_offset_t> end);
/// Hydrate a part of a segment, identified by the given range. The range
/// can contain data for multiple contiguous chunks, in which case multiple
/// files are written to cache.
ss::future<> hydrate_chunk(segment_chunk_range range);

/// Loads the segment chunk file from cache into an open file handle. If the
/// file is not present in cache, the returned file handle is unopened.
Expand Down
5 changes: 4 additions & 1 deletion src/v/cloud_storage/segment_chunk_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ segment_chunks::do_hydrate_and_materialize(chunk_start_offset_t chunk_start) {
chunk_end = next->first - 1;
}

co_await _segment.hydrate_chunk(chunk_start, chunk_end);
// TODO (abhijat) - use property here!
size_t prefetch = 3;
co_await _segment.hydrate_chunk(
segment_chunk_range{_chunks, prefetch, chunk_start});
co_return co_await _segment.materialize_chunk(chunk_start);
}

Expand Down

0 comments on commit 227bde3

Please sign in to comment.