Skip to content

Commit

Permalink
ducktape/cloud_storage: Track active hydrations
Browse files Browse the repository at this point in the history
In reader stress test the number of active kafka connections is expected
to be zero after a deadline. However when chunked read path is used,
readers are blocked while downloading chunks from cloud storage. The
readers will only detect that kafka connection has aborted after an
active download is finished.

This change adds a wait for all hydrations to finish before asserting
that connections are down to zero. This wait is for 15 seconds which may
be a little more than necessary, but it is hard to predict how long
current downloads will take.

On the other hand the wait for connections to go down after downloads
are finished is reduced to three seconds. In practice the connections
should go down much sooner but three seconds is a conservative estimate.
  • Loading branch information
abhijat committed Nov 3, 2023
1 parent 55c32d0 commit be358b8
Showing 1 changed file with 27 additions and 17 deletions.
44 changes: 27 additions & 17 deletions tests/rptest/scale_tests/tiered_storage_reader_stress_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def _get_stats(self):
segment_reader_delay_count = 0
materialize_segment_delay_count = 0
connection_count = 0
hydrations_count = 0
for family in metrics:
for sample in family.samples:
if sample.name == "redpanda_cloud_storage_readers":
Expand All @@ -137,6 +138,11 @@ def _get_stats(self):
elif sample.name == "redpanda_rpc_active_connections":
if sample.labels["redpanda_server"] == "kafka":
connection_count += int(sample.value)
for family in self.redpanda.metrics(
node, metrics_endpoint=MetricsEndpoint.METRICS):
for sample in family.samples:
if sample.name == "vectorized_cloud_storage_read_path_hydrations_in_progress_total":
hydrations_count += int(sample.value)

stats = {
"segment_readers": segment_reader_count,
Expand All @@ -145,7 +151,8 @@ def _get_stats(self):
"partition_readers_delayed": partition_reader_delay_count,
"materialize_segments_delayed":
materialize_segment_delay_count,
"connections": connection_count
"connections": connection_count,
"hydrations_count": hydrations_count
}
self.logger.debug(f"stats[{node.name}] = {stats}")
results[node] = stats
Expand Down Expand Up @@ -298,29 +305,32 @@ def stats_watcher():
# all have been trimmed.
assert stats['segment_readers']

def connections_closed():
for process in psutil.process_iter():
if 'rpk' in process.name():
parent = psutil.Process(process.ppid())
self.logger.warn(f'An rpk is still running: {process}, '
f'cmd: {process.cmdline()}, '
f'parent: {parent}, '
f'parent cmd: {parent.cmdline()}')
def wait_for_stat(fn, label):
for node, stats in self._get_stats().items():
if stats['connections'] > 0:
if metric := fn(stats):
self.logger.debug(
f"Node {node.name} still has {stats['connections']} connections open"
)
f"Node {node.name} still has {metric} {label}")
return False

return True

self.logger.info("Waiting for all Kafka connections to close")
"""
Active chunk hydrations block remote segment readers, which in turn keep kafka connections open.
A connection can only be closed once the download finishes, so we wait for active downloads to finish first.
"""
self.logger.info("Waiting for active hydrations to finish")
self.redpanda.wait_until(
lambda: wait_for_stat(lambda s: s['hydrations_count'],
'active hydrations'),
timeout_sec=15,
backoff_sec=1,
err_msg="Waiting for active hydrations to finish")

default_timequery_timeout = 5
# Once downloads are done, connections should be closed very shortly after.
self.logger.info("Waiting for all Kafka connections to close")
self.redpanda.wait_until(
connections_closed,
timeout_sec=default_timequery_timeout,
lambda: wait_for_stat(lambda s: s["connections"],
"kafka connections"),
timeout_sec=3,
backoff_sec=1,
err_msg="Waiting for Kafka connections to close")

Expand Down

0 comments on commit be358b8

Please sign in to comment.