Skip to content

Commit

Permalink
Allow to specify an indices stats condition (#925)
Browse files Browse the repository at this point in the history
With this commit we expand the existing `indices-stats` operation and
allow to pass a condition based on a path and an expected value. This
has a variety of use cases:

* Verify that the number of documents in an index matches the expected
number of documents
* Wait until an operation has finished (e.g. no ongoing segment merges)
by specifying additional retry properties.
  • Loading branch information
danielmitterdorfer authored Mar 3, 2020
1 parent 5584df0 commit a395a44
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 3 deletions.
65 changes: 64 additions & 1 deletion docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,14 @@ Each operation consists of the following properties:
* ``operation-type`` (mandatory): Type of this operation. See below for the operation types that are supported out of the box in Rally. You can also add arbitrary operations by defining :doc:`custom runners </adding_tracks>`.
* ``include-in-reporting`` (optional, defaults to ``true`` for normal operations and to ``false`` for administrative operations): Whether or not this operation should be included in the command line report. For example you might want Rally to create an index for you but you are not interested in detailed metrics about it. Note that Rally will still record all metrics in the metrics store.

Some of the operations below are also retryable (marked accordingly below). Retryable operations expose the following properties:

* ``retries`` (optional, defaults to 0): The number of times the operation is retried.
* ``retry-until-success`` (optional, defaults to ``false``): Retries until the operation returns a success. This will also forcibly set ``retry-on-error`` to ``true``.
* ``retry-wait-period`` (optional, defaults to 0.5): The time in seconds to wait between retry attempts.
* ``retry-on-timeout`` (optional, defaults to ``true``): Whether to retry on connection timeout.
* ``retry-on-error`` (optional, defaults to ``false``): Whether to retry on errors (e.g. when an index could not be deleted).

Depending on the operation type a couple of further parameters can be specified.

bulk
Expand Down Expand Up @@ -351,10 +359,27 @@ This is an administrative operation. Metrics are not reported by default. If rep
index-stats
~~~~~~~~~~~

With the operation type ``index-stats`` you can call the `indices stats API <http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-stats.html>`_. It does not support any parameters.
With the operation type ``index-stats`` you can call the `indices stats API <http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-stats.html>`_. It supports the following properties:

* ``index`` (optional, defaults to `_all`): An `index pattern <https://www.elastic.co/guide/en/elasticsearch/reference/current/multi-index.html>`_ that defines which indices should be targeted by this operation.
* ``condition`` (optional, defaults to no condition): A structured object with the properties ``path`` and ``expected-value``. If the actual value returned by indices stats API is equal to the expected value at the provided path, this operation will return successfully. See below for an example how this can be used.

In the following example the ``index-stats`` operation will wait until all segments have been merged::

{
"operation-type": "index-stats",
"index": "_all",
"condition": {
"path": "_all.total.merges.current",
"expected-value": 0
},
"retry-until-success": true
}

Throughput will be reported as number of completed `index-stats` operations per second.

This operation is :ref:`retryable <track_operations>`.

node-stats
~~~~~~~~~~

Expand Down Expand Up @@ -452,6 +477,8 @@ This example requires that the ``ingest-geoip`` Elasticsearch plugin is installe

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

put-settings
~~~~~~~~~~~~

Expand All @@ -475,6 +502,8 @@ Example::

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

cluster-health
~~~~~~~~~~~~~~

Expand All @@ -499,6 +528,8 @@ Example::

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

refresh
~~~~~~~

Expand All @@ -508,6 +539,8 @@ With the operation ``refresh`` you can execute the `refresh API <https://www.ela

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

create-index
~~~~~~~~~~~~

Expand Down Expand Up @@ -566,6 +599,8 @@ With the following snippet we will create a new index that is not defined in the

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

delete-index
~~~~~~~~~~~~

Expand Down Expand Up @@ -607,6 +642,8 @@ With the following snippet we will delete all ``logs-*`` indices::

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

create-index-template
~~~~~~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -662,6 +699,8 @@ With the following snippet we will create a new index template that is not defin

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

delete-index-template
~~~~~~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -705,6 +744,8 @@ With the following snippet we will delete the `default`` index template::

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

shrink-index
~~~~~~~~~~~~

Expand Down Expand Up @@ -733,6 +774,8 @@ Example::

This will shrink the index ``src`` to ``target``. The target index will consist of one shard and have one replica. With ``shrink-node`` we also explicitly specify the name of the node where we want the source index to be relocated to.

This operation is :ref:`retryable <track_operations>`.

delete-ml-datafeed
~~~~~~~~~~~~~~~~~~

Expand All @@ -755,6 +798,8 @@ With the operation ``create-ml-datafeed`` you can execute the `create datafeeds

This operation works only if `machine-learning <https://www.elastic.co/products/stack/machine-learning>`__ is properly installed and enabled. This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

start-ml-datafeed
~~~~~~~~~~~~~~~~~

Expand All @@ -768,6 +813,8 @@ With the operation ``start-ml-datafeed`` you can execute the `start datafeeds AP

This operation works only if `machine-learning <https://www.elastic.co/products/stack/machine-learning>`__ is properly installed and enabled. This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

stop-ml-datafeed
~~~~~~~~~~~~~~~~

Expand All @@ -779,6 +826,8 @@ With the operation ``stop-ml-datafeed`` you can execute the `stop datafeed API <

This operation works only if `machine-learning <https://www.elastic.co/products/stack/machine-learning>`__ is properly installed and enabled. This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

delete-ml-job
~~~~~~~~~~~~~

Expand All @@ -791,6 +840,8 @@ This runner will intentionally ignore 404s from Elasticsearch so it is safe to e

This operation works only if `machine-learning <https://www.elastic.co/products/stack/machine-learning>`__ is properly installed and enabled. This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

create-ml-job
~~~~~~~~~~~~~

Expand All @@ -801,6 +852,8 @@ With the operation ``create-ml-job`` you can execute the `create jobs API <https

This operation works only if `machine-learning <https://www.elastic.co/products/stack/machine-learning>`__ is properly installed and enabled. This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

open-ml-job
~~~~~~~~~~~

Expand All @@ -810,6 +863,8 @@ With the operation ``open-ml-job`` you can execute the `open jobs API <https://w

This operation works only if `machine-learning <https://www.elastic.co/products/stack/machine-learning>`__ is properly installed and enabled. This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

close-ml-job
~~~~~~~~~~~~

Expand All @@ -821,6 +876,8 @@ With the operation ``close-ml-job`` you can execute the `close jobs API. The ``c

This operation works only if `machine-learning <https://www.elastic.co/products/stack/machine-learning>`__ is properly installed and enabled. This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

raw-request
~~~~~~~~~~~

Expand Down Expand Up @@ -854,6 +911,8 @@ With the operation ``delete-snapshot-repository`` you can delete an existing sna

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

create-snapshot-repository
~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand All @@ -865,6 +924,8 @@ With the operation ``create-snapshot-repository`` you can create a new snapshot

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

restore-snapshot
~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -912,6 +973,8 @@ With the operation ``wait-for-recovery`` you can wait until an ongoing shard rec

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

This operation is :ref:`retryable <track_operations>`.

schedule
~~~~~~~~

Expand Down
37 changes: 35 additions & 2 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
def register_default_runners():
register_runner(track.OperationType.Bulk.name, BulkIndex())
register_runner(track.OperationType.ForceMerge.name, ForceMerge())
register_runner(track.OperationType.IndicesStats.name, IndicesStats())
register_runner(track.OperationType.IndicesStats.name, Retry(IndicesStats()))
register_runner(track.OperationType.NodesStats.name, NodeStats())
register_runner(track.OperationType.Search.name, Query())
register_runner(track.OperationType.RawRequest.name, RawRequest())
Expand Down Expand Up @@ -607,8 +607,39 @@ class IndicesStats(Runner):
Gather index stats for all indices.
"""

def _get(self, v, path):
if len(path) == 1:
return v[path[0]]
else:
return self._get(v[path[0]], path[1:])

def __call__(self, es, params):
es.indices.stats(metric="_all")
index = params.get("index", "_all")
condition = params.get("condition")

response = es.indices.stats(index=index, metric="_all")
if condition:
path = mandatory(condition, "path", repr(self))
expected_value = mandatory(condition, "expected-value", repr(self))
actual_value = self._get(response, path.split("."))
return {
"weight": 1,
"unit": "ops",
"condition": {
"path": path,
# avoid mapping issues in the ES metrics store by always rendering values as strings
"actual-value": str(actual_value),
"expected-value": str(expected_value)
},
# currently we only support "==" as a predicate but that might change in the future
"success": actual_value == expected_value
}
else:
return {
"weight": 1,
"unit": "ops",
"success": True
}

def __repr__(self, *args, **kwargs):
return "indices-stats"
Expand Down Expand Up @@ -1433,8 +1464,10 @@ def __call__(self, es, params):
# we can determine success if and only if the runner returns a dict. Otherwise, we have to assume it was fine.
elif isinstance(return_value, dict):
if return_value.get("success", True):
self.logger.debug("%s has returned successfully", repr(self.delegate))
return return_value
else:
self.logger.debug("%s has returned with an error: %s.", repr(self.delegate), return_value)
time.sleep(sleep_time)
else:
return return_value
Expand Down
78 changes: 78 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,84 @@ def test_optimize_with_params(self, es):
params={"request_timeout": 17000})


class IndicesStatsRunnerTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
def test_indices_stats_without_parameters(self, es):
indices_stats = runner.IndicesStats()
result = indices_stats(es, params={})
self.assertEqual(1, result["weight"])
self.assertEqual("ops", result["unit"])
self.assertTrue(result["success"])

es.indices.stats.assert_called_once_with(index="_all", metric="_all")

@mock.patch("elasticsearch.Elasticsearch")
def test_indices_stats_with_failed_condition(self, es):
es.indices.stats.return_value = {
"_all": {
"total": {
"merges": {
"current": 2,
"current_docs": 292698,
}
}
}
}

indices_stats = runner.IndicesStats()

result = indices_stats(es, params={
"index": "logs-*",
"condition": {
"path": "_all.total.merges.current",
"expected-value": 0
}
})
self.assertEqual(1, result["weight"])
self.assertEqual("ops", result["unit"])
self.assertFalse(result["success"])
self.assertDictEqual({
"path": "_all.total.merges.current",
"actual-value": "2",
"expected-value": "0"
}, result["condition"])

es.indices.stats.assert_called_once_with(index="logs-*", metric="_all")

@mock.patch("elasticsearch.Elasticsearch")
def test_indices_stats_with_successful_condition(self, es):
es.indices.stats.return_value = {
"_all": {
"total": {
"merges": {
"current": 0,
"current_docs": 292698,
}
}
}
}

indices_stats = runner.IndicesStats()

result = indices_stats(es, params={
"index": "logs-*",
"condition": {
"path": "_all.total.merges.current",
"expected-value": 0
}
})
self.assertEqual(1, result["weight"])
self.assertEqual("ops", result["unit"])
self.assertTrue(result["success"])
self.assertDictEqual({
"path": "_all.total.merges.current",
"actual-value": "0",
"expected-value": "0"
}, result["condition"])

es.indices.stats.assert_called_once_with(index="logs-*", metric="_all")


class QueryRunnerTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
def test_query_match_only_request_body_defined(self, es):
Expand Down

0 comments on commit a395a44

Please sign in to comment.