Skip to content

Commit

Permalink
cloud_storage: visit index search result
Browse files Browse the repository at this point in the history
This commit refactors the handling of the index search result. If a new
result type is introduced, it's author will be reminded to handle it by
the assertions.
  • Loading branch information
Vlad Lazar committed Aug 29, 2023
1 parent 6d507cd commit de5aa53
Showing 1 changed file with 87 additions and 98 deletions.
185 changes: 87 additions & 98 deletions src/v/cloud_storage/remote_segment_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,73 +115,66 @@ std::

std::optional<offset_index::find_result>
offset_index::find_rp_offset(model::offset upper_bound) {
size_t ix = 0;
find_result res{};

auto search_result = maybe_find_offset(upper_bound, _rp_index, _rp_offsets);

if (std::holds_alternative<std::monostate>(search_result)) {
return std::nullopt;
} else if (std::holds_alternative<find_result>(search_result)) {
return std::get<find_result>(search_result);
}
auto maybe_ix = std::get<index_value>(search_result);

// Invariant: maybe_ix here can't be nullopt
ix = maybe_ix.ix;
res.rp_offset = model::offset(maybe_ix.value);

decoder_t kaf_dec(
_kaf_index.get_initial_value(),
_kaf_index.get_row_count(),
_kaf_index.copy());
auto kaf_offset = _fetch_ix(std::move(kaf_dec), ix);
vassert(kaf_offset.has_value(), "Inconsistent index state");
res.kaf_offset = kafka::offset(*kaf_offset);
foffset_decoder_t file_dec(
_file_index.get_initial_value(),
_file_index.get_row_count(),
_file_index.copy(),
delta_delta_t(_min_file_pos_step));
auto file_pos = _fetch_ix(std::move(file_dec), ix);
res.file_pos = *file_pos;
return res;
return ss::visit(
search_result,
[](std::monostate) -> std::optional<find_result> { return std::nullopt; },
[](find_result result) -> std::optional<find_result> { return result; },
[this](index_value index_result) -> std::optional<find_result> {
find_result res{};

size_t ix = index_result.ix;
res.rp_offset = model::offset(index_result.value);

decoder_t kaf_dec(
_kaf_index.get_initial_value(),
_kaf_index.get_row_count(),
_kaf_index.copy());
auto kaf_offset = _fetch_ix(std::move(kaf_dec), ix);
vassert(kaf_offset.has_value(), "Inconsistent index state");
res.kaf_offset = kafka::offset(*kaf_offset);
foffset_decoder_t file_dec(
_file_index.get_initial_value(),
_file_index.get_row_count(),
_file_index.copy(),
delta_delta_t(_min_file_pos_step));
auto file_pos = _fetch_ix(std::move(file_dec), ix);
res.file_pos = *file_pos;
return res;
});
}

std::optional<offset_index::find_result>
offset_index::find_kaf_offset(kafka::offset upper_bound) {
size_t ix = 0;
find_result res{};

auto search_result = maybe_find_offset(
upper_bound, _kaf_index, _kaf_offsets);

if (std::holds_alternative<std::monostate>(search_result)) {
return std::nullopt;
} else if (std::holds_alternative<find_result>(search_result)) {
return std::get<find_result>(search_result);
}
auto maybe_ix = std::get<index_value>(search_result);

// Invariant: maybe_ix here can't be nullopt
ix = maybe_ix.ix;
res.kaf_offset = kafka::offset(maybe_ix.value);

decoder_t rp_dec(
_rp_index.get_initial_value(),
_rp_index.get_row_count(),
_rp_index.copy());
auto rp_offset = _fetch_ix(std::move(rp_dec), ix);
vassert(rp_offset.has_value(), "Inconsistent index state");
res.rp_offset = model::offset(*rp_offset);
foffset_decoder_t file_dec(
_file_index.get_initial_value(),
_file_index.get_row_count(),
_file_index.copy(),
delta_delta_t(_min_file_pos_step));
auto file_pos = _fetch_ix(std::move(file_dec), ix);
res.file_pos = *file_pos;
return res;
return ss::visit(
search_result,
[](std::monostate) -> std::optional<find_result> { return std::nullopt; },
[](find_result result) -> std::optional<find_result> { return result; },
[this](index_value index_result) -> std::optional<find_result> {
find_result res{};

size_t ix = index_result.ix;
res.kaf_offset = kafka::offset(index_result.value);

decoder_t rp_dec(
_rp_index.get_initial_value(),
_rp_index.get_row_count(),
_rp_index.copy());
auto rp_offset = _fetch_ix(std::move(rp_dec), ix);
vassert(rp_offset.has_value(), "Inconsistent index state");
res.rp_offset = model::offset(*rp_offset);
foffset_decoder_t file_dec(
_file_index.get_initial_value(),
_file_index.get_row_count(),
_file_index.copy(),
delta_delta_t(_min_file_pos_step));
auto file_pos = _fetch_ix(std::move(file_dec), ix);
res.file_pos = *file_pos;
return res;
});
}

std::optional<offset_index::find_result>
Expand All @@ -192,48 +185,44 @@ offset_index::find_timestamp(model::timestamp upper_bound) {
return std::nullopt;
}

size_t ix = 0;

auto search_result = maybe_find_offset(
upper_bound.value(), _time_index, _time_offsets);

if (std::holds_alternative<std::monostate>(search_result)) {
return std::nullopt;
} else if (std::holds_alternative<find_result>(search_result)) {
return std::get<find_result>(search_result);
}
auto maybe_ix = std::get<index_value>(search_result);

// Invariant: maybe_ix here can't be nullopt
ix = maybe_ix.ix;

// Decode all offset indices to build up the result.
decoder_t rp_dec(
_rp_index.get_initial_value(),
_rp_index.get_row_count(),
_rp_index.copy());
auto rp_offset = _fetch_ix(std::move(rp_dec), ix);
vassert(rp_offset.has_value(), "Inconsistent index state");

decoder_t kaf_dec(
_kaf_index.get_initial_value(),
_kaf_index.get_row_count(),
_kaf_index.copy());
auto kaf_offset = _fetch_ix(std::move(kaf_dec), ix);
vassert(kaf_offset.has_value(), "Inconsistent index state");

foffset_decoder_t file_dec(
_file_index.get_initial_value(),
_file_index.get_row_count(),
_file_index.copy(),
delta_delta_t(_min_file_pos_step));
auto file_pos = _fetch_ix(std::move(file_dec), ix);
vassert(file_pos.has_value(), "Inconsistent index state");

return offset_index::find_result{
.rp_offset = model::offset(*rp_offset),
.kaf_offset = kafka::offset(*kaf_offset),
.file_pos = *file_pos};
return ss::visit(
search_result,
[](std::monostate) -> std::optional<find_result> { return std::nullopt; },
[](find_result result) -> std::optional<find_result> { return result; },
[this](index_value index_result) -> std::optional<find_result> {
size_t ix = index_result.ix;

// Decode all offset indices to build up the result.
decoder_t rp_dec(
_rp_index.get_initial_value(),
_rp_index.get_row_count(),
_rp_index.copy());
auto rp_offset = _fetch_ix(std::move(rp_dec), ix);
vassert(rp_offset.has_value(), "Inconsistent index state");

decoder_t kaf_dec(
_kaf_index.get_initial_value(),
_kaf_index.get_row_count(),
_kaf_index.copy());
auto kaf_offset = _fetch_ix(std::move(kaf_dec), ix);
vassert(kaf_offset.has_value(), "Inconsistent index state");

foffset_decoder_t file_dec(
_file_index.get_initial_value(),
_file_index.get_row_count(),
_file_index.copy(),
delta_delta_t(_min_file_pos_step));
auto file_pos = _fetch_ix(std::move(file_dec), ix);
vassert(file_pos.has_value(), "Inconsistent index state");

return offset_index::find_result{
.rp_offset = model::offset(*rp_offset),
.kaf_offset = kafka::offset(*kaf_offset),
.file_pos = *file_pos};
});
}

offset_index::coarse_index_t offset_index::build_coarse_index(
Expand Down

0 comments on commit de5aa53

Please sign in to comment.