Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cloud_storage: use remote index in cloud timequery #13011

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/v/cloud_storage/read_path_probes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ ts_read_path_probe::ts_read_path_probe() {
"Spillover manifest materialization latency histogram"))
.aggregate(aggregate_labels),

sm::make_counter(
"chunks_hydrated",
[this] { return _chunks_hydrated; },
sm::description("Total number of hydrated chunks (some may have been "
"evicted from the cache)"))
.aggregate(aggregate_labels),

sm::make_histogram(
"chunk_hydration_latency",
[this] {
Expand Down
3 changes: 3 additions & 0 deletions src/v/cloud_storage/read_path_probes.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class ts_read_path_probe {
return _spillover_mat_latency.auto_measure();
}

void on_chunks_hydration(size_t num) { _chunks_hydrated += num; }

auto chunk_hydration_latency() {
return _chunk_hydration_latency.auto_measure();
}
Expand All @@ -84,6 +86,7 @@ class ts_read_path_probe {
/// Spillover manifest materialization latency
hist_t _spillover_mat_latency;

size_t _chunks_hydrated = 0;
hist_t _chunk_hydration_latency;

ssx::metrics::metric_groups _metrics
Expand Down
66 changes: 48 additions & 18 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,28 +312,32 @@ remote_segment::offset_data_stream(
vlog(_ctxlog.debug, "remote segment file input stream at offset {}", start);
ss::gate::holder g(_gate);
co_await hydrate();
offset_index::find_result pos;

std::optional<offset_index::find_result> indexed_pos;
std::optional<uint16_t> prefetch_override = std::nullopt;

// Perform index lookup by timestamp or offset. This reduces the number
// of hydrated chunks required to serve the request.
if (first_timestamp) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it odd that first_timestamp wasn't used in this conditional before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first_timestamp indicates that this is a time-query. By this point, we have already resolved to the correct segment for the time query. Since we didn't previously used the index, it makes senes that first_timestamp wasn't used (we'd just start from the beginning of the segment).

// Time queries are linear search from front of the segment. The
// dominant cost of a time query on a remote partition is promoting
// the segment into our local cache: once it's here, the cost of
// a scan is comparatively small. For workloads that do many time
// queries in close proximity on the same partition, an additional
// index could be added here, for hydrated segments.
pos = {
.rp_offset = _base_rp_offset,
.kaf_offset = _base_rp_offset - _base_offset_delta,
.file_pos = 0,
};
// The dominant cost of a time query on a remote partition is promoting
// the chunks into our local cache: once they're here, the cost of a
// scan is comparatively small.

prefetch_override = 0;
indexed_pos = maybe_get_offsets(*first_timestamp);
} else {
pos = maybe_get_offsets(start).value_or(offset_index::find_result{
.rp_offset = _base_rp_offset,
.kaf_offset = _base_rp_offset - _base_offset_delta,
.file_pos = 0,
});
indexed_pos = maybe_get_offsets(start);
}

// If the index lookup failed, scan the entire segement starting from the
// first chunk.
offset_index::find_result pos = indexed_pos.value_or(
offset_index::find_result{
.rp_offset = _base_rp_offset,
.kaf_offset = _base_rp_offset - _base_offset_delta,
.file_pos = 0,
});

vlog(
_ctxlog.debug,
"Offset data stream start reading at {}, log offset {}, delta {}",
Expand Down Expand Up @@ -381,7 +385,8 @@ remote_segment::maybe_get_offsets(kafka::offset kafka_offset) {
}
vlog(
_ctxlog.debug,
"Using index to locate {}, the result is rp-offset: {}, kafka-offset: "
"Using index to locate Kafka offset {}, the result is rp-offset: {}, "
"kafka-offset: "
"{}, file-pos: {}",
kafka_offset,
pos->rp_offset,
Expand All @@ -390,6 +395,27 @@ remote_segment::maybe_get_offsets(kafka::offset kafka_offset) {
return pos;
}

std::optional<offset_index::find_result>
remote_segment::maybe_get_offsets(model::timestamp ts) {
if (!_index) {
return {};
}
auto pos = _index->find_timestamp(ts);
if (!pos) {
return {};
}
vlog(
_ctxlog.debug,
"Using index to locate timestamp {}, the result is rp-offset: {}, "
"kafka-offset: "
"{}, file-pos: {}",
ts,
pos->rp_offset,
pos->kaf_offset,
pos->file_pos);
return pos;
}

/**
* Called by do_hydrate_segment on the stream the S3 remote creates for
* a GET response: pass the dat through into the cache.
Expand Down Expand Up @@ -944,6 +970,8 @@ ss::future<> remote_segment::hydrate_chunk(segment_chunk_range range) {
retry_chain_node rtc{
cache_hydration_timeout, cache_hydration_backoff, &_rtc};

const auto chunk_count = range.chunk_count();

const auto end = range.last_offset().value_or(_size - 1);
auto consumer = split_segment_into_chunk_range_consumer{
*this, std::move(range)};
Expand All @@ -955,6 +983,8 @@ ss::future<> remote_segment::hydrate_chunk(segment_chunk_range range) {
measurement->cancel();
throw download_exception{res, _path};
}

_ts_probe.on_chunks_hydration(chunk_count);
}

ss::future<ss::file>
Expand Down
5 changes: 5 additions & 0 deletions src/v/cloud_storage/remote_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ class remote_segment final {
std::optional<offset_index::find_result>
maybe_get_offsets(kafka::offset kafka_offset);

/// get a file offset for the corresponding to the timestamp
/// if the index is available
std::optional<offset_index::find_result>
maybe_get_offsets(model::timestamp);

/// Sets the results of the waiters of this segment as the given error.
void set_waiter_errors(const std::exception_ptr& err);

Expand Down
151 changes: 96 additions & 55 deletions src/v/cloud_storage/remote_segment_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,73 +115,114 @@ std::

std::optional<offset_index::find_result>
offset_index::find_rp_offset(model::offset upper_bound) {
size_t ix = 0;
find_result res{};

auto search_result = maybe_find_offset(upper_bound, _rp_index, _rp_offsets);

if (std::holds_alternative<std::monostate>(search_result)) {
return std::nullopt;
} else if (std::holds_alternative<find_result>(search_result)) {
return std::get<find_result>(search_result);
}
auto maybe_ix = std::get<index_value>(search_result);

// Invariant: maybe_ix here can't be nullopt
ix = maybe_ix.ix;
res.rp_offset = model::offset(maybe_ix.value);

decoder_t kaf_dec(
_kaf_index.get_initial_value(),
_kaf_index.get_row_count(),
_kaf_index.copy());
auto kaf_offset = _fetch_ix(std::move(kaf_dec), ix);
vassert(kaf_offset.has_value(), "Inconsistent index state");
res.kaf_offset = kafka::offset(*kaf_offset);
foffset_decoder_t file_dec(
_file_index.get_initial_value(),
_file_index.get_row_count(),
_file_index.copy(),
delta_delta_t(_min_file_pos_step));
auto file_pos = _fetch_ix(std::move(file_dec), ix);
res.file_pos = *file_pos;
return res;
return ss::visit(
search_result,
[](std::monostate) -> std::optional<find_result> { return std::nullopt; },
[](find_result result) -> std::optional<find_result> { return result; },
[this](index_value index_result) -> std::optional<find_result> {
find_result res{};

size_t ix = index_result.ix;
res.rp_offset = model::offset(index_result.value);

decoder_t kaf_dec(
_kaf_index.get_initial_value(),
_kaf_index.get_row_count(),
_kaf_index.copy());
auto kaf_offset = _fetch_ix(std::move(kaf_dec), ix);
vassert(kaf_offset.has_value(), "Inconsistent index state");
res.kaf_offset = kafka::offset(*kaf_offset);
foffset_decoder_t file_dec(
_file_index.get_initial_value(),
_file_index.get_row_count(),
_file_index.copy(),
delta_delta_t(_min_file_pos_step));
auto file_pos = _fetch_ix(std::move(file_dec), ix);
res.file_pos = *file_pos;
return res;
});
}

std::optional<offset_index::find_result>
offset_index::find_kaf_offset(kafka::offset upper_bound) {
size_t ix = 0;
find_result res{};

auto search_result = maybe_find_offset(
upper_bound, _kaf_index, _kaf_offsets);
return ss::visit(
search_result,
[](std::monostate) -> std::optional<find_result> { return std::nullopt; },
[](find_result result) -> std::optional<find_result> { return result; },
[this](index_value index_result) -> std::optional<find_result> {
find_result res{};

size_t ix = index_result.ix;
res.kaf_offset = kafka::offset(index_result.value);

decoder_t rp_dec(
_rp_index.get_initial_value(),
_rp_index.get_row_count(),
_rp_index.copy());
auto rp_offset = _fetch_ix(std::move(rp_dec), ix);
vassert(rp_offset.has_value(), "Inconsistent index state");
res.rp_offset = model::offset(*rp_offset);
foffset_decoder_t file_dec(
_file_index.get_initial_value(),
_file_index.get_row_count(),
_file_index.copy(),
delta_delta_t(_min_file_pos_step));
auto file_pos = _fetch_ix(std::move(file_dec), ix);
res.file_pos = *file_pos;
return res;
});
}

if (std::holds_alternative<std::monostate>(search_result)) {
std::optional<offset_index::find_result>
offset_index::find_timestamp(model::timestamp upper_bound) {
if (_initial_time == model::timestamp::missing()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bummer that -1 is used to indicate missing timestamp. That -1 carries so much additional baggage in Kafka for example by indicating something specific in time queries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's not ideal. Not much we can do about it now though. The -1 is already encoded in a released Serde version.

// Bail out early if this is a version 1 index
// that does not index timestamps.
return std::nullopt;
} else if (std::holds_alternative<find_result>(search_result)) {
return std::get<find_result>(search_result);
}
auto maybe_ix = std::get<index_value>(search_result);

// Invariant: maybe_ix here can't be nullopt
ix = maybe_ix.ix;
res.kaf_offset = kafka::offset(maybe_ix.value);

decoder_t rp_dec(
_rp_index.get_initial_value(),
_rp_index.get_row_count(),
_rp_index.copy());
auto rp_offset = _fetch_ix(std::move(rp_dec), ix);
vassert(rp_offset.has_value(), "Inconsistent index state");
res.rp_offset = model::offset(*rp_offset);
foffset_decoder_t file_dec(
_file_index.get_initial_value(),
_file_index.get_row_count(),
_file_index.copy(),
delta_delta_t(_min_file_pos_step));
auto file_pos = _fetch_ix(std::move(file_dec), ix);
res.file_pos = *file_pos;
return res;
auto search_result = maybe_find_offset(
upper_bound.value(), _time_index, _time_offsets);

return ss::visit(
search_result,
[](std::monostate) -> std::optional<find_result> { return std::nullopt; },
[](find_result result) -> std::optional<find_result> { return result; },
[this](index_value index_result) -> std::optional<find_result> {
size_t ix = index_result.ix;

// Decode all offset indices to build up the result.
decoder_t rp_dec(
_rp_index.get_initial_value(),
_rp_index.get_row_count(),
_rp_index.copy());
auto rp_offset = _fetch_ix(std::move(rp_dec), ix);
vassert(rp_offset.has_value(), "Inconsistent index state");

decoder_t kaf_dec(
_kaf_index.get_initial_value(),
_kaf_index.get_row_count(),
_kaf_index.copy());
auto kaf_offset = _fetch_ix(std::move(kaf_dec), ix);
vassert(kaf_offset.has_value(), "Inconsistent index state");

foffset_decoder_t file_dec(
_file_index.get_initial_value(),
_file_index.get_row_count(),
_file_index.copy(),
delta_delta_t(_min_file_pos_step));
auto file_pos = _fetch_ix(std::move(file_dec), ix);
vassert(file_pos.has_value(), "Inconsistent index state");

return offset_index::find_result{
.rp_offset = model::offset(*rp_offset),
.kaf_offset = kafka::offset(*kaf_offset),
.file_pos = *file_pos};
});
}

offset_index::coarse_index_t offset_index::build_coarse_index(
Expand Down
8 changes: 8 additions & 0 deletions src/v/cloud_storage/remote_segment_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ class offset_index {
/// returned.
std::optional<find_result> find_kaf_offset(kafka::offset upper_bound);

/// Find index entry which is strictly lower than the timestamp
///
/// The returned value has timestamp less than upper_bound.
/// If all elements are larger than 'upper_bound' nullopt is returned.
/// If all elements are smaller than 'upper_bound' the last value is
/// returned.
std::optional<find_result> find_timestamp(model::timestamp upper_bound);

/// Builds a coarse index mapping kafka offsets to file positions. The step
/// size is the resolution of the index. So given a step size of 16MiB, the
/// result contains mappings of kafka offset to file position from the index
Expand Down
2 changes: 2 additions & 0 deletions src/v/cloud_storage/segment_chunk_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ chunk_start_offset_t segment_chunk_range::first_offset() const {
return _chunks.begin()->first;
}

size_t segment_chunk_range::chunk_count() const { return _chunks.size(); }

segment_chunk_range::map_t::iterator segment_chunk_range::begin() {
return _chunks.begin();
}
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_storage/segment_chunk_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class segment_chunk_range {

std::optional<chunk_start_offset_t> last_offset() const;
chunk_start_offset_t first_offset() const;
size_t chunk_count() const;

map_t::iterator begin();
map_t::iterator end();
Expand Down
Loading