Skip to content

Commit

Permalink
Merge pull request #18112 from nvartolomei/nv/trim-prefix-timequery
Browse files Browse the repository at this point in the history
Fix timequery returning wrong offset after trim-prefix which could lead to stuck consumers
  • Loading branch information
nvartolomei authored May 7, 2024
2 parents 403b139 + 8f2de96 commit 6c630bd
Show file tree
Hide file tree
Showing 19 changed files with 419 additions and 106 deletions.
9 changes: 6 additions & 3 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1242,11 +1242,13 @@ remote_partition::timequery(storage::timequery_config cfg) {
co_return std::nullopt;
}

auto start_offset = stm_manifest.full_log_start_kafka_offset().value();
auto start_offset = std::max(
cfg.min_offset,
kafka::offset_cast(stm_manifest.full_log_start_kafka_offset().value()));

// Synthesize a log_reader_config from our timequery_config
storage::log_reader_config config(
kafka::offset_cast(start_offset),
start_offset,
cfg.max_offset,
0,
2048, // We just need one record batch
Expand All @@ -1269,7 +1271,8 @@ remote_partition::timequery(storage::timequery_config cfg) {
vlog(_ctxlog.debug, "timequery: {} batches", batches.size());

if (batches.size()) {
co_return storage::batch_timequery(*(batches.begin()), cfg.time);
co_return storage::batch_timequery(
*(batches.begin()), cfg.min_offset, cfg.time, cfg.max_offset);
} else {
co_return std::nullopt;
}
Expand Down
6 changes: 3 additions & 3 deletions src/v/cloud_storage/tests/cloud_storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ FIXTURE_TEST(test_local_timequery, e2e_fixture) {
bool expect_value = false,
std::optional<model::offset> expected_o = std::nullopt) {
auto timequery_conf = storage::timequery_config(
t, o, ss::default_priority_class(), std::nullopt);
model::offset(0), t, o, ss::default_priority_class(), std::nullopt);

auto result = partition->timequery(timequery_conf).get();

Expand Down Expand Up @@ -798,7 +798,7 @@ FIXTURE_TEST(test_cloud_storage_timequery, e2e_fixture) {
bool expect_value = false,
std::optional<model::offset> expected_o = std::nullopt) {
auto timequery_conf = storage::timequery_config(
t, o, ss::default_priority_class(), std::nullopt);
model::offset(0), t, o, ss::default_priority_class(), std::nullopt);

auto result = partition->timequery(timequery_conf).get();

Expand Down Expand Up @@ -904,7 +904,7 @@ FIXTURE_TEST(test_mixed_timequery, e2e_fixture) {
bool expect_value = false,
std::optional<model::offset> expected_o = std::nullopt) {
auto timequery_conf = storage::timequery_config(
t, o, ss::default_priority_class(), std::nullopt);
model::offset(0), t, o, ss::default_priority_class(), std::nullopt);

auto result = partition->timequery(timequery_conf).get();

Expand Down
78 changes: 41 additions & 37 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -506,14 +506,24 @@ partition::timequery(storage::timequery_config cfg) {

const bool may_answer_from_cloud
= may_read_from_cloud()
&& _cloud_storage_partition->bounds_timestamp(cfg.time);
&& _cloud_storage_partition->bounds_timestamp(cfg.time)
&& cfg.min_offset < kafka::offset_cast(
_cloud_storage_partition->next_kafka_offset());

if (_raft->log()->start_timestamp() <= cfg.time) {
// The query is ahead of the local data's start_timestamp: this
// means it _might_ hit on local data: start_timestamp is not
// precise, so once we query we might still fall back to cloud
// storage
auto result = co_await local_timequery(cfg);
//
// We also need to adjust the lower bound for the local query as the
// min_offset corresponds to the full log (including tiered storage).
auto local_query_cfg = cfg;
local_query_cfg.min_offset = std::max(
log()->from_log_offset(_raft->start_offset()),
local_query_cfg.min_offset);
auto result = co_await local_timequery(
local_query_cfg, may_answer_from_cloud);
if (result.has_value()) {
co_return result;
} else {
Expand All @@ -526,9 +536,16 @@ partition::timequery(storage::timequery_config cfg) {
// Timestamp is before local storage but within cloud storage
co_return co_await cloud_storage_timequery(cfg);
} else {
// No cloud data: queries earlier than the start of the log
// will hit on the start of the log.
co_return co_await local_timequery(cfg);
// No cloud data OR not allowed to read from cloud: queries earlier
// than the start of the log will hit on the start of the log.
//
// Adjust the lower bound for the local query as the min_offset
// corresponds to the full log (including tiered storage).
auto local_query_cfg = cfg;
local_query_cfg.min_offset = std::max(
log()->from_log_offset(_raft->start_offset()),
local_query_cfg.min_offset);
co_return co_await local_timequery(local_query_cfg, false);
}
}
}
Expand All @@ -548,58 +565,46 @@ partition::cloud_storage_timequery(storage::timequery_config cfg) {
// raft log is ahead of the query timestamp or the topic is a read
// replica, so proceed to query the remote partition to try and
// find the earliest data that has timestamp >= the query time.
vlog(
clusterlog.debug,
"timequery (cloud) {} t={} max_offset(k)={}",
_raft->ntp(),
cfg.time,
cfg.max_offset);
vlog(clusterlog.debug, "timequery (cloud) {} cfg(k)={}", _raft->ntp(), cfg);

// remote_partition pre-translates offsets for us, so no call into
// the offset translator here
auto result = co_await _cloud_storage_partition->timequery(cfg);
if (result.has_value()) {
vlog(
clusterlog.debug,
"timequery (cloud) {} t={} max_offset(r)={} result(r)={}",
"timequery (cloud) {} cfg(k)={} result(k)={}",
_raft->ntp(),
cfg.time,
cfg.max_offset,
cfg,
result->offset);
}

co_return result;
}

ss::future<std::optional<storage::timequery_result>>
partition::local_timequery(storage::timequery_config cfg) {
vlog(
clusterlog.debug,
"timequery (raft) {} t={} max_offset(k)={}",
_raft->ntp(),
cfg.time,
cfg.max_offset);
ss::future<std::optional<storage::timequery_result>> partition::local_timequery(
storage::timequery_config cfg, bool allow_cloud_fallback) {
vlog(clusterlog.debug, "timequery (raft) {} cfg(k)={}", _raft->ntp(), cfg);

cfg.min_offset = _raft->log()->to_log_offset(cfg.min_offset);
cfg.max_offset = _raft->log()->to_log_offset(cfg.max_offset);

auto result = co_await _raft->timequery(cfg);
vlog(clusterlog.debug, "timequery (raft) {} cfg(r)={}", _raft->ntp(), cfg);

const bool may_answer_from_cloud
= may_read_from_cloud()
&& _cloud_storage_partition->bounds_timestamp(cfg.time);
auto result = co_await _raft->timequery(cfg);

if (result.has_value()) {
if (may_answer_from_cloud) {
if (allow_cloud_fallback) {
// We need to test for cases in which we will fall back to querying
// cloud storage.
if (_raft->log()->start_timestamp() > cfg.time) {
// Query raced with prefix truncation
vlog(
clusterlog.debug,
"timequery (raft) {} ts={} raced with truncation "
"timequery (raft) {} cfg(r)={} raced with truncation "
"(start_timestamp {}, result {})",
_raft->ntp(),
cfg.time,
cfg,
_raft->log()->start_timestamp(),
result->time);
co_return std::nullopt;
Expand All @@ -618,11 +623,11 @@ partition::local_timequery(storage::timequery_config cfg) {
// https://github.com/redpanda-data/redpanda/issues/9669
vlog(
clusterlog.debug,
"Timequery (raft) {} ts={} miss on local log "
"Timequery (raft) {} cfg(r)={} miss on local log "
"(start_timestamp "
"{}, result {})",
_raft->ntp(),
cfg.time,
cfg,
_raft->log()->start_timestamp(),
result->time);
co_return std::nullopt;
Expand All @@ -635,15 +640,15 @@ partition::local_timequery(storage::timequery_config cfg) {
// have the same timestamp and are present in cloud storage.
vlog(
clusterlog.debug,
"Timequery (raft) {} ts={} hit start_offset in local log "
"Timequery (raft) {} cfg(r)={} hit start_offset in local log "
"(start_offset {} start_timestamp {}, result {})",
_raft->ntp(),
cfg,
_raft->log()->offsets().start_offset,
cfg.time,
_raft->log()->start_timestamp(),
cfg.time);

if (may_answer_from_cloud) {
if (allow_cloud_fallback) {
// Even though we hit data with the desired timestamp, we
// cannot be certain that this is the _first_ batch with
// the desired timestamp: return null so that the caller
Expand All @@ -654,10 +659,9 @@ partition::local_timequery(storage::timequery_config cfg) {

vlog(
clusterlog.debug,
"timequery (raft) {} t={} max_offset(r)={} result(r)={}",
"timequery (raft) {} cfg(r)={} result(r)={}",
_raft->ntp(),
cfg.time,
cfg.max_offset,
cfg,
result->offset);
result->offset = _raft->log()->from_log_offset(result->offset);
}
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
bool may_read_from_cloud() const;

ss::future<std::optional<storage::timequery_result>>
local_timequery(storage::timequery_config);
local_timequery(storage::timequery_config, bool allow_cloud_fallback);

consensus_ptr _raft;
ss::shared_ptr<cluster::log_eviction_stm> _log_eviction_stm;
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/server/handlers/list_offsets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ static ss::future<list_offset_partition_response> list_offsets_partition(
kafka_partition->leader_epoch());
}
auto res = co_await kafka_partition->timequery(storage::timequery_config{
kafka_partition->start_offset(),
timestamp,
offset,
model::prev_offset(offset),
kafka_read_priority(),
{model::record_batch_type::raft_data},
octx.rctx.abort_source().local()});
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ replicated_partition::timequery(storage::timequery_config cfg) {
if (batches.empty()) {
co_return std::nullopt;
}
co_return storage::batch_timequery(*(batches.begin()), cfg.time);
co_return storage::batch_timequery(
*(batches.begin()), cfg.min_offset, cfg.time, cfg.max_offset);
}

ss::future<result<model::offset>> replicated_partition::replicate(
Expand Down
80 changes: 80 additions & 0 deletions src/v/model/offset_interval.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include "base/vassert.h"
#include "model/fundamental.h"

namespace model {
/// A non-empty, bounded, closed interval of offsets [min offset, max offset].
///
/// This property helps to simplify the logic and the instructions required to
/// check for overlaps, containment, etc. It is the responsibility of the caller
/// to ensure these properties hold before constructing an instance of this
/// class.
///
/// To represent a potentially empty range, wrap it in an optional.
class bounded_offset_interval {
public:
static bounded_offset_interval
unchecked(model::offset min, model::offset max) noexcept {
return {min, max};
}

static bounded_offset_interval
checked(model::offset min, model::offset max) {
if (min < model::offset(0) || max < model::offset(0) || min > max) {
throw std::invalid_argument(fmt::format(
"Invalid arguments for constructing a non-empty bounded offset "
"interval: min({}) <= max({})",
min,
max));
}

return {min, max};
}

inline bool overlaps(const bounded_offset_interval& other) const noexcept {
return _min <= other._max && _max >= other._min;
}

inline bool contains(model::offset o) const noexcept {
return _min <= o && o <= _max;
}

friend std::ostream&
operator<<(std::ostream& o, const bounded_offset_interval& r) {
fmt::print(o, "{{min: {}, max: {}}}", r._min, r._max);
return o;
}

inline model::offset min() const noexcept { return _min; }
inline model::offset max() const noexcept { return _max; }

private:
bounded_offset_interval(model::offset min, model::offset max) noexcept
: _min(min)
, _max(max) {
#ifndef NDEBUG
vassert(
min >= model::offset(0), "Offset interval min({}) must be >= 0", min);
vassert(
min <= max,
"Offset interval invariant not satisfied: min({}) <= max({})",
min,
max);
#endif
}

model::offset _min;
model::offset _max;
};

} // namespace model
9 changes: 7 additions & 2 deletions src/v/model/tests/random_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ make_random_batch(model::offset o, int num_records, bool allow_compression) {

model::record_batch make_random_batch(record_batch_spec spec) {
auto ts = spec.timestamp.value_or(model::timestamp::now());
auto max_ts = model::timestamp(ts() + spec.count - 1);
auto max_ts = ts;
if (!spec.all_records_have_same_timestamp) {
max_ts = model::timestamp(ts() + spec.count - 1);
}
auto header = model::record_batch_header{
.size_bytes = 0, // computed later
.base_offset = spec.offset,
Expand Down Expand Up @@ -235,7 +238,9 @@ make_random_batches(record_batch_spec spec) {
auto num_records = spec.records ? *spec.records : get_int(2, 30);
auto batch_spec = spec;
batch_spec.timestamp = ts;
ts = model::timestamp(ts() + num_records);
if (!batch_spec.all_records_have_same_timestamp) {
ts = model::timestamp(ts() + num_records);
}
batch_spec.offset = o;
batch_spec.count = num_records;
if (spec.enable_idempotence) {
Expand Down
1 change: 1 addition & 0 deletions src/v/model/tests/random_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct record_batch_spec {
bool is_transactional{false};
std::optional<std::vector<size_t>> record_sizes;
std::optional<model::timestamp> timestamp;
bool all_records_have_same_timestamp{false};
};

model::record make_random_record(int, iobuf);
Expand Down
6 changes: 4 additions & 2 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2341,7 +2341,8 @@ disk_log_impl::make_reader(timequery_config config) {
vassert(!_closed, "make_reader on closed log - {}", *this);
return _lock_mngr.range_lock(config).then(
[this, cfg = config](std::unique_ptr<lock_manager::lease> lease) {
auto start_offset = _start_offset;
auto start_offset = cfg.min_offset;

if (!lease->range.empty()) {
const ss::lw_shared_ptr<segment>& segment = *lease->range.begin();
std::optional<segment_index::entry> index_entry = std::nullopt;
Expand Down Expand Up @@ -2456,7 +2457,8 @@ disk_log_impl::timequery(timequery_config cfg) {
if (
!batches.empty()
&& batches.front().header().max_timestamp >= cfg.time) {
return ret_t(batch_timequery(batches.front(), cfg.time));
return ret_t(batch_timequery(
batches.front(), cfg.min_offset, cfg.time, cfg.max_offset));
}
return ret_t();
});
Expand Down
Loading

0 comments on commit 6c630bd

Please sign in to comment.