Skip to content

Commit

Permalink
Add runner to wait until snapshot has been created
Browse files Browse the repository at this point in the history
For snapshot creation (and metrics) so far we've relied only the
`create-snapshot` runner with a blocking call.

In this commit we are introducing a new runner
`wait-for-snapshot-create`to complement `create-snapshot`. This is
similar to `restore-snapshot` and `wait-for-recovery` and helps in cases
 where network connections maybe terminated making a blocking call
 unsuitable.
  • Loading branch information
dliappis committed Aug 13, 2020
1 parent 77ec10b commit 293922e
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 42 deletions.
19 changes: 18 additions & 1 deletion docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,24 @@ With the operation ``create-snapshot`` you can `create a snapshot <https://www.e
* ``request-params`` (optional): A structure containing HTTP request parameters.

.. note::
When ``wait-for-completion`` is set to ``true`` Rally will report the achieved throughput in byte/s.
It's not recommend to rely on ``wait-for-completion=true``. Instead you should keep the default value (``False``) and use an additional ``wait-for-snapshot-create`` operation in the next step.
This is mandatory on the `Elastic Cloud <https://www.elastic.co/cloud>`_ or environments where Elasticsearch is sitting behind a network element that may terminate the blocking connection after a timeout.

wait-for-snapshot-create
~~~~~~~~~~~~~~~~~~~~~~~~

With the operation ``wait-for-snapshot-create`` you can wait until a snapshot has finished successfully.
Typically you'll use this operation directly after a ``create-snapshot`` operation.

It supports the following parameters:

* ``repository`` (mandatory): The name of the snapshot repository to use.
* ``snapshot`` (mandatory): The name of the snapshot that this operation will wait until it succeeds.
* ``completion-recheck-wait-period`` (optional, defaults to 1 second): Time in seconds to wait in between consecutive attempts.

Rally will report the achieved throughput in byte/s, the duration in seconds, the start and stop time in milliseconds and the total amount of files snapshotted as returned by the the `Elasticsearch snapshot status API call <https://www.elastic.co/guide/en/elasticsearch/reference/current/get-snapshot-status-api.html>`_.

This operation is :ref:`retryable <track_operations>`.

restore-snapshot
~~~~~~~~~~~~~~~~
Expand Down
78 changes: 56 additions & 22 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def register_default_runners():
register_runner(track.OperationType.CloseMlJob.name, Retry(CloseMlJob()), async_runner=True)
register_runner(track.OperationType.DeleteSnapshotRepository.name, Retry(DeleteSnapshotRepository()), async_runner=True)
register_runner(track.OperationType.CreateSnapshotRepository.name, Retry(CreateSnapshotRepository()), async_runner=True)
register_runner(track.OperationType.WaitForSnapshotCreate.name, Retry(WaitForSnapshotCreate()), async_runner=True)
register_runner(track.OperationType.WaitForRecovery.name, Retry(IndicesRecovery()), async_runner=True)
register_runner(track.OperationType.PutSettings.name, Retry(PutSettings()), async_runner=True)
register_runner(track.OperationType.CreateTransform.name, Retry(CreateTransform()), async_runner=True)
Expand Down Expand Up @@ -1464,33 +1465,66 @@ async def __call__(self, es, params):
repository = mandatory(params, "repository", repr(self))
snapshot = mandatory(params, "snapshot", repr(self))
body = mandatory(params, "body", repr(self))
response = await es.snapshot.create(repository=repository,
snapshot=snapshot,
body=body,
params=request_params,
wait_for_completion=wait_for_completion)

# We can derive a more useful throughput metric if the snapshot has successfully completed
if wait_for_completion and response.get("snapshot", {}).get("state") == "SUCCESS":
stats_response = await es.snapshot.status(repository=repository,
snapshot=snapshot)
size = stats_response["snapshots"][0]["stats"]["total"]["size_in_bytes"]
# while the actual service time as determined by Rally should be pretty accurate, the actual time it took
# to restore allows for a correct calculation of achieved throughput.
time_in_millis = stats_response["snapshots"][0]["stats"]["time_in_millis"]
time_in_seconds = time_in_millis / 1000
return {
"weight": size,
"unit": "byte",
"success": True,
"service_time": time_in_seconds,
"time_period": time_in_seconds
}
await es.snapshot.create(repository=repository,
snapshot=snapshot,
body=body,
params=request_params,
wait_for_completion=wait_for_completion)

def __repr__(self, *args, **kwargs):
return "create-snapshot"


class WaitForSnapshotCreate(Runner):
async def __call__(self, es, params):
repository = mandatory(params, "repository", repr(self))
snapshot = mandatory(params, "snapshot", repr(self))
wait_period = params.get("completion-recheck-wait-period", 1)

snapshot_done = False
stats = {}

while not snapshot_done:
response = await es.snapshot.status(repository=repository,
snapshot=snapshot,
ignore_unavailable=True)

if "snapshots" in response:
response_state = response["snapshots"][0]["state"]
# Possible states:
# https://www.elastic.co/guide/en/elasticsearch/reference/current/get-snapshot-status-api.html#get-snapshot-status-api-response-body
if response_state == "FAILED":
self.logger.error("Snapshot [%s] failed. Response status:\n%s", snapshot, json.dumps(response))
raise exceptions.RallyAssertionError(
f"Snapshot [{snapshot}] failed. Please check logs."
)
snapshot_done = response_state == "SUCCESS"
stats = response["snapshots"][0]["stats"]

if not snapshot_done:
await asyncio.sleep(wait_period)

size = stats["total"]["size_in_bytes"]
file_count = stats["total"]["file_count"]
start_time_in_millis = stats["start_time_in_millis"]
duration_in_millis = stats["time_in_millis"]
duration_in_seconds = duration_in_millis / 1000

return {
"weight": size,
"unit": "byte",
"success": True,
"throughput": size / duration_in_seconds,
"start_time_millis": start_time_in_millis,
"stop_time_millis": start_time_in_millis + duration_in_millis,
"duration": duration_in_millis,
"file_count": file_count
}

def __repr__(self, *args, **kwargs):
return "wait-for-snapshot-create"


class RestoreSnapshot(Runner):
"""
Restores a snapshot from an already registered repository
Expand Down
4 changes: 3 additions & 1 deletion esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ class OperationType(Enum):
RawRequest = 5
WaitForRecovery = 6
CreateSnapshot = 7

WaitForSnapshotCreate = 8

# administrative actions
ForceMerge = 1001
Expand Down Expand Up @@ -510,6 +510,8 @@ def from_hyphenated_string(cls, v):
return OperationType.CreateSnapshotRepository
elif v == "create-snapshot":
return OperationType.CreateSnapshot
elif v == "wait-for-snapshot-create":
return OperationType.WaitForSnapshotCreate
elif v == "restore-snapshot":
return OperationType.RestoreSnapshot
elif v == "wait-for-recovery":
Expand Down
191 changes: 173 additions & 18 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io
import json
import logging
import random
import unittest.mock as mock
from unittest import TestCase
Expand Down Expand Up @@ -2756,6 +2757,140 @@ async def test_create_snapshot_wait_for_completion(self, es):
}
})

params = {
"repository": "backups",
"snapshot": "snapshot-001",
"body": {
"indices": "logs-*"
},
"wait-for-completion": True,
"request-params": {
"request_timeout": 7200
}
}

r = runner.CreateSnapshot()
await r(es, params)

es.snapshot.create.assert_called_once_with(repository="backups",
snapshot="snapshot-001",
body={
"indices": "logs-*"
},
params={"request_timeout": 7200},
wait_for_completion=True)


class WaitForSnapshotCreateTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_wait_for_snapshot_create_entire_lifecycle(self, es):
es.snapshot.status.side_effect = [
# empty response
as_future({}),
# active snapshot
as_future({
"snapshots": [{
"snapshot": "restore_speed_snapshot",
"repository": "restore_speed",
"uuid": "92efRcQxRCCwJuuC2lb-Ow",
"state": "STARTED",
"include_global_state": True,
"shards_stats": {
"initializing": 0,
"started": 10,
"finalizing": 0,
"done": 3,
"failed": 0,
"total": 13},
"stats": {
"incremental": {
"file_count": 222,
"size_in_bytes": 243468220144
},
"processed": {
"file_count": 18,
"size_in_bytes": 82839346
},
"total": {
"file_count": 222,
"size_in_bytes": 243468220144
},
"start_time_in_millis": 1597319858606,
"time_in_millis": 6606
},
"indices": {
# skipping content as we don"t parse this
}
}]
}
),
# completed
as_future({
"snapshots": [{
"snapshot": "restore_speed_snapshot",
"repository": "restore_speed",
"uuid": "6gDpGbxOTpWKIutWdpWCFw",
"state": "SUCCESS",
"include_global_state": True,
"shards_stats": {
"initializing": 0,
"started": 0,
"finalizing": 0,
"done": 13,
"failed": 0,
"total": 13
},
"stats": {
"incremental": {
"file_count": 204,
"size_in_bytes": 243468188055
},
"total": {
"file_count": 204,
"size_in_bytes": 243468188055
},
"start_time_in_millis": 1597317564956,
"time_in_millis": 1113462
},
"indices": {
# skipping content here as don"t parse this
}
}]
})
]

basic_params = {
"repository": "restore_speed",
"snapshot": "restore_speed_snapshot",
"completion-recheck-wait-period": 0
}

r = runner.WaitForSnapshotCreate()
result = await r(es, basic_params)

es.snapshot.status.assert_called_with(
repository="restore_speed",
snapshot="restore_speed_snapshot",
ignore_unavailable=True
)

self.assertDictEqual({
"weight": 243468188055,
"unit": "byte",
"success": True,
"duration": 1113462,
"file_count": 204,
"throughput": 218658731.10622546,
"start_time_millis": 1597317564956,
"stop_time_millis": 1597317564956 + 1113462
}, result)

self.assertEqual(3, es.snapshot.status.call_count)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_wait_for_snapshot_create_immediate_success(self, es):
es.snapshot.status.return_value = as_future({
"snapshots": [
{
Expand All @@ -2779,36 +2914,56 @@ async def test_create_snapshot_wait_for_completion(self, es):
params = {
"repository": "backups",
"snapshot": "snapshot-001",
"body": {
"indices": "logs-*"
},
"wait-for-completion": True,
"request-params": {
"request_timeout": 7200
}
}

r = runner.CreateSnapshot()
r = runner.WaitForSnapshotCreate()
result = await r(es, params)

self.assertDictEqual({
"weight": 9399505,
"unit": "byte",
"success": True,
"service_time": 0.2,
"time_period": 0.2
"duration": 200,
"file_count": 70,
"throughput": 46997525.0,
"start_time_millis": 1591776481060,
"stop_time_millis": 1591776481060 + 200
}, result)

es.snapshot.create.assert_called_once_with(repository="backups",
es.snapshot.status.assert_called_once_with(repository="backups",
snapshot="snapshot-001",
body={
"indices": "logs-*"
},
params={"request_timeout": 7200},
wait_for_completion=True)
ignore_unavailable=True)

es.snapshot.status.assert_called_once_with(repository="backups",
snapshot="snapshot-001")
@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_wait_for_snapshot_create_failure(self, es):
snapshot_status = {
"snapshots": [
{
"snapshot": "snapshot-001",
"repository": "backups",
"state": "FAILED",
"include_global_state": False
}
]
}
es.snapshot.status.return_value = as_future(snapshot_status)

params = {
"repository": "backups",
"snapshot": "snapshot-001",
}

r = runner.WaitForSnapshotCreate()

logger = logging.getLogger("esrally.driver.runner")
with mock.patch.object(logger, "error") as mocked_error_logger:
with self.assertRaises(exceptions.RallyAssertionError) as ctx:
await r(es, params)
self.assertEqual("Snapshot [snapshot-001] failed. Please check logs.", ctx.exception.args[0])
mocked_error_logger.assert_has_calls([
mock.call("Snapshot [%s] failed. Response status:\n%s", "snapshot-001", json.dumps(snapshot_status))
])


class RestoreSnapshotTests(TestCase):
Expand Down

0 comments on commit 293922e

Please sign in to comment.