diff --git a/src/v/cloud_storage/remote_partition.cc b/src/v/cloud_storage/remote_partition.cc index f65e177c2cdd2..840ab82357f0c 100644 --- a/src/v/cloud_storage/remote_partition.cc +++ b/src/v/cloud_storage/remote_partition.cc @@ -1242,11 +1242,13 @@ remote_partition::timequery(storage::timequery_config cfg) { co_return std::nullopt; } - auto start_offset = stm_manifest.full_log_start_kafka_offset().value(); + auto start_offset = std::max( + cfg.min_offset, + kafka::offset_cast(stm_manifest.full_log_start_kafka_offset().value())); // Synthesize a log_reader_config from our timequery_config storage::log_reader_config config( - kafka::offset_cast(start_offset), + start_offset, cfg.max_offset, 0, 2048, // We just need one record batch @@ -1269,7 +1271,8 @@ remote_partition::timequery(storage::timequery_config cfg) { vlog(_ctxlog.debug, "timequery: {} batches", batches.size()); if (batches.size()) { - co_return storage::batch_timequery(*(batches.begin()), cfg.time); + co_return storage::batch_timequery( + *(batches.begin()), cfg.min_offset, cfg.time, cfg.max_offset); } else { co_return std::nullopt; } diff --git a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc index 448f5640a13f3..42011d1e71dcd 100644 --- a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc +++ b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc @@ -711,7 +711,7 @@ FIXTURE_TEST(test_local_timequery, e2e_fixture) { bool expect_value = false, std::optional expected_o = std::nullopt) { auto timequery_conf = storage::timequery_config( - t, o, ss::default_priority_class(), std::nullopt); + model::offset(0), t, o, ss::default_priority_class(), std::nullopt); auto result = partition->timequery(timequery_conf).get(); @@ -798,7 +798,7 @@ FIXTURE_TEST(test_cloud_storage_timequery, e2e_fixture) { bool expect_value = false, std::optional expected_o = std::nullopt) { auto timequery_conf = storage::timequery_config( - t, o, ss::default_priority_class(), std::nullopt); + model::offset(0), t, o, ss::default_priority_class(), std::nullopt); auto result = partition->timequery(timequery_conf).get(); @@ -904,7 +904,7 @@ FIXTURE_TEST(test_mixed_timequery, e2e_fixture) { bool expect_value = false, std::optional expected_o = std::nullopt) { auto timequery_conf = storage::timequery_config( - t, o, ss::default_priority_class(), std::nullopt); + model::offset(0), t, o, ss::default_priority_class(), std::nullopt); auto result = partition->timequery(timequery_conf).get(); diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 43f8266915546..0b8932f774ad1 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -506,14 +506,24 @@ partition::timequery(storage::timequery_config cfg) { const bool may_answer_from_cloud = may_read_from_cloud() - && _cloud_storage_partition->bounds_timestamp(cfg.time); + && _cloud_storage_partition->bounds_timestamp(cfg.time) + && cfg.min_offset < kafka::offset_cast( + _cloud_storage_partition->next_kafka_offset()); if (_raft->log()->start_timestamp() <= cfg.time) { // The query is ahead of the local data's start_timestamp: this // means it _might_ hit on local data: start_timestamp is not // precise, so once we query we might still fall back to cloud // storage - auto result = co_await local_timequery(cfg); + // + // We also need to adjust the lower bound for the local query as the + // min_offset corresponds to the full log (including tiered storage). + auto local_query_cfg = cfg; + local_query_cfg.min_offset = std::max( + log()->from_log_offset(_raft->start_offset()), + local_query_cfg.min_offset); + auto result = co_await local_timequery( + local_query_cfg, may_answer_from_cloud); if (result.has_value()) { co_return result; } else { @@ -526,9 +536,16 @@ partition::timequery(storage::timequery_config cfg) { // Timestamp is before local storage but within cloud storage co_return co_await cloud_storage_timequery(cfg); } else { - // No cloud data: queries earlier than the start of the log - // will hit on the start of the log. - co_return co_await local_timequery(cfg); + // No cloud data OR not allowed to read from cloud: queries earlier + // than the start of the log will hit on the start of the log. + // + // Adjust the lower bound for the local query as the min_offset + // corresponds to the full log (including tiered storage). + auto local_query_cfg = cfg; + local_query_cfg.min_offset = std::max( + log()->from_log_offset(_raft->start_offset()), + local_query_cfg.min_offset); + co_return co_await local_timequery(local_query_cfg, false); } } } @@ -548,12 +565,7 @@ partition::cloud_storage_timequery(storage::timequery_config cfg) { // raft log is ahead of the query timestamp or the topic is a read // replica, so proceed to query the remote partition to try and // find the earliest data that has timestamp >= the query time. - vlog( - clusterlog.debug, - "timequery (cloud) {} t={} max_offset(k)={}", - _raft->ntp(), - cfg.time, - cfg.max_offset); + vlog(clusterlog.debug, "timequery (cloud) {} cfg(k)={}", _raft->ntp(), cfg); // remote_partition pre-translates offsets for us, so no call into // the offset translator here @@ -561,45 +573,38 @@ partition::cloud_storage_timequery(storage::timequery_config cfg) { if (result.has_value()) { vlog( clusterlog.debug, - "timequery (cloud) {} t={} max_offset(r)={} result(r)={}", + "timequery (cloud) {} cfg(k)={} result(k)={}", _raft->ntp(), - cfg.time, - cfg.max_offset, + cfg, result->offset); } co_return result; } -ss::future> -partition::local_timequery(storage::timequery_config cfg) { - vlog( - clusterlog.debug, - "timequery (raft) {} t={} max_offset(k)={}", - _raft->ntp(), - cfg.time, - cfg.max_offset); +ss::future> partition::local_timequery( + storage::timequery_config cfg, bool allow_cloud_fallback) { + vlog(clusterlog.debug, "timequery (raft) {} cfg(k)={}", _raft->ntp(), cfg); + cfg.min_offset = _raft->log()->to_log_offset(cfg.min_offset); cfg.max_offset = _raft->log()->to_log_offset(cfg.max_offset); - auto result = co_await _raft->timequery(cfg); + vlog(clusterlog.debug, "timequery (raft) {} cfg(r)={}", _raft->ntp(), cfg); - const bool may_answer_from_cloud - = may_read_from_cloud() - && _cloud_storage_partition->bounds_timestamp(cfg.time); + auto result = co_await _raft->timequery(cfg); if (result.has_value()) { - if (may_answer_from_cloud) { + if (allow_cloud_fallback) { // We need to test for cases in which we will fall back to querying // cloud storage. if (_raft->log()->start_timestamp() > cfg.time) { // Query raced with prefix truncation vlog( clusterlog.debug, - "timequery (raft) {} ts={} raced with truncation " + "timequery (raft) {} cfg(r)={} raced with truncation " "(start_timestamp {}, result {})", _raft->ntp(), - cfg.time, + cfg, _raft->log()->start_timestamp(), result->time); co_return std::nullopt; @@ -618,11 +623,11 @@ partition::local_timequery(storage::timequery_config cfg) { // https://github.com/redpanda-data/redpanda/issues/9669 vlog( clusterlog.debug, - "Timequery (raft) {} ts={} miss on local log " + "Timequery (raft) {} cfg(r)={} miss on local log " "(start_timestamp " "{}, result {})", _raft->ntp(), - cfg.time, + cfg, _raft->log()->start_timestamp(), result->time); co_return std::nullopt; @@ -635,15 +640,15 @@ partition::local_timequery(storage::timequery_config cfg) { // have the same timestamp and are present in cloud storage. vlog( clusterlog.debug, - "Timequery (raft) {} ts={} hit start_offset in local log " + "Timequery (raft) {} cfg(r)={} hit start_offset in local log " "(start_offset {} start_timestamp {}, result {})", _raft->ntp(), + cfg, _raft->log()->offsets().start_offset, - cfg.time, _raft->log()->start_timestamp(), cfg.time); - if (may_answer_from_cloud) { + if (allow_cloud_fallback) { // Even though we hit data with the desired timestamp, we // cannot be certain that this is the _first_ batch with // the desired timestamp: return null so that the caller @@ -654,10 +659,9 @@ partition::local_timequery(storage::timequery_config cfg) { vlog( clusterlog.debug, - "timequery (raft) {} t={} max_offset(r)={} result(r)={}", + "timequery (raft) {} cfg(r)={} result(r)={}", _raft->ntp(), - cfg.time, - cfg.max_offset, + cfg, result->offset); result->offset = _raft->log()->from_log_offset(result->offset); } diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 5b95254746a1c..0b503a136bb95 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -345,7 +345,7 @@ class partition : public ss::enable_lw_shared_from_this { bool may_read_from_cloud() const; ss::future> - local_timequery(storage::timequery_config); + local_timequery(storage::timequery_config, bool allow_cloud_fallback); consensus_ptr _raft; ss::shared_ptr _log_eviction_stm; diff --git a/src/v/kafka/server/handlers/list_offsets.cc b/src/v/kafka/server/handlers/list_offsets.cc index f8499092b8fda..54c5a52cd1db8 100644 --- a/src/v/kafka/server/handlers/list_offsets.cc +++ b/src/v/kafka/server/handlers/list_offsets.cc @@ -130,8 +130,9 @@ static ss::future list_offsets_partition( kafka_partition->leader_epoch()); } auto res = co_await kafka_partition->timequery(storage::timequery_config{ + kafka_partition->start_offset(), timestamp, - offset, + model::prev_offset(offset), kafka_read_priority(), {model::record_batch_type::raft_data}, octx.rctx.abort_source().local()}); diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index 094fd4eddfa15..c6fd3fa590afb 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -356,7 +356,8 @@ replicated_partition::timequery(storage::timequery_config cfg) { if (batches.empty()) { co_return std::nullopt; } - co_return storage::batch_timequery(*(batches.begin()), cfg.time); + co_return storage::batch_timequery( + *(batches.begin()), cfg.min_offset, cfg.time, cfg.max_offset); } ss::future> replicated_partition::replicate( diff --git a/src/v/model/offset_interval.h b/src/v/model/offset_interval.h new file mode 100644 index 0000000000000..be1711e4d50bb --- /dev/null +++ b/src/v/model/offset_interval.h @@ -0,0 +1,80 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#pragma once + +#include "base/vassert.h" +#include "model/fundamental.h" + +namespace model { +/// A non-empty, bounded, closed interval of offsets [min offset, max offset]. +/// +/// This property helps to simplify the logic and the instructions required to +/// check for overlaps, containment, etc. It is the responsibility of the caller +/// to ensure these properties hold before constructing an instance of this +/// class. +/// +/// To represent a potentially empty range, wrap it in an optional. +class bounded_offset_interval { +public: + static bounded_offset_interval + unchecked(model::offset min, model::offset max) noexcept { + return {min, max}; + } + + static bounded_offset_interval + checked(model::offset min, model::offset max) { + if (min < model::offset(0) || max < model::offset(0) || min > max) { + throw std::invalid_argument(fmt::format( + "Invalid arguments for constructing a non-empty bounded offset " + "interval: min({}) <= max({})", + min, + max)); + } + + return {min, max}; + } + + inline bool overlaps(const bounded_offset_interval& other) const noexcept { + return _min <= other._max && _max >= other._min; + } + + inline bool contains(model::offset o) const noexcept { + return _min <= o && o <= _max; + } + + friend std::ostream& + operator<<(std::ostream& o, const bounded_offset_interval& r) { + fmt::print(o, "{{min: {}, max: {}}}", r._min, r._max); + return o; + } + + inline model::offset min() const noexcept { return _min; } + inline model::offset max() const noexcept { return _max; } + +private: + bounded_offset_interval(model::offset min, model::offset max) noexcept + : _min(min) + , _max(max) { +#ifndef NDEBUG + vassert( + min >= model::offset(0), "Offset interval min({}) must be >= 0", min); + vassert( + min <= max, + "Offset interval invariant not satisfied: min({}) <= max({})", + min, + max); +#endif + } + + model::offset _min; + model::offset _max; +}; + +} // namespace model diff --git a/src/v/model/tests/random_batch.cc b/src/v/model/tests/random_batch.cc index 4157c6df084ed..12a546b515876 100644 --- a/src/v/model/tests/random_batch.cc +++ b/src/v/model/tests/random_batch.cc @@ -119,7 +119,10 @@ make_random_batch(model::offset o, int num_records, bool allow_compression) { model::record_batch make_random_batch(record_batch_spec spec) { auto ts = spec.timestamp.value_or(model::timestamp::now()); - auto max_ts = model::timestamp(ts() + spec.count - 1); + auto max_ts = ts; + if (!spec.all_records_have_same_timestamp) { + max_ts = model::timestamp(ts() + spec.count - 1); + } auto header = model::record_batch_header{ .size_bytes = 0, // computed later .base_offset = spec.offset, @@ -235,7 +238,9 @@ make_random_batches(record_batch_spec spec) { auto num_records = spec.records ? *spec.records : get_int(2, 30); auto batch_spec = spec; batch_spec.timestamp = ts; - ts = model::timestamp(ts() + num_records); + if (!batch_spec.all_records_have_same_timestamp) { + ts = model::timestamp(ts() + num_records); + } batch_spec.offset = o; batch_spec.count = num_records; if (spec.enable_idempotence) { diff --git a/src/v/model/tests/random_batch.h b/src/v/model/tests/random_batch.h index c7dbccad0443e..d9df64dba393a 100644 --- a/src/v/model/tests/random_batch.h +++ b/src/v/model/tests/random_batch.h @@ -34,6 +34,7 @@ struct record_batch_spec { bool is_transactional{false}; std::optional> record_sizes; std::optional timestamp; + bool all_records_have_same_timestamp{false}; }; model::record make_random_record(int, iobuf); diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index b6aa1ef56fd1d..4e179acce977e 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -2341,7 +2341,8 @@ disk_log_impl::make_reader(timequery_config config) { vassert(!_closed, "make_reader on closed log - {}", *this); return _lock_mngr.range_lock(config).then( [this, cfg = config](std::unique_ptr lease) { - auto start_offset = _start_offset; + auto start_offset = cfg.min_offset; + if (!lease->range.empty()) { const ss::lw_shared_ptr& segment = *lease->range.begin(); std::optional index_entry = std::nullopt; @@ -2456,7 +2457,8 @@ disk_log_impl::timequery(timequery_config cfg) { if ( !batches.empty() && batches.front().header().max_timestamp >= cfg.time) { - return ret_t(batch_timequery(batches.front(), cfg.time)); + return ret_t(batch_timequery( + batches.front(), cfg.min_offset, cfg.time, cfg.max_offset)); } return ret_t(); }); diff --git a/src/v/storage/lock_manager.cc b/src/v/storage/lock_manager.cc index 2a6d6dda2576d..32f4b058aafda 100644 --- a/src/v/storage/lock_manager.cc +++ b/src/v/storage/lock_manager.cc @@ -9,6 +9,7 @@ #include "storage/lock_manager.h" +#include "model/offset_interval.h" #include "storage/segment.h" #include @@ -38,14 +39,27 @@ range(segment_set::underlying_t segs) { ss::future> lock_manager::range_lock(const timequery_config& cfg) { + auto query_interval = model::bounded_offset_interval::checked( + cfg.min_offset, cfg.max_offset); + segment_set::underlying_t tmp; + // Copy segments that have timestamps >= cfg.time and overlap with the + // offset range [min_offset, max_offset]. std::copy_if( _set.lower_bound(cfg.time), _set.end(), std::back_inserter(tmp), - [&cfg](ss::lw_shared_ptr& s) { - // must be base offset - return s->offsets().get_base_offset() <= cfg.max_offset; + [&query_interval](ss::lw_shared_ptr& s) { + if (s->empty()) { + return false; + } + + // Safety: unchecked is safe here because we did already check + // `!s->empty()` above to ensure that the segment has data. + auto segment_interval = model::bounded_offset_interval::unchecked( + s->offsets().get_base_offset(), s->offsets().get_dirty_offset()); + + return segment_interval.overlaps(query_interval); }); return range(std::move(tmp)); } diff --git a/src/v/storage/log_reader.cc b/src/v/storage/log_reader.cc index 17ece5e993b2d..14f0194f12e26 100644 --- a/src/v/storage/log_reader.cc +++ b/src/v/storage/log_reader.cc @@ -13,6 +13,7 @@ #include "base/vlog.h" #include "bytes/iobuf.h" #include "model/fundamental.h" +#include "model/offset_interval.h" #include "model/record.h" #include "storage/logger.h" #include "storage/offset_translator_state.h" @@ -533,23 +534,28 @@ bool log_reader::is_done() { || is_finished_offset(_lease->range, _config.start_offset); } -timequery_result -batch_timequery(const model::record_batch& b, model::timestamp t) { +timequery_result batch_timequery( + const model::record_batch& b, + model::offset min_offset, + model::timestamp t, + model::offset max_offset) { + auto query_interval = model::bounded_offset_interval::checked( + min_offset, max_offset); + // If the timestamp matches something mid-batch, then // parse into the batch far enough to find it: this // happens when we had CreateTime input, such that // records in the batch have different timestamps. model::offset result_o = b.base_offset(); model::timestamp result_t = b.header().first_timestamp; - if (b.header().first_timestamp < t && !b.compressed()) { + if (!b.compressed()) { b.for_each_record( - [&result_o, &result_t, t, &b]( + [&result_o, &result_t, &b, query_interval, t]( const model::record& r) -> ss::stop_iteration { + auto record_o = model::offset{r.offset_delta()} + b.base_offset(); auto record_t = model::timestamp( b.header().first_timestamp() + r.timestamp_delta()); - if (record_t >= t) { - auto record_o = model::offset{r.offset_delta()} - + b.base_offset(); + if (record_t >= t && query_interval.contains(record_o)) { result_o = record_o; result_t = record_t; return ss::stop_iteration::yes; diff --git a/src/v/storage/log_reader.h b/src/v/storage/log_reader.h index 4683692f2fe21..94ab687088653 100644 --- a/src/v/storage/log_reader.h +++ b/src/v/storage/log_reader.h @@ -260,14 +260,33 @@ class log_reader final : public model::record_batch_reader::impl { /** * Assuming caller has already determined that this batch contains - * the record that should be the result to the timequery, traverse - * the batch to find which record matches. + * the record that should be the result to the timequery (critical!), + * traverse the batch to find the record with with timestamp >= \ref t. + * + * The min and max offsets are used to limit the search to a specific + * range inside the batch. This is necessary to support the case where + * log was requested to be prefix-truncated (trim-prefix) to an offset + * which lies in the middle of a batch. + * + * If the preconditions aren't met, the result is the timestamp of the first + * record in the batch. * * This is used by both storage's disk_log_impl and by cloud_storage's * remote_partition, to seek to their final result after finding * the batch. + * + * To read more about trim-prefix: + * https://docs.redpanda.com/current/reference/rpk/rpk-topic/rpk-topic-trim-prefix/ + * + * \param b The batch to search in. + * \param min_offset The minimum offset to consider + * \param t The timestamp to search for + * \param max_offset The maximum offset to consider */ -timequery_result -batch_timequery(const model::record_batch& b, model::timestamp t); +timequery_result batch_timequery( + const model::record_batch& b, + model::offset min_offset, + model::timestamp t, + model::offset max_offset); } // namespace storage diff --git a/src/v/storage/tests/storage_e2e_test.cc b/src/v/storage/tests/storage_e2e_test.cc index 70343c68d9ab2..e4aef420752f3 100644 --- a/src/v/storage/tests/storage_e2e_test.cc +++ b/src/v/storage/tests/storage_e2e_test.cc @@ -289,10 +289,20 @@ FIXTURE_TEST(test_reading_range_from_a_log, storage_test_fixture) { BOOST_REQUIRE_EQUAL(range.size(), 5); BOOST_REQUIRE_EQUAL(range.front().header().crc, batches[3].header().crc); BOOST_REQUIRE_EQUAL(range.back().header().crc, batches[7].header().crc); - // range from base of beging to the middle of end + + // Range that starts and ends in the middle of the same batch. range = read_range_to_vector( log, - batches[3].base_offset(), + batches[3].base_offset() + model::offset(batches[3].record_count() / 3), + batches[3].base_offset() + + model::offset(batches[3].record_count() / 3 * 2LL)); + BOOST_REQUIRE_EQUAL(range.size(), 1); + BOOST_REQUIRE_EQUAL(range.front().header().crc, batches[3].header().crc); + + // Range that starts and ends in the middle of batches. + range = read_range_to_vector( + log, + batches[3].base_offset() + model::offset(batches[3].record_count() / 2), batches[7].base_offset() + model::offset(batches[7].record_count() / 2)); BOOST_REQUIRE_EQUAL(range.size(), 5); BOOST_REQUIRE_EQUAL(range.front().header().crc, batches[3].header().crc); diff --git a/src/v/storage/tests/timequery_test.cc b/src/v/storage/tests/timequery_test.cc index 4c2a95d9415eb..69c7fa526eeb7 100644 --- a/src/v/storage/tests/timequery_test.cc +++ b/src/v/storage/tests/timequery_test.cc @@ -8,17 +8,24 @@ // by the Apache License, Version 2.0 #include "config/configuration.h" +#include "model/fundamental.h" #include "model/tests/random_batch.h" +#include "model/timestamp.h" #include "storage/tests/disk_log_builder_fixture.h" #include "test_utils/fixture.h" #include +#include + namespace { // Make a batch that is big enough to trigger the indexing threshold. -model::record_batch -make_random_batch(model::offset o, bool big_enough_for_index = true) { +model::record_batch make_random_batch( + model::offset o, + model::timestamp ts, + int num_records = 1, + bool big_enough_for_index = true) { auto batch_size = storage::segment_index::default_data_buffer_step + 1; if (!big_enough_for_index) { batch_size = 1024; @@ -26,11 +33,11 @@ make_random_batch(model::offset o, bool big_enough_for_index = true) { return model::test::make_random_batch( model::offset(o), - 1, + num_records, false, model::record_batch_type::raft_data, - std::vector{batch_size}, - std::nullopt); + std::vector(num_records, batch_size), + ts); } } // namespace @@ -43,23 +50,20 @@ FIXTURE_TEST(timequery, log_builder_fixture) { // seg0: timestamps 0..99, offset = timestamp b | add_segment(0); for (auto ts = 0; ts < 100; ts++) { - auto batch = make_random_batch(model::offset(ts)); - batch.header().first_timestamp = model::timestamp(ts); - batch.header().max_timestamp = model::timestamp(ts); + auto batch = make_random_batch(model::offset(ts), model::timestamp(ts)); b | add_batch(std::move(batch)); } // seg1: [(offset, ts)..] // - (100, 100), (101, 100), ... (104, 100) - // - (105, 101), (105, 101), ... (109, 101) + // - (105, 101), (106, 101), ... (109, 101) // ... // - (195, 119), (196, 119), ... (200, 119) b | add_segment(100); for (auto offset = 100; offset <= 200; offset++) { auto ts = 100 + (offset - 100) / 5; - auto batch = make_random_batch(model::offset(offset)); - batch.header().first_timestamp = model::timestamp(ts); - batch.header().max_timestamp = model::timestamp(ts); + auto batch = make_random_batch( + model::offset(offset), model::timestamp(ts)); b | add_batch(std::move(batch)); } @@ -67,11 +71,35 @@ FIXTURE_TEST(timequery, log_builder_fixture) { BOOST_TEST(seg->index().batch_timestamps_are_monotonic()); } + BOOST_TEST_CONTEXT( + "undershoot the timestamp but keep increasing the start offset") { + auto log = b.get_log(); + for (auto start_offset = log->offsets().start_offset; + start_offset < model::offset(10); + start_offset++) { + BOOST_TEST_INFO_SCOPE( + fmt::format("start_offset: {}", start_offset)); + + storage::timequery_config config( + start_offset, + model::timestamp(0), + log->offsets().dirty_offset, + ss::default_priority_class(), + std::nullopt); + + auto res = log->timequery(config).get0(); + BOOST_TEST(res); + BOOST_TEST(res->time == model::timestamp(start_offset)); + BOOST_TEST(res->offset == start_offset); + } + } + // in the first segment check that query(ts) -> batch.offset = ts. for (auto ts = 0; ts < 100; ts++) { auto log = b.get_log(); storage::timequery_config config( + log->offsets().start_offset, model::timestamp(ts), log->offsets().dirty_offset, ss::default_priority_class(), @@ -94,6 +122,7 @@ FIXTURE_TEST(timequery, log_builder_fixture) { auto log = b.get_log(); storage::timequery_config config( + log->offsets().start_offset, model::timestamp(ts), log->offsets().dirty_offset, ss::default_priority_class(), @@ -110,6 +139,68 @@ FIXTURE_TEST(timequery, log_builder_fixture) { b | stop(); } +FIXTURE_TEST(timequery_multiple_messages_per_batch, log_builder_fixture) { + using namespace storage; // NOLINT + + b | start(); + + b | add_segment(0); + + int num_batches = 10; + int records_per_batch = 10; + + // Half share the same timestamp. + for (auto ts = 0; ts < num_batches * records_per_batch / 2; + ts += records_per_batch) { + b + | add_batch( + model::test::make_random_batch(model::test::record_batch_spec{ + .offset = model::offset(ts), + // It is sad but we can't properly query for timestamps inside + // compressed batches. + .allow_compression = false, + .count = records_per_batch, + .timestamp = model::timestamp(ts), + .all_records_have_same_timestamp = true, + })); + } + + // Half have different timestamps. + for (auto ts = num_batches * records_per_batch / 2; + ts < num_batches * records_per_batch; + ts += records_per_batch) { + auto batch = make_random_batch( + model::offset(ts), model::timestamp(ts), records_per_batch); + b | add_batch(std::move(batch)); + } + + for (const auto& seg : b.get_log_segments()) { + BOOST_TEST(seg->index().batch_timestamps_are_monotonic()); + } + + auto log = b.get_log(); + + for (auto start_offset = log->offsets().start_offset; + start_offset < model::offset(num_batches * records_per_batch); + start_offset++) { + BOOST_TEST_INFO_SCOPE(fmt::format("start_offset: {}", start_offset)); + + storage::timequery_config config( + start_offset, + model::timestamp(0), + log->offsets().dirty_offset, + ss::default_priority_class(), + std::nullopt); + + auto res = log->timequery(config).get0(); + BOOST_TEST(res); + BOOST_TEST(res->time == model::timestamp(start_offset)); + BOOST_TEST(res->offset == start_offset); + } + + b | stop(); +} + FIXTURE_TEST(timequery_single_value, log_builder_fixture) { using namespace storage; // NOLINT @@ -118,15 +209,15 @@ FIXTURE_TEST(timequery_single_value, log_builder_fixture) { // seg0: timestamps [1000...1099], offsets = [0...99] b | add_segment(0); for (auto offset = 0; offset < 100; ++offset) { - auto batch = make_random_batch(model::offset(offset)); - batch.header().first_timestamp = model::timestamp(offset + 1000); - batch.header().max_timestamp = model::timestamp(offset + 1000); + auto batch = make_random_batch( + model::offset(offset), model::timestamp(offset + 1000)); b | add_batch(std::move(batch)); } // ask for time greater than last timestamp f.e 1200 auto log = b.get_log(); storage::timequery_config config( + log->offsets().start_offset, model::timestamp(1200), log->offsets().dirty_offset, ss::default_priority_class(), @@ -151,20 +242,15 @@ FIXTURE_TEST(timequery_sparse_index, log_builder_fixture) { b | start(); b | add_segment(0); - auto batch1 = make_random_batch(model::offset(0)); - batch1.header().first_timestamp = model::timestamp(1000); - batch1.header().max_timestamp = model::timestamp(1000); + auto batch1 = make_random_batch(model::offset(0), model::timestamp(1000)); b | add_batch(std::move(batch1)); // This batch will not be indexed. - auto batch2 = make_random_batch(model::offset(1), false); - batch2.header().first_timestamp = model::timestamp(1600); - batch2.header().max_timestamp = model::timestamp(1600); + auto batch2 = make_random_batch( + model::offset(1), model::timestamp(1600), 1, false); b | add_batch(std::move(batch2)); - auto batch3 = make_random_batch(model::offset(2)); - batch3.header().first_timestamp = model::timestamp(2000); - batch3.header().max_timestamp = model::timestamp(2000); + auto batch3 = make_random_batch(model::offset(2), model::timestamp(2000)); b | add_batch(std::move(batch3)); const auto& seg = b.get_log_segments().front(); @@ -173,6 +259,7 @@ FIXTURE_TEST(timequery_sparse_index, log_builder_fixture) { auto log = b.get_log(); storage::timequery_config config( + log->offsets().start_offset, model::timestamp(1600), log->offsets().dirty_offset, ss::default_priority_class(), @@ -195,9 +282,8 @@ FIXTURE_TEST(timequery_one_element_index, log_builder_fixture) { // This batch doesn't trigger the size indexing threshold, // but it's the first one so it gets indexed regardless. - auto batch = make_random_batch(model::offset(0), false); - batch.header().first_timestamp = model::timestamp(1000); - batch.header().max_timestamp = model::timestamp(1000); + auto batch = make_random_batch( + model::offset(0), model::timestamp(1000), 1, false); b | add_batch(std::move(batch)); const auto& seg = b.get_log_segments().front(); @@ -206,6 +292,7 @@ FIXTURE_TEST(timequery_one_element_index, log_builder_fixture) { auto log = b.get_log(); storage::timequery_config config( + log->offsets().start_offset, model::timestamp(1000), log->offsets().dirty_offset, ss::default_priority_class(), @@ -242,9 +329,7 @@ FIXTURE_TEST(timequery_non_monotonic_log, log_builder_fixture) { b | add_segment(0); for (const auto& [offset, ts] : batch_spec) { - auto batch = make_random_batch(model::offset(offset)); - batch.header().first_timestamp = model::timestamp(ts); - batch.header().max_timestamp = model::timestamp(ts); + auto batch = make_random_batch(offset, ts); b | add_batch(std::move(batch)); } @@ -255,6 +340,7 @@ FIXTURE_TEST(timequery_non_monotonic_log, log_builder_fixture) { auto log = b.get_log(); for (const auto& [offset, ts] : batch_spec) { storage::timequery_config config( + log->offsets().start_offset, model::timestamp(ts), log->offsets().dirty_offset, ss::default_priority_class(), @@ -280,6 +366,7 @@ FIXTURE_TEST(timequery_non_monotonic_log, log_builder_fixture) { // Query for a bogus, really small timestamp. // We should return the first element in the log storage::timequery_config config( + log->offsets().start_offset, model::timestamp(-5000), log->offsets().dirty_offset, ss::default_priority_class(), @@ -310,9 +397,7 @@ FIXTURE_TEST(timequery_clamp, log_builder_fixture) { b | add_segment(0); for (const auto& [offset, ts] : batch_spec) { - auto batch = make_random_batch(model::offset(offset)); - batch.header().first_timestamp = model::timestamp(ts); - batch.header().max_timestamp = model::timestamp(ts); + auto batch = make_random_batch(offset, ts); b | add_batch(std::move(batch)); } @@ -322,6 +407,7 @@ FIXTURE_TEST(timequery_clamp, log_builder_fixture) { auto log = b.get_log(); storage::timequery_config config( + log->offsets().start_offset, model::timestamp(storage::offset_time_index::delta_time_max * 2 + 1), log->offsets().dirty_offset, ss::default_priority_class(), diff --git a/src/v/storage/types.cc b/src/v/storage/types.cc index 5d1e0562a8473..e4d2c0e8345fe 100644 --- a/src/v/storage/types.cc +++ b/src/v/storage/types.cc @@ -93,8 +93,8 @@ std::ostream& operator<<(std::ostream& o, const timequery_result& a) { return o << "{offset:" << a.offset << ", time:" << a.time << "}"; } std::ostream& operator<<(std::ostream& o, const timequery_config& a) { - o << "{max_offset:" << a.max_offset << ", time:" << a.time - << ", type_filter:"; + o << "{min_offset: " << a.min_offset << ", max_offset: " << a.max_offset + << ", time:" << a.time << ", type_filter:"; if (a.type_filter) { o << *a.type_filter; } else { diff --git a/src/v/storage/types.h b/src/v/storage/types.h index d42c0ad9e3ee2..bcad9850cda93 100644 --- a/src/v/storage/types.h +++ b/src/v/storage/types.h @@ -226,20 +226,25 @@ using opt_abort_source_t using opt_client_address_t = std::optional; +/// A timequery configuration specifies the range of offsets to search for a +/// record with a timestamp equal to or greater than the specified time. struct timequery_config { timequery_config( + model::offset min_offset, model::timestamp t, - model::offset o, + model::offset max_offset, ss::io_priority_class iop, std::optional type_filter, opt_abort_source_t as = std::nullopt, opt_client_address_t client_addr = std::nullopt) noexcept - : time(t) - , max_offset(o) + : min_offset(min_offset) + , time(t) + , max_offset(max_offset) , prio(iop) , type_filter(type_filter) , abort_source(as) , client_address(std::move(client_addr)) {} + model::offset min_offset; model::timestamp time; model::offset max_offset; ss::io_priority_class prio; @@ -301,6 +306,8 @@ struct truncate_prefix_config { operator<<(std::ostream&, const truncate_prefix_config&); }; +using translate_offsets = ss::bool_class; + /** * Log reader configuration. * @@ -312,9 +319,22 @@ struct truncate_prefix_config { * search when the size of the filter set is small (e.g. < 5). If you need to * use a larger filter then this design should be revisited. * - * Start and max offset are inclusive. + * Start and max offset are inclusive. Because the reader only looks at batch + * headers the first batch may start before the start offset and the last batch + * may end after the max offset. + * + * Consider the following case: + * + * cfg = {start offset = 14, max offset = 17} + * + + + * v v + * //-------+-------------+------------+-------------+-------// + * \\...9 | 10...14 | 15..15 | 16.....22 | 23...\\ + * //-------+-------------+------------+-------------+-------// + * ^ ^ + * | | + * The reader will actually return whole batches: [10, 14], [15, 15], [16, 22]. */ -using translate_offsets = ss::bool_class; struct log_reader_config { model::offset start_offset; model::offset max_offset; diff --git a/src/v/transform/api.cc b/src/v/transform/api.cc index e99c027608e13..b0898ac0b734b 100644 --- a/src/v/transform/api.cc +++ b/src/v/transform/api.cc @@ -150,6 +150,7 @@ class partition_source final : public source { ss::future offset_at_timestamp(model::timestamp ts, ss::abort_source* as) final { auto result = co_await _partition.timequery(storage::timequery_config( + _partition.start_offset(), ts, model::offset::max(), /*iop=*/wasm_read_priority(), diff --git a/tests/rptest/tests/timequery_test.py b/tests/rptest/tests/timequery_test.py index ec23dac50623a..456619d8a8467 100644 --- a/tests/rptest/tests/timequery_test.py +++ b/tests/rptest/tests/timequery_test.py @@ -458,6 +458,66 @@ def query_slices(tid): assert not any([e > 0 for e in errors]) + @cluster(num_nodes=4) + # @parametrize(cloud_storage=True, spillover=False) + # @parametrize(cloud_storage=True, spillover=True) + @parametrize(cloud_storage=False, spillover=False) + def test_timequery_with_trim_prefix(self, cloud_storage: bool, + spillover: bool): + self.set_up_cluster(cloud_storage=cloud_storage, + batch_cache=False, + spillover=spillover) + total_segments = 12 + record_size = 1024 + base_ts = 1664453149000 + msg_count = (self.log_segment_size * total_segments) // record_size + local_retention = self.log_segment_size * 4 + topic, timestamps = self._create_and_produce(self.redpanda, True, + local_retention, base_ts, + record_size, msg_count) + + # Confirm messages written + rpk = RpkTool(self.redpanda) + p = next(rpk.describe_topic(topic.name)) + assert p.high_watermark == msg_count + + if cloud_storage: + # If using cloud storage, we must wait for some segments + # to fall out of local storage, to ensure we are really + # hitting the cloud storage read path when querying. + wait_for_local_storage_truncate(redpanda=self.redpanda, + topic=topic.name, + target_bytes=local_retention) + + num_batches_per_segment = self.log_segment_size // record_size + new_lwm = int(num_batches_per_segment * 2.5) + trim_response = rpk.trim_prefix(topic.name, + offset=new_lwm, + partitions=[0]) + assert len(trim_response) == 1 + assert new_lwm == trim_response[0].new_start_offset + + # Double check that the start offset has advanced. + p = next(rpk.describe_topic(topic.name)) + assert new_lwm == p.start_offset, f"Expected {new_lwm}, got {p.start_offset}" + + # Query below valid timestamps the offset of the first message. + kcat = KafkaCat(self.redpanda) + offset = kcat.query_offset(topic.name, 0, timestamps[0] - 1000) + assert offset == new_lwm, f"Expected {new_lwm}, got {offset}" + + # Leave just the last message in the log. + trim_response = rpk.trim_prefix(topic.name, + offset=p.high_watermark - 1, + partitions=[0]) + + # Query below valid timestamps the offset of the only message left. + # This is an edge-case where tiered storage, if in use, becomes + # completely irrelevant. + kcat = KafkaCat(self.redpanda) + offset = kcat.query_offset(topic.name, 0, timestamps[0] - 1000) + assert offset == msg_count - 1, f"Expected {msg_count - 1}, got {offset}" + class TimeQueryKafkaTest(Test, BaseTimeQuery): """