Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cloud_storage: enable prefetching chunks #10950

Merged
merged 8 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 91 additions & 28 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,63 @@

#include <exception>

namespace {
class bounded_stream final : public ss::data_source_impl {
public:
bounded_stream(ss::input_stream<char>& stream, size_t upto)
: _stream{stream}
, _upto{upto} {}

ss::future<ss::temporary_buffer<char>> get() override {
auto buf = co_await _stream.read_up_to(_upto);
_upto -= buf.size();
co_return buf;
}

private:
ss::input_stream<char>& _stream;
size_t _upto;
};

} // namespace

namespace cloud_storage {

class split_segment_into_chunk_range_consumer {
public:
split_segment_into_chunk_range_consumer(
cloud_storage::remote_segment& remote_segment,
cloud_storage::segment_chunk_range range)
: _segment{remote_segment}
, _range{std::move(range)} {}

ss::future<uint64_t>
operator()(uint64_t size, ss::input_stream<char> stream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the stream is not closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

for (const auto [start, end] : _range) {
const auto bytes_to_read = end.value_or(_segment._size - 1) - start
+ 1;
auto reservation = co_await _segment._cache.reserve_space(
bytes_to_read, 1);
vlog(
cst_log.trace,
"making stream from byte offset {} for {} bytes",
start,
bytes_to_read);
auto dsi = std::make_unique<bounded_stream>(stream, bytes_to_read);
auto stream_upto = ss::input_stream<char>{
ss::data_source{std::move(dsi)}};
co_await _segment.put_chunk_in_cache(
reservation, std::move(stream_upto), start);
}
co_await stream.close();
co_return size;
}

private:
cloud_storage::remote_segment& _segment;
cloud_storage::segment_chunk_range _range;
};

std::filesystem::path
generate_index_path(const cloud_storage::remote_segment_path& p) {
return fmt::format("{}.index", p().native());
Expand Down Expand Up @@ -258,6 +313,7 @@ remote_segment::offset_data_stream(
ss::gate::holder g(_gate);
co_await hydrate();
offset_index::find_result pos;
std::optional<uint16_t> prefetch_override = std::nullopt;
if (first_timestamp) {
// Time queries are linear search from front of the segment. The
// dominant cost of a time query on a remote partition is promoting
Expand All @@ -270,6 +326,7 @@ remote_segment::offset_data_stream(
.kaf_offset = _base_rp_offset - _base_offset_delta,
.file_pos = 0,
};
prefetch_override = 0;
} else {
pos = maybe_get_offsets(start).value_or(offset_index::find_result{
.rp_offset = _base_rp_offset,
Expand Down Expand Up @@ -300,7 +357,8 @@ remote_segment::offset_data_stream(
pos.kaf_offset,
end,
pos.file_pos,
std::move(options));
std::move(options),
prefetch_override);
data_stream = ss::input_stream<char>{
ss::data_source{std::move(chunk_ds)}};
}
Expand Down Expand Up @@ -407,8 +465,7 @@ ss::future<uint64_t> remote_segment::put_segment_in_cache(
co_return size_bytes;
}

ss::future<uint64_t> remote_segment::put_chunk_in_cache(
uint64_t size,
ss::future<> remote_segment::put_chunk_in_cache(
space_reservation_guard& reservation,
ss::input_stream<char> stream,
chunk_start_offset_t chunk_start) {
Expand All @@ -424,8 +481,6 @@ ss::future<uint64_t> remote_segment::put_chunk_in_cache(
put_exception);
std::rethrow_exception(put_exception);
}

co_return size;
}

ss::future<> remote_segment::do_hydrate_segment() {
Expand All @@ -440,13 +495,11 @@ ss::future<> remote_segment::do_hydrate_segment() {
_bucket,
_path,
[this, &reservation](uint64_t size_bytes, ss::input_stream<char> s) {
if (is_legacy_mode_engaged()) {
return put_segment_in_cache_and_create_index(
size_bytes, reservation, std::move(s));
} else {
return put_segment_in_cache(
size_bytes, reservation, std::move(s));
}
// Always create the index because we are in legacy mode if we ended
// up hydrating the segment. Legacy mode indicates a missing index, so
// we create it here on the fly using the downloaded segment.
return put_segment_in_cache_and_create_index(
size_bytes, reservation, std::move(s));
},
local_rtc);

Expand Down Expand Up @@ -882,27 +935,37 @@ ss::future<> remote_segment::hydrate() {
.discard_result();
}

ss::future<> remote_segment::hydrate_chunk(
chunk_start_offset_t start, std::optional<chunk_start_offset_t> end) {
retry_chain_node rtc{
cache_hydration_timeout, cache_hydration_backoff, &_rtc};

auto cache_status = co_await _cache.is_cached(get_path_to_chunk(start));
if (cache_status == cache_element_status::available) {
ss::future<> remote_segment::hydrate_chunk(segment_chunk_range range) {
const auto start = range.first_offset();
const auto path_to_start = get_path_to_chunk(start);

// It is possible that the chunk has already been downloaded during a
// prefetch operation. In this case we skip hydration and try to materialize
// the chunk. This also skips the prefetch of the successive chunks. So
// given a series of chunks A, B, C, D, E and a prefetch of 2, when A is
// fetched B,C are also fetched. Then hydration of B,C are no-ops and no
// prefetch is done during those no-ops. When D is fetched, hydration
// makes an HTTP GET call and E is also prefetched. So a total of two calls
// are made for the five chunks (ignoring any cache evictions during the
// process).
if (const auto status = co_await _cache.is_cached(path_to_start);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each is_cached call maps to an access sys call. We should be able to answer these queries by using the access time tracker in cache_service. I'll try to include this in #10855.

status == cache_element_status::available) {
vlog(
_ctxlog.debug,
"skipping chunk hydration for chunk path {}, it is already in "
"cache",
path_to_start);
co_return;
}

const auto space_required = end.value_or(_size - 1) - start + 1;
auto reserved = co_await _cache.reserve_space(space_required, 1);
retry_chain_node rtc{
cache_hydration_timeout, cache_hydration_backoff, &_rtc};

const auto end = range.last_offset().value_or(_size - 1);
auto consumer = split_segment_into_chunk_range_consumer{
*this, std::move(range)};
auto res = co_await _api.download_segment(
_bucket,
_path,
[this, start, &reserved](auto size, auto stream) {
return put_chunk_in_cache(size, reserved, std::move(stream), start);
},
rtc,
std::make_pair(start, end.value_or(_size - 1)));
_bucket, _path, std::move(consumer), rtc, std::make_pair(start, end));
if (res != download_result::success) {
throw download_exception{res, _path};
}
Expand Down
14 changes: 7 additions & 7 deletions src/v/cloud_storage/remote_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,10 @@ class remote_segment final {
/// implementation, but the index is still required to be present first.
ss::future<> hydrate();

/// Hydrate a part of a segment, identified by the given start and end
/// offsets. If the end offset is std::nullopt, the last offset in the file
/// is used as the end offset.
ss::future<> hydrate_chunk(
chunk_start_offset_t start, std::optional<chunk_start_offset_t> end);
/// Hydrate a part of a segment, identified by the given range. The range
/// can contain data for multiple contiguous chunks, in which case multiple
/// files are written to cache.
ss::future<> hydrate_chunk(segment_chunk_range range);

/// Loads the segment chunk file from cache into an open file handle. If the
/// file is not present in cache, the returned file handle is unopened.
Expand Down Expand Up @@ -214,8 +213,7 @@ class remote_segment final {

/// Stores a segment chunk in cache. The chunk is stored in a path derived
/// from the segment path: <segment_path>_chunks/chunk_start_file_offset.
ss::future<uint64_t> put_chunk_in_cache(
uint64_t,
ss::future<> put_chunk_in_cache(
space_reservation_guard&,
ss::input_stream<char>,
chunk_start_offset_t chunk_start);
Expand Down Expand Up @@ -303,6 +301,8 @@ class remote_segment final {

std::optional<segment_chunks> _chunks_api;
std::optional<offset_index::coarse_index_t> _coarse_index;

friend class split_segment_into_chunk_range_consumer;
};

class remote_segment_batch_consumer;
Expand Down
73 changes: 67 additions & 6 deletions src/v/cloud_storage/segment_chunk_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ bool segment_chunks::downloads_in_progress() const {
});
}

ss::future<ss::file>
segment_chunks::do_hydrate_and_materialize(chunk_start_offset_t chunk_start) {
ss::future<ss::file> segment_chunks::do_hydrate_and_materialize(
chunk_start_offset_t chunk_start, std::optional<uint16_t> prefetch_override) {
gate_guard g{_gate};
vassert(_started, "chunk API is not started");

Expand All @@ -121,12 +121,15 @@ segment_chunks::do_hydrate_and_materialize(chunk_start_offset_t chunk_start) {
chunk_end = next->first - 1;
}

co_await _segment.hydrate_chunk(chunk_start, chunk_end);
const auto prefetch = prefetch_override.value_or(
config::shard_local_cfg().cloud_storage_chunk_prefetch);
co_await _segment.hydrate_chunk(
segment_chunk_range{_chunks, prefetch, chunk_start});
co_return co_await _segment.materialize_chunk(chunk_start);
}

ss::future<segment_chunk::handle_t>
segment_chunks::hydrate_chunk(chunk_start_offset_t chunk_start) {
ss::future<segment_chunk::handle_t> segment_chunks::hydrate_chunk(
chunk_start_offset_t chunk_start, std::optional<uint16_t> prefetch_override) {
gate_guard g{_gate};
vassert(_started, "chunk API is not started");

Expand Down Expand Up @@ -158,7 +161,8 @@ segment_chunks::hydrate_chunk(chunk_start_offset_t chunk_start) {
// Keep retrying if materialization fails.
bool done = false;
while (!done) {
auto handle = co_await do_hydrate_and_materialize(chunk_start);
auto handle = co_await do_hydrate_and_materialize(
chunk_start, prefetch_override);
if (handle) {
done = true;
chunk.handle = ss::make_lw_shared(std::move(handle));
Expand Down Expand Up @@ -425,4 +429,61 @@ std::unique_ptr<chunk_eviction_strategy> make_eviction_strategy(
}
}

segment_chunk_range::segment_chunk_range(
const segment_chunks::chunk_map_t& chunks,
size_t prefetch,
chunk_start_offset_t start) {
auto it = chunks.find(start);
VladLazar marked this conversation as resolved.
Show resolved Hide resolved
vassert(
it != chunks.end(), "failed to find {} in chunk start offsets", start);
auto n_it = std::next(it);

// We need one chunk which will be downloaded for the current read, plus the
// prefetch count
size_t num_chunks_required = prefetch + 1;

// Collects start and end file offsets to be hydrated for the given
// prefetch by iterating over adjacent chunk start offsets. The chunk map
// does not contain end offsets, so for a given chunk start offset in the
// map, the corresponding end of chunk is the next entry in the map minus
// one.
for (size_t i = 0; i < num_chunks_required && it != chunks.end(); ++i) {
auto start = it->first;

// The last entry in the chunk map always represents data upto the
// end of segment. A nullopt here is a signal to
// split_segment_into_chunk_range_consumer (which does have access to
// the segment size) to use the segment size as the end of the byte
// range.
std::optional<chunk_start_offset_t> end = std::nullopt;
if (n_it != chunks.end()) {
end = n_it->first - 1;
}

_chunks[start] = end;
if (n_it == chunks.end()) {
break;
}
it++;
n_it++;
}
}

std::optional<chunk_start_offset_t> segment_chunk_range::last_offset() const {
auto it = _chunks.end();
return std::prev(it)->second;
}

chunk_start_offset_t segment_chunk_range::first_offset() const {
return _chunks.begin()->first;
}

segment_chunk_range::map_t::iterator segment_chunk_range::begin() {
return _chunks.begin();
}

segment_chunk_range::map_t::iterator segment_chunk_range::end() {
return _chunks.end();
}

} // namespace cloud_storage
30 changes: 26 additions & 4 deletions src/v/cloud_storage/segment_chunk_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ class segment_chunks {
// hydration. The waiters are managed per chunk in `segment_chunk::waiters`.
// The first reader to request hydration queues the download. The next
// readers are added to wait list.
ss::future<segment_chunk::handle_t>
hydrate_chunk(chunk_start_offset_t chunk_start);
ss::future<segment_chunk::handle_t> hydrate_chunk(
chunk_start_offset_t chunk_start,
std::optional<uint16_t> prefetch_override = std::nullopt);

// For all chunks between first and last, increment the
// required_by_readers_in_future value by one, and increment the
Expand Down Expand Up @@ -76,8 +77,9 @@ class segment_chunks {
// Attempts to download chunk into cache and return the file handle for
// segment_chunk. Should be retried if there is a failure due to cache
// eviction between download and opening the file handle.
ss::future<ss::file>
do_hydrate_and_materialize(chunk_start_offset_t chunk_start);
ss::future<ss::file> do_hydrate_and_materialize(
chunk_start_offset_t chunk_start,
std::optional<uint16_t> prefetch_override = std::nullopt);

// Periodically closes chunk file handles for the space to be reclaimable by
// cache eviction. The chunks are evicted when they are no longer opened for
Expand Down Expand Up @@ -167,4 +169,24 @@ std::unique_ptr<chunk_eviction_strategy> make_eviction_strategy(
uint64_t max_chunks,
uint64_t hydrated_chunks);

class segment_chunk_range {
public:
Comment on lines +172 to +173
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something, but given the public API of this, could this be a deque of pairs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean for storing the data inside this class (instead of a map), or for replacing this class entirely?

If the former, then probably yes it can be a deque of pairs and since we only ever iterate over it and never lookup keys themselves, it should be marginally faster, but I don't see that as enough of a speedup to change it (traversal through a tree should still be pretty fast).

If the latter, this class provides some convenience methods for calculating the bounds of the range to decide how much space to reserve in cache etc, it could maybe be done by free functions accepting the deque of pairs but I prefer a class.

using map_t = absl::
btree_map<chunk_start_offset_t, std::optional<chunk_start_offset_t>>;

segment_chunk_range(
const segment_chunks::chunk_map_t& chunks,
size_t prefetch,
chunk_start_offset_t start);

std::optional<chunk_start_offset_t> last_offset() const;
chunk_start_offset_t first_offset() const;

map_t::iterator begin();
map_t::iterator end();

private:
map_t _chunks;
};

} // namespace cloud_storage
9 changes: 6 additions & 3 deletions src/v/cloud_storage/segment_chunk_data_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ chunk_data_source_impl::chunk_data_source_impl(
kafka::offset start,
kafka::offset end,
int64_t begin_stream_at,
ss::file_input_stream_options stream_options)
ss::file_input_stream_options stream_options,
std::optional<uint16_t> prefetch_override)
: _chunks(chunks)
, _segment(segment)
, _first_chunk_start(_segment.get_chunk_start_for_kafka_offset(start))
Expand All @@ -31,7 +32,8 @@ chunk_data_source_impl::chunk_data_source_impl(
, _current_chunk_start(_first_chunk_start)
, _stream_options(std::move(stream_options))
, _rtc{_as}
, _ctxlog{cst_log, _rtc, _segment.get_segment_path()().native()} {
, _ctxlog{cst_log, _rtc, _segment.get_segment_path()().native()}
, _prefetch_override{prefetch_override} {
vlog(
_ctxlog.trace,
"chunk data source initialized with file position {} to {}",
Expand Down Expand Up @@ -73,7 +75,8 @@ ss::future<ss::temporary_buffer<char>> chunk_data_source_impl::get() {
ss::future<>
chunk_data_source_impl::load_chunk_handle(chunk_start_offset_t chunk_start) {
try {
_current_data_file = co_await _chunks.hydrate_chunk(chunk_start);
_current_data_file = co_await _chunks.hydrate_chunk(
chunk_start, _prefetch_override);
} catch (const ss::abort_requested_exception& ex) {
throw;
} catch (const ss::gate_closed_exception& ex) {
Expand Down
Loading