diff --git a/tests/rptest/services/cluster.py b/tests/rptest/services/cluster.py index 928f832bf50ce..5387ebf1ab196 100644 --- a/tests/rptest/services/cluster.py +++ b/tests/rptest/services/cluster.py @@ -156,6 +156,7 @@ def wrapped(self, *args, **kwargs): if self.redpanda.si_settings is not None: try: + self.redpanda.maybe_do_internal_scrub() self.redpanda.stop_and_scrub_object_storage() except: self.redpanda.cloud_storage_diagnostics() diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index 02d068cb38904..5819cfd5126b3 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -619,6 +619,9 @@ def set_expected_damage(self, damage_types: set[str]): def is_damage_expected(self, damage_types: set[str]): return (damage_types & self._expected_damage_types) == damage_types + def get_expected_damage(self): + return self._expected_damage_types + @classmethod def cache_size_for_throughput(cls, throughput_bytes: int) -> int: """ @@ -681,8 +684,8 @@ def __init__(self): # sasl is required def sasl_enabled(self): return (self.kafka_enable_authorization is None and self.enable_sasl - and self.endpoint_authn_method - is None) or self.endpoint_authn_method == "sasl" + and self.endpoint_authn_method is None + ) or self.endpoint_authn_method == "sasl" # principal is extracted from mtls distinguished name def mtls_identity_enabled(self): @@ -2413,10 +2416,8 @@ def set_cluster_config(self, admin_client = self._admin patch_result = admin_client.patch_cluster_config( - upsert={ - k: v - for k, v in values.items() if v is not None - }, + upsert={k: v + for k, v in values.items() if v is not None}, remove=[k for k, v in values.items() if v is None]) new_version = patch_result['config_version'] @@ -3777,6 +3778,73 @@ def stop_and_scrub_object_storage(self, run_timeout=300): # :param run_timeout timeout for the execution of rp-storage-tool. # can be set to None for no timeout + # We stop because the scrubbing routine would otherwise interpret + # ongoing uploads as inconsistency. In future, we may replace this + # stop with a flush, when Redpanda gets an admin API for explicitly + # flushing data to remote storage. + self.wait_for_manifest_uploads() + self.stop() + + scrub_timeout = max(run_timeout, self.cloud_storage_scrub_timeout_s) + report = self._get_object_storage_report(timeout=scrub_timeout) + + # It is legal for tiered storage to leak objects under + # certain circumstances: this will remain the case until + # we implement background scrub + modify test code to + # insist on Redpanda completing its own scrub before + # we externally validate + # (https://github.com/redpanda-data/redpanda/issues/9072) + permitted_anomalies = {"segments_outside_manifest"} + + # Whether any anomalies were found + any_anomalies = any(len(v) for v in report['anomalies'].values()) + + # List of fatal anomalies found + fatal_anomalies = set(k for k, v in report['anomalies'].items() + if len(v) and k not in permitted_anomalies) + + if not any_anomalies: + self.logger.info(f"No anomalies in object storage scrub") + elif not fatal_anomalies: + self.logger.warn( + f"Non-fatal anomalies in remote storage: {json.dumps(report, indent=2)}" + ) + else: + # Tests may declare that they expect some anomalies, e.g. if they + # intentionally damage the data. + if self._si_settings.is_damage_expected(fatal_anomalies): + self.logger.warn( + f"Tolerating anomalies in remote storage: {json.dumps(report, indent=2)}" + ) + else: + self.logger.error( + f"Fatal anomalies in remote storage: {json.dumps(report, indent=2)}" + ) + raise RuntimeError( + f"Object storage scrub detected fatal anomalies of type {fatal_anomalies}" + ) + + def maybe_do_internal_scrub(self): + if not self._si_settings: + return + + cloud_partitions = self.wait_for_manifest_uploads() + results = self.wait_for_internal_scrub(cloud_partitions) + + if results: + self.logger.error( + "Fatal anomalies reported by internal scrub: " + f"{json.dumps(results, indent=2)}") + raise RuntimeError( + f"Internal object storage scrub detected fatal anomalies: {results}" + ) + else: + self.logger.info( + f"No anomalies in internal object storage scrub") + + def wait_for_manifest_uploads(self) -> set[Partition]: + cloud_storage_partitions: set[Partition] = set() + def all_partitions_uploaded_manifest(): manifest_not_uploaded = [] for p in self.partitions(): @@ -3794,6 +3862,16 @@ def all_partitions_uploaded_manifest(): remote_write = status["cloud_storage_mode"] in { "full", "write_only" } + + if remote_write: + # TODO(vlad): do this differently? + # Create new partition tuples since the replicas list is not hashable + cloud_storage_partitions.add( + Partition(topic=p.topic, + index=p.index, + leader=p.leader, + replicas=None)) + has_uploaded_manifest = status[ "metadata_update_pending"] is False or status.get( 'ms_since_last_manifest_upload', None) is not None @@ -3819,50 +3897,119 @@ def all_partitions_uploaded_manifest(): timeout_sec=30 + timeout, backoff_sec=1) - # We stop because the scrubbing routine would otherwise interpret - # ongoing uploads as inconsistency. In future, we may replace this - # stop with a flush, when Redpanda gets an admin API for explicitly - # flushing data to remote storage. - self.stop() + return cloud_storage_partitions - scrub_timeout = max(run_timeout, self.cloud_storage_scrub_timeout_s) - report = self._get_object_storage_report(timeout=scrub_timeout) + def wait_for_internal_scrub(self, cloud_storage_partitions): + """ + Configure the scrubber such that it will run aggresively + until the entire partition is scrubbed. Once that happens, + the scrubber will pause due to the `cloud_storage_full_scrub_interval_ms` + config. Returns the aggregated anomalies for all partitions after applying + filtering for expected damage. + """ + if not cloud_storage_partitions: + return None - # It is legal for tiered storage to leak objects under - # certain circumstances: this will remain the case until - # we implement background scrub + modify test code to - # insist on Redpanda completing its own scrub before - # we externally validate - # (https://github.com/redpanda-data/redpanda/issues/9072) - permitted_anomalies = {"segments_outside_manifest"} + self.set_cluster_config({ + "cloud_storage_partial_scrub_interval_ms": + 100, + "cloud_storage_full_scrub_interval_ms": + 1000 * 60 * 10, + "cloud_storage_scrubbing_interval_jitter_ms": + 100, + "cloud_storage_background_jobs_quota": + 5000, + "cloud_storage_housekeeping_interval_ms": + 100, + "cloud_storage_enable_segment_merging": + False + }) + + unavailable = set() + for p in cloud_storage_partitions: + try: + self._admin.reset_scrubbing_metadata(namespace="kafka", + topic=p.topic, + partition=p.index, + node=p.leader) + except HTTPError as he: + if he.response.status_code == 404: + # Old redpanda, doesn't have this endpoint. We can't + # do our upload check. + unavailable.add(p) + continue + else: + raise + + cloud_storage_partitions -= unavailable + scrubbed = set() + all_anomalies = [] + + allowed_keys = set([ + "ns", "topic", "partition", "revision_id", "last_complete_scrub_at" + ]) + + expected_damage = self._si_settings.get_expected_damage() + + def filter_anomalies(detected): + bad_delta_types = set( + ["non_monotonical_delta", "mising_delta", "end_delta_smaller"]) + + for anomaly_type in expected_damage: + if anomaly_type == "ntpr_no_manifest": + detected.pop("missing_partition_manifest", None) + if anomaly_type == "missing_segments": + detected.pop("missing_segments", None) + if anomaly_type == "missing_spillover_manifests": + detected.pop("missing_spillover_manifests", None) + if anomaly_type == "ntpr_bad_deltas": + if metas := detected.get("segment_metadata_anomalies"): + metas = [ + m for m in metas + if m["type"] not in bad_delta_types + ] + if metas: + detected["segment_metadata_anomalies"] = metas + else: + detected.pop("segment_metadata_anomalies", None) + if anomaly_type == "ntpr_overlap_offsets": + if metas := detected.get("segment_metadata_anomalies"): + metas = [ + m for m in metas if m["type"] != "offset_overlap" + ] + if metas: + detected["segment_metadata_anomalies"] = metas + else: + detected.pop("segment_metadata_anomalies", None) + if anomaly_type == "metadata_offset_gaps": + if metas := detected.get("segment_metadata_anomalies"): + metas = [m for m in metas if m["type"] != "offset_gap"] + if metas: + detected["segment_metadata_anomalies"] = metas + else: + detected.pop("segment_metadata_anomalies", None) + + def all_partitions_scrubbed(): + waiting_for = cloud_storage_partitions - scrubbed + self.logger.info( + f"Waiting for {len(waiting_for)} partitions to be scrubbed") + for p in waiting_for: + result = self._admin.get_cloud_storage_anomalies( + namespace="kafka", topic=p.topic, partition=p.index) + if "last_complete_scrub_at" in result: + scrubbed.add(p) - # Whether any anomalies were found - any_anomalies = any(len(v) for v in report['anomalies'].values()) + filter_anomalies(result) + if set(result.keys()) != allowed_keys: + all_anomalies.append(result) - # List of fatal anomalies found - fatal_anomalies = set(k for k, v in report['anomalies'].items() - if len(v) and k not in permitted_anomalies) + return len(waiting_for) == 0 - if not any_anomalies: - self.logger.info(f"No anomalies in object storage scrub") - elif not fatal_anomalies: - self.logger.warn( - f"Non-fatal anomalies in remote storage: {json.dumps(report, indent=2)}" - ) - else: - # Tests may declare that they expect some anomalies, e.g. if they - # intentionally damage the data. - if self._si_settings.is_damage_expected(fatal_anomalies): - self.logger.warn( - f"Tolerating anomalies in remote storage: {json.dumps(report, indent=2)}" - ) - else: - self.logger.error( - f"Fatal anomalies in remote storage: {json.dumps(report, indent=2)}" - ) - raise RuntimeError( - f"Object storage scrub detected fatal anomalies of type {fatal_anomalies}" - ) + n_partitions = len(cloud_storage_partitions) + timeout = (n_partitions // 100 + 1) * 60 + wait_until(all_partitions_scrubbed, timeout_sec=timeout, backoff_sec=5) + + return all_anomalies def set_expected_controller_records(self, max_records: Optional[int]): self._expect_max_controller_records = max_records diff --git a/tests/rptest/tests/cloud_storage_scrubber_test.py b/tests/rptest/tests/cloud_storage_scrubber_test.py index f9d89a3f61a8d..3303773719c71 100644 --- a/tests/rptest/tests/cloud_storage_scrubber_test.py +++ b/tests/rptest/tests/cloud_storage_scrubber_test.py @@ -368,4 +368,4 @@ def test_scrubber(self, cloud_storage_type): # and fudges the manifest. rp-storage-tool also picks # up on some of these things. self.redpanda.si_settings.set_expected_damage( - {"missing_segments", "metadata_offset_gaps"}) + {"missing_segments", "metadata_offset_gaps", "missing_spillover_manifests"})