Skip to content

Commit

Permalink
kafka: fix timequery when local log is empty
Browse files Browse the repository at this point in the history
Before querying local log we bump the `timequery_config::min_offset`.
This offset is inclusive. When local log is empty, the min_offset points
one unit above the end of the log. In this PR, we add a check to return
early when this is the case.
  • Loading branch information
nvartolomei committed Jul 9, 2024
1 parent 26b6202 commit 3dd1c3a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 2 deletions.
14 changes: 14 additions & 0 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,13 @@ partition::timequery(storage::timequery_config cfg) {
local_query_cfg.min_offset = std::max(
log()->from_log_offset(_raft->start_offset()),
local_query_cfg.min_offset);

// If the min_offset is ahead of max_offset, the local log is empty
// or was truncated since the timequery_config was created.
if (local_query_cfg.min_offset > local_query_cfg.max_offset) {
co_return std::nullopt;
}

auto result = co_await local_timequery(
local_query_cfg, may_answer_from_cloud);
if (result.has_value()) {
Expand All @@ -545,6 +552,13 @@ partition::timequery(storage::timequery_config cfg) {
local_query_cfg.min_offset = std::max(
log()->from_log_offset(_raft->start_offset()),
local_query_cfg.min_offset);

// If the min_offset is ahead of max_offset, the local log is empty
// or was truncated since the timequery_config was created.
if (local_query_cfg.min_offset > local_query_cfg.max_offset) {
co_return std::nullopt;
}

co_return co_await local_timequery(local_query_cfg, false);
}
}
Expand Down
58 changes: 56 additions & 2 deletions tests/rptest/tests/timequery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
# by the Apache License, Version 2.0

import concurrent.futures
import datetime
import re
import threading
from logging import Logger
import time
from typing import Callable

from rptest.services.admin import Admin
Expand All @@ -21,8 +23,8 @@
from rptest.clients.types import TopicSpec
from rptest.clients.rpk import RpkTool
from rptest.clients.kafka_cat import KafkaCat
from rptest.util import (wait_until, wait_for_local_storage_truncate,
wait_until_result)
from rptest.util import (segments_count, wait_until,
wait_for_local_storage_truncate, wait_until_result)

from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.utils.si_utils import BucketView, NTP
Expand Down Expand Up @@ -623,6 +625,58 @@ def test_timequery_with_spillover_gc_delayed(self):
else:
assert offset == o, f"Expected {o}, got {offset}"

@cluster(num_nodes=4)
def test_timequery_empty_local_log(self):
self.set_up_cluster(cloud_storage=True,
batch_cache=False,
spillover=False)

total_segments = 3
record_size = 1024
base_ts = 1664453149000
msg_count = (self.log_segment_size * total_segments) // record_size
local_retention = 1 # Any value works for this test.
topic, timestamps = self._create_and_produce(self.redpanda, True,
local_retention, base_ts,
record_size, msg_count)

# Confirm messages written
rpk = RpkTool(self.redpanda)
p = next(rpk.describe_topic(topic.name))
assert p.high_watermark == msg_count

# Restart the cluster to force segment roll. The newly created segment
# will have no user data which is what we want to test.
self.redpanda.restart_nodes(self.redpanda.nodes)
wait_until(lambda: len(list(rpk.describe_topic(topic.name))) > 0,
30,
backoff_sec=2)

wait_until(
lambda: next(segments_count(self.redpanda, topic.name, 0)) == 1,
timeout_sec=30,
backoff_sec=2,
err_msg="Expected only one segment to be present")

kcat = KafkaCat(self.redpanda)

# Query below valid timestamps the offset of the first message.
offset = kcat.query_offset(topic.name, 0, timestamps[0] - 1000)
assert offset == 0, f"Expected 0, got {offset}"

# Query with a timestamp in-between cloud log and the configuration
# batch present in the local log.
offset = kcat.query_offset(topic.name, 0,
timestamps[msg_count - 1] + 1000)
assert offset == -1, f"Expected -1, got {offset}"

# Query with a timestamp in the future.
offset = kcat.query_offset(
topic.name, 0,
int(time.time() + datetime.timedelta(days=1).total_seconds()) *
1000)
assert offset == -1, f"Expected -1, got {offset}"


class TimeQueryKafkaTest(Test, BaseTimeQuery):
"""
Expand Down

0 comments on commit 3dd1c3a

Please sign in to comment.