diff --git a/tests/rptest/scale_tests/tiered_storage_reader_stress_test.py b/tests/rptest/scale_tests/tiered_storage_reader_stress_test.py index e10f16bbc4a33..805256532637a 100644 --- a/tests/rptest/scale_tests/tiered_storage_reader_stress_test.py +++ b/tests/rptest/scale_tests/tiered_storage_reader_stress_test.py @@ -122,6 +122,7 @@ def _get_stats(self): segment_reader_delay_count = 0 materialize_segment_delay_count = 0 connection_count = 0 + hydrations_count = 0 for family in metrics: for sample in family.samples: if sample.name == "redpanda_cloud_storage_readers": @@ -137,6 +138,11 @@ def _get_stats(self): elif sample.name == "redpanda_rpc_active_connections": if sample.labels["redpanda_server"] == "kafka": connection_count += int(sample.value) + for family in self.redpanda.metrics( + node, metrics_endpoint=MetricsEndpoint.METRICS): + for sample in family.samples: + if sample.name == "vectorized_cloud_storage_read_path_hydrations_in_progress_total": + hydrations_count += int(sample.value) stats = { "segment_readers": segment_reader_count, @@ -145,7 +151,8 @@ def _get_stats(self): "partition_readers_delayed": partition_reader_delay_count, "materialize_segments_delayed": materialize_segment_delay_count, - "connections": connection_count + "connections": connection_count, + "hydrations_count": hydrations_count } self.logger.debug(f"stats[{node.name}] = {stats}") results[node] = stats @@ -298,29 +305,32 @@ def stats_watcher(): # all have been trimmed. assert stats['segment_readers'] - def connections_closed(): - for process in psutil.process_iter(): - if 'rpk' in process.name(): - parent = psutil.Process(process.ppid()) - self.logger.warn(f'An rpk is still running: {process}, ' - f'cmd: {process.cmdline()}, ' - f'parent: {parent}, ' - f'parent cmd: {parent.cmdline()}') + def wait_for_stat(fn, label): for node, stats in self._get_stats().items(): - if stats['connections'] > 0: + if metric := fn(stats): self.logger.debug( - f"Node {node.name} still has {stats['connections']} connections open" - ) + f"Node {node.name} still has {metric} {label}") return False - return True - self.logger.info("Waiting for all Kafka connections to close") + """ + Active chunk hydrations block remote segment readers, which in turn keep kafka connections open. + A connection can only be closed once the download finishes, so we wait for active downloads to finish first. + """ + self.logger.info("Waiting for active hydrations to finish") + self.redpanda.wait_until( + lambda: wait_for_stat(lambda s: s['hydrations_count'], + 'active hydrations'), + timeout_sec=15, + backoff_sec=1, + err_msg="Waiting for active hydrations to finish") - default_timequery_timeout = 5 + # Once downloads are done, connections should be closed very shortly after. + self.logger.info("Waiting for all Kafka connections to close") self.redpanda.wait_until( - connections_closed, - timeout_sec=default_timequery_timeout, + lambda: wait_for_stat(lambda s: s["connections"], + "kafka connections"), + timeout_sec=3, backoff_sec=1, err_msg="Waiting for Kafka connections to close")