Skip to content

Commit

Permalink
Add runner to wait until snapshot has been created (#1047)
Browse files Browse the repository at this point in the history
For snapshot creation (and metrics) so far we've relied only on the
`create-snapshot` runner with a blocking call.

In this commit we are introducing a new runner
`wait-for-snapshot-create`to complement `create-snapshot`. This is
similar to `restore-snapshot` and `wait-for-recovery` and helps in cases
 where network connections maybe terminated making a blocking call
 unsuitable.
  • Loading branch information
dliappis authored Aug 14, 2020
1 parent 77ec10b commit be8622a
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 52 deletions.
19 changes: 18 additions & 1 deletion docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,24 @@ With the operation ``create-snapshot`` you can `create a snapshot <https://www.e
* ``request-params`` (optional): A structure containing HTTP request parameters.

.. note::
When ``wait-for-completion`` is set to ``true`` Rally will report the achieved throughput in byte/s.
It's not recommended to rely on ``wait-for-completion=true``. Instead you should keep the default value (``False``) and use an additional ``wait-for-snapshot-create`` operation in the next step.
This is mandatory on `Elastic Cloud <https://www.elastic.co/cloud>`_ or environments where Elasticsearch is connected via intermediate network components, such as proxies, that may terminate the blocking connection after a timeout.

wait-for-snapshot-create
~~~~~~~~~~~~~~~~~~~~~~~~

With the operation ``wait-for-snapshot-create`` you can wait until a `snapshot has finished successfully <https://www.elastic.co/guide/en/elasticsearch/reference/current/get-snapshot-status-api.html>`_.
Typically you'll use this operation directly after a ``create-snapshot`` operation.

It supports the following parameters:

* ``repository`` (mandatory): The name of the snapshot repository to use.
* ``snapshot`` (mandatory): The name of the snapshot that this operation will wait until it succeeds.
* ``completion-recheck-wait-period`` (optional, defaults to 1 second): Time in seconds to wait in between consecutive attempts.

Rally will report the achieved throughput in byte/s.

This operation is :ref:`retryable <track_operations>`.

restore-snapshot
~~~~~~~~~~~~~~~~
Expand Down
76 changes: 54 additions & 22 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def register_default_runners():
register_runner(track.OperationType.CloseMlJob.name, Retry(CloseMlJob()), async_runner=True)
register_runner(track.OperationType.DeleteSnapshotRepository.name, Retry(DeleteSnapshotRepository()), async_runner=True)
register_runner(track.OperationType.CreateSnapshotRepository.name, Retry(CreateSnapshotRepository()), async_runner=True)
register_runner(track.OperationType.WaitForSnapshotCreate.name, Retry(WaitForSnapshotCreate()), async_runner=True)
register_runner(track.OperationType.WaitForRecovery.name, Retry(IndicesRecovery()), async_runner=True)
register_runner(track.OperationType.PutSettings.name, Retry(PutSettings()), async_runner=True)
register_runner(track.OperationType.CreateTransform.name, Retry(CreateTransform()), async_runner=True)
Expand Down Expand Up @@ -1464,33 +1465,64 @@ async def __call__(self, es, params):
repository = mandatory(params, "repository", repr(self))
snapshot = mandatory(params, "snapshot", repr(self))
body = mandatory(params, "body", repr(self))
response = await es.snapshot.create(repository=repository,
snapshot=snapshot,
body=body,
params=request_params,
wait_for_completion=wait_for_completion)

# We can derive a more useful throughput metric if the snapshot has successfully completed
if wait_for_completion and response.get("snapshot", {}).get("state") == "SUCCESS":
stats_response = await es.snapshot.status(repository=repository,
snapshot=snapshot)
size = stats_response["snapshots"][0]["stats"]["total"]["size_in_bytes"]
# while the actual service time as determined by Rally should be pretty accurate, the actual time it took
# to restore allows for a correct calculation of achieved throughput.
time_in_millis = stats_response["snapshots"][0]["stats"]["time_in_millis"]
time_in_seconds = time_in_millis / 1000
return {
"weight": size,
"unit": "byte",
"success": True,
"service_time": time_in_seconds,
"time_period": time_in_seconds
}
await es.snapshot.create(repository=repository,
snapshot=snapshot,
body=body,
params=request_params,
wait_for_completion=wait_for_completion)

def __repr__(self, *args, **kwargs):
return "create-snapshot"


class WaitForSnapshotCreate(Runner):
async def __call__(self, es, params):
repository = mandatory(params, "repository", repr(self))
snapshot = mandatory(params, "snapshot", repr(self))
wait_period = params.get("completion-recheck-wait-period", 1)

snapshot_done = False
stats = {}

while not snapshot_done:
response = await es.snapshot.status(repository=repository,
snapshot=snapshot,
ignore_unavailable=True)

if "snapshots" in response:
response_state = response["snapshots"][0]["state"]
# Possible states:
# https://www.elastic.co/guide/en/elasticsearch/reference/current/get-snapshot-status-api.html#get-snapshot-status-api-response-body
if response_state == "FAILED":
self.logger.error("Snapshot [%s] failed. Response:\n%s", snapshot, json.dumps(response, indent=2))
raise exceptions.RallyAssertionError(f"Snapshot [{snapshot}] failed. Please check logs.")
snapshot_done = response_state == "SUCCESS"
stats = response["snapshots"][0]["stats"]

if not snapshot_done:
await asyncio.sleep(wait_period)

size = stats["total"]["size_in_bytes"]
file_count = stats["total"]["file_count"]
start_time_in_millis = stats["start_time_in_millis"]
duration_in_millis = stats["time_in_millis"]
duration_in_seconds = duration_in_millis / 1000

return {
"weight": size,
"unit": "byte",
"success": True,
"throughput": size / duration_in_seconds,
"start_time_millis": start_time_in_millis,
"stop_time_millis": start_time_in_millis + duration_in_millis,
"duration": duration_in_millis,
"file_count": file_count
}

def __repr__(self, *args, **kwargs):
return "wait-for-snapshot-create"


class RestoreSnapshot(Runner):
"""
Restores a snapshot from an already registered repository
Expand Down
18 changes: 10 additions & 8 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,7 @@ class OperationType(Enum):
Bulk = 4
RawRequest = 5
WaitForRecovery = 6
CreateSnapshot = 7

WaitForSnapshotCreate = 7

# administrative actions
ForceMerge = 1001
Expand All @@ -443,12 +442,13 @@ class OperationType(Enum):
Sleep = 1018
DeleteSnapshotRepository = 1019
CreateSnapshotRepository = 1020
RestoreSnapshot = 1021
PutSettings = 1022
CreateTransform = 1023
StartTransform = 1024
WaitForTransform = 1025
DeleteTransform = 1026
CreateSnapshot = 1021
RestoreSnapshot = 1022
PutSettings = 1023
CreateTransform = 1024
StartTransform = 1025
WaitForTransform = 1026
DeleteTransform = 1027

@property
def admin_op(self):
Expand Down Expand Up @@ -510,6 +510,8 @@ def from_hyphenated_string(cls, v):
return OperationType.CreateSnapshotRepository
elif v == "create-snapshot":
return OperationType.CreateSnapshot
elif v == "wait-for-snapshot-create":
return OperationType.WaitForSnapshotCreate
elif v == "restore-snapshot":
return OperationType.RestoreSnapshot
elif v == "wait-for-recovery":
Expand Down
192 changes: 171 additions & 21 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2701,8 +2701,6 @@ class CreateSnapshotTests(TestCase):
@run_async
async def test_create_snapshot_no_wait(self, es):
es.snapshot.create.return_value = as_future({})
# should not be called
es.snapshot.status.return_value = as_future({})

params = {
"repository": "backups",
Expand All @@ -2726,7 +2724,6 @@ async def test_create_snapshot_no_wait(self, es):
},
params={"request_timeout": 7200},
wait_for_completion=False)
self.assertEqual(0, es.snapshot.status.call_count)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
Expand Down Expand Up @@ -2756,6 +2753,140 @@ async def test_create_snapshot_wait_for_completion(self, es):
}
})

params = {
"repository": "backups",
"snapshot": "snapshot-001",
"body": {
"indices": "logs-*"
},
"wait-for-completion": True,
"request-params": {
"request_timeout": 7200
}
}

r = runner.CreateSnapshot()
await r(es, params)

es.snapshot.create.assert_called_once_with(repository="backups",
snapshot="snapshot-001",
body={
"indices": "logs-*"
},
params={"request_timeout": 7200},
wait_for_completion=True)


class WaitForSnapshotCreateTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_wait_for_snapshot_create_entire_lifecycle(self, es):
es.snapshot.status.side_effect = [
# empty response
as_future({}),
# active snapshot
as_future({
"snapshots": [{
"snapshot": "restore_speed_snapshot",
"repository": "restore_speed",
"uuid": "92efRcQxRCCwJuuC2lb-Ow",
"state": "STARTED",
"include_global_state": True,
"shards_stats": {
"initializing": 0,
"started": 10,
"finalizing": 0,
"done": 3,
"failed": 0,
"total": 13},
"stats": {
"incremental": {
"file_count": 222,
"size_in_bytes": 243468220144
},
"processed": {
"file_count": 18,
"size_in_bytes": 82839346
},
"total": {
"file_count": 222,
"size_in_bytes": 243468220144
},
"start_time_in_millis": 1597319858606,
"time_in_millis": 6606
},
"indices": {
# skipping content as we don"t parse this
}
}]
}
),
# completed
as_future({
"snapshots": [{
"snapshot": "restore_speed_snapshot",
"repository": "restore_speed",
"uuid": "6gDpGbxOTpWKIutWdpWCFw",
"state": "SUCCESS",
"include_global_state": True,
"shards_stats": {
"initializing": 0,
"started": 0,
"finalizing": 0,
"done": 13,
"failed": 0,
"total": 13
},
"stats": {
"incremental": {
"file_count": 204,
"size_in_bytes": 243468188055
},
"total": {
"file_count": 204,
"size_in_bytes": 243468188055
},
"start_time_in_millis": 1597317564956,
"time_in_millis": 1113462
},
"indices": {
# skipping content here as don"t parse this
}
}]
})
]

basic_params = {
"repository": "restore_speed",
"snapshot": "restore_speed_snapshot",
"completion-recheck-wait-period": 0
}

r = runner.WaitForSnapshotCreate()
result = await r(es, basic_params)

es.snapshot.status.assert_called_with(
repository="restore_speed",
snapshot="restore_speed_snapshot",
ignore_unavailable=True
)

self.assertDictEqual({
"weight": 243468188055,
"unit": "byte",
"success": True,
"duration": 1113462,
"file_count": 204,
"throughput": 218658731.10622546,
"start_time_millis": 1597317564956,
"stop_time_millis": 1597317564956 + 1113462
}, result)

self.assertEqual(3, es.snapshot.status.call_count)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_wait_for_snapshot_create_immediate_success(self, es):
es.snapshot.status.return_value = as_future({
"snapshots": [
{
Expand All @@ -2779,36 +2910,55 @@ async def test_create_snapshot_wait_for_completion(self, es):
params = {
"repository": "backups",
"snapshot": "snapshot-001",
"body": {
"indices": "logs-*"
},
"wait-for-completion": True,
"request-params": {
"request_timeout": 7200
}
}

r = runner.CreateSnapshot()
r = runner.WaitForSnapshotCreate()
result = await r(es, params)

self.assertDictEqual({
"weight": 9399505,
"unit": "byte",
"success": True,
"service_time": 0.2,
"time_period": 0.2
"duration": 200,
"file_count": 70,
"throughput": 46997525.0,
"start_time_millis": 1591776481060,
"stop_time_millis": 1591776481060 + 200
}, result)

es.snapshot.create.assert_called_once_with(repository="backups",
es.snapshot.status.assert_called_once_with(repository="backups",
snapshot="snapshot-001",
body={
"indices": "logs-*"
},
params={"request_timeout": 7200},
wait_for_completion=True)
ignore_unavailable=True)

es.snapshot.status.assert_called_once_with(repository="backups",
snapshot="snapshot-001")
@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_wait_for_snapshot_create_failure(self, es):
snapshot_status = {
"snapshots": [
{
"snapshot": "snapshot-001",
"repository": "backups",
"state": "FAILED",
"include_global_state": False
}
]
}
es.snapshot.status.return_value = as_future(snapshot_status)

params = {
"repository": "backups",
"snapshot": "snapshot-001",
}

r = runner.WaitForSnapshotCreate()

with mock.patch.object(r.logger, "error") as mocked_error_logger:
with self.assertRaises(exceptions.RallyAssertionError) as ctx:
await r(es, params)
self.assertEqual("Snapshot [snapshot-001] failed. Please check logs.", ctx.exception.args[0])
mocked_error_logger.assert_has_calls([
mock.call("Snapshot [%s] failed. Response:\n%s", "snapshot-001", json.dumps(snapshot_status, indent=2))
])


class RestoreSnapshotTests(TestCase):
Expand Down

0 comments on commit be8622a

Please sign in to comment.