diff --git a/tests/rptest/scale_tests/tiered_storage_reader_stress_test.py b/tests/rptest/scale_tests/tiered_storage_reader_stress_test.py index 904ce0feece10..46f22ed496a3b 100644 --- a/tests/rptest/scale_tests/tiered_storage_reader_stress_test.py +++ b/tests/rptest/scale_tests/tiered_storage_reader_stress_test.py @@ -7,15 +7,16 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 -from rptest.tests.redpanda_test import RedpandaTest +import concurrent.futures +import threading +import time + +from rptest.clients.rpk import RpkTool, RpkException from rptest.services.cluster import cluster -from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierSeqConsumer +from rptest.services.kgo_verifier_services import KgoVerifierProducer from rptest.services.redpanda import SISettings, MetricsEndpoint, ResourceSettings -from rptest.clients.rpk import RpkTool, RpkException -import concurrent.futures +from rptest.tests.redpanda_test import RedpandaTest from rptest.utils.si_utils import quiesce_uploads -import time -import threading class TieredStorageReaderStressTest(RedpandaTest): @@ -81,7 +82,7 @@ def _produce_and_quiesce(self, topic_name: str, msg_size: int, **kwargs) produce_duration = time.time() - t1 self.logger.info( - f"Produced {data_size} bytes in {produce_duration} seconds, {(data_size/produce_duration)/1000000.0:.2f}MB/s" + f"Produced {data_size} bytes in {produce_duration} seconds, {(data_size / produce_duration) / 1000000.0:.2f}MB/s" ) quiesce_uploads( @@ -117,6 +118,7 @@ def _get_stats(self): segment_reader_delay_count = 0 materialize_segment_delay_count = 0 connection_count = 0 + hydrations_count = 0 for family in metrics: for sample in family.samples: if sample.name == "redpanda_cloud_storage_readers": @@ -132,6 +134,11 @@ def _get_stats(self): elif sample.name == "redpanda_rpc_active_connections": if sample.labels["redpanda_server"] == "kafka": connection_count += int(sample.value) + for family in self.redpanda.metrics( + node, metrics_endpoint=MetricsEndpoint.METRICS): + for sample in family.samples: + if sample.name == "vectorized_cloud_storage_read_path_hydrations_in_progress_total": + hydrations_count += int(sample.value) stats = { "segment_readers": segment_reader_count, @@ -140,7 +147,8 @@ def _get_stats(self): "partition_readers_delayed": partition_reader_delay_count, "materialize_segments_delayed": materialize_segment_delay_count, - "connections": connection_count + "connections": connection_count, + "hydrations_count": hydrations_count } self.logger.debug(f"stats[{node.name}] = {stats}") results[node] = stats @@ -293,25 +301,39 @@ def stats_watcher(): # all have been trimmed. assert stats['segment_readers'] - def connections_closed(): + def wait_for_stat(fn, label): for node, stats in self._get_stats().items(): - if stats['connections'] > 0: + if metric := fn(stats): self.logger.debug( - f"Node {node.name} still has {stats['connections']} connections open" - ) + f"Node {node.name} still has {metric} {label}") return False - return True - self.logger.info("Waiting for all Kafka connections to close") + """ + Active chunk hydrations block remote segment readers during reader creation, + which in turn keep kafka connections open. A connection can only be closed + once the download finishes, so we wait for active downloads to finish first. + """ + self.logger.info("Waiting for active hydrations to finish") + self.redpanda.wait_until( + lambda: wait_for_stat(lambda s: s['hydrations_count'], + 'active hydrations'), + timeout_sec=15, + backoff_sec=1, + err_msg="Waiting for active hydrations to finish") - default_timequery_timeout = 5 + # Once downloads are done, connections should be closed very shortly after. + self.logger.info("Waiting for all Kafka connections to close") self.redpanda.wait_until( - connections_closed, - timeout_sec=default_timequery_timeout, + lambda: wait_for_stat(lambda s: s["connections"], + "kafka connections"), + timeout_sec=3, backoff_sec=1, err_msg="Waiting for Kafka connections to close") - # TODO: we should assert here that the connection count is at zero, - # and we should assert that no new downloads start after all our - # clients have stopped. + # No new downloads should be started once all reader activity is done + for _ in range(10): + time.sleep(0.5) + assert wait_for_stat( + lambda s: s['hydrations_count'], 'active hydrations' + ), 'found an active hydration where none expected'