Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add runner to wait until snapshot has been created #1047

Merged
merged 4 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,24 @@ With the operation ``create-snapshot`` you can `create a snapshot <https://www.e
* ``request-params`` (optional): A structure containing HTTP request parameters.

.. note::
When ``wait-for-completion`` is set to ``true`` Rally will report the achieved throughput in byte/s.
It's not recommend to rely on ``wait-for-completion=true``. Instead you should keep the default value (``False``) and use an additional ``wait-for-snapshot-create`` operation in the next step.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "not recommend" -> "not recommended"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 32ee4ab

This is mandatory on the `Elastic Cloud <https://www.elastic.co/cloud>`_ or environments where Elasticsearch is sitting behind a network element that may terminate the blocking connection after a timeout.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • on the Elastic Cloud -> on Elastic Cloud?
  • "sitting behind a network element" -> "connected via intermediate network components, such as proxies, that may terminate ..."?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 32ee4ab


wait-for-snapshot-create
~~~~~~~~~~~~~~~~~~~~~~~~

With the operation ``wait-for-snapshot-create`` you can wait until a snapshot has finished successfully.
Typically you'll use this operation directly after a ``create-snapshot`` operation.

It supports the following parameters:

* ``repository`` (mandatory): The name of the snapshot repository to use.
* ``snapshot`` (mandatory): The name of the snapshot that this operation will wait until it succeeds.
* ``completion-recheck-wait-period`` (optional, defaults to 1 second): Time in seconds to wait in between consecutive attempts.

Rally will report the achieved throughput in byte/s, the duration in seconds, the start and stop time in milliseconds and the total amount of files snapshotted as returned by the the `Elasticsearch snapshot status API call <https://www.elastic.co/guide/en/elasticsearch/reference/current/get-snapshot-status-api.html>`_.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I'd document all the metadata that we return (except for the throughput)? Otherwise we should probably also document the respective metric keys and make clear that these metadata are only available with an Elasticsearch metrics store (contrary to the throughput).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 32ee4ab


This operation is :ref:`retryable <track_operations>`.

restore-snapshot
~~~~~~~~~~~~~~~~
Expand Down
78 changes: 56 additions & 22 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def register_default_runners():
register_runner(track.OperationType.CloseMlJob.name, Retry(CloseMlJob()), async_runner=True)
register_runner(track.OperationType.DeleteSnapshotRepository.name, Retry(DeleteSnapshotRepository()), async_runner=True)
register_runner(track.OperationType.CreateSnapshotRepository.name, Retry(CreateSnapshotRepository()), async_runner=True)
register_runner(track.OperationType.WaitForSnapshotCreate.name, Retry(WaitForSnapshotCreate()), async_runner=True)
register_runner(track.OperationType.WaitForRecovery.name, Retry(IndicesRecovery()), async_runner=True)
register_runner(track.OperationType.PutSettings.name, Retry(PutSettings()), async_runner=True)
register_runner(track.OperationType.CreateTransform.name, Retry(CreateTransform()), async_runner=True)
Expand Down Expand Up @@ -1464,33 +1465,66 @@ async def __call__(self, es, params):
repository = mandatory(params, "repository", repr(self))
snapshot = mandatory(params, "snapshot", repr(self))
body = mandatory(params, "body", repr(self))
response = await es.snapshot.create(repository=repository,
snapshot=snapshot,
body=body,
params=request_params,
wait_for_completion=wait_for_completion)

# We can derive a more useful throughput metric if the snapshot has successfully completed
if wait_for_completion and response.get("snapshot", {}).get("state") == "SUCCESS":
stats_response = await es.snapshot.status(repository=repository,
snapshot=snapshot)
size = stats_response["snapshots"][0]["stats"]["total"]["size_in_bytes"]
# while the actual service time as determined by Rally should be pretty accurate, the actual time it took
# to restore allows for a correct calculation of achieved throughput.
time_in_millis = stats_response["snapshots"][0]["stats"]["time_in_millis"]
time_in_seconds = time_in_millis / 1000
return {
"weight": size,
"unit": "byte",
"success": True,
"service_time": time_in_seconds,
"time_period": time_in_seconds
}
await es.snapshot.create(repository=repository,
snapshot=snapshot,
body=body,
params=request_params,
wait_for_completion=wait_for_completion)

def __repr__(self, *args, **kwargs):
return "create-snapshot"


class WaitForSnapshotCreate(Runner):
async def __call__(self, es, params):
repository = mandatory(params, "repository", repr(self))
snapshot = mandatory(params, "snapshot", repr(self))
wait_period = params.get("completion-recheck-wait-period", 1)

snapshot_done = False
stats = {}

while not snapshot_done:
response = await es.snapshot.status(repository=repository,
snapshot=snapshot,
ignore_unavailable=True)

if "snapshots" in response:
response_state = response["snapshots"][0]["state"]
# Possible states:
# https://www.elastic.co/guide/en/elasticsearch/reference/current/get-snapshot-status-api.html#get-snapshot-status-api-response-body
if response_state == "FAILED":
self.logger.error("Snapshot [%s] failed. Response status:\n%s", snapshot, json.dumps(response))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • "Response status" -> "Response"? (as it is the actual full, response). I also think we should pretty-print it by specifying e.g. indent=2 in json.dumps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 32ee4ab

raise exceptions.RallyAssertionError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Any reason why it is formatted this way? I tried putting everything on the same line and ended up with a line width of 110 which should still be fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 32ee4ab

f"Snapshot [{snapshot}] failed. Please check logs."
)
snapshot_done = response_state == "SUCCESS"
stats = response["snapshots"][0]["stats"]

if not snapshot_done:
await asyncio.sleep(wait_period)

size = stats["total"]["size_in_bytes"]
file_count = stats["total"]["file_count"]
start_time_in_millis = stats["start_time_in_millis"]
duration_in_millis = stats["time_in_millis"]
duration_in_seconds = duration_in_millis / 1000

return {
"weight": size,
"unit": "byte",
"success": True,
"throughput": size / duration_in_seconds,
"start_time_millis": start_time_in_millis,
"stop_time_millis": start_time_in_millis + duration_in_millis,
"duration": duration_in_millis,
"file_count": file_count
}

def __repr__(self, *args, **kwargs):
return "wait-for-snapshot-create"


class RestoreSnapshot(Runner):
"""
Restores a snapshot from an already registered repository
Expand Down
4 changes: 3 additions & 1 deletion esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ class OperationType(Enum):
RawRequest = 5
WaitForRecovery = 6
CreateSnapshot = 7
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could move CreateSnapshot again to administrative actions (i.e. anything > 1000) because Rally would not report request metrics by default for administrative operations (it's usually not interesting to know).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 32ee4ab


WaitForSnapshotCreate = 8

# administrative actions
ForceMerge = 1001
Expand Down Expand Up @@ -510,6 +510,8 @@ def from_hyphenated_string(cls, v):
return OperationType.CreateSnapshotRepository
elif v == "create-snapshot":
return OperationType.CreateSnapshot
elif v == "wait-for-snapshot-create":
return OperationType.WaitForSnapshotCreate
elif v == "restore-snapshot":
return OperationType.RestoreSnapshot
elif v == "wait-for-recovery":
Expand Down
191 changes: 173 additions & 18 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io
import json
import logging
import random
import unittest.mock as mock
from unittest import TestCase
Expand Down Expand Up @@ -2756,6 +2757,140 @@ async def test_create_snapshot_wait_for_completion(self, es):
}
})

params = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I cannot comment at the specific line but in test_create_snapshot_no_wait we still mock es.snapshot.status and assert later on that it is not called. IMHO we should remove this now because there is no chance we'd ever call it anymore in the new runner implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 32ee4ab + 2b242b2 + 6523ec9

"repository": "backups",
"snapshot": "snapshot-001",
"body": {
"indices": "logs-*"
},
"wait-for-completion": True,
"request-params": {
"request_timeout": 7200
}
}

r = runner.CreateSnapshot()
await r(es, params)

es.snapshot.create.assert_called_once_with(repository="backups",
snapshot="snapshot-001",
body={
"indices": "logs-*"
},
params={"request_timeout": 7200},
wait_for_completion=True)


class WaitForSnapshotCreateTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_wait_for_snapshot_create_entire_lifecycle(self, es):
es.snapshot.status.side_effect = [
# empty response
as_future({}),
# active snapshot
as_future({
"snapshots": [{
"snapshot": "restore_speed_snapshot",
"repository": "restore_speed",
"uuid": "92efRcQxRCCwJuuC2lb-Ow",
"state": "STARTED",
"include_global_state": True,
"shards_stats": {
"initializing": 0,
"started": 10,
"finalizing": 0,
"done": 3,
"failed": 0,
"total": 13},
"stats": {
"incremental": {
"file_count": 222,
"size_in_bytes": 243468220144
},
"processed": {
"file_count": 18,
"size_in_bytes": 82839346
},
"total": {
"file_count": 222,
"size_in_bytes": 243468220144
},
"start_time_in_millis": 1597319858606,
"time_in_millis": 6606
},
"indices": {
# skipping content as we don"t parse this
}
}]
}
),
# completed
as_future({
"snapshots": [{
"snapshot": "restore_speed_snapshot",
"repository": "restore_speed",
"uuid": "6gDpGbxOTpWKIutWdpWCFw",
"state": "SUCCESS",
"include_global_state": True,
"shards_stats": {
"initializing": 0,
"started": 0,
"finalizing": 0,
"done": 13,
"failed": 0,
"total": 13
},
"stats": {
"incremental": {
"file_count": 204,
"size_in_bytes": 243468188055
},
"total": {
"file_count": 204,
"size_in_bytes": 243468188055
},
"start_time_in_millis": 1597317564956,
"time_in_millis": 1113462
},
"indices": {
# skipping content here as don"t parse this
}
}]
})
]

basic_params = {
"repository": "restore_speed",
"snapshot": "restore_speed_snapshot",
"completion-recheck-wait-period": 0
}

r = runner.WaitForSnapshotCreate()
result = await r(es, basic_params)

es.snapshot.status.assert_called_with(
repository="restore_speed",
snapshot="restore_speed_snapshot",
ignore_unavailable=True
)

self.assertDictEqual({
"weight": 243468188055,
"unit": "byte",
"success": True,
"duration": 1113462,
"file_count": 204,
"throughput": 218658731.10622546,
"start_time_millis": 1597317564956,
"stop_time_millis": 1597317564956 + 1113462
}, result)

self.assertEqual(3, es.snapshot.status.call_count)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_wait_for_snapshot_create_immediate_success(self, es):
es.snapshot.status.return_value = as_future({
"snapshots": [
{
Expand All @@ -2779,36 +2914,56 @@ async def test_create_snapshot_wait_for_completion(self, es):
params = {
"repository": "backups",
"snapshot": "snapshot-001",
"body": {
"indices": "logs-*"
},
"wait-for-completion": True,
"request-params": {
"request_timeout": 7200
}
}

r = runner.CreateSnapshot()
r = runner.WaitForSnapshotCreate()
result = await r(es, params)

self.assertDictEqual({
"weight": 9399505,
"unit": "byte",
"success": True,
"service_time": 0.2,
"time_period": 0.2
"duration": 200,
"file_count": 70,
"throughput": 46997525.0,
"start_time_millis": 1591776481060,
"stop_time_millis": 1591776481060 + 200
}, result)

es.snapshot.create.assert_called_once_with(repository="backups",
es.snapshot.status.assert_called_once_with(repository="backups",
snapshot="snapshot-001",
body={
"indices": "logs-*"
},
params={"request_timeout": 7200},
wait_for_completion=True)
ignore_unavailable=True)

es.snapshot.status.assert_called_once_with(repository="backups",
snapshot="snapshot-001")
@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_wait_for_snapshot_create_failure(self, es):
snapshot_status = {
"snapshots": [
{
"snapshot": "snapshot-001",
"repository": "backups",
"state": "FAILED",
"include_global_state": False
}
]
}
es.snapshot.status.return_value = as_future(snapshot_status)

params = {
"repository": "backups",
"snapshot": "snapshot-001",
}

r = runner.WaitForSnapshotCreate()

logger = logging.getLogger("esrally.driver.runner")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we just use the runner's logger with logger = r.logger (you can then probably just inline it in the with statement below? This would also make it more refactoring safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 32ee4ab

with mock.patch.object(logger, "error") as mocked_error_logger:
with self.assertRaises(exceptions.RallyAssertionError) as ctx:
await r(es, params)
self.assertEqual("Snapshot [snapshot-001] failed. Please check logs.", ctx.exception.args[0])
mocked_error_logger.assert_has_calls([
mock.call("Snapshot [%s] failed. Response status:\n%s", "snapshot-001", json.dumps(snapshot_status))
])


class RestoreSnapshotTests(TestCase):
Expand Down