Skip to content

Commit

Permalink
cloud_storage: Read offset arrays when creating coarse index
Browse files Browse the repository at this point in the history
When creating coarse index, in addition to reading the index fields, we
also read the offset fields. This ensures that in cases where the index
is very small and all the data is contained in the offset fields, a
mini index can still be generated.

Additionally the calculation of offsets to add in the index is changed
so that we do not end up skipping offsets when the mod of current
offset is unchanged, which is a bug that the previous iteration of the
code was susceptible to.
  • Loading branch information
abhijat committed Jun 27, 2023
1 parent 71fa060 commit 87f4a6e
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 21 deletions.
39 changes: 27 additions & 12 deletions src/v/cloud_storage/remote_segment_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ offset_index::find_kaf_offset(kafka::offset upper_bound) {

offset_index::coarse_index_t
offset_index::build_coarse_index(uint64_t step_size) const {
vlog(
cst_log.trace,
"building coarse index from file offset index with {} rows",
_file_index.get_row_count());
vassert(
step_size > static_cast<uint64_t>(_min_file_pos_step),
"step size {} cannot be less than or equal to index step size {}",
Expand All @@ -195,28 +199,39 @@ offset_index::build_coarse_index(uint64_t step_size) const {
std::array<int64_t, buffer_depth> kafka_row{};

coarse_index_t index;
size_t curr_mod_step_sz{0};
while (file_dec.read(file_row) && kaf_dec.read(kafka_row)) {
for (auto it = file_row.cbegin(), kit = kafka_row.cbegin();
it != file_row.cend() && kit != kafka_row.cend();
auto populate_index = [step_size, &index](
const auto& file_offsets,
const auto& kafka_offsets,
auto& span_start,
auto& span_end) {
for (auto it = file_offsets.cbegin(), kit = kafka_offsets.cbegin();
it != file_offsets.cend() && kit != kafka_offsets.cend();
++it, ++kit) {
auto curr_fpos = *it;
auto crossed_step_sz = curr_fpos % step_size;
if (crossed_step_sz < curr_mod_step_sz) {
span_end = *it;
auto delta = span_end - span_start + 1;
if (span_end > span_start && delta >= step_size) {
vlog(
cst_log.trace,
"adding entry to coarse index, current file pos: {}, step "
"size: {}, curr mod step size: {}",
curr_fpos,
"size: {}, span size: {}",
span_end,
step_size,
curr_mod_step_sz);
index[kafka::offset{*kit}] = curr_fpos;
delta);
index[kafka::offset{*kit}] = span_end;
span_start = span_end + 1;
}
curr_mod_step_sz = crossed_step_sz;
}
};

size_t start{0};
size_t end{0};
while (file_dec.read(file_row) && kaf_dec.read(kafka_row)) {
populate_index(file_row, kafka_row, start, end);
file_row = {};
kafka_row = {};
}

populate_index(_file_offsets, _kaf_offsets, start, end);
return index;
}

Expand Down
2 changes: 2 additions & 0 deletions src/v/cloud_storage/remote_segment_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ class offset_index {
encoder_t _kaf_index;
foffset_encoder_t _file_index;
int64_t _min_file_pos_step;

friend class offset_index_accessor;
};

class remote_segment_index_builder : public storage::batch_consumer {
Expand Down
97 changes: 88 additions & 9 deletions src/v/cloud_storage/tests/remote_segment_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
#include "bytes/iobuf.h"
#include "bytes/iostream.h"
#include "cloud_storage/remote_segment_index.h"
#include "common_def.h"
#include "model/record_batch_types.h"
#include "cloud_storage/tests/common_def.h"
#include "random/generators.h"
#include "vlog.h"

#include <seastar/testing/test_case.hh>
#include <seastar/testing/thread_test_case.hh>
Expand Down Expand Up @@ -155,8 +153,8 @@ SEASTAR_THREAD_TEST_CASE(test_remote_segment_index_builder) {
}

SEASTAR_THREAD_TEST_CASE(test_remote_segment_build_coarse_index) {
static const model::offset base_offset{100};
static const kafka::offset kbase_offset{100};
const model::offset base_offset{100};
const kafka::offset kbase_offset{100};
std::vector<batch_t> batches;
for (int i = 0; i < 1000; i++) {
auto num_records = random_generators::get_int(1, 20);
Expand Down Expand Up @@ -191,13 +189,94 @@ SEASTAR_THREAD_TEST_CASE(test_remote_segment_build_coarse_index) {
[](auto pair) { return std::make_pair(pair.second, pair.first); });

// Assert that all entries in the mini-map are approximately as far away as
// step size. We cannot be sure that entries are exactly as far away,
// because of the way the entries are recorded using mod. Additionally all
// kafka offsets should be ascending.
// step size. Additionally all kafka offsets should be ascending.
for (auto it_a = file_to_koffset.cbegin(), it_b = std::next(it_a);
it_b != file_to_koffset.cend();
++it_a, ++it_b) {
BOOST_REQUIRE_GE(it_b->first - it_a->first, 100_KiB);
BOOST_REQUIRE_GT(it_b->second, it_a->second);
}
}

namespace cloud_storage {
class offset_index_accessor {
public:
explicit offset_index_accessor(const offset_index& ix)
: _ix{ix} {}

size_t file_offset_index_size() const {
return _ix._file_index.get_row_count();
}

size_t file_offset_array_size() const {
auto sz = 0;
for (const auto& offset : _ix._file_offsets) {
if (offset != 0) {
++sz;
}
}
return sz;
}

private:
const offset_index& _ix;
};
} // namespace cloud_storage

SEASTAR_THREAD_TEST_CASE(
test_remote_segment_build_coarse_index_from_offset_fields) {
// This test asserts that for an index where all the data is contained in
// the `_offsets` arrays and the `_index` fields are empty, the coarse index
// is still generated correctly.
const model::offset base_offset{100};
const kafka::offset kbase_offset{100};

std::vector<batch_t> batches;
// Create only 10 batches, so that the index fields in remote segment index
// remain empty
for (int i = 0; i < 10; i++) {
auto num_records = random_generators::get_int(10, 20);
std::vector<size_t> record_sizes;
record_sizes.reserve(num_records);
for (int i = 0; i < num_records; i++) {
// The record size is large enough that we exceed the coarse
// index threshold
record_sizes.push_back(random_generators::get_int(950, 1000));
}
batch_t batch = {
.num_records = num_records,
.type = model::record_batch_type::raft_data,
.record_sizes = std::move(record_sizes),
};
batches.push_back(std::move(batch));
}
auto segment = generate_segment(base_offset, batches);
auto is = make_iobuf_input_stream(std::move(segment));
offset_index ix(base_offset, kbase_offset, 0, 0);
auto parser = make_remote_segment_index_builder(
std::move(is), ix, model::offset_delta(0), 0);
auto pclose = ss::defer([&parser] { parser->close().get(); });
auto result = parser->consume().get();
BOOST_REQUIRE(result.has_value());
BOOST_REQUIRE_NE(result.value(), 0);

offset_index_accessor acc{ix};
BOOST_REQUIRE_EQUAL(acc.file_offset_index_size(), 0);
BOOST_REQUIRE_GT(acc.file_offset_array_size(), 0);

auto mini_ix = ix.build_coarse_index(70_KiB);
absl::btree_map<int64_t, kafka::offset> file_to_koffset;
std::transform(
std::make_move_iterator(mini_ix.begin()),
std::make_move_iterator(mini_ix.end()),
std::inserter(file_to_koffset, file_to_koffset.end()),
[](auto pair) { return std::make_pair(pair.second, pair.first); });

BOOST_REQUIRE_GT(file_to_koffset.size(), 0);
for (auto it_a = file_to_koffset.cbegin(), it_b = std::next(it_a);
it_b != file_to_koffset.cend();
++it_a, ++it_b) {
BOOST_REQUIRE_GE(it_b->first - it_a->first, 90_KiB);
BOOST_REQUIRE_GE(it_b->first - it_a->first, 70_KiB);
BOOST_REQUIRE_GT(it_b->second, it_a->second);
}
}

0 comments on commit 87f4a6e

Please sign in to comment.