Skip to content

Commit

Permalink
rptest: perform internal scrub before stopping
Browse files Browse the repository at this point in the history
This commit updates the RedpandaService such that it will perform an
internal scrub (as opposed to external via rp-storage-tool) at the end
of all tiered storage tests. This is achieved by configuring the
scrubber to be aggresive until it reaches the end of the log. Once that
happens, it will wait for the full scrub timeout (10 minutes in this
commit).
  • Loading branch information
Vlad Lazar committed Oct 25, 2023
1 parent 2dc6e19 commit 766a497
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 46 deletions.
1 change: 1 addition & 0 deletions tests/rptest/services/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def wrapped(self, *args, **kwargs):

if self.redpanda.si_settings is not None:
try:
self.redpanda.maybe_do_internal_scrub()
self.redpanda.stop_and_scrub_object_storage()
except:
self.redpanda.cloud_storage_diagnostics()
Expand Down
237 changes: 192 additions & 45 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,9 @@ def set_expected_damage(self, damage_types: set[str]):
def is_damage_expected(self, damage_types: set[str]):
return (damage_types & self._expected_damage_types) == damage_types

def get_expected_damage(self):
return self._expected_damage_types

@classmethod
def cache_size_for_throughput(cls, throughput_bytes: int) -> int:
"""
Expand Down Expand Up @@ -681,8 +684,8 @@ def __init__(self):
# sasl is required
def sasl_enabled(self):
return (self.kafka_enable_authorization is None and self.enable_sasl
and self.endpoint_authn_method
is None) or self.endpoint_authn_method == "sasl"
and self.endpoint_authn_method is None
) or self.endpoint_authn_method == "sasl"

# principal is extracted from mtls distinguished name
def mtls_identity_enabled(self):
Expand Down Expand Up @@ -2413,10 +2416,8 @@ def set_cluster_config(self,
admin_client = self._admin

patch_result = admin_client.patch_cluster_config(
upsert={
k: v
for k, v in values.items() if v is not None
},
upsert={k: v
for k, v in values.items() if v is not None},
remove=[k for k, v in values.items() if v is None])
new_version = patch_result['config_version']

Expand Down Expand Up @@ -3777,6 +3778,73 @@ def stop_and_scrub_object_storage(self, run_timeout=300):
# :param run_timeout timeout for the execution of rp-storage-tool.
# can be set to None for no timeout

# We stop because the scrubbing routine would otherwise interpret
# ongoing uploads as inconsistency. In future, we may replace this
# stop with a flush, when Redpanda gets an admin API for explicitly
# flushing data to remote storage.
self.wait_for_manifest_uploads()
self.stop()

scrub_timeout = max(run_timeout, self.cloud_storage_scrub_timeout_s)
report = self._get_object_storage_report(timeout=scrub_timeout)

# It is legal for tiered storage to leak objects under
# certain circumstances: this will remain the case until
# we implement background scrub + modify test code to
# insist on Redpanda completing its own scrub before
# we externally validate
# (https://github.com/redpanda-data/redpanda/issues/9072)
permitted_anomalies = {"segments_outside_manifest"}

# Whether any anomalies were found
any_anomalies = any(len(v) for v in report['anomalies'].values())

# List of fatal anomalies found
fatal_anomalies = set(k for k, v in report['anomalies'].items()
if len(v) and k not in permitted_anomalies)

if not any_anomalies:
self.logger.info(f"No anomalies in object storage scrub")
elif not fatal_anomalies:
self.logger.warn(
f"Non-fatal anomalies in remote storage: {json.dumps(report, indent=2)}"
)
else:
# Tests may declare that they expect some anomalies, e.g. if they
# intentionally damage the data.
if self._si_settings.is_damage_expected(fatal_anomalies):
self.logger.warn(
f"Tolerating anomalies in remote storage: {json.dumps(report, indent=2)}"
)
else:
self.logger.error(
f"Fatal anomalies in remote storage: {json.dumps(report, indent=2)}"
)
raise RuntimeError(
f"Object storage scrub detected fatal anomalies of type {fatal_anomalies}"
)

def maybe_do_internal_scrub(self):
if not self._si_settings:
return

cloud_partitions = self.wait_for_manifest_uploads()
results = self.wait_for_internal_scrub(cloud_partitions)

if results:
self.logger.error(
"Fatal anomalies reported by internal scrub: "
f"{json.dumps(results, indent=2)}")
raise RuntimeError(
f"Internal object storage scrub detected fatal anomalies: {results}"
)
else:
self.logger.info(
f"No anomalies in internal object storage scrub")

def wait_for_manifest_uploads(self) -> set[Partition]:
cloud_storage_partitions: set[Partition] = set()

def all_partitions_uploaded_manifest():
manifest_not_uploaded = []
for p in self.partitions():
Expand All @@ -3794,6 +3862,16 @@ def all_partitions_uploaded_manifest():
remote_write = status["cloud_storage_mode"] in {
"full", "write_only"
}

if remote_write:
# TODO(vlad): do this differently?
# Create new partition tuples since the replicas list is not hashable
cloud_storage_partitions.add(
Partition(topic=p.topic,
index=p.index,
leader=p.leader,
replicas=None))

has_uploaded_manifest = status[
"metadata_update_pending"] is False or status.get(
'ms_since_last_manifest_upload', None) is not None
Expand All @@ -3819,50 +3897,119 @@ def all_partitions_uploaded_manifest():
timeout_sec=30 + timeout,
backoff_sec=1)

# We stop because the scrubbing routine would otherwise interpret
# ongoing uploads as inconsistency. In future, we may replace this
# stop with a flush, when Redpanda gets an admin API for explicitly
# flushing data to remote storage.
self.stop()
return cloud_storage_partitions

scrub_timeout = max(run_timeout, self.cloud_storage_scrub_timeout_s)
report = self._get_object_storage_report(timeout=scrub_timeout)
def wait_for_internal_scrub(self, cloud_storage_partitions):
"""
Configure the scrubber such that it will run aggresively
until the entire partition is scrubbed. Once that happens,
the scrubber will pause due to the `cloud_storage_full_scrub_interval_ms`
config. Returns the aggregated anomalies for all partitions after applying
filtering for expected damage.
"""
if not cloud_storage_partitions:
return None

# It is legal for tiered storage to leak objects under
# certain circumstances: this will remain the case until
# we implement background scrub + modify test code to
# insist on Redpanda completing its own scrub before
# we externally validate
# (https://github.com/redpanda-data/redpanda/issues/9072)
permitted_anomalies = {"segments_outside_manifest"}
self.set_cluster_config({
"cloud_storage_partial_scrub_interval_ms":
100,
"cloud_storage_full_scrub_interval_ms":
1000 * 60 * 10,
"cloud_storage_scrubbing_interval_jitter_ms":
100,
"cloud_storage_background_jobs_quota":
5000,
"cloud_storage_housekeeping_interval_ms":
100,
"cloud_storage_enable_segment_merging":
False
})

unavailable = set()
for p in cloud_storage_partitions:
try:
self._admin.reset_scrubbing_metadata(namespace="kafka",
topic=p.topic,
partition=p.index,
node=p.leader)
except HTTPError as he:
if he.response.status_code == 404:
# Old redpanda, doesn't have this endpoint. We can't
# do our upload check.
unavailable.add(p)
continue
else:
raise

cloud_storage_partitions -= unavailable
scrubbed = set()
all_anomalies = []

allowed_keys = set([
"ns", "topic", "partition", "revision_id", "last_complete_scrub_at"
])

expected_damage = self._si_settings.get_expected_damage()

def filter_anomalies(detected):
bad_delta_types = set(
["non_monotonical_delta", "mising_delta", "end_delta_smaller"])

for anomaly_type in expected_damage:
if anomaly_type == "ntpr_no_manifest":
detected.pop("missing_partition_manifest", None)
if anomaly_type == "missing_segments":
detected.pop("missing_segments", None)
if anomaly_type == "missing_spillover_manifests":
detected.pop("missing_spillover_manifests", None)
if anomaly_type == "ntpr_bad_deltas":
if metas := detected.get("segment_metadata_anomalies"):
metas = [
m for m in metas
if m["type"] not in bad_delta_types
]
if metas:
detected["segment_metadata_anomalies"] = metas
else:
detected.pop("segment_metadata_anomalies", None)
if anomaly_type == "ntpr_overlap_offsets":
if metas := detected.get("segment_metadata_anomalies"):
metas = [
m for m in metas if m["type"] != "offset_overlap"
]
if metas:
detected["segment_metadata_anomalies"] = metas
else:
detected.pop("segment_metadata_anomalies", None)
if anomaly_type == "metadata_offset_gaps":
if metas := detected.get("segment_metadata_anomalies"):
metas = [m for m in metas if m["type"] != "offset_gap"]
if metas:
detected["segment_metadata_anomalies"] = metas
else:
detected.pop("segment_metadata_anomalies", None)

def all_partitions_scrubbed():
waiting_for = cloud_storage_partitions - scrubbed
self.logger.info(
f"Waiting for {len(waiting_for)} partitions to be scrubbed")
for p in waiting_for:
result = self._admin.get_cloud_storage_anomalies(
namespace="kafka", topic=p.topic, partition=p.index)
if "last_complete_scrub_at" in result:
scrubbed.add(p)

# Whether any anomalies were found
any_anomalies = any(len(v) for v in report['anomalies'].values())
filter_anomalies(result)
if set(result.keys()) != allowed_keys:
all_anomalies.append(result)

# List of fatal anomalies found
fatal_anomalies = set(k for k, v in report['anomalies'].items()
if len(v) and k not in permitted_anomalies)
return len(waiting_for) == 0

if not any_anomalies:
self.logger.info(f"No anomalies in object storage scrub")
elif not fatal_anomalies:
self.logger.warn(
f"Non-fatal anomalies in remote storage: {json.dumps(report, indent=2)}"
)
else:
# Tests may declare that they expect some anomalies, e.g. if they
# intentionally damage the data.
if self._si_settings.is_damage_expected(fatal_anomalies):
self.logger.warn(
f"Tolerating anomalies in remote storage: {json.dumps(report, indent=2)}"
)
else:
self.logger.error(
f"Fatal anomalies in remote storage: {json.dumps(report, indent=2)}"
)
raise RuntimeError(
f"Object storage scrub detected fatal anomalies of type {fatal_anomalies}"
)
n_partitions = len(cloud_storage_partitions)
timeout = (n_partitions // 100 + 1) * 60
wait_until(all_partitions_scrubbed, timeout_sec=timeout, backoff_sec=5)

return all_anomalies

def set_expected_controller_records(self, max_records: Optional[int]):
self._expect_max_controller_records = max_records
Expand Down
2 changes: 1 addition & 1 deletion tests/rptest/tests/cloud_storage_scrubber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,4 +368,4 @@ def test_scrubber(self, cloud_storage_type):
# and fudges the manifest. rp-storage-tool also picks
# up on some of these things.
self.redpanda.si_settings.set_expected_damage(
{"missing_segments", "metadata_offset_gaps"})
{"missing_segments", "metadata_offset_gaps", "missing_spillover_manifests"})

0 comments on commit 766a497

Please sign in to comment.