Skip to content

Commit

Permalink
Merge pull request #19937 from nvartolomei/nv/timequery-emtpy-topic
Browse files Browse the repository at this point in the history
kafka: fix timequery failing for empty topics
  • Loading branch information
nvartolomei authored Jun 21, 2024
2 parents 03ccde6 + e7c0ddd commit 656db4a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
17 changes: 15 additions & 2 deletions src/v/kafka/server/handlers/list_offsets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "kafka/server/replicated_partition.h"
#include "kafka/server/request_context.h"
#include "kafka/server/response.h"
#include "model/fundamental.h"
#include "model/namespace.h"
#include "resource_mgmt/io_priority.h"

Expand Down Expand Up @@ -129,10 +130,22 @@ static ss::future<list_offset_partition_response> list_offsets_partition(
offset,
kafka_partition->leader_epoch());
}
auto min_offset = kafka_partition->start_offset();
auto max_offset = model::prev_offset(offset);

// Empty partition.
if (max_offset < min_offset) {
co_return list_offsets_response::make_partition(
ktp.get_partition(),
model::timestamp(-1),
model::offset(-1),
kafka_partition->leader_epoch());
}

auto res = co_await kafka_partition->timequery(storage::timequery_config{
kafka_partition->start_offset(),
min_offset,
timestamp,
model::prev_offset(offset),
max_offset,
kafka_read_priority(),
{model::record_batch_type::raft_data},
octx.rctx.abort_source().local()});
Expand Down
23 changes: 20 additions & 3 deletions tests/rptest/tests/timequery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ def _test_timequery(self, cluster, cloud_storage: bool, batch_cache: bool):
base_ts = 1664453149000
msg_count = (self.log_segment_size * total_segments) // record_size
local_retention = self.log_segment_size * 4
kcat = KafkaCat(cluster)

# Test the base case with an empty topic.
empty_topic = TopicSpec(name="tq_empty_topic",
partition_count=1,
replication_factor=3)
self.client().create_topic(empty_topic)
offset = kcat.query_offset(empty_topic.name, 0, base_ts)
self.logger.info(f"Time query returned offset {offset}")
assert offset == -1, f"Expected -1, got {offset}"

# Create a topic and produce a run of messages we will query.
topic, timestamps = self._create_and_produce(cluster, cloud_storage,
local_retention, base_ts,
record_size, msg_count)
Expand Down Expand Up @@ -163,7 +175,6 @@ def __init__(self, offset, ts=None, expect_read=True):
# offset should cause cloud downloads.
hit_offsets = set()

kcat = KafkaCat(cluster)
cloud_metrics = None
local_metrics = None

Expand Down Expand Up @@ -456,8 +467,8 @@ def query_slices(tid):
assert not any([e > 0 for e in errors])

@cluster(num_nodes=4)
# @parametrize(cloud_storage=True, spillover=False)
# @parametrize(cloud_storage=True, spillover=True)
@parametrize(cloud_storage=True, spillover=False)
@parametrize(cloud_storage=True, spillover=True)
@parametrize(cloud_storage=False, spillover=False)
def test_timequery_with_trim_prefix(self, cloud_storage: bool,
spillover: bool):
Expand Down Expand Up @@ -515,6 +526,12 @@ def test_timequery_with_trim_prefix(self, cloud_storage: bool,
offset = kcat.query_offset(topic.name, 0, timestamps[0] - 1000)
assert offset == msg_count - 1, f"Expected {msg_count - 1}, got {offset}"

# Trim everything, leaving an empty log.
rpk.trim_prefix(topic.name, offset=p.high_watermark, partitions=[0])
kcat = KafkaCat(self.redpanda)
offset = kcat.query_offset(topic.name, 0, timestamps[0] - 1000)
assert offset == -1, f"Expected -1, got {offset}"

@cluster(
num_nodes=4,
log_allow_list=["Failed to upload spillover manifest {timed_out}"])
Expand Down

0 comments on commit 656db4a

Please sign in to comment.