Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cloud_storage: add timestamps to remote segment index #11802

Merged
merged 2 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,11 @@ ss::future<ntp_archiver_upload_result> ntp_archiver::upload_segment(

auto index_path = make_index_path(path);
auto make_idx_fut = make_segment_index(
candidate.starting_offset, _rtclog, index_path, std::move(stream_index));
candidate.starting_offset,
candidate.base_timestamp,
_rtclog,
index_path,
std::move(stream_index));

auto [upload_res, idx_res] = co_await ss::when_all_succeed(
std::move(upload_fut), std::move(make_idx_fut));
Expand Down Expand Up @@ -1222,6 +1226,7 @@ ss::future<ntp_archiver_upload_result> ntp_archiver::upload_tx(
ss::future<std::optional<ntp_archiver::make_segment_index_result>>
ntp_archiver::make_segment_index(
model::offset base_rp_offset,
model::timestamp base_timestamp,
retry_chain_logger& ctxlog,
std::string_view index_path,
ss::input_stream<char> stream) {
Expand All @@ -1232,7 +1237,8 @@ ntp_archiver::make_segment_index(
base_rp_offset,
base_kafka_offset,
0,
cloud_storage::remote_segment_sampling_step_bytes};
cloud_storage::remote_segment_sampling_step_bytes,
base_timestamp};

vlog(ctxlog.debug, "creating remote segment index: {}", index_path);
cloud_storage::segment_record_stats stats{};
Expand Down
1 change: 1 addition & 0 deletions src/v/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ class ntp_archiver {
/// \return An index on success, nullopt on failure
ss::future<std::optional<make_segment_index_result>> make_segment_index(
model::offset base_rp_offset,
model::timestamp base_timestamp,
retry_chain_logger& ctxlog,
std::string_view index_path,
ss::input_stream<char> stream);
Expand Down
17 changes: 12 additions & 5 deletions src/v/archival/tests/service_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ void segment_matcher<Fixture>::verify_index(
meta->base_offset,
meta->base_kafka_offset(),
0,
cloud_storage::remote_segment_sampling_step_bytes};
cloud_storage::remote_segment_sampling_step_bytes,
meta->base_timestamp};

auto builder = cloud_storage::make_remote_segment_index_builder(
ntp,
Expand All @@ -400,16 +401,22 @@ void segment_matcher<Fixture>::verify_index(
builder->consume().finally([&builder] { return builder->close(); }).get();
reader_handle.close().get();

auto actual = iobuf_to_bytes(ix.to_iobuf());
auto ix_buf = ix.to_iobuf();
iobuf expected_buf;
expected_buf.append((const uint8_t*)(expected.data()), expected.size());

vlog(
fixt_log.info,
"expected {} bytes, got {}",
expected.size(),
actual.size());
ix_buf.size_bytes());

if (ix_buf != expected_buf) {
vlog(fixt_log.info, "ix_buf: {}", ix_buf.hexdump(1024));

auto a = ss::sstring{actual.begin(), actual.end()};
BOOST_REQUIRE(a == expected); // NOLINT
vlog(fixt_log.info, "expected: {}", expected_buf.hexdump(1024));
}
BOOST_REQUIRE(ix_buf == expected_buf); // NOLINT
}

template<class Fixture>
Expand Down
10 changes: 7 additions & 3 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ remote_segment::remote_segment(
, _base_offset_delta(std::clamp(
meta.delta_offset, model::offset_delta(0), model::offset_delta::max()))
, _max_rp_offset(meta.committed_offset)
, _base_timestamp(meta.base_timestamp)
, _size(meta.size_bytes)
, _rtc(&parent)
, _ctxlog(cst_log, _rtc, generate_log_prefix(meta, ntp))
Expand Down Expand Up @@ -402,7 +403,8 @@ ss::future<uint64_t> remote_segment::put_segment_in_cache_and_create_index(
get_base_rp_offset(),
get_base_kafka_offset(),
0,
remote_segment_sampling_step_bytes);
remote_segment_sampling_step_bytes,
_base_timestamp);
auto [sparse, sput] = input_stream_fanout<2>(std::move(s), 1);
auto parser = make_remote_segment_index_builder(
get_ntp(),
Expand Down Expand Up @@ -523,7 +525,8 @@ ss::future<> remote_segment::do_hydrate_index() {
_base_rp_offset,
_base_rp_offset - _base_offset_delta,
0,
remote_segment_sampling_step_bytes);
remote_segment_sampling_step_bytes,
_base_timestamp);

auto result = co_await _api.download_index(
_bucket, remote_segment_path{_index_path}, ix, local_rtc);
Expand Down Expand Up @@ -689,7 +692,8 @@ ss::future<bool> remote_segment::maybe_materialize_index() {
_base_rp_offset,
_base_rp_offset - _base_offset_delta,
0,
remote_segment_sampling_step_bytes);
remote_segment_sampling_step_bytes,
_base_timestamp);
if (auto cache_item = co_await _cache.get(path); cache_item.has_value()) {
// The cache item is expected to be present if the segment is present
// so it's very unlikely that we will call this method if there is no
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_storage/remote_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ class remote_segment final {
model::offset _base_rp_offset;
model::offset_delta _base_offset_delta;
model::offset _max_rp_offset;
model::timestamp _base_timestamp;

// The expected size according to the manifest entry for the segment
size_t _size{0};
Expand Down
42 changes: 37 additions & 5 deletions src/v/cloud_storage/remote_segment_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,39 @@ offset_index::offset_index(
model::offset initial_rp,
kafka::offset initial_kaf,
int64_t initial_file_pos,
int64_t file_pos_step)
int64_t file_pos_step,
model::timestamp initial_time)
: _rp_offsets{}
, _kaf_offsets{}
, _file_offsets{}
, _time_offsets{}
, _pos{}
, _initial_rp(initial_rp)
, _initial_kaf(initial_kaf)
, _initial_file_pos(initial_file_pos)
, _initial_time(initial_time)
, _rp_index(initial_rp)
, _kaf_index(initial_kaf)
, _file_index(initial_file_pos, delta_delta_t(file_pos_step))
, _time_index(initial_time.value())
, _min_file_pos_step(file_pos_step) {}

void offset_index::add(
model::offset rp_offset, kafka::offset kaf_offset, int64_t file_offset) {
model::offset rp_offset,
kafka::offset kaf_offset,
int64_t file_offset,
model::timestamp max_timestamp) {
auto ix = index_mask & _pos++;
_rp_offsets.at(ix) = rp_offset();
_kaf_offsets.at(ix) = kaf_offset();
_file_offsets.at(ix) = file_offset;
_time_offsets.at(ix) = max_timestamp.value();
try {
if ((_pos & index_mask) == 0) {
_rp_index.add(_rp_offsets);
_kaf_index.add(_kaf_offsets);
_file_index.add(_file_offsets);
_time_index.add(_time_offsets);
jcsp marked this conversation as resolved.
Show resolved Hide resolved
}
} catch (...) {
// Get rid of the corrupted state in the encoders.
Expand All @@ -59,6 +68,7 @@ void offset_index::add(
_kaf_index = encoder_t(_initial_kaf);
_file_index = foffset_encoder_t(
_initial_file_pos, delta_delta_t(_min_file_pos_step));
_time_index = encoder_t(_initial_time.value());
throw;
}
}
Expand Down Expand Up @@ -242,7 +252,7 @@ offset_index::coarse_index_t offset_index::build_coarse_index(
struct offset_index_header
: serde::envelope<
offset_index_header,
serde::version<1>,
serde::version<2>,
serde::compat_version<1>> {
int64_t min_file_pos_step;
uint64_t num_elements;
Expand All @@ -258,6 +268,12 @@ struct offset_index_header
iobuf rp_index;
iobuf kaf_index;
iobuf file_index;

// Version 2 fields
int64_t base_time{model::timestamp::missing().value()};
int64_t last_time{model::timestamp::missing().value()};
std::vector<int64_t> time_write_buf;
iobuf time_index;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to care for version<1> ->version<2> compat?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're using serde's default read/write impls, & I believe serde::envelope's default behavior is that if it just runs out of data on decode, it will not populate those later fields (so these values remain at their defaults).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I have also updated the base_time and last_time to have explicit initialization -- previously one would have had to use the vector or iobuf to check for whether timestamp fields are set, because the ints would have had undefined contents)

};

iobuf offset_index::to_iobuf() {
Expand All @@ -279,7 +295,11 @@ iobuf offset_index::to_iobuf() {
.rp_index = _rp_index.copy(),
.kaf_index = _kaf_index.copy(),
.file_index = _file_index.copy(),
};
.base_time = _initial_time.value(),
.last_time = _time_index.get_last_value(),
.time_write_buf = std::vector<int64_t>(
_time_offsets.begin(), _time_offsets.end()),
.time_index = _time_index.copy()};
return serde::to_iobuf(std::move(hdr));
}

Expand Down Expand Up @@ -310,6 +330,17 @@ void offset_index::from_iobuf(iobuf b) {
std::move(hdr.file_index),
delta_delta_t(_min_file_pos_step));
_min_file_pos_step = hdr.min_file_pos_step;

_initial_time = model::timestamp(hdr.base_time);
_time_index = encoder_t(
_initial_time.value(),
num_rows,
hdr.last_time,
std::move(hdr.time_index));
std::copy(
hdr.time_write_buf.begin(),
hdr.time_write_buf.end(),
_time_offsets.begin());
}

std::optional<offset_index::index_value>
Expand Down Expand Up @@ -362,7 +393,8 @@ void remote_segment_index_builder::consume_batch_start(
_ix.add(
hdr.base_offset,
hdr.base_offset - _running_delta,
static_cast<int64_t>(physical_base_offset));
static_cast<int64_t>(physical_base_offset),
hdr.max_timestamp);
_window = 0;
}
}
Expand Down
13 changes: 10 additions & 3 deletions src/v/cloud_storage/remote_segment_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ class offset_index {
model::offset initial_rp,
kafka::offset initial_kaf,
int64_t initial_file_pos,
int64_t file_pos_step);
int64_t file_pos_step,
model::timestamp initial_time);

/// Add new tuple to the index.
void
add(model::offset rp_offset, kafka::offset kaf_offset, int64_t file_offset);
void add(
model::offset rp_offset,
kafka::offset kaf_offset,
int64_t file_offset,
model::timestamp);

struct find_result {
model::offset rp_offset;
Expand Down Expand Up @@ -142,10 +146,12 @@ class offset_index {
std::array<int64_t, buffer_depth> _rp_offsets;
std::array<int64_t, buffer_depth> _kaf_offsets;
std::array<int64_t, buffer_depth> _file_offsets;
std::array<int64_t, buffer_depth> _time_offsets;
uint64_t _pos;
model::offset _initial_rp;
kafka::offset _initial_kaf;
int64_t _initial_file_pos;
model::timestamp _initial_time;

using encoder_t = deltafor_encoder<int64_t>;
using decoder_t = deltafor_decoder<int64_t>;
Expand All @@ -156,6 +162,7 @@ class offset_index {
encoder_t _rp_index;
encoder_t _kaf_index;
foffset_encoder_t _file_index;
encoder_t _time_index;
int64_t _min_file_pos_step;

friend class offset_index_accessor;
Expand Down
31 changes: 25 additions & 6 deletions src/v/cloud_storage/tests/remote_segment_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,18 @@ BOOST_AUTO_TEST_CASE(remote_segment_index_search_test) {
std::vector<model::offset> rp_offsets;
std::vector<kafka::offset> kaf_offsets;
std::vector<size_t> file_offsets;
std::vector<model::timestamp> timestamps;
int64_t rp = segment_base_rp_offset();
int64_t kaf = segment_base_kaf_offset();
size_t fpos = random_generators::get_int(1000, 2000);
model::timestamp timestamp{123456};
bool is_config = false;
for (size_t i = 0; i < segment_num_batches; i++) {
if (!is_config) {
rp_offsets.push_back(model::offset(rp));
kaf_offsets.push_back(kafka::offset(kaf));
file_offsets.push_back(fpos);
timestamps.push_back(timestamp);
}
// The test queries every element using the key that matches the element
// exactly and then it queries the element using the key which is
Expand All @@ -63,22 +66,35 @@ BOOST_AUTO_TEST_CASE(remote_segment_index_search_test) {
rp += batch_size;
kaf += is_config ? batch_size - 1 : batch_size;
fpos += random_generators::get_int(1000, 2000);
timestamp = model::timestamp(timestamp.value() + 1);
}

offset_index tmp_index(
segment_base_rp_offset, segment_base_kaf_offset, 0U, 1000);
segment_base_rp_offset,
segment_base_kaf_offset,
0U,
1000,
model::timestamp{0xdeadbeef});
model::offset last;
kafka::offset klast;
size_t flast;
for (size_t i = 0; i < rp_offsets.size(); i++) {
tmp_index.add(rp_offsets.at(i), kaf_offsets.at(i), file_offsets.at(i));
tmp_index.add(
rp_offsets.at(i),
kaf_offsets.at(i),
file_offsets.at(i),
timestamps.at(i));
last = rp_offsets.at(i);
klast = kaf_offsets.at(i);
flast = file_offsets.at(i);
}

offset_index index(
segment_base_rp_offset, segment_base_kaf_offset, 0U, 1000);
segment_base_rp_offset,
segment_base_kaf_offset,
0U,
1000,
model::timestamp{0xdeadbeef});
auto buf = tmp_index.to_iobuf();
index.from_iobuf(std::move(buf));

Expand Down Expand Up @@ -136,7 +152,8 @@ SEASTAR_THREAD_TEST_CASE(test_remote_segment_index_builder) {
}
auto segment = generate_segment(base_offset, batches);
auto is = make_iobuf_input_stream(std::move(segment));
offset_index ix(base_offset, kbase_offset, 0, 0);
offset_index ix(
base_offset, kbase_offset, 0, 0, model::timestamp{0xdeadbeef});
auto parser = make_remote_segment_index_builder(
test_ntp, std::move(is), ix, model::offset_delta(0), 0);
auto result = parser->consume().get();
Expand Down Expand Up @@ -184,7 +201,8 @@ SEASTAR_THREAD_TEST_CASE(test_remote_segment_build_coarse_index) {
expected_conf_records + expected_data_records - 1);
auto segment = generate_segment(base_offset, batches);
auto is = make_iobuf_input_stream(std::move(segment));
offset_index ix(base_offset, kbase_offset, 0, 0);
offset_index ix(
base_offset, kbase_offset, 0, 0, model::timestamp{0xdeadbeef});
segment_record_stats stats{};
auto parser = make_remote_segment_index_builder(
test_ntp, std::move(is), ix, model::offset_delta(0), 0, std::ref(stats));
Expand Down Expand Up @@ -270,7 +288,8 @@ SEASTAR_THREAD_TEST_CASE(
}
auto segment = generate_segment(base_offset, batches);
auto is = make_iobuf_input_stream(std::move(segment));
offset_index ix(base_offset, kbase_offset, 0, 0);
offset_index ix(
base_offset, kbase_offset, 0, 0, model::timestamp{0xdeadbeef});
auto parser = make_remote_segment_index_builder(
test_ntp, std::move(is), ix, model::offset_delta(0), 0);
auto pclose = ss::defer([&parser] { parser->close().get(); });
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/tests/remote_segment_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ void upload_index(
meta.base_offset,
meta.base_kafka_offset(),
0,
remote_segment_sampling_step_bytes};
remote_segment_sampling_step_bytes,
meta.base_timestamp};

auto builder = make_remote_segment_index_builder(
manifest_ntp,
Expand Down