Skip to content

Commit

Permalink
rptest: perform internal scrub before stopping
Browse files Browse the repository at this point in the history
This commit updates the RedpandaService such that it will perform an
internal scrub (as opposed to external via rp-storage-tool) at the end
of all tiered storage tests. This is achieved by configuring the
scrubber to be aggresive until it reaches the end of the log. Once that
happens, it will wait for the full scrub timeout (10 minutes in this
commit).
  • Loading branch information
Vlad Lazar committed Oct 23, 2023
1 parent 13e273f commit e66dc85
Showing 1 changed file with 177 additions and 45 deletions.
222 changes: 177 additions & 45 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,9 @@ def set_expected_damage(self, damage_types: set[str]):
def is_damage_expected(self, damage_types: set[str]):
return (damage_types & self._expected_damage_types) == damage_types

def get_expected_damage(self):
return self._expected_damage_types

@classmethod
def cache_size_for_throughput(cls, throughput_bytes: int) -> int:
"""
Expand Down Expand Up @@ -675,8 +678,8 @@ def __init__(self):
# sasl is required
def sasl_enabled(self):
return (self.kafka_enable_authorization is None and self.enable_sasl
and self.endpoint_authn_method
is None) or self.endpoint_authn_method == "sasl"
and self.endpoint_authn_method is None
) or self.endpoint_authn_method == "sasl"

# principal is extracted from mtls distinguished name
def mtls_identity_enabled(self):
Expand Down Expand Up @@ -2407,10 +2410,8 @@ def set_cluster_config(self,
admin_client = self._admin

patch_result = admin_client.patch_cluster_config(
upsert={
k: v
for k, v in values.items() if v is not None
},
upsert={k: v
for k, v in values.items() if v is not None},
remove=[k for k, v in values.items() if v is None])
new_version = patch_result['config_version']

Expand Down Expand Up @@ -2805,6 +2806,11 @@ def stop(self, **kwargs):
"""
Override default stop() to execude stop_node in parallel
"""

# If tiered storage is enabled, wait for Redpanda to perform
# a full internal scrub of all partitions
self.maybe_do_internal_scrub()

self._stop_time = time.time() # The last time stop is invoked
self.logger.info("%s: exporting cluster config" % self.who_am_i())

Expand Down Expand Up @@ -3771,6 +3777,72 @@ def stop_and_scrub_object_storage(self, run_timeout=300):
# :param run_timeout timeout for the execution of rp-storage-tool.
# can be set to None for no timeout

# We stop because the scrubbing routine would otherwise interpret
# ongoing uploads as inconsistency. In future, we may replace this
# stop with a flush, when Redpanda gets an admin API for explicitly
# flushing data to remote storage.
self.wait_for_manifest_uploads()
self.stop()

scrub_timeout = max(run_timeout, self.cloud_storage_scrub_timeout_s)
report = self._get_object_storage_report(timeout=scrub_timeout)

# It is legal for tiered storage to leak objects under
# certain circumstances: this will remain the case until
# we implement background scrub + modify test code to
# insist on Redpanda completing its own scrub before
# we externally validate
# (https://github.com/redpanda-data/redpanda/issues/9072)
permitted_anomalies = {"segments_outside_manifest"}

# Whether any anomalies were found
any_anomalies = any(len(v) for v in report['anomalies'].values())

# List of fatal anomalies found
fatal_anomalies = set(k for k, v in report['anomalies'].items()
if len(v) and k not in permitted_anomalies)

if not any_anomalies:
self.logger.info(f"No anomalies in object storage scrub")
elif not fatal_anomalies:
self.logger.warn(
f"Non-fatal anomalies in remote storage: {json.dumps(report, indent=2)}"
)
else:
# Tests may declare that they expect some anomalies, e.g. if they
# intentionally damage the data.
if self._si_settings.is_damage_expected(fatal_anomalies):
self.logger.warn(
f"Tolerating anomalies in remote storage: {json.dumps(report, indent=2)}"
)
else:
self.logger.error(
f"Fatal anomalies in remote storage: {json.dumps(report, indent=2)}"
)
raise RuntimeError(
f"Object storage scrub detected fatal anomalies of type {fatal_anomalies}"
)

if self._internal_scrub_results:
self.logger.info(f"No anomalies in internal object storage scrub")
else:
self.logger.error(
"Fatal anomalies reported by internal scrub: "
f"{json.dumps(self._internal_scrub_results, indent=2)}")
raise RuntimeError(
f"Internal object storage scrub detected fatal anomalies: {self._internal_scrub_results}"
)

def maybe_do_internal_scrub(self):
if self._si_settings:
cloud_partitions = self.wait_for_manifest_uploads()
# Reported by RedpandaService.stop_and_scrub_object_storage
self._internal_scrub_results = self.wait_for_internal_scrub(
cloud_partitions)

def wait_for_manifest_uploads(self) -> set[Partition]:
cloud_storage_partitions: set[Partition] = set()

def all_partitions_uploaded_manifest():
manifest_not_uploaded = []
for p in self.partitions():
Expand All @@ -3788,6 +3860,16 @@ def all_partitions_uploaded_manifest():
remote_write = status["cloud_storage_mode"] in {
"full", "write_only"
}

if remote_write:
# TODO(vlad): do this differently?
# Create new partition tuples since the replicas list is not hashable
cloud_storage_partitions.add(
Partition(topic=p.topic,
index=p.index,
leader=p.leader,
replicas=None))

has_uploaded_manifest = status[
"metadata_update_pending"] is False or status.get(
'ms_since_last_manifest_upload', None) is not None
Expand All @@ -3813,50 +3895,100 @@ def all_partitions_uploaded_manifest():
timeout_sec=30 + timeout,
backoff_sec=1)

# We stop because the scrubbing routine would otherwise interpret
# ongoing uploads as inconsistency. In future, we may replace this
# stop with a flush, when Redpanda gets an admin API for explicitly
# flushing data to remote storage.
self.stop()
return cloud_storage_partitions

scrub_timeout = max(run_timeout, self.cloud_storage_scrub_timeout_s)
report = self._get_object_storage_report(timeout=scrub_timeout)
def wait_for_internal_scrub(self, cloud_storage_partitions):
"""
Configure the scrubber such that it will run aggresively
until the entire partition is scrubbed. Once that happens,
the scrubber will pause due to the `cloud_storage_full_scrub_interval_ms`
config. Returns the aggregated anomalies for all partitions after applying
filtering for expected damage.
"""
self.set_cluster_config({
"cloud_storage_partial_scrub_interval_ms":
100,
"cloud_storage_full_scrub_interval_ms":
1000 * 60 * 10,
"cloud_storage_scrubbing_interval_jitter_ms":
100,
"cloud_storage_background_jobs_quota":
5000,
"cloud_storage_housekeeping_interval_ms":
100,
"cloud_storage_enable_segment_merging":
False
})

unavailable = set()
for p in cloud_storage_partitions:
try:
self._admin.reset_scrubbing_metadata(namespace="kafka",
topic=p.topic,
partition=p.index,
node=p.leader)
except HTTPError as he:
if he.response.status_code == 404:
# Old redpanda, doesn't have this endpoint. We can't
# do our upload check.
unavailable.add(p)
continue
else:
raise

cloud_storage_partitions -= unavailable
scrubbed = set()
all_anomalies = []

allowed_keys = set([
"ns", "topic", "partition", "revivion_id", "last_complete_scrub_at"
])

expected_damage = self._si_settings.get_expected_damage()

def filter_anomalies(detected):
bad_delta_types = set(
["non_monotonical_delta", "mising_delta", "end_delta_smaller"])

for anomaly_type in expected_damage:
if anomaly_type == "ntpr_no_manifest":
detected.pop("missing_partition_manifest", None)
if anomaly_type == "missing_segments":
detected.pop("missing_segments", None)
if anomaly_type == "ntpr_bad_deltas":
if metas := detected.get("segment_metadata_anomalies"):
metas = [
m for m in metas
if m["type"] not in bad_delta_types
]
if anomaly_type == "ntpr_overlap_offsets":
if metas := detected.get("segment_metadata_anomalies"):
metas = [
m for m in metas if m["type"] != "offset_overlap"
]
if anomaly_type == "metadata_offset_gaps":
if metas := detected.get("segment_metadata_anomalies"):
metas = [m for m in metas if m["type"] != "offset_gap"]

def all_partitions_scrubbed():
waiting_for = cloud_storage_partitions - scrubbed
self.logger.info(
f"Waiting for {len(waiting_for)} partitions to be scrubbed")
for p in waiting_for:
result = self._admin.get_cloud_storage_anomalies(
namespace="kafka", topic=p.topic, partition=p.index)
if "last_complete_scrub_at" in result:
scrubbed.add(p)

# It is legal for tiered storage to leak objects under
# certain circumstances: this will remain the case until
# we implement background scrub + modify test code to
# insist on Redpanda completing its own scrub before
# we externally validate
# (https://github.com/redpanda-data/redpanda/issues/9072)
permitted_anomalies = {"segments_outside_manifest"}
filter_anomalies(result)
if set(result.keys()) != allowed_keys:
all_anomalies.append(result)

# Whether any anomalies were found
any_anomalies = any(len(v) for v in report['anomalies'].values())
return len(waiting_for) == 0

# List of fatal anomalies found
fatal_anomalies = set(k for k, v in report['anomalies'].items()
if len(v) and k not in permitted_anomalies)
wait_until(all_partitions_scrubbed, timeout_sec=60, backoff_sec=1)

if not any_anomalies:
self.logger.info(f"No anomalies in object storage scrub")
elif not fatal_anomalies:
self.logger.warn(
f"Non-fatal anomalies in remote storage: {json.dumps(report, indent=2)}"
)
else:
# Tests may declare that they expect some anomalies, e.g. if they
# intentionally damage the data.
if self._si_settings.is_damage_expected(fatal_anomalies):
self.logger.warn(
f"Tolerating anomalies in remote storage: {json.dumps(report, indent=2)}"
)
else:
self.logger.error(
f"Fatal anomalies in remote storage: {json.dumps(report, indent=2)}"
)
raise RuntimeError(
f"Object storage scrub detected fatal anomalies of type {fatal_anomalies}"
)
return all_anomalies

def set_expected_controller_records(self, max_records: Optional[int]):
self._expect_max_controller_records = max_records
Expand Down

0 comments on commit e66dc85

Please sign in to comment.