Skip to content

Commit

Permalink
ducktape/cloud_storage: Wait for downloads in test
Browse files Browse the repository at this point in the history
In reader stress test the number of active kafka connections is expected
to be zero after a deadline. However when chunked read path is used,
readers are blocked while downloading chunks from cloud storage. The
readers will only detect that kafka connection has aborted after an
active download is finished.

This change adds a wait for all hydrations to finish before asserting
that connections are down to zero. This wait is for 15 seconds which may
be a little more than necessary, but it is hard to predict how long
current downloads will take.

On the other hand the wait for connections to go down after downloads
are finished is reduced to three seconds. In practice the connections
should go down much sooner but three seconds is a conservative estimate.

Additionally after all connections are done, we assert that no new
downloads are started.
  • Loading branch information
abhijat committed Nov 6, 2023
1 parent ac0546e commit 30c3f3a
Showing 1 changed file with 42 additions and 20 deletions.
62 changes: 42 additions & 20 deletions tests/rptest/scale_tests/tiered_storage_reader_stress_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.tests.redpanda_test import RedpandaTest
import concurrent.futures
import threading
import time

from rptest.clients.rpk import RpkTool, RpkException
from rptest.services.cluster import cluster
from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierSeqConsumer
from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.services.redpanda import SISettings, MetricsEndpoint, ResourceSettings
from rptest.clients.rpk import RpkTool, RpkException
import concurrent.futures
from rptest.tests.redpanda_test import RedpandaTest
from rptest.utils.si_utils import quiesce_uploads
import time
import threading


class TieredStorageReaderStressTest(RedpandaTest):
Expand Down Expand Up @@ -81,7 +82,7 @@ def _produce_and_quiesce(self, topic_name: str, msg_size: int,
**kwargs)
produce_duration = time.time() - t1
self.logger.info(
f"Produced {data_size} bytes in {produce_duration} seconds, {(data_size/produce_duration)/1000000.0:.2f}MB/s"
f"Produced {data_size} bytes in {produce_duration} seconds, {(data_size / produce_duration) / 1000000.0:.2f}MB/s"
)

quiesce_uploads(
Expand Down Expand Up @@ -117,6 +118,7 @@ def _get_stats(self):
segment_reader_delay_count = 0
materialize_segment_delay_count = 0
connection_count = 0
hydrations_count = 0
for family in metrics:
for sample in family.samples:
if sample.name == "redpanda_cloud_storage_readers":
Expand All @@ -132,6 +134,11 @@ def _get_stats(self):
elif sample.name == "redpanda_rpc_active_connections":
if sample.labels["redpanda_server"] == "kafka":
connection_count += int(sample.value)
for family in self.redpanda.metrics(
node, metrics_endpoint=MetricsEndpoint.METRICS):
for sample in family.samples:
if sample.name == "vectorized_cloud_storage_read_path_hydrations_in_progress_total":
hydrations_count += int(sample.value)

stats = {
"segment_readers": segment_reader_count,
Expand All @@ -140,7 +147,8 @@ def _get_stats(self):
"partition_readers_delayed": partition_reader_delay_count,
"materialize_segments_delayed":
materialize_segment_delay_count,
"connections": connection_count
"connections": connection_count,
"hydrations_count": hydrations_count
}
self.logger.debug(f"stats[{node.name}] = {stats}")
results[node] = stats
Expand Down Expand Up @@ -293,25 +301,39 @@ def stats_watcher():
# all have been trimmed.
assert stats['segment_readers']

def connections_closed():
def wait_for_stat(fn, label):
for node, stats in self._get_stats().items():
if stats['connections'] > 0:
if metric := fn(stats):
self.logger.debug(
f"Node {node.name} still has {stats['connections']} connections open"
)
f"Node {node.name} still has {metric} {label}")
return False

return True

self.logger.info("Waiting for all Kafka connections to close")
"""
Active chunk hydrations block remote segment readers during reader creation,
which in turn keep kafka connections open. A connection can only be closed
once the download finishes, so we wait for active downloads to finish first.
"""
self.logger.info("Waiting for active hydrations to finish")
self.redpanda.wait_until(
lambda: wait_for_stat(lambda s: s['hydrations_count'],
'active hydrations'),
timeout_sec=15,
backoff_sec=1,
err_msg="Waiting for active hydrations to finish")

default_timequery_timeout = 5
# Once downloads are done, connections should be closed very shortly after.
self.logger.info("Waiting for all Kafka connections to close")
self.redpanda.wait_until(
connections_closed,
timeout_sec=default_timequery_timeout,
lambda: wait_for_stat(lambda s: s["connections"],
"kafka connections"),
timeout_sec=3,
backoff_sec=1,
err_msg="Waiting for Kafka connections to close")

# TODO: we should assert here that the connection count is at zero,
# and we should assert that no new downloads start after all our
# clients have stopped.
# No new downloads should be started once all reader activity is done
for _ in range(10):
time.sleep(0.5)
assert wait_for_stat(
lambda s: s['hydrations_count'], 'active hydrations'
), 'found an active hydration where none expected'

0 comments on commit 30c3f3a

Please sign in to comment.