Skip to content

Commit

Permalink
cloud_storage: supply a base_timestamp to remote index constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsp committed Jul 11, 2023
1 parent 0263a82 commit b3e5b48
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 18 deletions.
10 changes: 8 additions & 2 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,11 @@ ss::future<ntp_archiver_upload_result> ntp_archiver::upload_segment(

auto index_path = make_index_path(path);
auto make_idx_fut = make_segment_index(
candidate.starting_offset, _rtclog, index_path, std::move(stream_index));
candidate.starting_offset,
candidate.base_timestamp,
_rtclog,
index_path,
std::move(stream_index));

auto [upload_res, idx_res] = co_await ss::when_all_succeed(
std::move(upload_fut), std::move(make_idx_fut));
Expand Down Expand Up @@ -1222,6 +1226,7 @@ ss::future<ntp_archiver_upload_result> ntp_archiver::upload_tx(
ss::future<std::optional<ntp_archiver::make_segment_index_result>>
ntp_archiver::make_segment_index(
model::offset base_rp_offset,
model::timestamp base_timestamp,
retry_chain_logger& ctxlog,
std::string_view index_path,
ss::input_stream<char> stream) {
Expand All @@ -1232,7 +1237,8 @@ ntp_archiver::make_segment_index(
base_rp_offset,
base_kafka_offset,
0,
cloud_storage::remote_segment_sampling_step_bytes};
cloud_storage::remote_segment_sampling_step_bytes,
base_timestamp};

vlog(ctxlog.debug, "creating remote segment index: {}", index_path);
cloud_storage::segment_record_stats stats{};
Expand Down
1 change: 1 addition & 0 deletions src/v/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ class ntp_archiver {
/// \return An index on success, nullopt on failure
ss::future<std::optional<make_segment_index_result>> make_segment_index(
model::offset base_rp_offset,
model::timestamp base_timestamp,
retry_chain_logger& ctxlog,
std::string_view index_path,
ss::input_stream<char> stream);
Expand Down
17 changes: 12 additions & 5 deletions src/v/archival/tests/service_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ void segment_matcher<Fixture>::verify_index(
meta->base_offset,
meta->base_kafka_offset(),
0,
cloud_storage::remote_segment_sampling_step_bytes};
cloud_storage::remote_segment_sampling_step_bytes,
meta->base_timestamp};

auto builder = cloud_storage::make_remote_segment_index_builder(
ntp,
Expand All @@ -400,16 +401,22 @@ void segment_matcher<Fixture>::verify_index(
builder->consume().finally([&builder] { return builder->close(); }).get();
reader_handle.close().get();

auto actual = iobuf_to_bytes(ix.to_iobuf());
auto ix_buf = ix.to_iobuf();
iobuf expected_buf;
expected_buf.append((const uint8_t*)(expected.data()), expected.size());

vlog(
fixt_log.info,
"expected {} bytes, got {}",
expected.size(),
actual.size());
ix_buf.size_bytes());

if (ix_buf != expected_buf) {
vlog(fixt_log.info, "ix_buf: {}", ix_buf.hexdump(1024));

auto a = ss::sstring{actual.begin(), actual.end()};
BOOST_REQUIRE(a == expected); // NOLINT
vlog(fixt_log.info, "expected: {}", expected_buf.hexdump(1024));
}
BOOST_REQUIRE(ix_buf == expected_buf); // NOLINT
}

template<class Fixture>
Expand Down
10 changes: 7 additions & 3 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ remote_segment::remote_segment(
, _base_offset_delta(std::clamp(
meta.delta_offset, model::offset_delta(0), model::offset_delta::max()))
, _max_rp_offset(meta.committed_offset)
, _base_timestamp(meta.base_timestamp)
, _size(meta.size_bytes)
, _rtc(&parent)
, _ctxlog(cst_log, _rtc, generate_log_prefix(meta, ntp))
Expand Down Expand Up @@ -402,7 +403,8 @@ ss::future<uint64_t> remote_segment::put_segment_in_cache_and_create_index(
get_base_rp_offset(),
get_base_kafka_offset(),
0,
remote_segment_sampling_step_bytes);
remote_segment_sampling_step_bytes,
_base_timestamp);
auto [sparse, sput] = input_stream_fanout<2>(std::move(s), 1);
auto parser = make_remote_segment_index_builder(
get_ntp(),
Expand Down Expand Up @@ -523,7 +525,8 @@ ss::future<> remote_segment::do_hydrate_index() {
_base_rp_offset,
_base_rp_offset - _base_offset_delta,
0,
remote_segment_sampling_step_bytes);
remote_segment_sampling_step_bytes,
_base_timestamp);

auto result = co_await _api.download_index(
_bucket, remote_segment_path{_index_path}, ix, local_rtc);
Expand Down Expand Up @@ -689,7 +692,8 @@ ss::future<bool> remote_segment::maybe_materialize_index() {
_base_rp_offset,
_base_rp_offset - _base_offset_delta,
0,
remote_segment_sampling_step_bytes);
remote_segment_sampling_step_bytes,
_base_timestamp);
if (auto cache_item = co_await _cache.get(path); cache_item.has_value()) {
// The cache item is expected to be present if the segment is present
// so it's very unlikely that we will call this method if there is no
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_storage/remote_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ class remote_segment final {
model::offset _base_rp_offset;
model::offset_delta _base_offset_delta;
model::offset _max_rp_offset;
model::timestamp _base_timestamp;

// The expected size according to the manifest entry for the segment
size_t _size{0};
Expand Down
5 changes: 4 additions & 1 deletion src/v/cloud_storage/remote_segment_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ offset_index::offset_index(
model::offset initial_rp,
kafka::offset initial_kaf,
int64_t initial_file_pos,
int64_t file_pos_step)
int64_t file_pos_step,
model::timestamp initial_time)
: _rp_offsets{}
, _kaf_offsets{}
, _file_offsets{}
Expand All @@ -31,9 +32,11 @@ offset_index::offset_index(
, _initial_rp(initial_rp)
, _initial_kaf(initial_kaf)
, _initial_file_pos(initial_file_pos)
, _initial_time(initial_time)
, _rp_index(initial_rp)
, _kaf_index(initial_kaf)
, _file_index(initial_file_pos, delta_delta_t(file_pos_step))
, _time_index(initial_time.value())
, _min_file_pos_step(file_pos_step) {}

void offset_index::add(
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/remote_segment_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class offset_index {
model::offset initial_rp,
kafka::offset initial_kaf,
int64_t initial_file_pos,
int64_t file_pos_step);
int64_t file_pos_step,
model::timestamp initial_time);

/// Add new tuple to the index.
void add(
Expand Down
21 changes: 16 additions & 5 deletions src/v/cloud_storage/tests/remote_segment_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ BOOST_AUTO_TEST_CASE(remote_segment_index_search_test) {
}

offset_index tmp_index(
segment_base_rp_offset, segment_base_kaf_offset, 0U, 1000);
segment_base_rp_offset,
segment_base_kaf_offset,
0U,
1000,
model::timestamp{0xdeadbeef});
model::offset last;
kafka::offset klast;
size_t flast;
Expand All @@ -86,7 +90,11 @@ BOOST_AUTO_TEST_CASE(remote_segment_index_search_test) {
}

offset_index index(
segment_base_rp_offset, segment_base_kaf_offset, 0U, 1000);
segment_base_rp_offset,
segment_base_kaf_offset,
0U,
1000,
model::timestamp{0xdeadbeef});
auto buf = tmp_index.to_iobuf();
index.from_iobuf(std::move(buf));

Expand Down Expand Up @@ -144,7 +152,8 @@ SEASTAR_THREAD_TEST_CASE(test_remote_segment_index_builder) {
}
auto segment = generate_segment(base_offset, batches);
auto is = make_iobuf_input_stream(std::move(segment));
offset_index ix(base_offset, kbase_offset, 0, 0);
offset_index ix(
base_offset, kbase_offset, 0, 0, model::timestamp{0xdeadbeef});
auto parser = make_remote_segment_index_builder(
test_ntp, std::move(is), ix, model::offset_delta(0), 0);
auto result = parser->consume().get();
Expand Down Expand Up @@ -192,7 +201,8 @@ SEASTAR_THREAD_TEST_CASE(test_remote_segment_build_coarse_index) {
expected_conf_records + expected_data_records - 1);
auto segment = generate_segment(base_offset, batches);
auto is = make_iobuf_input_stream(std::move(segment));
offset_index ix(base_offset, kbase_offset, 0, 0);
offset_index ix(
base_offset, kbase_offset, 0, 0, model::timestamp{0xdeadbeef});
segment_record_stats stats{};
auto parser = make_remote_segment_index_builder(
test_ntp, std::move(is), ix, model::offset_delta(0), 0, std::ref(stats));
Expand Down Expand Up @@ -278,7 +288,8 @@ SEASTAR_THREAD_TEST_CASE(
}
auto segment = generate_segment(base_offset, batches);
auto is = make_iobuf_input_stream(std::move(segment));
offset_index ix(base_offset, kbase_offset, 0, 0);
offset_index ix(
base_offset, kbase_offset, 0, 0, model::timestamp{0xdeadbeef});
auto parser = make_remote_segment_index_builder(
test_ntp, std::move(is), ix, model::offset_delta(0), 0);
auto pclose = ss::defer([&parser] { parser->close().get(); });
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/tests/remote_segment_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ void upload_index(
meta.base_offset,
meta.base_kafka_offset(),
0,
remote_segment_sampling_step_bytes};
remote_segment_sampling_step_bytes,
meta.base_timestamp};

auto builder = make_remote_segment_index_builder(
manifest_ntp,
Expand Down

0 comments on commit b3e5b48

Please sign in to comment.