Skip to content

Commit

Permalink
ducktape: Wait for spillover to stabilize before collection
Browse files Browse the repository at this point in the history
When a manifest spillover happens it is uploaded to the cloud storage.
The original manifest, now truncated, is uploaded after a short period.
This means that there is a window during which the spilled over manifest
has been uploaded and the changed root manifest is not uploaded, and the
same offset range could be present in both the root and spillover in
cloud storage.

The test here constructs a reset manifest by analyzing the spillover and
root manifests. If it happens to collect these during such a window, it
ends up creating a reset manifest which is invalid, containing the same
offset range in both the segment set and the spillover set. From this
point onwards, no spillover command can succeed, because it always fails
at the overlapping range.

The change here waits for a set of manifests to appear in cloud storage
which do not have a common set of keys before creating the reset
manifest. This is done by comparing the root and spillover manifests
until a suitable combination is found.
  • Loading branch information
abhijat committed Sep 1, 2023
1 parent dc9af51 commit f426912
Showing 1 changed file with 64 additions and 39 deletions.
103 changes: 64 additions & 39 deletions tests/rptest/tests/e2e_shadow_indexing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
import dataclasses
import json
import random
import re
Expand All @@ -17,18 +16,20 @@
from ducktape.tests.test import TestContext
from ducktape.utils.util import wait_until

from rptest.tests.redpanda_test import RedpandaTest
from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.clients.rpk import RpkTool
from rptest.clients.types import TopicSpec
from rptest.services.action_injector import random_process_kills
from rptest.services.admin import Admin
from rptest.services.cluster import cluster
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer, KgoVerifierRandomConsumer, KgoVerifierSeqConsumer
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer, \
KgoVerifierRandomConsumer, KgoVerifierSeqConsumer
from rptest.services.metrics_check import MetricCheck
from rptest.services.redpanda import SISettings, get_cloud_storage_type, make_redpanda_service, CHAOS_LOG_ALLOW_LIST, MetricsEndpoint
from rptest.services.redpanda import SISettings, get_cloud_storage_type, make_redpanda_service, CHAOS_LOG_ALLOW_LIST, \
MetricsEndpoint
from rptest.tests.end_to_end import EndToEndTest
from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.tests.redpanda_test import RedpandaTest
from rptest.util import Scale, wait_until_segments
from rptest.util import (
produce_until_segments,
Expand Down Expand Up @@ -255,14 +256,46 @@ def all_partitions_spilled():
producer.wait(timeout_sec=60)
producer.free()

wait_until(lambda: self._all_uploads_done() == True,
wait_until(lambda: self._all_uploads_done(),
timeout_sec=60,
backoff_sec=5)

s3_snapshot = BucketView(self.redpanda, topics=self.topics)
manifest = s3_snapshot.manifest_for_ntp(self.topic, 0)
spillover_manifests = s3_snapshot.get_spillover_manifests(
NTP("kafka", self.topic, 0))
class Manifests:
def __init__(self, test_instance):
self.test_instance = test_instance
self.manifest = None
self.spillover_manifests = None

def __call__(self) -> bool:
s3_snapshot = BucketView(self.test_instance.redpanda,
topics=self.test_instance.topics)
self.manifest = s3_snapshot.manifest_for_ntp(
self.test_instance.topic, 0)
self.spillover_manifests = s3_snapshot.get_spillover_manifests(
NTP("kafka", self.test_instance.topic, 0))
if not self.spillover_manifests:
return False
manifest_keys = set(self.manifest['segments'].keys())
spillover_keys = set(
sm['segment'].keys()
for sm in self.spillover_manifests.values())
return manifest_keys.isdisjoint(spillover_keys)

def has_data(self) -> bool:
return self.manifest and self.spillover_manifests

manifests = Manifests(self)
wait_until(
lambda: manifests(),
backoff_sec=1,
timeout_sec=120,
err_msg='Could not find suitable manifest and spillover combination'
)

assert manifests.has_data(
), 'Manifests were not loaded from cloud storage'
manifest = manifests.manifest
spillover_manifests = manifests.spillover_manifests
# Enable aggressive local retention to remove local copy of the data
self.rpk.alter_topic_config(self.topic, 'retention.local.target.bytes',
self.segment_size * 5)
Expand All @@ -277,13 +310,8 @@ def all_partitions_spilled():
'false')
time.sleep(1)

# collect all spillover manifests
all_spillover_manifests = []
for manifest_meta, sm in spillover_manifests.items():
all_spillover_manifests.append(sm)

# sorted list containing spillover manifest metadata
all_spillover_manifests = sorted(all_spillover_manifests,
# collect+sort all spillover manifests
all_spillover_manifests = sorted(spillover_manifests.values(),
key=lambda sm: sm['start_offset'])

first_left = None
Expand All @@ -303,33 +331,30 @@ def all_partitions_spilled():
total_size = sum([s['size_bytes'] for _, s in segments])
first_segment_meta = segments[0][1]
last_segment_meta = segments[-1][1]
spillover_manifest_meta = {}

# fill spillover manifest meta with data from first segment
spillover_manifest_meta['ntp_revision'] = first_segment_meta[
'ntp_revision']
spillover_manifest_meta['base_offset'] = first_segment_meta[
'base_offset']
spillover_manifest_meta['base_timestamp'] = first_segment_meta[
'base_timestamp']
spillover_manifest_meta['delta_offset'] = first_segment_meta[
'delta_offset']

# override offsets with data from the last segment
spillover_manifest_meta['committed_offset'] = last_segment_meta[
'committed_offset']
spillover_manifest_meta['delta_offset_end'] = last_segment_meta[
'delta_offset_end']
spillover_manifest_meta['max_timestamp'] = last_segment_meta[
'max_timestamp']
spillover_manifest_meta['size_bytes'] = total_size
manifest['spillover'].append(spillover_manifest_meta)
manifest['spillover'].append({
'ntp_revision':
first_segment_meta['ntp_revision'],
'base_offset':
first_segment_meta['base_offset'],
'base_timestamp':
first_segment_meta['base_timestamp'],
'delta_offset':
first_segment_meta['delta_offset'],
'committed_offset':
last_segment_meta['committed_offset'],
'delta_offset_end':
last_segment_meta['delta_offset_end'],
'max_timestamp':
last_segment_meta['max_timestamp'],
'size_bytes':
total_size
})

# adjust archive fields
manifest['archive_start_offset'] = first_left['base_offset']
manifest['archive_clean_offset'] = first_left['base_offset']
manifest['archive_start_offset_delta'] = first_left['delta_offset']
for _, segment in all_spillover_manifests[0]['segments'].items():
for segment in all_spillover_manifests[0]['segments'].values():
manifest['archive_size_bytes'] -= segment['size_bytes']

json_man = json.dumps(manifest)
Expand All @@ -355,7 +380,7 @@ def all_partitions_spilled():
producer.free()

# wait for uploads from first
wait_until(lambda: self._all_uploads_done() == True,
wait_until(lambda: self._all_uploads_done(),
timeout_sec=60,
backoff_sec=5)
rpk = RpkTool(self.redpanda)
Expand Down

0 comments on commit f426912

Please sign in to comment.