Skip to content

Commit

Permalink
cloud_storage: add timestamps to remote segment index
Browse files Browse the repository at this point in the history
This is a prerequisite to make timequeries seek
to the proper chunk, instead of reading from the
start of segments.

The timestamp we care about is the batch max timestamp,
because timequeries will want to find a batch that is
definitely before the target timestamp, to start their
scan from there.

Related: #11801
  • Loading branch information
jcsp committed Jul 11, 2023
1 parent e111746 commit 0263a82
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 7 deletions.
37 changes: 33 additions & 4 deletions src/v/cloud_storage/remote_segment_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ offset_index::offset_index(
: _rp_offsets{}
, _kaf_offsets{}
, _file_offsets{}
, _time_offsets{}
, _pos{}
, _initial_rp(initial_rp)
, _initial_kaf(initial_kaf)
Expand All @@ -36,16 +37,21 @@ offset_index::offset_index(
, _min_file_pos_step(file_pos_step) {}

void offset_index::add(
model::offset rp_offset, kafka::offset kaf_offset, int64_t file_offset) {
model::offset rp_offset,
kafka::offset kaf_offset,
int64_t file_offset,
model::timestamp max_timestamp) {
auto ix = index_mask & _pos++;
_rp_offsets.at(ix) = rp_offset();
_kaf_offsets.at(ix) = kaf_offset();
_file_offsets.at(ix) = file_offset;
_time_offsets.at(ix) = max_timestamp.value();
try {
if ((_pos & index_mask) == 0) {
_rp_index.add(_rp_offsets);
_kaf_index.add(_kaf_offsets);
_file_index.add(_file_offsets);
_time_index.add(_time_offsets);
}
} catch (...) {
// Get rid of the corrupted state in the encoders.
Expand All @@ -59,6 +65,7 @@ void offset_index::add(
_kaf_index = encoder_t(_initial_kaf);
_file_index = foffset_encoder_t(
_initial_file_pos, delta_delta_t(_min_file_pos_step));
_time_index = encoder_t(_initial_time.value());
throw;
}
}
Expand Down Expand Up @@ -242,7 +249,7 @@ offset_index::coarse_index_t offset_index::build_coarse_index(
struct offset_index_header
: serde::envelope<
offset_index_header,
serde::version<1>,
serde::version<2>,
serde::compat_version<1>> {
int64_t min_file_pos_step;
uint64_t num_elements;
Expand All @@ -258,6 +265,12 @@ struct offset_index_header
iobuf rp_index;
iobuf kaf_index;
iobuf file_index;

// Version 2 fields
int64_t base_time{model::timestamp::missing().value()};
int64_t last_time{model::timestamp::missing().value()};
std::vector<int64_t> time_write_buf;
iobuf time_index;
};

iobuf offset_index::to_iobuf() {
Expand All @@ -279,7 +292,11 @@ iobuf offset_index::to_iobuf() {
.rp_index = _rp_index.copy(),
.kaf_index = _kaf_index.copy(),
.file_index = _file_index.copy(),
};
.base_time = _initial_time.value(),
.last_time = _time_index.get_last_value(),
.time_write_buf = std::vector<int64_t>(
_time_offsets.begin(), _time_offsets.end()),
.time_index = _time_index.copy()};
return serde::to_iobuf(std::move(hdr));
}

Expand Down Expand Up @@ -310,6 +327,17 @@ void offset_index::from_iobuf(iobuf b) {
std::move(hdr.file_index),
delta_delta_t(_min_file_pos_step));
_min_file_pos_step = hdr.min_file_pos_step;

_initial_time = model::timestamp(hdr.base_time);
_time_index = encoder_t(
_initial_time.value(),
num_rows,
hdr.last_time,
std::move(hdr.time_index));
std::copy(
hdr.time_write_buf.begin(),
hdr.time_write_buf.end(),
_time_offsets.begin());
}

std::optional<offset_index::index_value>
Expand Down Expand Up @@ -362,7 +390,8 @@ void remote_segment_index_builder::consume_batch_start(
_ix.add(
hdr.base_offset,
hdr.base_offset - _running_delta,
static_cast<int64_t>(physical_base_offset));
static_cast<int64_t>(physical_base_offset),
hdr.max_timestamp);
_window = 0;
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/v/cloud_storage/remote_segment_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@ class offset_index {
int64_t file_pos_step);

/// Add new tuple to the index.
void
add(model::offset rp_offset, kafka::offset kaf_offset, int64_t file_offset);
void add(
model::offset rp_offset,
kafka::offset kaf_offset,
int64_t file_offset,
model::timestamp);

struct find_result {
model::offset rp_offset;
Expand Down Expand Up @@ -142,10 +145,12 @@ class offset_index {
std::array<int64_t, buffer_depth> _rp_offsets;
std::array<int64_t, buffer_depth> _kaf_offsets;
std::array<int64_t, buffer_depth> _file_offsets;
std::array<int64_t, buffer_depth> _time_offsets;
uint64_t _pos;
model::offset _initial_rp;
kafka::offset _initial_kaf;
int64_t _initial_file_pos;
model::timestamp _initial_time;

using encoder_t = deltafor_encoder<int64_t>;
using decoder_t = deltafor_decoder<int64_t>;
Expand All @@ -156,6 +161,7 @@ class offset_index {
encoder_t _rp_index;
encoder_t _kaf_index;
foffset_encoder_t _file_index;
encoder_t _time_index;
int64_t _min_file_pos_step;

friend class offset_index_accessor;
Expand Down
10 changes: 9 additions & 1 deletion src/v/cloud_storage/tests/remote_segment_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,18 @@ BOOST_AUTO_TEST_CASE(remote_segment_index_search_test) {
std::vector<model::offset> rp_offsets;
std::vector<kafka::offset> kaf_offsets;
std::vector<size_t> file_offsets;
std::vector<model::timestamp> timestamps;
int64_t rp = segment_base_rp_offset();
int64_t kaf = segment_base_kaf_offset();
size_t fpos = random_generators::get_int(1000, 2000);
model::timestamp timestamp{123456};
bool is_config = false;
for (size_t i = 0; i < segment_num_batches; i++) {
if (!is_config) {
rp_offsets.push_back(model::offset(rp));
kaf_offsets.push_back(kafka::offset(kaf));
file_offsets.push_back(fpos);
timestamps.push_back(timestamp);
}
// The test queries every element using the key that matches the element
// exactly and then it queries the element using the key which is
Expand All @@ -63,6 +66,7 @@ BOOST_AUTO_TEST_CASE(remote_segment_index_search_test) {
rp += batch_size;
kaf += is_config ? batch_size - 1 : batch_size;
fpos += random_generators::get_int(1000, 2000);
timestamp = model::timestamp(timestamp.value() + 1);
}

offset_index tmp_index(
Expand All @@ -71,7 +75,11 @@ BOOST_AUTO_TEST_CASE(remote_segment_index_search_test) {
kafka::offset klast;
size_t flast;
for (size_t i = 0; i < rp_offsets.size(); i++) {
tmp_index.add(rp_offsets.at(i), kaf_offsets.at(i), file_offsets.at(i));
tmp_index.add(
rp_offsets.at(i),
kaf_offsets.at(i),
file_offsets.at(i),
timestamps.at(i));
last = rp_offsets.at(i);
klast = kaf_offsets.at(i);
flast = file_offsets.at(i);
Expand Down

0 comments on commit 0263a82

Please sign in to comment.