From 30c3f3ad51b1ed7162464cb7a8a60f433dd2f976 Mon Sep 17 00:00:00 2001 From: Abhijat Malviya Date: Fri, 3 Nov 2023 17:05:50 +0530 Subject: [PATCH] ducktape/cloud_storage: Wait for downloads in test In reader stress test the number of active kafka connections is expected to be zero after a deadline. However when chunked read path is used, readers are blocked while downloading chunks from cloud storage. The readers will only detect that kafka connection has aborted after an active download is finished. This change adds a wait for all hydrations to finish before asserting that connections are down to zero. This wait is for 15 seconds which may be a little more than necessary, but it is hard to predict how long current downloads will take. On the other hand the wait for connections to go down after downloads are finished is reduced to three seconds. In practice the connections should go down much sooner but three seconds is a conservative estimate. Additionally after all connections are done, we assert that no new downloads are started. --- .../tiered_storage_reader_stress_test.py | 62 +++++++++++++------ 1 file changed, 42 insertions(+), 20 deletions(-) diff --git a/tests/rptest/scale_tests/tiered_storage_reader_stress_test.py b/tests/rptest/scale_tests/tiered_storage_reader_stress_test.py index 904ce0feece10..46f22ed496a3b 100644 --- a/tests/rptest/scale_tests/tiered_storage_reader_stress_test.py +++ b/tests/rptest/scale_tests/tiered_storage_reader_stress_test.py @@ -7,15 +7,16 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 -from rptest.tests.redpanda_test import RedpandaTest +import concurrent.futures +import threading +import time + +from rptest.clients.rpk import RpkTool, RpkException from rptest.services.cluster import cluster -from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierSeqConsumer +from rptest.services.kgo_verifier_services import KgoVerifierProducer from rptest.services.redpanda import SISettings, MetricsEndpoint, ResourceSettings -from rptest.clients.rpk import RpkTool, RpkException -import concurrent.futures +from rptest.tests.redpanda_test import RedpandaTest from rptest.utils.si_utils import quiesce_uploads -import time -import threading class TieredStorageReaderStressTest(RedpandaTest): @@ -81,7 +82,7 @@ def _produce_and_quiesce(self, topic_name: str, msg_size: int, **kwargs) produce_duration = time.time() - t1 self.logger.info( - f"Produced {data_size} bytes in {produce_duration} seconds, {(data_size/produce_duration)/1000000.0:.2f}MB/s" + f"Produced {data_size} bytes in {produce_duration} seconds, {(data_size / produce_duration) / 1000000.0:.2f}MB/s" ) quiesce_uploads( @@ -117,6 +118,7 @@ def _get_stats(self): segment_reader_delay_count = 0 materialize_segment_delay_count = 0 connection_count = 0 + hydrations_count = 0 for family in metrics: for sample in family.samples: if sample.name == "redpanda_cloud_storage_readers": @@ -132,6 +134,11 @@ def _get_stats(self): elif sample.name == "redpanda_rpc_active_connections": if sample.labels["redpanda_server"] == "kafka": connection_count += int(sample.value) + for family in self.redpanda.metrics( + node, metrics_endpoint=MetricsEndpoint.METRICS): + for sample in family.samples: + if sample.name == "vectorized_cloud_storage_read_path_hydrations_in_progress_total": + hydrations_count += int(sample.value) stats = { "segment_readers": segment_reader_count, @@ -140,7 +147,8 @@ def _get_stats(self): "partition_readers_delayed": partition_reader_delay_count, "materialize_segments_delayed": materialize_segment_delay_count, - "connections": connection_count + "connections": connection_count, + "hydrations_count": hydrations_count } self.logger.debug(f"stats[{node.name}] = {stats}") results[node] = stats @@ -293,25 +301,39 @@ def stats_watcher(): # all have been trimmed. assert stats['segment_readers'] - def connections_closed(): + def wait_for_stat(fn, label): for node, stats in self._get_stats().items(): - if stats['connections'] > 0: + if metric := fn(stats): self.logger.debug( - f"Node {node.name} still has {stats['connections']} connections open" - ) + f"Node {node.name} still has {metric} {label}") return False - return True - self.logger.info("Waiting for all Kafka connections to close") + """ + Active chunk hydrations block remote segment readers during reader creation, + which in turn keep kafka connections open. A connection can only be closed + once the download finishes, so we wait for active downloads to finish first. + """ + self.logger.info("Waiting for active hydrations to finish") + self.redpanda.wait_until( + lambda: wait_for_stat(lambda s: s['hydrations_count'], + 'active hydrations'), + timeout_sec=15, + backoff_sec=1, + err_msg="Waiting for active hydrations to finish") - default_timequery_timeout = 5 + # Once downloads are done, connections should be closed very shortly after. + self.logger.info("Waiting for all Kafka connections to close") self.redpanda.wait_until( - connections_closed, - timeout_sec=default_timequery_timeout, + lambda: wait_for_stat(lambda s: s["connections"], + "kafka connections"), + timeout_sec=3, backoff_sec=1, err_msg="Waiting for Kafka connections to close") - # TODO: we should assert here that the connection count is at zero, - # and we should assert that no new downloads start after all our - # clients have stopped. + # No new downloads should be started once all reader activity is done + for _ in range(10): + time.sleep(0.5) + assert wait_for_stat( + lambda s: s['hydrations_count'], 'active hydrations' + ), 'found an active hydration where none expected'