From d99da37bc4dcdde192faaa5722195dba6231069c Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 9 Mar 2022 11:45:09 -0500 Subject: [PATCH 1/2] Pick up data streams in disk-usage-stats telemetry Some of our tracks just declare data streams. --- esrally/driver/driver.py | 11 ++++++--- esrally/telemetry.py | 20 ++++++++++++---- esrally/track/track.py | 3 +++ tests/telemetry_test.py | 51 +++++++++++++++++++++++++++++----------- 4 files changed, 64 insertions(+), 21 deletions(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 808fedc5f..2fa16ed96 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -588,7 +588,7 @@ def create_es_clients(self): es[cluster_name] = self.es_client_factory(cluster_hosts, cluster_client_options).create() return es - def prepare_telemetry(self, es, enable, index_names): + def prepare_telemetry(self, es, enable, index_names, data_stream_names): enabled_devices = self.config.opts("telemetry", "devices") telemetry_params = self.config.opts("telemetry", "params") log_root = paths.race_root(self.config) @@ -612,7 +612,7 @@ def prepare_telemetry(self, es, enable, index_names): telemetry.SearchableSnapshotsStats(telemetry_params, es, self.metrics_store), telemetry.DataStreamStats(telemetry_params, es, self.metrics_store), telemetry.IngestPipelineStats(es, self.metrics_store), - telemetry.DiskUsageStats(telemetry_params, es_default, self.metrics_store, index_names), + telemetry.DiskUsageStats(telemetry_params, es_default, self.metrics_store, index_names, data_stream_names), ] else: devices = [] @@ -659,7 +659,12 @@ def prepare_benchmark(self, t): # Avoid issuing any requests to the target cluster when static responses are enabled. The results # are not useful and attempts to connect to a non-existing cluster just lead to exception traces in logs. - self.prepare_telemetry(es_clients, enable=not uses_static_responses, index_names=self.track.index_names()) + self.prepare_telemetry( + es_clients, + enable=not uses_static_responses, + index_names=self.track.index_names(), + data_stream_names=self.track.data_stream_names(), + ) for host in self.config.opts("driver", "load_driver_hosts"): host_config = { diff --git a/esrally/telemetry.py b/esrally/telemetry.py index 61ef988ce..2441e9202 100644 --- a/esrally/telemetry.py +++ b/esrally/telemetry.py @@ -2238,7 +2238,7 @@ class DiskUsageStats(TelemetryDevice): human_name = "Disk usage of each field" help = "Runs the indices disk usage API after benchmarking" - def __init__(self, telemetry_params, client, metrics_store, index_names): + def __init__(self, telemetry_params, client, metrics_store, index_names, data_stream_names): """ :param telemetry_params: The configuration object for telemetry_params. May specify: @@ -2246,20 +2246,32 @@ def __init__(self, telemetry_params, client, metrics_store, index_names): usage to fetch. Default is all indices in the track. :param client: The Elasticsearch client for this cluster. :param metrics_store: The configured metrics store we write to. + :param index_names: Names of indices defined by this track + :param data_stream_names: Names of data streams defined by this track """ super().__init__() self.telemetry_params = telemetry_params self.client = client self.metrics_store = metrics_store self.index_names = index_names + self.data_stream_names = data_stream_names + + def on_benchmark_start(self): + self.indices = self.telemetry_params.get("disk-usage-stats-indices", ",".join(self.index_names + self.data_stream_names)) + if not self.indices: + msg = ( + "No indices defined for disk-usage-stats. Set disk-usage-stats-indices " + "telemetry param or add indices or data streams to the track config." + ) + self.logger.exception(msg) + raise exceptions.RallyError(msg) def on_benchmark_stop(self): # pylint: disable=import-outside-toplevel import elasticsearch - indices = self.telemetry_params.get("disk-usage-stats-indices", ",".join(self.index_names)) found = False - for index in indices.split(","): + for index in self.indices.split(","): self.logger.debug("Gathering disk usage for [%s]", index) try: response = self.client.transport.perform_request("POST", f"/{index}/_disk_usage", params={"run_expensive_tasks": "true"}) @@ -2274,7 +2286,7 @@ def on_benchmark_stop(self): found = True self.handle_telemetry_usage(response) if not found: - msg = f"Couldn't find any indices for disk usage {indices}" + msg = f"Couldn't find any indices for disk usage {self.indices}" self.logger.exception(msg) raise exceptions.RallyError(msg) diff --git a/esrally/track/track.py b/esrally/track/track.py index 21505d3b9..05ee5a78c 100644 --- a/esrally/track/track.py +++ b/esrally/track/track.py @@ -533,6 +533,9 @@ def uncompressed_size_in_bytes(self): def index_names(self): return [i.name for i in self.indices] + def data_stream_names(self): + return [i.name for i in self.data_streams] + def __str__(self): return self.name diff --git a/tests/telemetry_test.py b/tests/telemetry_test.py index 0f1dfd366..346525328 100644 --- a/tests/telemetry_test.py +++ b/tests/telemetry_test.py @@ -4150,16 +4150,21 @@ def test_logs_warning_on_missing_stats(self, metrics_put_value_cluster_level, me class TestDiskUsageStats: @mock.patch("elasticsearch.Elasticsearch") - def test_uses_indices_param_by_default(self, es): + def test_uses_indices_and_data_streams_by_default(self, es): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) tc = TransportClient(response={"_shards": {"failed": 0}}) es = Client(transport_client=tc) - device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo", "bar"]) + device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=["baz", "bork"]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() - assert tc.args == [("POST", "/foo/_disk_usage"), ("POST", "/bar/_disk_usage")] + assert tc.args == [ + ("POST", "/foo/_disk_usage"), + ("POST", "/bar/_disk_usage"), + ("POST", "/baz/_disk_usage"), + ("POST", "/bork/_disk_usage"), + ] @mock.patch("elasticsearch.Elasticsearch") def test_uses_indices_param_if_specified(self, es): @@ -4167,7 +4172,9 @@ def test_uses_indices_param_if_specified(self, es): metrics_store = metrics.EsMetricsStore(cfg) tc = TransportClient(response={"_shards": {"failed": 0}}) es = Client(transport_client=tc) - device = telemetry.DiskUsageStats({"disk-usage-stats-indices": "foo,bar"}, es, metrics_store, ["baz"]) + device = telemetry.DiskUsageStats( + {"disk-usage-stats-indices": "foo,bar"}, es, metrics_store, index_names=["baz"], data_stream_names=["not_this"] + ) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() @@ -4183,13 +4190,29 @@ def test_error_on_retrieval_does_not_store_metrics(self, metrics_store_cluster_l error=elasticsearch.RequestError, ) ) - device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"]) + device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() with pytest.raises(exceptions.RallyError): t.on_benchmark_stop() assert metrics_store_cluster_level.call_count == 0 + @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") + def test_no_indices_fails(self, metrics_store_cluster_level): + cfg = create_config() + metrics_store = metrics.EsMetricsStore(cfg) + es = Client( + transport_client=TransportClient( + force_error=True, + error=elasticsearch.RequestError, + ) + ) + device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=[], data_stream_names=[]) + t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) + with pytest.raises(exceptions.RallyError): + t.on_benchmark_start() + assert metrics_store_cluster_level.call_count == 0 + @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") def test_missing_all_fails(self, metrics_store_cluster_level): cfg = create_config() @@ -4200,7 +4223,7 @@ def test_missing_all_fails(self, metrics_store_cluster_level): error=elasticsearch.RequestError, ) ) - device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo", "bar"]) + device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() with pytest.raises(exceptions.RallyError): @@ -4245,7 +4268,7 @@ def perform_request(self, *args, **kwargs): ) es = Client(transport_client=TwoTransportClients(not_found_transport_client, successful_client)) - device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo", "bar"]) + device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() @@ -4262,7 +4285,7 @@ def test_successful_shards(self, metrics_store_cluster_level): } ) ) - device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"]) + device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() @@ -4279,7 +4302,7 @@ def test_unsuccessful_shards(self, metrics_store_cluster_level): } ) ) - device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"]) + device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() with pytest.raises(exceptions.RallyError): @@ -4305,7 +4328,7 @@ def test_source(self, metrics_store_cluster_level): } ) ) - device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"]) + device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() @@ -4334,7 +4357,7 @@ def test_id(self, metrics_store_cluster_level): } ) ) - device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"]) + device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() @@ -4371,7 +4394,7 @@ def test_empty_field(self, metrics_store_cluster_level): } ) ) - device = telemetry.DiskUsageStats({"disk-usage-stats-indices": "foo"}, es, metrics_store, ["foo"]) + device = telemetry.DiskUsageStats({"disk-usage-stats-indices": "foo"}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() @@ -4399,7 +4422,7 @@ def test_number(self, metrics_store_cluster_level): } ) ) - device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"]) + device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() @@ -4429,7 +4452,7 @@ def test_keyword(self, metrics_store_cluster_level): } ) ) - device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"]) + device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() From 2bb9570d2fe5704f5faf182a6a62ace67dae7221 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 21 Mar 2022 16:32:42 -0400 Subject: [PATCH 2/2] One or the other --- tests/telemetry_test.py | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/tests/telemetry_test.py b/tests/telemetry_test.py index 346525328..63ef9ff06 100644 --- a/tests/telemetry_test.py +++ b/tests/telemetry_test.py @@ -4150,30 +4150,57 @@ def test_logs_warning_on_missing_stats(self, metrics_put_value_cluster_level, me class TestDiskUsageStats: @mock.patch("elasticsearch.Elasticsearch") - def test_uses_indices_and_data_streams_by_default(self, es): + def test_uses_indices_by_default(self, es): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) tc = TransportClient(response={"_shards": {"failed": 0}}) es = Client(transport_client=tc) - device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=["baz", "bork"]) + device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=[]) + t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) + t.on_benchmark_start() + t.on_benchmark_stop() + assert tc.args == [ + ("POST", "/foo/_disk_usage"), + ("POST", "/bar/_disk_usage"), + ] + + @mock.patch("elasticsearch.Elasticsearch") + def test_uses_data_streams_by_default(self, es): + cfg = create_config() + metrics_store = metrics.EsMetricsStore(cfg) + tc = TransportClient(response={"_shards": {"failed": 0}}) + es = Client(transport_client=tc) + device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=[], data_stream_names=["foo", "bar"]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() assert tc.args == [ ("POST", "/foo/_disk_usage"), ("POST", "/bar/_disk_usage"), - ("POST", "/baz/_disk_usage"), - ("POST", "/bork/_disk_usage"), ] @mock.patch("elasticsearch.Elasticsearch") - def test_uses_indices_param_if_specified(self, es): + def test_uses_indices_param_if_specified_instead_of_index_names(self, es): + cfg = create_config() + metrics_store = metrics.EsMetricsStore(cfg) + tc = TransportClient(response={"_shards": {"failed": 0}}) + es = Client(transport_client=tc) + device = telemetry.DiskUsageStats( + {"disk-usage-stats-indices": "foo,bar"}, es, metrics_store, index_names=["baz"], data_stream_names=[] + ) + t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) + t.on_benchmark_start() + t.on_benchmark_stop() + assert tc.args == [("POST", "/foo/_disk_usage"), ("POST", "/bar/_disk_usage")] + + @mock.patch("elasticsearch.Elasticsearch") + def test_uses_indices_param_if_specified_instead_of_data_stream_names(self, es): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) tc = TransportClient(response={"_shards": {"failed": 0}}) es = Client(transport_client=tc) device = telemetry.DiskUsageStats( - {"disk-usage-stats-indices": "foo,bar"}, es, metrics_store, index_names=["baz"], data_stream_names=["not_this"] + {"disk-usage-stats-indices": "foo,bar"}, es, metrics_store, index_names=[], data_stream_names=["baz"] ) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start()