diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index ef5c40786b1d3..66728b9edfe41 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -1092,7 +1092,11 @@ ss::future ntp_archiver::upload_segment( auto index_path = make_index_path(path); auto make_idx_fut = make_segment_index( - candidate.starting_offset, _rtclog, index_path, std::move(stream_index)); + candidate.starting_offset, + candidate.base_timestamp, + _rtclog, + index_path, + std::move(stream_index)); auto [upload_res, idx_res] = co_await ss::when_all_succeed( std::move(upload_fut), std::move(make_idx_fut)); @@ -1222,6 +1226,7 @@ ss::future ntp_archiver::upload_tx( ss::future> ntp_archiver::make_segment_index( model::offset base_rp_offset, + model::timestamp base_timestamp, retry_chain_logger& ctxlog, std::string_view index_path, ss::input_stream stream) { @@ -1232,7 +1237,8 @@ ntp_archiver::make_segment_index( base_rp_offset, base_kafka_offset, 0, - cloud_storage::remote_segment_sampling_step_bytes}; + cloud_storage::remote_segment_sampling_step_bytes, + base_timestamp}; vlog(ctxlog.debug, "creating remote segment index: {}", index_path); cloud_storage::segment_record_stats stats{}; diff --git a/src/v/archival/ntp_archiver_service.h b/src/v/archival/ntp_archiver_service.h index dc5317fe83a92..f628a3101d8d6 100644 --- a/src/v/archival/ntp_archiver_service.h +++ b/src/v/archival/ntp_archiver_service.h @@ -494,6 +494,7 @@ class ntp_archiver { /// \return An index on success, nullopt on failure ss::future> make_segment_index( model::offset base_rp_offset, + model::timestamp base_timestamp, retry_chain_logger& ctxlog, std::string_view index_path, ss::input_stream stream); diff --git a/src/v/archival/tests/service_fixture.cc b/src/v/archival/tests/service_fixture.cc index 7ce3fc12eed28..503894ed6fde0 100644 --- a/src/v/archival/tests/service_fixture.cc +++ b/src/v/archival/tests/service_fixture.cc @@ -388,7 +388,8 @@ void segment_matcher::verify_index( meta->base_offset, meta->base_kafka_offset(), 0, - cloud_storage::remote_segment_sampling_step_bytes}; + cloud_storage::remote_segment_sampling_step_bytes, + meta->base_timestamp}; auto builder = cloud_storage::make_remote_segment_index_builder( ntp, @@ -400,16 +401,22 @@ void segment_matcher::verify_index( builder->consume().finally([&builder] { return builder->close(); }).get(); reader_handle.close().get(); - auto actual = iobuf_to_bytes(ix.to_iobuf()); + auto ix_buf = ix.to_iobuf(); + iobuf expected_buf; + expected_buf.append((const uint8_t*)(expected.data()), expected.size()); vlog( fixt_log.info, "expected {} bytes, got {}", expected.size(), - actual.size()); + ix_buf.size_bytes()); + + if (ix_buf != expected_buf) { + vlog(fixt_log.info, "ix_buf: {}", ix_buf.hexdump(1024)); - auto a = ss::sstring{actual.begin(), actual.end()}; - BOOST_REQUIRE(a == expected); // NOLINT + vlog(fixt_log.info, "expected: {}", expected_buf.hexdump(1024)); + } + BOOST_REQUIRE(ix_buf == expected_buf); // NOLINT } template diff --git a/src/v/cloud_storage/remote_segment.cc b/src/v/cloud_storage/remote_segment.cc index f569dfe620b36..ea51f74c5e42a 100644 --- a/src/v/cloud_storage/remote_segment.cc +++ b/src/v/cloud_storage/remote_segment.cc @@ -178,6 +178,7 @@ remote_segment::remote_segment( , _base_offset_delta(std::clamp( meta.delta_offset, model::offset_delta(0), model::offset_delta::max())) , _max_rp_offset(meta.committed_offset) + , _base_timestamp(meta.base_timestamp) , _size(meta.size_bytes) , _rtc(&parent) , _ctxlog(cst_log, _rtc, generate_log_prefix(meta, ntp)) @@ -402,7 +403,8 @@ ss::future remote_segment::put_segment_in_cache_and_create_index( get_base_rp_offset(), get_base_kafka_offset(), 0, - remote_segment_sampling_step_bytes); + remote_segment_sampling_step_bytes, + _base_timestamp); auto [sparse, sput] = input_stream_fanout<2>(std::move(s), 1); auto parser = make_remote_segment_index_builder( get_ntp(), @@ -523,7 +525,8 @@ ss::future<> remote_segment::do_hydrate_index() { _base_rp_offset, _base_rp_offset - _base_offset_delta, 0, - remote_segment_sampling_step_bytes); + remote_segment_sampling_step_bytes, + _base_timestamp); auto result = co_await _api.download_index( _bucket, remote_segment_path{_index_path}, ix, local_rtc); @@ -689,7 +692,8 @@ ss::future remote_segment::maybe_materialize_index() { _base_rp_offset, _base_rp_offset - _base_offset_delta, 0, - remote_segment_sampling_step_bytes); + remote_segment_sampling_step_bytes, + _base_timestamp); if (auto cache_item = co_await _cache.get(path); cache_item.has_value()) { // The cache item is expected to be present if the segment is present // so it's very unlikely that we will call this method if there is no diff --git a/src/v/cloud_storage/remote_segment.h b/src/v/cloud_storage/remote_segment.h index 257c3f605601a..39f0f88158519 100644 --- a/src/v/cloud_storage/remote_segment.h +++ b/src/v/cloud_storage/remote_segment.h @@ -263,6 +263,7 @@ class remote_segment final { model::offset _base_rp_offset; model::offset_delta _base_offset_delta; model::offset _max_rp_offset; + model::timestamp _base_timestamp; // The expected size according to the manifest entry for the segment size_t _size{0}; diff --git a/src/v/cloud_storage/remote_segment_index.cc b/src/v/cloud_storage/remote_segment_index.cc index 6f4b440bb8075..b8312bc6e6274 100644 --- a/src/v/cloud_storage/remote_segment_index.cc +++ b/src/v/cloud_storage/remote_segment_index.cc @@ -22,7 +22,8 @@ offset_index::offset_index( model::offset initial_rp, kafka::offset initial_kaf, int64_t initial_file_pos, - int64_t file_pos_step) + int64_t file_pos_step, + model::timestamp initial_time) : _rp_offsets{} , _kaf_offsets{} , _file_offsets{} @@ -31,9 +32,11 @@ offset_index::offset_index( , _initial_rp(initial_rp) , _initial_kaf(initial_kaf) , _initial_file_pos(initial_file_pos) + , _initial_time(initial_time) , _rp_index(initial_rp) , _kaf_index(initial_kaf) , _file_index(initial_file_pos, delta_delta_t(file_pos_step)) + , _time_index(initial_time.value()) , _min_file_pos_step(file_pos_step) {} void offset_index::add( diff --git a/src/v/cloud_storage/remote_segment_index.h b/src/v/cloud_storage/remote_segment_index.h index 371bc86f4673c..4fe4f9d575e8f 100644 --- a/src/v/cloud_storage/remote_segment_index.h +++ b/src/v/cloud_storage/remote_segment_index.h @@ -53,7 +53,8 @@ class offset_index { model::offset initial_rp, kafka::offset initial_kaf, int64_t initial_file_pos, - int64_t file_pos_step); + int64_t file_pos_step, + model::timestamp initial_time); /// Add new tuple to the index. void add( diff --git a/src/v/cloud_storage/tests/remote_segment_index_test.cc b/src/v/cloud_storage/tests/remote_segment_index_test.cc index 972a1a1012123..e1fe7b9ed1e4b 100644 --- a/src/v/cloud_storage/tests/remote_segment_index_test.cc +++ b/src/v/cloud_storage/tests/remote_segment_index_test.cc @@ -70,7 +70,11 @@ BOOST_AUTO_TEST_CASE(remote_segment_index_search_test) { } offset_index tmp_index( - segment_base_rp_offset, segment_base_kaf_offset, 0U, 1000); + segment_base_rp_offset, + segment_base_kaf_offset, + 0U, + 1000, + model::timestamp{0xdeadbeef}); model::offset last; kafka::offset klast; size_t flast; @@ -86,7 +90,11 @@ BOOST_AUTO_TEST_CASE(remote_segment_index_search_test) { } offset_index index( - segment_base_rp_offset, segment_base_kaf_offset, 0U, 1000); + segment_base_rp_offset, + segment_base_kaf_offset, + 0U, + 1000, + model::timestamp{0xdeadbeef}); auto buf = tmp_index.to_iobuf(); index.from_iobuf(std::move(buf)); @@ -144,7 +152,8 @@ SEASTAR_THREAD_TEST_CASE(test_remote_segment_index_builder) { } auto segment = generate_segment(base_offset, batches); auto is = make_iobuf_input_stream(std::move(segment)); - offset_index ix(base_offset, kbase_offset, 0, 0); + offset_index ix( + base_offset, kbase_offset, 0, 0, model::timestamp{0xdeadbeef}); auto parser = make_remote_segment_index_builder( test_ntp, std::move(is), ix, model::offset_delta(0), 0); auto result = parser->consume().get(); @@ -192,7 +201,8 @@ SEASTAR_THREAD_TEST_CASE(test_remote_segment_build_coarse_index) { expected_conf_records + expected_data_records - 1); auto segment = generate_segment(base_offset, batches); auto is = make_iobuf_input_stream(std::move(segment)); - offset_index ix(base_offset, kbase_offset, 0, 0); + offset_index ix( + base_offset, kbase_offset, 0, 0, model::timestamp{0xdeadbeef}); segment_record_stats stats{}; auto parser = make_remote_segment_index_builder( test_ntp, std::move(is), ix, model::offset_delta(0), 0, std::ref(stats)); @@ -278,7 +288,8 @@ SEASTAR_THREAD_TEST_CASE( } auto segment = generate_segment(base_offset, batches); auto is = make_iobuf_input_stream(std::move(segment)); - offset_index ix(base_offset, kbase_offset, 0, 0); + offset_index ix( + base_offset, kbase_offset, 0, 0, model::timestamp{0xdeadbeef}); auto parser = make_remote_segment_index_builder( test_ntp, std::move(is), ix, model::offset_delta(0), 0); auto pclose = ss::defer([&parser] { parser->close().get(); }); diff --git a/src/v/cloud_storage/tests/remote_segment_test.cc b/src/v/cloud_storage/tests/remote_segment_test.cc index 69d4eb6836009..45dc59312dbbe 100644 --- a/src/v/cloud_storage/tests/remote_segment_test.cc +++ b/src/v/cloud_storage/tests/remote_segment_test.cc @@ -170,7 +170,8 @@ void upload_index( meta.base_offset, meta.base_kafka_offset(), 0, - remote_segment_sampling_step_bytes}; + remote_segment_sampling_step_bytes, + meta.base_timestamp}; auto builder = make_remote_segment_index_builder( manifest_ntp,