diff --git a/tests/rptest/tests/cluster_metadata_upload_test.py b/tests/rptest/tests/cluster_metadata_upload_test.py index 74eefe0e5c398..207ff2da0595a 100644 --- a/tests/rptest/tests/cluster_metadata_upload_test.py +++ b/tests/rptest/tests/cluster_metadata_upload_test.py @@ -153,8 +153,7 @@ def test_uploads_after_wipe(self, cloud_storage_type): wait_until(lambda: self.bucket_has_metadata(1), timeout_sec=10, backoff_sec=1) - orig_cluster_uuid_resp: str = admin.get_cluster_uuid( - self.redpanda.nodes[0]) + orig_cluster_uuid_resp: str = admin.get_cluster_uuid(self.redpanda.nodes[0]) # Wipe the directory away, simulating a full cluster outage. self.redpanda.stop() diff --git a/tests/rptest/tests/topic_recovery_test.py b/tests/rptest/tests/topic_recovery_test.py index cb9fce6ab06ad..4d4890cac5e65 100644 --- a/tests/rptest/tests/topic_recovery_test.py +++ b/tests/rptest/tests/topic_recovery_test.py @@ -109,12 +109,14 @@ def restore_redpanda(self, baseline, controller_checksums, topics_spec: Sequence[TopicSpec] | None = None, - topics_overrides: dict[str, str] | None = None): + topics_overrides: dict[str, str] | None = None, + original_cluster_uuid: Optional[str] = None): """Run restore procedure. Default implementation runs it for every topic that it can find in S3.""" topics_spec = topics_spec or self.expected_recovered_topics - topic_manifests = list(self._get_all_topic_manifests()) + topic_manifests = list( + self._get_all_topic_manifests(original_cluster_uuid)) self.logger.info(f"topic_manifests: {topic_manifests}") for topic in topics_spec: for _, manifest in topic_manifests: @@ -126,7 +128,10 @@ def validate_node(self, host, baseline, restored): The checksums are sampled from data directory before and after recovery.""" pass - def validate_cluster(self, baseline, restored): + def validate_cluster(self, + baseline, + restored, + original_cluster_uuid: Optional[str] = None): """This method is invoked after the recovery and partition validation are done.""" pass @@ -152,14 +157,15 @@ def after_restart_validation(self): """ pass - def _validate_partition_last_offset(self): + def _validate_partition_last_offset(self, + remote_label: Optional[str] = None): """Validate restored partition by comparing high watermark to manifests last offset value. We expect high_watermark to be exactly equal to last_offset because redpanda removes the raft_configuration batch from the log. The log should have only one configuration batch. """ - view = BucketView(redpanda=self._redpanda) + view = BucketView(redpanda=self._redpanda, remote_label=remote_label) # Prompt the view to scan the bucket and load all manifests, so that we # don't have to calculate the revision ID @@ -225,7 +231,7 @@ def _list_objects(self): self.logger.info(f"ListObjects: {results}") return results - def _get_all_topic_manifests(self): + def _get_all_topic_manifests(self, cluster_uuid: Optional[str] = None): """Retrieves all topics stored in S3 bucket""" self.logger.info(f"List all objects in {self._bucket}") keys = [ @@ -233,7 +239,7 @@ def _get_all_topic_manifests(self): if key.endswith('topic_manifest.json') or key.endswith('topic_manifest.bin') ] - view = BucketView(self._redpanda) + view = BucketView(self._redpanda, remote_label=cluster_uuid) for key in keys: m = view.get_topic_manifest_from_path(key) self.logger.info(f"Topic manifest found at {key}, content:\n{m}") @@ -311,7 +317,10 @@ def validate_node(self, host, baseline, restored): assert len(new_b) == 1 assert "0-1-v1.log" in new_b - def validate_cluster(self, baseline, restored): + def validate_cluster(self, + baseline, + restored, + original_cluster_uuid: Optional[str] = None): """This method is invoked after the recovery and partition validation are done. Check that high_watermark is 0 for the new partition.""" @@ -358,7 +367,10 @@ def generate_baseline(self): self._redpanda.start_node(node) time.sleep(1) - def validate_cluster(self, baseline, restored): + def validate_cluster(self, + baseline, + restored, + original_cluster_uuid: Optional[str] = None): """This method is invoked after the recovery and partition validation are done. Check that high_watermark is 0 for the new partition.""" @@ -409,7 +421,10 @@ def create_initial_topics(self): """Simulate missing topic manifest by not creating any topics""" pass - def restore_redpanda(self, baseline, controller_checksums): + def restore_redpanda(self, + baseline, + controller_checksums, + original_cluster_uuid: Optional[str] = None): """Run restore with fake topic manifest path""" topic_manifest = "d0000000/meta/kafka/panda-topic/topic_manifest.json" self.logger.info(f"recovering {topic_manifest} that doesn't exist") @@ -419,7 +434,10 @@ def restore_redpanda(self, baseline, controller_checksums): partition_count=1, replication_factor=3), ) - def validate_cluster(self, baseline, restored): + def validate_cluster(self, + baseline, + restored, + original_cluster_uuid: Optional[str] = None): """This method is invoked after the recovery and partition validation are done. Check that high_watermark is 0 for the new partition.""" @@ -468,7 +486,8 @@ def _delete(self, key): self.logger.info(f"deleting manifest file {key}") self._s3.delete_object(self._bucket, key, True) - def _find_and_remove_partition_manifest(self): + def _find_and_remove_partition_manifest( + self, original_cluster_uuid: Optional[str] = None): """Find and delete manifest for partition 0. This method have to poll the bucket to make sure that the data is deleted from Minio (deletes are eventually consistent in Minio S3 implementation).""" @@ -476,7 +495,9 @@ def _find_and_remove_partition_manifest(self): ntp_0 = NTP(ns='kafka', topic=self.topic_name, partition=0) ntp_1 = NTP(ns='kafka', topic=self.topic_name, partition=1) - view = BucketView(self._redpanda) + self.logger.info( + f"AWONG finding partition manifest with {original_cluster_uuid}") + view = BucketView(self._redpanda, remote_label=original_cluster_uuid) manifests = view.partition_manifests manifest_0 = manifests[ntp_0] @@ -486,20 +507,29 @@ def _find_and_remove_partition_manifest(self): max([seg_meta['delta_offset_end'] for _, seg_meta in manifest_1['segments'].items()]) manifest_0_path = view.gen_manifest_path( - ntp_0.to_ntpr(manifest_0['revision'])) + ntp_0.to_ntpr(manifest_0['revision']), + remote_label=original_cluster_uuid) self._delete(manifest_0_path) self.logger.info( f"manifest {manifest_0_path} is removed, partition-1 last offset is {self._part1_offset}" ) - def restore_redpanda(self, baseline, controller_checksums): + def restore_redpanda(self, + baseline, + controller_checksums, + original_cluster_uuid: Optional[str] = None): """Run restore but first remove partition manifest from the bucket.""" - self._find_and_remove_partition_manifest() - super(MissingPartition, self).restore_redpanda(baseline, - controller_checksums) + self._find_and_remove_partition_manifest(original_cluster_uuid) + super(MissingPartition, self).restore_redpanda( + baseline, + controller_checksums, + original_cluster_uuid=original_cluster_uuid) - def validate_cluster(self, baseline, restored): + def validate_cluster(self, + baseline, + restored, + original_cluster_uuid: Optional[str] = None): """This method is invoked after the recovery and partition validation are done. Check that high_watermark is 0 for the new partition.""" @@ -577,13 +607,21 @@ def _find_and_remove_segment(self): else: assert False, "No segments found in the bucket" - def restore_redpanda(self, baseline, controller_checksums): + def restore_redpanda(self, + baseline, + controller_checksums, + original_cluster_uuid: Optional[str] = None): """Run restore but remove the segment first.""" self._find_and_remove_segment() - super(MissingSegment, self).restore_redpanda(baseline, - controller_checksums) + super(MissingSegment, self).restore_redpanda( + baseline, + controller_checksums, + original_cluster_uuid=original_cluster_uuid) - def validate_cluster(self, baseline, restored): + def validate_cluster(self, + baseline, + restored, + original_cluster_uuid: Optional[str] = None): """Check that the topic is writeable""" self.logger.info( f"MissingSegment.validate_cluster - baseline - {baseline}") @@ -675,11 +713,14 @@ def _collect_non_data_batch_sizes(self, expected_topics): f'{pprint.pformat(dict(non_data_batches_per_ntp), indent=2)}') return non_data_batches_per_ntp - def validate_cluster(self, baseline, restored): + def validate_cluster(self, + baseline, + restored, + original_cluster_uuid: Optional[str] = None): """Check that the topic is writeable""" self.logger.info(f"FastCheck.validate_cluster - baseline - {baseline}") self.logger.info(f"FastCheck.validate_cluster - restored - {restored}") - self._validate_partition_last_offset() + self._validate_partition_last_offset(original_cluster_uuid) expected_topics = [ topic.name for topic in self.expected_recovered_topics ] @@ -734,7 +775,10 @@ def generate_baseline(self): message_size) self._kafka_tools.produce(topic.name, num_messages, message_size) - def restore_redpanda(self, baseline, controller_checksums): + def restore_redpanda(self, + baseline, + controller_checksums, + original_cluster_uuid: Optional[str] = None): """Run restore procedure. Default implementation runs it for every topic that it can find in S3.""" @@ -750,7 +794,8 @@ def restore_redpanda(self, baseline, controller_checksums): assert is_close_size(size_bytes, self.max_size_bytes), \ f"Not enoug bytes produced, expected {self.max_size_bytes} got {size_bytes}" - topic_manifests = list(self._get_all_topic_manifests()) + topic_manifests = list( + self._get_all_topic_manifests(original_cluster_uuid)) self.logger.info(f"topic_manifests: {topic_manifests}") for topic in self.expected_recovered_topics: for _, manifest in topic_manifests: @@ -760,7 +805,10 @@ def restore_redpanda(self, baseline, controller_checksums): self.restored_size_bytes }) - def validate_cluster(self, baseline, restored): + def validate_cluster(self, + baseline, + restored, + original_cluster_uuid: Optional[str] = None): """Check size of every partition.""" self.logger.info( f"SizeBasedRetention.validate_cluster - baseline - {baseline}") @@ -856,11 +904,15 @@ def generate_baseline(self): producer.wait() producer.free() - def restore_redpanda(self, baseline, controller_checksums): + def restore_redpanda(self, + baseline, + controller_checksums, + original_cluster_uuid: Optional[str] = None): """Run restore procedure. Default implementation runs it for every topic that it can find in S3.""" - topic_manifests = list(self._get_all_topic_manifests()) + topic_manifests = list( + self._get_all_topic_manifests(original_cluster_uuid)) self.logger.info(f"topic_manifests: {topic_manifests}") for topic in self.expected_recovered_topics: for _, manifest in topic_manifests: @@ -869,7 +921,10 @@ def restore_redpanda(self, baseline, controller_checksums): self._restore_topic(manifest, {"retention.local.target.ms": 3600000}) - def validate_cluster(self, baseline, restored): + def validate_cluster(self, + baseline, + restored, + original_cluster_uuid: Optional[str] = None): """Check that the topic is writeable""" self.logger.info( f"TimeBasedRetention.validate_cluster - baseline - {baseline}") @@ -929,14 +984,21 @@ def create_initial_topics(self): topic.virtual_cluster_id }) - def restore_redpanda(self, baseline, controller_checksums): + def restore_redpanda(self, baseline, controller_checksums, + original_cluster_uuid: Optional[str]): self._redpanda.set_cluster_config({"enable_mpx_extensions": True}) - return super().restore_redpanda(baseline, controller_checksums) + return super().restore_redpanda( + baseline, + controller_checksums, + original_cluster_uuid=original_cluster_uuid) def validate_node(self, host, baseline, restored): pass - def validate_cluster(self, baseline, restored): + def validate_cluster(self, + baseline, + restored, + original_cluster_uuid: Optional[str] = None): topic_cfg = self._rpk.describe_topic_configs("panda-topic") self.logger.info(f"cfg: {topic_cfg}") @@ -996,7 +1058,7 @@ def wait_for_status(): assert status['request']['retention_bytes'] == -1 assert status['request']['retention_ms'] == 500000 - def restore_redpanda(self, *_): + def restore_redpanda(self, *_, original_cluster_uuid: Optional[str]): payload = {'retention_ms': 500000} response = self.admin.initiate_topic_scan_and_recovery(payload=payload) assert response.status_code == requests.status_codes.codes[ @@ -1004,13 +1066,19 @@ def restore_redpanda(self, *_): self._assert_duplicate_request_is_rejected() self._assert_status() - def validate_cluster(self, baseline, restored): + def validate_cluster(self, + baseline, + restored, + original_cluster_uuid: Optional[str] = None): """Check that the topic is writeable""" self.logger.info( f"AdminApiBasedRestore.validate_cluster - baseline - {baseline}") self.logger.info( f"AdminApiBasedRestore.validate_cluster - restored - {restored}") - self._validate_partition_last_offset() + self.logger.info( + f"AdminApiBasedRestore.validate_cluster - original_cluster_uuid - {original_cluster_uuid}" + ) + self._validate_partition_last_offset(original_cluster_uuid) def after_restart_validation(self): super().after_restart_validation() @@ -1056,13 +1124,17 @@ def generate_baseline(self): quiesce_uploads(self._redpanda, [topic.name for topic in self.topics], 400) - def restore_redpanda(self, baseline, controller_checksums): + def restore_redpanda(self, baseline, controller_checksums, + original_cluster_uuid: Optional[str]): # set check mode, start recovery of topics self._redpanda.set_cluster_config( values={ 'cloud_storage_recovery_topic_validation_mode': self.check_mode }) - return super().restore_redpanda(baseline, controller_checksums) + return super().restore_redpanda( + baseline, + controller_checksums, + original_cluster_uuid=original_cluster_uuid) class PreventRecoveryOfDamagedPartitionsCase(BaseCase): @@ -1142,13 +1214,17 @@ def _find_and_remove_newest_segments(self, topic: TopicSpec, skip: int): self._delete(victim[0][1]) - def validate_cluster(self, baseline, restored): + def validate_cluster(self, + baseline, + restored, + original_cluster_uuid: Optional[str] = None): """Check that self unrecoverable_topic is not present""" assert len( list(self._rpk.describe_topic(self.unrecoverable_topic.name)) ) == 0, f"topic {self.unrecoverable_topic.name} is present" - def restore_redpanda(self, baseline, controller_checksums): + def restore_redpanda(self, baseline, controller_checksums, + original_cluster_uuid: Optional[str]): # skip 4 segments before deleting one. validation_depth=10 should fail the recovery self._find_and_remove_newest_segments(self.unrecoverable_topic, skip=4) @@ -1162,14 +1238,18 @@ def restore_redpanda(self, baseline, controller_checksums): }) # restore recoverable topic. this should not fail - super().restore_redpanda(baseline, controller_checksums, - [self.recoverable_topic]) + super().restore_redpanda(baseline, + controller_checksums, + [self.recoverable_topic], + original_cluster_uuid=original_cluster_uuid) # restore unrecoverable topic. this should fail exception = None try: - super().restore_redpanda(baseline, controller_checksums, - [self.unrecoverable_topic]) + super().restore_redpanda( + baseline, + controller_checksums, [self.unrecoverable_topic], + original_cluster_uuid=original_cluster_uuid) except RpkException as e: exception = e assert exception is not None, f"RpkException expected for unrecoverable {self.unrecoverable_topic.name}" @@ -1517,6 +1597,7 @@ def do_run(self, test_case: BaseCase, upload_delay_sec=60): test_case.topics, timeout=datetime.timedelta(seconds=upload_delay_sec)) + original_cluster_uuid = self.redpanda._admin.get_cluster_uuid() self._stop_redpanda_nodes() baseline = self._collect_file_checksums() @@ -1525,9 +1606,13 @@ def do_run(self, test_case: BaseCase, upload_delay_sec=60): self._start_redpanda_nodes() - self.logger.info("Restoring topic data") + self.logger.info( + f"Restoring topic data, original cluster uuid {original_cluster_uuid}" + ) controller_cs = self._collect_controller_log_checksums() - test_case.restore_redpanda(baseline, controller_cs) + test_case.restore_redpanda(baseline, + controller_cs, + original_cluster_uuid=original_cluster_uuid) self.logger.info("Waiting while topic is created") self._wait_for_topic(test_case.expected_recovered_topics) @@ -1541,8 +1626,11 @@ def do_run(self, test_case: BaseCase, upload_delay_sec=60): self.logger.info(f"Validating node {host}") test_case.validate_node(host, baseline[host], restored[host]) - self.logger.info("Validate all") - test_case.validate_cluster(baseline, restored) + self.logger.info( + f"Validate all, original cluster uuid: {original_cluster_uuid}") + test_case.validate_cluster(baseline, + restored, + original_cluster_uuid=original_cluster_uuid) if test_case.second_restart_needed: self._stop_redpanda_nodes() diff --git a/tests/rptest/tests/upgrade_test.py b/tests/rptest/tests/upgrade_test.py index 26a82b21f374b..35c93be288d22 100644 --- a/tests/rptest/tests/upgrade_test.py +++ b/tests/rptest/tests/upgrade_test.py @@ -519,6 +519,13 @@ def verify(): }, node=new_version_node) + # 24.1.x -> 24.2.x: versions 24.1 and below will not support remote + # labels, so we'll explicitly tune our bucket view to not filter with + # labels. + initial_supports_remote_label = False + if initial_version > Version("24.2.0"): + initial_supports_remote_label = True + if block_uploads_during_upgrade: # If uploads are blocked during upgrade, we expect the new # nodes not to be able to trim their local logs. @@ -546,7 +553,8 @@ def verify(): timeout_sec=60) # capture the cloud storage state to run a progress check later - bucket_view = BucketView(self.redpanda) + bucket_view = BucketView( + self.redpanda, with_remote_labels=initial_supports_remote_label) manifest_mid_upgrade = bucket_view.manifest_for_ntp( topic=topic, partition=newdata_p) diff --git a/tests/rptest/tests/workload_producer_consumer.py b/tests/rptest/tests/workload_producer_consumer.py index 1f866785b8093..0e9a8c308fd4f 100644 --- a/tests/rptest/tests/workload_producer_consumer.py +++ b/tests/rptest/tests/workload_producer_consumer.py @@ -8,6 +8,8 @@ # by the Apache License, Version 2.0 import uuid +from packaging.version import Version + from rptest.clients.rpk import RpkTool from rptest.clients.types import TopicSpec from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierSeqConsumer @@ -71,6 +73,8 @@ def begin(self): assert topic_cfg["redpanda.remote.read"].value == "true" and topic_cfg[ "redpanda.remote.write"].value == "true" + self.begin_version = Version( + self.ctx.redpanda.get_version(self.ctx.redpanda.nodes[0])) self._producer.start(clean=False) self._producer.wait_for_offset_map() self._seq_consumer.start(clean=False) @@ -92,13 +96,20 @@ def on_cluster_upgraded(self, version: tuple[int, int, int]) -> int: query remote storage and compute uploaded_kafka_offset, to check that progress is made """ major_version = version[0:2] - quiesce_uploads(self.ctx.redpanda, [self.topic.name], 120) + begin_supports_remote_label = self.begin_version > Version("24.2.0") + quiesce_uploads(self.ctx.redpanda, [self.topic.name], + 120, + with_remote_labels=begin_supports_remote_label) # get the manifest for a topic and do some checking topic_description = self.rpk.describe_topic(self.topic.name) partition_zero = next(topic_description) - bucket = BucketView(self.ctx.redpanda) + # If topics were created on a version that doesn't support labels, opt + # out of using them in the bucket view. + bucket = BucketView(self.ctx.redpanda, + with_remote_labels=begin_supports_remote_label) + manifest = bucket.manifest_for_ntp(self.topic.name, partition_zero.id) if major_version < (23, 2):