From 766a497af4688da1d97dce058c8824046b6505cf Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Mon, 23 Oct 2023 12:50:44 +0100 Subject: [PATCH] rptest: perform internal scrub before stopping This commit updates the RedpandaService such that it will perform an internal scrub (as opposed to external via rp-storage-tool) at the end of all tiered storage tests. This is achieved by configuring the scrubber to be aggresive until it reaches the end of the log. Once that happens, it will wait for the full scrub timeout (10 minutes in this commit). --- tests/rptest/services/cluster.py | 1 + tests/rptest/services/redpanda.py | 237 ++++++++++++++---- .../tests/cloud_storage_scrubber_test.py | 2 +- 3 files changed, 194 insertions(+), 46 deletions(-) diff --git a/tests/rptest/services/cluster.py b/tests/rptest/services/cluster.py index 928f832bf50ce..5387ebf1ab196 100644 --- a/tests/rptest/services/cluster.py +++ b/tests/rptest/services/cluster.py @@ -156,6 +156,7 @@ def wrapped(self, *args, **kwargs): if self.redpanda.si_settings is not None: try: + self.redpanda.maybe_do_internal_scrub() self.redpanda.stop_and_scrub_object_storage() except: self.redpanda.cloud_storage_diagnostics() diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index 02d068cb38904..5819cfd5126b3 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -619,6 +619,9 @@ def set_expected_damage(self, damage_types: set[str]): def is_damage_expected(self, damage_types: set[str]): return (damage_types & self._expected_damage_types) == damage_types + def get_expected_damage(self): + return self._expected_damage_types + @classmethod def cache_size_for_throughput(cls, throughput_bytes: int) -> int: """ @@ -681,8 +684,8 @@ def __init__(self): # sasl is required def sasl_enabled(self): return (self.kafka_enable_authorization is None and self.enable_sasl - and self.endpoint_authn_method - is None) or self.endpoint_authn_method == "sasl" + and self.endpoint_authn_method is None + ) or self.endpoint_authn_method == "sasl" # principal is extracted from mtls distinguished name def mtls_identity_enabled(self): @@ -2413,10 +2416,8 @@ def set_cluster_config(self, admin_client = self._admin patch_result = admin_client.patch_cluster_config( - upsert={ - k: v - for k, v in values.items() if v is not None - }, + upsert={k: v + for k, v in values.items() if v is not None}, remove=[k for k, v in values.items() if v is None]) new_version = patch_result['config_version'] @@ -3777,6 +3778,73 @@ def stop_and_scrub_object_storage(self, run_timeout=300): # :param run_timeout timeout for the execution of rp-storage-tool. # can be set to None for no timeout + # We stop because the scrubbing routine would otherwise interpret + # ongoing uploads as inconsistency. In future, we may replace this + # stop with a flush, when Redpanda gets an admin API for explicitly + # flushing data to remote storage. + self.wait_for_manifest_uploads() + self.stop() + + scrub_timeout = max(run_timeout, self.cloud_storage_scrub_timeout_s) + report = self._get_object_storage_report(timeout=scrub_timeout) + + # It is legal for tiered storage to leak objects under + # certain circumstances: this will remain the case until + # we implement background scrub + modify test code to + # insist on Redpanda completing its own scrub before + # we externally validate + # (https://github.com/redpanda-data/redpanda/issues/9072) + permitted_anomalies = {"segments_outside_manifest"} + + # Whether any anomalies were found + any_anomalies = any(len(v) for v in report['anomalies'].values()) + + # List of fatal anomalies found + fatal_anomalies = set(k for k, v in report['anomalies'].items() + if len(v) and k not in permitted_anomalies) + + if not any_anomalies: + self.logger.info(f"No anomalies in object storage scrub") + elif not fatal_anomalies: + self.logger.warn( + f"Non-fatal anomalies in remote storage: {json.dumps(report, indent=2)}" + ) + else: + # Tests may declare that they expect some anomalies, e.g. if they + # intentionally damage the data. + if self._si_settings.is_damage_expected(fatal_anomalies): + self.logger.warn( + f"Tolerating anomalies in remote storage: {json.dumps(report, indent=2)}" + ) + else: + self.logger.error( + f"Fatal anomalies in remote storage: {json.dumps(report, indent=2)}" + ) + raise RuntimeError( + f"Object storage scrub detected fatal anomalies of type {fatal_anomalies}" + ) + + def maybe_do_internal_scrub(self): + if not self._si_settings: + return + + cloud_partitions = self.wait_for_manifest_uploads() + results = self.wait_for_internal_scrub(cloud_partitions) + + if results: + self.logger.error( + "Fatal anomalies reported by internal scrub: " + f"{json.dumps(results, indent=2)}") + raise RuntimeError( + f"Internal object storage scrub detected fatal anomalies: {results}" + ) + else: + self.logger.info( + f"No anomalies in internal object storage scrub") + + def wait_for_manifest_uploads(self) -> set[Partition]: + cloud_storage_partitions: set[Partition] = set() + def all_partitions_uploaded_manifest(): manifest_not_uploaded = [] for p in self.partitions(): @@ -3794,6 +3862,16 @@ def all_partitions_uploaded_manifest(): remote_write = status["cloud_storage_mode"] in { "full", "write_only" } + + if remote_write: + # TODO(vlad): do this differently? + # Create new partition tuples since the replicas list is not hashable + cloud_storage_partitions.add( + Partition(topic=p.topic, + index=p.index, + leader=p.leader, + replicas=None)) + has_uploaded_manifest = status[ "metadata_update_pending"] is False or status.get( 'ms_since_last_manifest_upload', None) is not None @@ -3819,50 +3897,119 @@ def all_partitions_uploaded_manifest(): timeout_sec=30 + timeout, backoff_sec=1) - # We stop because the scrubbing routine would otherwise interpret - # ongoing uploads as inconsistency. In future, we may replace this - # stop with a flush, when Redpanda gets an admin API for explicitly - # flushing data to remote storage. - self.stop() + return cloud_storage_partitions - scrub_timeout = max(run_timeout, self.cloud_storage_scrub_timeout_s) - report = self._get_object_storage_report(timeout=scrub_timeout) + def wait_for_internal_scrub(self, cloud_storage_partitions): + """ + Configure the scrubber such that it will run aggresively + until the entire partition is scrubbed. Once that happens, + the scrubber will pause due to the `cloud_storage_full_scrub_interval_ms` + config. Returns the aggregated anomalies for all partitions after applying + filtering for expected damage. + """ + if not cloud_storage_partitions: + return None - # It is legal for tiered storage to leak objects under - # certain circumstances: this will remain the case until - # we implement background scrub + modify test code to - # insist on Redpanda completing its own scrub before - # we externally validate - # (https://github.com/redpanda-data/redpanda/issues/9072) - permitted_anomalies = {"segments_outside_manifest"} + self.set_cluster_config({ + "cloud_storage_partial_scrub_interval_ms": + 100, + "cloud_storage_full_scrub_interval_ms": + 1000 * 60 * 10, + "cloud_storage_scrubbing_interval_jitter_ms": + 100, + "cloud_storage_background_jobs_quota": + 5000, + "cloud_storage_housekeeping_interval_ms": + 100, + "cloud_storage_enable_segment_merging": + False + }) + + unavailable = set() + for p in cloud_storage_partitions: + try: + self._admin.reset_scrubbing_metadata(namespace="kafka", + topic=p.topic, + partition=p.index, + node=p.leader) + except HTTPError as he: + if he.response.status_code == 404: + # Old redpanda, doesn't have this endpoint. We can't + # do our upload check. + unavailable.add(p) + continue + else: + raise + + cloud_storage_partitions -= unavailable + scrubbed = set() + all_anomalies = [] + + allowed_keys = set([ + "ns", "topic", "partition", "revision_id", "last_complete_scrub_at" + ]) + + expected_damage = self._si_settings.get_expected_damage() + + def filter_anomalies(detected): + bad_delta_types = set( + ["non_monotonical_delta", "mising_delta", "end_delta_smaller"]) + + for anomaly_type in expected_damage: + if anomaly_type == "ntpr_no_manifest": + detected.pop("missing_partition_manifest", None) + if anomaly_type == "missing_segments": + detected.pop("missing_segments", None) + if anomaly_type == "missing_spillover_manifests": + detected.pop("missing_spillover_manifests", None) + if anomaly_type == "ntpr_bad_deltas": + if metas := detected.get("segment_metadata_anomalies"): + metas = [ + m for m in metas + if m["type"] not in bad_delta_types + ] + if metas: + detected["segment_metadata_anomalies"] = metas + else: + detected.pop("segment_metadata_anomalies", None) + if anomaly_type == "ntpr_overlap_offsets": + if metas := detected.get("segment_metadata_anomalies"): + metas = [ + m for m in metas if m["type"] != "offset_overlap" + ] + if metas: + detected["segment_metadata_anomalies"] = metas + else: + detected.pop("segment_metadata_anomalies", None) + if anomaly_type == "metadata_offset_gaps": + if metas := detected.get("segment_metadata_anomalies"): + metas = [m for m in metas if m["type"] != "offset_gap"] + if metas: + detected["segment_metadata_anomalies"] = metas + else: + detected.pop("segment_metadata_anomalies", None) + + def all_partitions_scrubbed(): + waiting_for = cloud_storage_partitions - scrubbed + self.logger.info( + f"Waiting for {len(waiting_for)} partitions to be scrubbed") + for p in waiting_for: + result = self._admin.get_cloud_storage_anomalies( + namespace="kafka", topic=p.topic, partition=p.index) + if "last_complete_scrub_at" in result: + scrubbed.add(p) - # Whether any anomalies were found - any_anomalies = any(len(v) for v in report['anomalies'].values()) + filter_anomalies(result) + if set(result.keys()) != allowed_keys: + all_anomalies.append(result) - # List of fatal anomalies found - fatal_anomalies = set(k for k, v in report['anomalies'].items() - if len(v) and k not in permitted_anomalies) + return len(waiting_for) == 0 - if not any_anomalies: - self.logger.info(f"No anomalies in object storage scrub") - elif not fatal_anomalies: - self.logger.warn( - f"Non-fatal anomalies in remote storage: {json.dumps(report, indent=2)}" - ) - else: - # Tests may declare that they expect some anomalies, e.g. if they - # intentionally damage the data. - if self._si_settings.is_damage_expected(fatal_anomalies): - self.logger.warn( - f"Tolerating anomalies in remote storage: {json.dumps(report, indent=2)}" - ) - else: - self.logger.error( - f"Fatal anomalies in remote storage: {json.dumps(report, indent=2)}" - ) - raise RuntimeError( - f"Object storage scrub detected fatal anomalies of type {fatal_anomalies}" - ) + n_partitions = len(cloud_storage_partitions) + timeout = (n_partitions // 100 + 1) * 60 + wait_until(all_partitions_scrubbed, timeout_sec=timeout, backoff_sec=5) + + return all_anomalies def set_expected_controller_records(self, max_records: Optional[int]): self._expect_max_controller_records = max_records diff --git a/tests/rptest/tests/cloud_storage_scrubber_test.py b/tests/rptest/tests/cloud_storage_scrubber_test.py index f9d89a3f61a8d..3303773719c71 100644 --- a/tests/rptest/tests/cloud_storage_scrubber_test.py +++ b/tests/rptest/tests/cloud_storage_scrubber_test.py @@ -368,4 +368,4 @@ def test_scrubber(self, cloud_storage_type): # and fudges the manifest. rp-storage-tool also picks # up on some of these things. self.redpanda.si_settings.set_expected_damage( - {"missing_segments", "metadata_offset_gaps"}) + {"missing_segments", "metadata_offset_gaps", "missing_spillover_manifests"})