Skip to content

Commit

Permalink
Pick up data streams in disk-usage-stats telemetry (#1455)
Browse files Browse the repository at this point in the history
Some of our tracks just declare data streams.
  • Loading branch information
nik9000 authored Mar 22, 2022
1 parent ae3c940 commit 45e43e2
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 21 deletions.
11 changes: 8 additions & 3 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ def create_es_clients(self):
es[cluster_name] = self.es_client_factory(cluster_hosts, cluster_client_options).create()
return es

def prepare_telemetry(self, es, enable, index_names):
def prepare_telemetry(self, es, enable, index_names, data_stream_names):
enabled_devices = self.config.opts("telemetry", "devices")
telemetry_params = self.config.opts("telemetry", "params")
log_root = paths.race_root(self.config)
Expand All @@ -612,7 +612,7 @@ def prepare_telemetry(self, es, enable, index_names):
telemetry.SearchableSnapshotsStats(telemetry_params, es, self.metrics_store),
telemetry.DataStreamStats(telemetry_params, es, self.metrics_store),
telemetry.IngestPipelineStats(es, self.metrics_store),
telemetry.DiskUsageStats(telemetry_params, es_default, self.metrics_store, index_names),
telemetry.DiskUsageStats(telemetry_params, es_default, self.metrics_store, index_names, data_stream_names),
]
else:
devices = []
Expand Down Expand Up @@ -659,7 +659,12 @@ def prepare_benchmark(self, t):

# Avoid issuing any requests to the target cluster when static responses are enabled. The results
# are not useful and attempts to connect to a non-existing cluster just lead to exception traces in logs.
self.prepare_telemetry(es_clients, enable=not uses_static_responses, index_names=self.track.index_names())
self.prepare_telemetry(
es_clients,
enable=not uses_static_responses,
index_names=self.track.index_names(),
data_stream_names=self.track.data_stream_names(),
)

for host in self.config.opts("driver", "load_driver_hosts"):
host_config = {
Expand Down
20 changes: 16 additions & 4 deletions esrally/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2238,28 +2238,40 @@ class DiskUsageStats(TelemetryDevice):
human_name = "Disk usage of each field"
help = "Runs the indices disk usage API after benchmarking"

def __init__(self, telemetry_params, client, metrics_store, index_names):
def __init__(self, telemetry_params, client, metrics_store, index_names, data_stream_names):
"""
:param telemetry_params: The configuration object for telemetry_params.
May specify:
``disk-usage-stats-indices``: Comma separated list of indices who's disk
usage to fetch. Default is all indices in the track.
:param client: The Elasticsearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param index_names: Names of indices defined by this track
:param data_stream_names: Names of data streams defined by this track
"""
super().__init__()
self.telemetry_params = telemetry_params
self.client = client
self.metrics_store = metrics_store
self.index_names = index_names
self.data_stream_names = data_stream_names

def on_benchmark_start(self):
self.indices = self.telemetry_params.get("disk-usage-stats-indices", ",".join(self.index_names + self.data_stream_names))
if not self.indices:
msg = (
"No indices defined for disk-usage-stats. Set disk-usage-stats-indices "
"telemetry param or add indices or data streams to the track config."
)
self.logger.exception(msg)
raise exceptions.RallyError(msg)

def on_benchmark_stop(self):
# pylint: disable=import-outside-toplevel
import elasticsearch

indices = self.telemetry_params.get("disk-usage-stats-indices", ",".join(self.index_names))
found = False
for index in indices.split(","):
for index in self.indices.split(","):
self.logger.debug("Gathering disk usage for [%s]", index)
try:
response = self.client.transport.perform_request("POST", f"/{index}/_disk_usage", params={"run_expensive_tasks": "true"})
Expand All @@ -2274,7 +2286,7 @@ def on_benchmark_stop(self):
found = True
self.handle_telemetry_usage(response)
if not found:
msg = f"Couldn't find any indices for disk usage {indices}"
msg = f"Couldn't find any indices for disk usage {self.indices}"
self.logger.exception(msg)
raise exceptions.RallyError(msg)

Expand Down
3 changes: 3 additions & 0 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,9 @@ def uncompressed_size_in_bytes(self):
def index_names(self):
return [i.name for i in self.indices]

def data_stream_names(self):
return [i.name for i in self.data_streams]

def __str__(self):
return self.name

Expand Down
78 changes: 64 additions & 14 deletions tests/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4150,24 +4150,58 @@ def test_logs_warning_on_missing_stats(self, metrics_put_value_cluster_level, me

class TestDiskUsageStats:
@mock.patch("elasticsearch.Elasticsearch")
def test_uses_indices_param_by_default(self, es):
def test_uses_indices_by_default(self, es):
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
tc = TransportClient(response={"_shards": {"failed": 0}})
es = Client(transport_client=tc)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo", "bar"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
assert tc.args == [
("POST", "/foo/_disk_usage"),
("POST", "/bar/_disk_usage"),
]

@mock.patch("elasticsearch.Elasticsearch")
def test_uses_data_streams_by_default(self, es):
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
tc = TransportClient(response={"_shards": {"failed": 0}})
es = Client(transport_client=tc)
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=[], data_stream_names=["foo", "bar"])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
assert tc.args == [
("POST", "/foo/_disk_usage"),
("POST", "/bar/_disk_usage"),
]

@mock.patch("elasticsearch.Elasticsearch")
def test_uses_indices_param_if_specified_instead_of_index_names(self, es):
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
tc = TransportClient(response={"_shards": {"failed": 0}})
es = Client(transport_client=tc)
device = telemetry.DiskUsageStats(
{"disk-usage-stats-indices": "foo,bar"}, es, metrics_store, index_names=["baz"], data_stream_names=[]
)
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
assert tc.args == [("POST", "/foo/_disk_usage"), ("POST", "/bar/_disk_usage")]

@mock.patch("elasticsearch.Elasticsearch")
def test_uses_indices_param_if_specified(self, es):
def test_uses_indices_param_if_specified_instead_of_data_stream_names(self, es):
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
tc = TransportClient(response={"_shards": {"failed": 0}})
es = Client(transport_client=tc)
device = telemetry.DiskUsageStats({"disk-usage-stats-indices": "foo,bar"}, es, metrics_store, ["baz"])
device = telemetry.DiskUsageStats(
{"disk-usage-stats-indices": "foo,bar"}, es, metrics_store, index_names=[], data_stream_names=["baz"]
)
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand All @@ -4183,13 +4217,29 @@ def test_error_on_retrieval_does_not_store_metrics(self, metrics_store_cluster_l
error=elasticsearch.RequestError,
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
with pytest.raises(exceptions.RallyError):
t.on_benchmark_stop()
assert metrics_store_cluster_level.call_count == 0

@mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level")
def test_no_indices_fails(self, metrics_store_cluster_level):
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
es = Client(
transport_client=TransportClient(
force_error=True,
error=elasticsearch.RequestError,
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=[], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
with pytest.raises(exceptions.RallyError):
t.on_benchmark_start()
assert metrics_store_cluster_level.call_count == 0

@mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level")
def test_missing_all_fails(self, metrics_store_cluster_level):
cfg = create_config()
Expand All @@ -4200,7 +4250,7 @@ def test_missing_all_fails(self, metrics_store_cluster_level):
error=elasticsearch.RequestError,
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo", "bar"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
with pytest.raises(exceptions.RallyError):
Expand Down Expand Up @@ -4245,7 +4295,7 @@ def perform_request(self, *args, **kwargs):
)

es = Client(transport_client=TwoTransportClients(not_found_transport_client, successful_client))
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo", "bar"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand All @@ -4262,7 +4312,7 @@ def test_successful_shards(self, metrics_store_cluster_level):
}
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand All @@ -4279,7 +4329,7 @@ def test_unsuccessful_shards(self, metrics_store_cluster_level):
}
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
with pytest.raises(exceptions.RallyError):
Expand All @@ -4305,7 +4355,7 @@ def test_source(self, metrics_store_cluster_level):
}
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand Down Expand Up @@ -4334,7 +4384,7 @@ def test_id(self, metrics_store_cluster_level):
}
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand Down Expand Up @@ -4371,7 +4421,7 @@ def test_empty_field(self, metrics_store_cluster_level):
}
)
)
device = telemetry.DiskUsageStats({"disk-usage-stats-indices": "foo"}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({"disk-usage-stats-indices": "foo"}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand Down Expand Up @@ -4399,7 +4449,7 @@ def test_number(self, metrics_store_cluster_level):
}
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand Down Expand Up @@ -4429,7 +4479,7 @@ def test_keyword(self, metrics_store_cluster_level):
}
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand Down

0 comments on commit 45e43e2

Please sign in to comment.