From 3dd1c3a45432c6019b1e8c154dc47751fefd035b Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Tue, 9 Jul 2024 20:35:25 +0100 Subject: [PATCH] kafka: fix timequery when local log is empty Before querying local log we bump the `timequery_config::min_offset`. This offset is inclusive. When local log is empty, the min_offset points one unit above the end of the log. In this PR, we add a check to return early when this is the case. --- src/v/cluster/partition.cc | 14 +++++++ tests/rptest/tests/timequery_test.py | 58 +++++++++++++++++++++++++++- 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index ec8d998b09e8..c100bc7fa284 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -522,6 +522,13 @@ partition::timequery(storage::timequery_config cfg) { local_query_cfg.min_offset = std::max( log()->from_log_offset(_raft->start_offset()), local_query_cfg.min_offset); + + // If the min_offset is ahead of max_offset, the local log is empty + // or was truncated since the timequery_config was created. + if (local_query_cfg.min_offset > local_query_cfg.max_offset) { + co_return std::nullopt; + } + auto result = co_await local_timequery( local_query_cfg, may_answer_from_cloud); if (result.has_value()) { @@ -545,6 +552,13 @@ partition::timequery(storage::timequery_config cfg) { local_query_cfg.min_offset = std::max( log()->from_log_offset(_raft->start_offset()), local_query_cfg.min_offset); + + // If the min_offset is ahead of max_offset, the local log is empty + // or was truncated since the timequery_config was created. + if (local_query_cfg.min_offset > local_query_cfg.max_offset) { + co_return std::nullopt; + } + co_return co_await local_timequery(local_query_cfg, false); } } diff --git a/tests/rptest/tests/timequery_test.py b/tests/rptest/tests/timequery_test.py index 3f8cfe71f45c..ad7beaa511ac 100644 --- a/tests/rptest/tests/timequery_test.py +++ b/tests/rptest/tests/timequery_test.py @@ -8,9 +8,11 @@ # by the Apache License, Version 2.0 import concurrent.futures +import datetime import re import threading from logging import Logger +import time from typing import Callable from rptest.services.admin import Admin @@ -21,8 +23,8 @@ from rptest.clients.types import TopicSpec from rptest.clients.rpk import RpkTool from rptest.clients.kafka_cat import KafkaCat -from rptest.util import (wait_until, wait_for_local_storage_truncate, - wait_until_result) +from rptest.util import (segments_count, wait_until, + wait_for_local_storage_truncate, wait_until_result) from rptest.services.kgo_verifier_services import KgoVerifierProducer from rptest.utils.si_utils import BucketView, NTP @@ -623,6 +625,58 @@ def test_timequery_with_spillover_gc_delayed(self): else: assert offset == o, f"Expected {o}, got {offset}" + @cluster(num_nodes=4) + def test_timequery_empty_local_log(self): + self.set_up_cluster(cloud_storage=True, + batch_cache=False, + spillover=False) + + total_segments = 3 + record_size = 1024 + base_ts = 1664453149000 + msg_count = (self.log_segment_size * total_segments) // record_size + local_retention = 1 # Any value works for this test. + topic, timestamps = self._create_and_produce(self.redpanda, True, + local_retention, base_ts, + record_size, msg_count) + + # Confirm messages written + rpk = RpkTool(self.redpanda) + p = next(rpk.describe_topic(topic.name)) + assert p.high_watermark == msg_count + + # Restart the cluster to force segment roll. The newly created segment + # will have no user data which is what we want to test. + self.redpanda.restart_nodes(self.redpanda.nodes) + wait_until(lambda: len(list(rpk.describe_topic(topic.name))) > 0, + 30, + backoff_sec=2) + + wait_until( + lambda: next(segments_count(self.redpanda, topic.name, 0)) == 1, + timeout_sec=30, + backoff_sec=2, + err_msg="Expected only one segment to be present") + + kcat = KafkaCat(self.redpanda) + + # Query below valid timestamps the offset of the first message. + offset = kcat.query_offset(topic.name, 0, timestamps[0] - 1000) + assert offset == 0, f"Expected 0, got {offset}" + + # Query with a timestamp in-between cloud log and the configuration + # batch present in the local log. + offset = kcat.query_offset(topic.name, 0, + timestamps[msg_count - 1] + 1000) + assert offset == -1, f"Expected -1, got {offset}" + + # Query with a timestamp in the future. + offset = kcat.query_offset( + topic.name, 0, + int(time.time() + datetime.timedelta(days=1).total_seconds()) * + 1000) + assert offset == -1, f"Expected -1, got {offset}" + class TimeQueryKafkaTest(Test, BaseTimeQuery): """