Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pick up data streams in disk-usage-stats telemetry #1455

Merged
merged 2 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ def create_es_clients(self):
es[cluster_name] = self.es_client_factory(cluster_hosts, cluster_client_options).create()
return es

def prepare_telemetry(self, es, enable, index_names):
def prepare_telemetry(self, es, enable, index_names, data_stream_names):
enabled_devices = self.config.opts("telemetry", "devices")
telemetry_params = self.config.opts("telemetry", "params")
log_root = paths.race_root(self.config)
Expand All @@ -612,7 +612,7 @@ def prepare_telemetry(self, es, enable, index_names):
telemetry.SearchableSnapshotsStats(telemetry_params, es, self.metrics_store),
telemetry.DataStreamStats(telemetry_params, es, self.metrics_store),
telemetry.IngestPipelineStats(es, self.metrics_store),
telemetry.DiskUsageStats(telemetry_params, es_default, self.metrics_store, index_names),
telemetry.DiskUsageStats(telemetry_params, es_default, self.metrics_store, index_names, data_stream_names),
]
else:
devices = []
Expand Down Expand Up @@ -659,7 +659,12 @@ def prepare_benchmark(self, t):

# Avoid issuing any requests to the target cluster when static responses are enabled. The results
# are not useful and attempts to connect to a non-existing cluster just lead to exception traces in logs.
self.prepare_telemetry(es_clients, enable=not uses_static_responses, index_names=self.track.index_names())
self.prepare_telemetry(
es_clients,
enable=not uses_static_responses,
index_names=self.track.index_names(),
data_stream_names=self.track.data_stream_names(),
)

for host in self.config.opts("driver", "load_driver_hosts"):
host_config = {
Expand Down
20 changes: 16 additions & 4 deletions esrally/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2238,28 +2238,40 @@ class DiskUsageStats(TelemetryDevice):
human_name = "Disk usage of each field"
help = "Runs the indices disk usage API after benchmarking"

def __init__(self, telemetry_params, client, metrics_store, index_names):
def __init__(self, telemetry_params, client, metrics_store, index_names, data_stream_names):
"""
:param telemetry_params: The configuration object for telemetry_params.
May specify:
``disk-usage-stats-indices``: Comma separated list of indices who's disk
usage to fetch. Default is all indices in the track.
:param client: The Elasticsearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param index_names: Names of indices defined by this track
:param data_stream_names: Names of data streams defined by this track
"""
super().__init__()
self.telemetry_params = telemetry_params
self.client = client
self.metrics_store = metrics_store
self.index_names = index_names
self.data_stream_names = data_stream_names

def on_benchmark_start(self):
self.indices = self.telemetry_params.get("disk-usage-stats-indices", ",".join(self.index_names + self.data_stream_names))
if not self.indices:
msg = (
"No indices defined for disk-usage-stats. Set disk-usage-stats-indices "
"telemetry param or add indices or data streams to the track config."
)
self.logger.exception(msg)
raise exceptions.RallyError(msg)

def on_benchmark_stop(self):
# pylint: disable=import-outside-toplevel
import elasticsearch

indices = self.telemetry_params.get("disk-usage-stats-indices", ",".join(self.index_names))
found = False
for index in indices.split(","):
for index in self.indices.split(","):
self.logger.debug("Gathering disk usage for [%s]", index)
try:
response = self.client.transport.perform_request("POST", f"/{index}/_disk_usage", params={"run_expensive_tasks": "true"})
Expand All @@ -2274,7 +2286,7 @@ def on_benchmark_stop(self):
found = True
self.handle_telemetry_usage(response)
if not found:
msg = f"Couldn't find any indices for disk usage {indices}"
msg = f"Couldn't find any indices for disk usage {self.indices}"
self.logger.exception(msg)
raise exceptions.RallyError(msg)

Expand Down
3 changes: 3 additions & 0 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,9 @@ def uncompressed_size_in_bytes(self):
def index_names(self):
return [i.name for i in self.indices]

def data_stream_names(self):
return [i.name for i in self.data_streams]

def __str__(self):
return self.name

Expand Down
51 changes: 37 additions & 14 deletions tests/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4150,24 +4150,31 @@ def test_logs_warning_on_missing_stats(self, metrics_put_value_cluster_level, me

class TestDiskUsageStats:
@mock.patch("elasticsearch.Elasticsearch")
def test_uses_indices_param_by_default(self, es):
def test_uses_indices_and_data_streams_by_default(self, es):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you split this case, please? Per the docs, we can only use one or the other

The indices section contains a list of all indices that are used by this track. Cannot be used if the data-streams section is specified.

cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
tc = TransportClient(response={"_shards": {"failed": 0}})
es = Client(transport_client=tc)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo", "bar"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=["baz", "bork"])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
assert tc.args == [("POST", "/foo/_disk_usage"), ("POST", "/bar/_disk_usage")]
assert tc.args == [
("POST", "/foo/_disk_usage"),
("POST", "/bar/_disk_usage"),
("POST", "/baz/_disk_usage"),
("POST", "/bork/_disk_usage"),
]

@mock.patch("elasticsearch.Elasticsearch")
def test_uses_indices_param_if_specified(self, es):
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
tc = TransportClient(response={"_shards": {"failed": 0}})
es = Client(transport_client=tc)
device = telemetry.DiskUsageStats({"disk-usage-stats-indices": "foo,bar"}, es, metrics_store, ["baz"])
device = telemetry.DiskUsageStats(
{"disk-usage-stats-indices": "foo,bar"}, es, metrics_store, index_names=["baz"], data_stream_names=["not_this"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise would be good here to only use index_names or data_stream_names

)
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand All @@ -4183,13 +4190,29 @@ def test_error_on_retrieval_does_not_store_metrics(self, metrics_store_cluster_l
error=elasticsearch.RequestError,
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
with pytest.raises(exceptions.RallyError):
t.on_benchmark_stop()
assert metrics_store_cluster_level.call_count == 0

@mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level")
def test_no_indices_fails(self, metrics_store_cluster_level):
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
es = Client(
transport_client=TransportClient(
force_error=True,
error=elasticsearch.RequestError,
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=[], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
with pytest.raises(exceptions.RallyError):
t.on_benchmark_start()
assert metrics_store_cluster_level.call_count == 0

@mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level")
def test_missing_all_fails(self, metrics_store_cluster_level):
cfg = create_config()
Expand All @@ -4200,7 +4223,7 @@ def test_missing_all_fails(self, metrics_store_cluster_level):
error=elasticsearch.RequestError,
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo", "bar"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
with pytest.raises(exceptions.RallyError):
Expand Down Expand Up @@ -4245,7 +4268,7 @@ def perform_request(self, *args, **kwargs):
)

es = Client(transport_client=TwoTransportClients(not_found_transport_client, successful_client))
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo", "bar"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand All @@ -4262,7 +4285,7 @@ def test_successful_shards(self, metrics_store_cluster_level):
}
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand All @@ -4279,7 +4302,7 @@ def test_unsuccessful_shards(self, metrics_store_cluster_level):
}
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
with pytest.raises(exceptions.RallyError):
Expand All @@ -4305,7 +4328,7 @@ def test_source(self, metrics_store_cluster_level):
}
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand Down Expand Up @@ -4334,7 +4357,7 @@ def test_id(self, metrics_store_cluster_level):
}
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand Down Expand Up @@ -4371,7 +4394,7 @@ def test_empty_field(self, metrics_store_cluster_level):
}
)
)
device = telemetry.DiskUsageStats({"disk-usage-stats-indices": "foo"}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({"disk-usage-stats-indices": "foo"}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand Down Expand Up @@ -4399,7 +4422,7 @@ def test_number(self, metrics_store_cluster_level):
}
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand Down Expand Up @@ -4429,7 +4452,7 @@ def test_keyword(self, metrics_store_cluster_level):
}
)
)
device = telemetry.DiskUsageStats({}, es, metrics_store, ["foo"])
device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[])
t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device])
t.on_benchmark_start()
t.on_benchmark_stop()
Expand Down