From 0263a82d3288e057d5e28481ed11a195f0f7cdac Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 30 Jun 2023 13:25:33 +0100 Subject: [PATCH 1/2] cloud_storage: add timestamps to remote segment index This is a prerequisite to make timequeries seek to the proper chunk, instead of reading from the start of segments. The timestamp we care about is the batch max timestamp, because timequeries will want to find a batch that is definitely before the target timestamp, to start their scan from there. Related: https://github.com/redpanda-data/redpanda/issues/11801 --- src/v/cloud_storage/remote_segment_index.cc | 37 +++++++++++++++++-- src/v/cloud_storage/remote_segment_index.h | 10 ++++- .../tests/remote_segment_index_test.cc | 10 ++++- 3 files changed, 50 insertions(+), 7 deletions(-) diff --git a/src/v/cloud_storage/remote_segment_index.cc b/src/v/cloud_storage/remote_segment_index.cc index 53434141434a..6f4b440bb807 100644 --- a/src/v/cloud_storage/remote_segment_index.cc +++ b/src/v/cloud_storage/remote_segment_index.cc @@ -26,6 +26,7 @@ offset_index::offset_index( : _rp_offsets{} , _kaf_offsets{} , _file_offsets{} + , _time_offsets{} , _pos{} , _initial_rp(initial_rp) , _initial_kaf(initial_kaf) @@ -36,16 +37,21 @@ offset_index::offset_index( , _min_file_pos_step(file_pos_step) {} void offset_index::add( - model::offset rp_offset, kafka::offset kaf_offset, int64_t file_offset) { + model::offset rp_offset, + kafka::offset kaf_offset, + int64_t file_offset, + model::timestamp max_timestamp) { auto ix = index_mask & _pos++; _rp_offsets.at(ix) = rp_offset(); _kaf_offsets.at(ix) = kaf_offset(); _file_offsets.at(ix) = file_offset; + _time_offsets.at(ix) = max_timestamp.value(); try { if ((_pos & index_mask) == 0) { _rp_index.add(_rp_offsets); _kaf_index.add(_kaf_offsets); _file_index.add(_file_offsets); + _time_index.add(_time_offsets); } } catch (...) { // Get rid of the corrupted state in the encoders. @@ -59,6 +65,7 @@ void offset_index::add( _kaf_index = encoder_t(_initial_kaf); _file_index = foffset_encoder_t( _initial_file_pos, delta_delta_t(_min_file_pos_step)); + _time_index = encoder_t(_initial_time.value()); throw; } } @@ -242,7 +249,7 @@ offset_index::coarse_index_t offset_index::build_coarse_index( struct offset_index_header : serde::envelope< offset_index_header, - serde::version<1>, + serde::version<2>, serde::compat_version<1>> { int64_t min_file_pos_step; uint64_t num_elements; @@ -258,6 +265,12 @@ struct offset_index_header iobuf rp_index; iobuf kaf_index; iobuf file_index; + + // Version 2 fields + int64_t base_time{model::timestamp::missing().value()}; + int64_t last_time{model::timestamp::missing().value()}; + std::vector time_write_buf; + iobuf time_index; }; iobuf offset_index::to_iobuf() { @@ -279,7 +292,11 @@ iobuf offset_index::to_iobuf() { .rp_index = _rp_index.copy(), .kaf_index = _kaf_index.copy(), .file_index = _file_index.copy(), - }; + .base_time = _initial_time.value(), + .last_time = _time_index.get_last_value(), + .time_write_buf = std::vector( + _time_offsets.begin(), _time_offsets.end()), + .time_index = _time_index.copy()}; return serde::to_iobuf(std::move(hdr)); } @@ -310,6 +327,17 @@ void offset_index::from_iobuf(iobuf b) { std::move(hdr.file_index), delta_delta_t(_min_file_pos_step)); _min_file_pos_step = hdr.min_file_pos_step; + + _initial_time = model::timestamp(hdr.base_time); + _time_index = encoder_t( + _initial_time.value(), + num_rows, + hdr.last_time, + std::move(hdr.time_index)); + std::copy( + hdr.time_write_buf.begin(), + hdr.time_write_buf.end(), + _time_offsets.begin()); } std::optional @@ -362,7 +390,8 @@ void remote_segment_index_builder::consume_batch_start( _ix.add( hdr.base_offset, hdr.base_offset - _running_delta, - static_cast(physical_base_offset)); + static_cast(physical_base_offset), + hdr.max_timestamp); _window = 0; } } diff --git a/src/v/cloud_storage/remote_segment_index.h b/src/v/cloud_storage/remote_segment_index.h index 6c28198dc7ae..371bc86f4673 100644 --- a/src/v/cloud_storage/remote_segment_index.h +++ b/src/v/cloud_storage/remote_segment_index.h @@ -56,8 +56,11 @@ class offset_index { int64_t file_pos_step); /// Add new tuple to the index. - void - add(model::offset rp_offset, kafka::offset kaf_offset, int64_t file_offset); + void add( + model::offset rp_offset, + kafka::offset kaf_offset, + int64_t file_offset, + model::timestamp); struct find_result { model::offset rp_offset; @@ -142,10 +145,12 @@ class offset_index { std::array _rp_offsets; std::array _kaf_offsets; std::array _file_offsets; + std::array _time_offsets; uint64_t _pos; model::offset _initial_rp; kafka::offset _initial_kaf; int64_t _initial_file_pos; + model::timestamp _initial_time; using encoder_t = deltafor_encoder; using decoder_t = deltafor_decoder; @@ -156,6 +161,7 @@ class offset_index { encoder_t _rp_index; encoder_t _kaf_index; foffset_encoder_t _file_index; + encoder_t _time_index; int64_t _min_file_pos_step; friend class offset_index_accessor; diff --git a/src/v/cloud_storage/tests/remote_segment_index_test.cc b/src/v/cloud_storage/tests/remote_segment_index_test.cc index 719e8fccffee..972a1a101212 100644 --- a/src/v/cloud_storage/tests/remote_segment_index_test.cc +++ b/src/v/cloud_storage/tests/remote_segment_index_test.cc @@ -43,15 +43,18 @@ BOOST_AUTO_TEST_CASE(remote_segment_index_search_test) { std::vector rp_offsets; std::vector kaf_offsets; std::vector file_offsets; + std::vector timestamps; int64_t rp = segment_base_rp_offset(); int64_t kaf = segment_base_kaf_offset(); size_t fpos = random_generators::get_int(1000, 2000); + model::timestamp timestamp{123456}; bool is_config = false; for (size_t i = 0; i < segment_num_batches; i++) { if (!is_config) { rp_offsets.push_back(model::offset(rp)); kaf_offsets.push_back(kafka::offset(kaf)); file_offsets.push_back(fpos); + timestamps.push_back(timestamp); } // The test queries every element using the key that matches the element // exactly and then it queries the element using the key which is @@ -63,6 +66,7 @@ BOOST_AUTO_TEST_CASE(remote_segment_index_search_test) { rp += batch_size; kaf += is_config ? batch_size - 1 : batch_size; fpos += random_generators::get_int(1000, 2000); + timestamp = model::timestamp(timestamp.value() + 1); } offset_index tmp_index( @@ -71,7 +75,11 @@ BOOST_AUTO_TEST_CASE(remote_segment_index_search_test) { kafka::offset klast; size_t flast; for (size_t i = 0; i < rp_offsets.size(); i++) { - tmp_index.add(rp_offsets.at(i), kaf_offsets.at(i), file_offsets.at(i)); + tmp_index.add( + rp_offsets.at(i), + kaf_offsets.at(i), + file_offsets.at(i), + timestamps.at(i)); last = rp_offsets.at(i); klast = kaf_offsets.at(i); flast = file_offsets.at(i); From b3e5b48423d1ce5cd1df014b5f9e76cfce8cc200 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 4 Jul 2023 09:04:03 +0100 Subject: [PATCH 2/2] cloud_storage: supply a base_timestamp to remote index constructor --- src/v/archival/ntp_archiver_service.cc | 10 +++++++-- src/v/archival/ntp_archiver_service.h | 1 + src/v/archival/tests/service_fixture.cc | 17 ++++++++++----- src/v/cloud_storage/remote_segment.cc | 10 ++++++--- src/v/cloud_storage/remote_segment.h | 1 + src/v/cloud_storage/remote_segment_index.cc | 5 ++++- src/v/cloud_storage/remote_segment_index.h | 3 ++- .../tests/remote_segment_index_test.cc | 21 ++++++++++++++----- .../tests/remote_segment_test.cc | 3 ++- 9 files changed, 53 insertions(+), 18 deletions(-) diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index ef5c40786b1d..66728b9edfe4 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -1092,7 +1092,11 @@ ss::future ntp_archiver::upload_segment( auto index_path = make_index_path(path); auto make_idx_fut = make_segment_index( - candidate.starting_offset, _rtclog, index_path, std::move(stream_index)); + candidate.starting_offset, + candidate.base_timestamp, + _rtclog, + index_path, + std::move(stream_index)); auto [upload_res, idx_res] = co_await ss::when_all_succeed( std::move(upload_fut), std::move(make_idx_fut)); @@ -1222,6 +1226,7 @@ ss::future ntp_archiver::upload_tx( ss::future> ntp_archiver::make_segment_index( model::offset base_rp_offset, + model::timestamp base_timestamp, retry_chain_logger& ctxlog, std::string_view index_path, ss::input_stream stream) { @@ -1232,7 +1237,8 @@ ntp_archiver::make_segment_index( base_rp_offset, base_kafka_offset, 0, - cloud_storage::remote_segment_sampling_step_bytes}; + cloud_storage::remote_segment_sampling_step_bytes, + base_timestamp}; vlog(ctxlog.debug, "creating remote segment index: {}", index_path); cloud_storage::segment_record_stats stats{}; diff --git a/src/v/archival/ntp_archiver_service.h b/src/v/archival/ntp_archiver_service.h index dc5317fe83a9..f628a3101d8d 100644 --- a/src/v/archival/ntp_archiver_service.h +++ b/src/v/archival/ntp_archiver_service.h @@ -494,6 +494,7 @@ class ntp_archiver { /// \return An index on success, nullopt on failure ss::future> make_segment_index( model::offset base_rp_offset, + model::timestamp base_timestamp, retry_chain_logger& ctxlog, std::string_view index_path, ss::input_stream stream); diff --git a/src/v/archival/tests/service_fixture.cc b/src/v/archival/tests/service_fixture.cc index 7ce3fc12eed2..503894ed6fde 100644 --- a/src/v/archival/tests/service_fixture.cc +++ b/src/v/archival/tests/service_fixture.cc @@ -388,7 +388,8 @@ void segment_matcher::verify_index( meta->base_offset, meta->base_kafka_offset(), 0, - cloud_storage::remote_segment_sampling_step_bytes}; + cloud_storage::remote_segment_sampling_step_bytes, + meta->base_timestamp}; auto builder = cloud_storage::make_remote_segment_index_builder( ntp, @@ -400,16 +401,22 @@ void segment_matcher::verify_index( builder->consume().finally([&builder] { return builder->close(); }).get(); reader_handle.close().get(); - auto actual = iobuf_to_bytes(ix.to_iobuf()); + auto ix_buf = ix.to_iobuf(); + iobuf expected_buf; + expected_buf.append((const uint8_t*)(expected.data()), expected.size()); vlog( fixt_log.info, "expected {} bytes, got {}", expected.size(), - actual.size()); + ix_buf.size_bytes()); + + if (ix_buf != expected_buf) { + vlog(fixt_log.info, "ix_buf: {}", ix_buf.hexdump(1024)); - auto a = ss::sstring{actual.begin(), actual.end()}; - BOOST_REQUIRE(a == expected); // NOLINT + vlog(fixt_log.info, "expected: {}", expected_buf.hexdump(1024)); + } + BOOST_REQUIRE(ix_buf == expected_buf); // NOLINT } template diff --git a/src/v/cloud_storage/remote_segment.cc b/src/v/cloud_storage/remote_segment.cc index f569dfe620b3..ea51f74c5e42 100644 --- a/src/v/cloud_storage/remote_segment.cc +++ b/src/v/cloud_storage/remote_segment.cc @@ -178,6 +178,7 @@ remote_segment::remote_segment( , _base_offset_delta(std::clamp( meta.delta_offset, model::offset_delta(0), model::offset_delta::max())) , _max_rp_offset(meta.committed_offset) + , _base_timestamp(meta.base_timestamp) , _size(meta.size_bytes) , _rtc(&parent) , _ctxlog(cst_log, _rtc, generate_log_prefix(meta, ntp)) @@ -402,7 +403,8 @@ ss::future remote_segment::put_segment_in_cache_and_create_index( get_base_rp_offset(), get_base_kafka_offset(), 0, - remote_segment_sampling_step_bytes); + remote_segment_sampling_step_bytes, + _base_timestamp); auto [sparse, sput] = input_stream_fanout<2>(std::move(s), 1); auto parser = make_remote_segment_index_builder( get_ntp(), @@ -523,7 +525,8 @@ ss::future<> remote_segment::do_hydrate_index() { _base_rp_offset, _base_rp_offset - _base_offset_delta, 0, - remote_segment_sampling_step_bytes); + remote_segment_sampling_step_bytes, + _base_timestamp); auto result = co_await _api.download_index( _bucket, remote_segment_path{_index_path}, ix, local_rtc); @@ -689,7 +692,8 @@ ss::future remote_segment::maybe_materialize_index() { _base_rp_offset, _base_rp_offset - _base_offset_delta, 0, - remote_segment_sampling_step_bytes); + remote_segment_sampling_step_bytes, + _base_timestamp); if (auto cache_item = co_await _cache.get(path); cache_item.has_value()) { // The cache item is expected to be present if the segment is present // so it's very unlikely that we will call this method if there is no diff --git a/src/v/cloud_storage/remote_segment.h b/src/v/cloud_storage/remote_segment.h index 257c3f605601..39f0f8815851 100644 --- a/src/v/cloud_storage/remote_segment.h +++ b/src/v/cloud_storage/remote_segment.h @@ -263,6 +263,7 @@ class remote_segment final { model::offset _base_rp_offset; model::offset_delta _base_offset_delta; model::offset _max_rp_offset; + model::timestamp _base_timestamp; // The expected size according to the manifest entry for the segment size_t _size{0}; diff --git a/src/v/cloud_storage/remote_segment_index.cc b/src/v/cloud_storage/remote_segment_index.cc index 6f4b440bb807..b8312bc6e627 100644 --- a/src/v/cloud_storage/remote_segment_index.cc +++ b/src/v/cloud_storage/remote_segment_index.cc @@ -22,7 +22,8 @@ offset_index::offset_index( model::offset initial_rp, kafka::offset initial_kaf, int64_t initial_file_pos, - int64_t file_pos_step) + int64_t file_pos_step, + model::timestamp initial_time) : _rp_offsets{} , _kaf_offsets{} , _file_offsets{} @@ -31,9 +32,11 @@ offset_index::offset_index( , _initial_rp(initial_rp) , _initial_kaf(initial_kaf) , _initial_file_pos(initial_file_pos) + , _initial_time(initial_time) , _rp_index(initial_rp) , _kaf_index(initial_kaf) , _file_index(initial_file_pos, delta_delta_t(file_pos_step)) + , _time_index(initial_time.value()) , _min_file_pos_step(file_pos_step) {} void offset_index::add( diff --git a/src/v/cloud_storage/remote_segment_index.h b/src/v/cloud_storage/remote_segment_index.h index 371bc86f4673..4fe4f9d575e8 100644 --- a/src/v/cloud_storage/remote_segment_index.h +++ b/src/v/cloud_storage/remote_segment_index.h @@ -53,7 +53,8 @@ class offset_index { model::offset initial_rp, kafka::offset initial_kaf, int64_t initial_file_pos, - int64_t file_pos_step); + int64_t file_pos_step, + model::timestamp initial_time); /// Add new tuple to the index. void add( diff --git a/src/v/cloud_storage/tests/remote_segment_index_test.cc b/src/v/cloud_storage/tests/remote_segment_index_test.cc index 972a1a101212..e1fe7b9ed1e4 100644 --- a/src/v/cloud_storage/tests/remote_segment_index_test.cc +++ b/src/v/cloud_storage/tests/remote_segment_index_test.cc @@ -70,7 +70,11 @@ BOOST_AUTO_TEST_CASE(remote_segment_index_search_test) { } offset_index tmp_index( - segment_base_rp_offset, segment_base_kaf_offset, 0U, 1000); + segment_base_rp_offset, + segment_base_kaf_offset, + 0U, + 1000, + model::timestamp{0xdeadbeef}); model::offset last; kafka::offset klast; size_t flast; @@ -86,7 +90,11 @@ BOOST_AUTO_TEST_CASE(remote_segment_index_search_test) { } offset_index index( - segment_base_rp_offset, segment_base_kaf_offset, 0U, 1000); + segment_base_rp_offset, + segment_base_kaf_offset, + 0U, + 1000, + model::timestamp{0xdeadbeef}); auto buf = tmp_index.to_iobuf(); index.from_iobuf(std::move(buf)); @@ -144,7 +152,8 @@ SEASTAR_THREAD_TEST_CASE(test_remote_segment_index_builder) { } auto segment = generate_segment(base_offset, batches); auto is = make_iobuf_input_stream(std::move(segment)); - offset_index ix(base_offset, kbase_offset, 0, 0); + offset_index ix( + base_offset, kbase_offset, 0, 0, model::timestamp{0xdeadbeef}); auto parser = make_remote_segment_index_builder( test_ntp, std::move(is), ix, model::offset_delta(0), 0); auto result = parser->consume().get(); @@ -192,7 +201,8 @@ SEASTAR_THREAD_TEST_CASE(test_remote_segment_build_coarse_index) { expected_conf_records + expected_data_records - 1); auto segment = generate_segment(base_offset, batches); auto is = make_iobuf_input_stream(std::move(segment)); - offset_index ix(base_offset, kbase_offset, 0, 0); + offset_index ix( + base_offset, kbase_offset, 0, 0, model::timestamp{0xdeadbeef}); segment_record_stats stats{}; auto parser = make_remote_segment_index_builder( test_ntp, std::move(is), ix, model::offset_delta(0), 0, std::ref(stats)); @@ -278,7 +288,8 @@ SEASTAR_THREAD_TEST_CASE( } auto segment = generate_segment(base_offset, batches); auto is = make_iobuf_input_stream(std::move(segment)); - offset_index ix(base_offset, kbase_offset, 0, 0); + offset_index ix( + base_offset, kbase_offset, 0, 0, model::timestamp{0xdeadbeef}); auto parser = make_remote_segment_index_builder( test_ntp, std::move(is), ix, model::offset_delta(0), 0); auto pclose = ss::defer([&parser] { parser->close().get(); }); diff --git a/src/v/cloud_storage/tests/remote_segment_test.cc b/src/v/cloud_storage/tests/remote_segment_test.cc index 69d4eb683600..45dc59312dbb 100644 --- a/src/v/cloud_storage/tests/remote_segment_test.cc +++ b/src/v/cloud_storage/tests/remote_segment_test.cc @@ -170,7 +170,8 @@ void upload_index( meta.base_offset, meta.base_kafka_offset(), 0, - remote_segment_sampling_step_bytes}; + remote_segment_sampling_step_bytes, + meta.base_timestamp}; auto builder = make_remote_segment_index_builder( manifest_ntp,