Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: fix timequery when local log is empty #21312

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,13 @@ partition::timequery(storage::timequery_config cfg) {
local_query_cfg.min_offset = std::max(
log()->from_log_offset(_raft->start_offset()),
local_query_cfg.min_offset);

// If the min_offset is ahead of max_offset, the local log is empty
// or was truncated since the timequery_config was created.
if (local_query_cfg.min_offset > local_query_cfg.max_offset) {
co_return std::nullopt;
}

auto result = co_await local_timequery(
local_query_cfg, may_answer_from_cloud);
if (result.has_value()) {
Expand All @@ -545,6 +552,13 @@ partition::timequery(storage::timequery_config cfg) {
local_query_cfg.min_offset = std::max(
log()->from_log_offset(_raft->start_offset()),
local_query_cfg.min_offset);

// If the min_offset is ahead of max_offset, the local log is empty
// or was truncated since the timequery_config was created.
if (local_query_cfg.min_offset > local_query_cfg.max_offset) {
co_return std::nullopt;
}

co_return co_await local_timequery(local_query_cfg, false);
}
}
Expand Down
58 changes: 56 additions & 2 deletions tests/rptest/tests/timequery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
# by the Apache License, Version 2.0

import concurrent.futures
import datetime
import re
import threading
from logging import Logger
import time
from typing import Callable

from rptest.services.admin import Admin
Expand All @@ -21,8 +23,8 @@
from rptest.clients.types import TopicSpec
from rptest.clients.rpk import RpkTool
from rptest.clients.kafka_cat import KafkaCat
from rptest.util import (wait_until, wait_for_local_storage_truncate,
wait_until_result)
from rptest.util import (segments_count, wait_until,
wait_for_local_storage_truncate, wait_until_result)

from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.utils.si_utils import BucketView, NTP
Expand Down Expand Up @@ -623,6 +625,58 @@ def test_timequery_with_spillover_gc_delayed(self):
else:
assert offset == o, f"Expected {o}, got {offset}"

@cluster(num_nodes=4)
def test_timequery_empty_local_log(self):
self.set_up_cluster(cloud_storage=True,
batch_cache=False,
spillover=False)

total_segments = 3
record_size = 1024
base_ts = 1664453149000
msg_count = (self.log_segment_size * total_segments) // record_size
local_retention = 1 # Any value works for this test.
topic, timestamps = self._create_and_produce(self.redpanda, True,
local_retention, base_ts,
record_size, msg_count)

# Confirm messages written
rpk = RpkTool(self.redpanda)
p = next(rpk.describe_topic(topic.name))
assert p.high_watermark == msg_count

# Restart the cluster to force segment roll. The newly created segment
# will have no user data which is what we want to test.
self.redpanda.restart_nodes(self.redpanda.nodes)
wait_until(lambda: len(list(rpk.describe_topic(topic.name))) > 0,
30,
Comment on lines +648 to +652
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: mentioned in slack, you can also trigger leadership changes and that should also force a roll

backoff_sec=2)

wait_until(
lambda: next(segments_count(self.redpanda, topic.name, 0)) == 1,
timeout_sec=30,
backoff_sec=2,
err_msg="Expected only one segment to be present")

kcat = KafkaCat(self.redpanda)

# Query below valid timestamps the offset of the first message.
offset = kcat.query_offset(topic.name, 0, timestamps[0] - 1000)
assert offset == 0, f"Expected 0, got {offset}"

# Query with a timestamp in-between cloud log and the configuration
# batch present in the local log.
offset = kcat.query_offset(topic.name, 0,
timestamps[msg_count - 1] + 1000)
assert offset == -1, f"Expected -1, got {offset}"

# Query with a timestamp in the future.
offset = kcat.query_offset(
topic.name, 0,
int(time.time() + datetime.timedelta(days=1).total_seconds()) *
1000)
assert offset == -1, f"Expected -1, got {offset}"


class TimeQueryKafkaTest(Test, BaseTimeQuery):
"""
Expand Down
Loading