Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add runner to wait until snapshot has been created #1047

Merged
merged 4 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,24 @@ With the operation ``create-snapshot`` you can `create a snapshot <https://www.e
* ``request-params`` (optional): A structure containing HTTP request parameters.

.. note::
When ``wait-for-completion`` is set to ``true`` Rally will report the achieved throughput in byte/s.
It's not recommended to rely on ``wait-for-completion=true``. Instead you should keep the default value (``False``) and use an additional ``wait-for-snapshot-create`` operation in the next step.
This is mandatory on `Elastic Cloud <https://www.elastic.co/cloud>`_ or environments where Elasticsearch is connected via intermediate network components, such as proxies, that may terminate the blocking connection after a timeout.

wait-for-snapshot-create
~~~~~~~~~~~~~~~~~~~~~~~~

With the operation ``wait-for-snapshot-create`` you can wait until a `snapshot has finished successfully <https://www.elastic.co/guide/en/elasticsearch/reference/current/get-snapshot-status-api.html>`_.
Typically you'll use this operation directly after a ``create-snapshot`` operation.

It supports the following parameters:

* ``repository`` (mandatory): The name of the snapshot repository to use.
* ``snapshot`` (mandatory): The name of the snapshot that this operation will wait until it succeeds.
* ``completion-recheck-wait-period`` (optional, defaults to 1 second): Time in seconds to wait in between consecutive attempts.

Rally will report the achieved throughput in byte/s.

This operation is :ref:`retryable <track_operations>`.

restore-snapshot
~~~~~~~~~~~~~~~~
Expand Down
76 changes: 54 additions & 22 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def register_default_runners():
register_runner(track.OperationType.CloseMlJob.name, Retry(CloseMlJob()), async_runner=True)
register_runner(track.OperationType.DeleteSnapshotRepository.name, Retry(DeleteSnapshotRepository()), async_runner=True)
register_runner(track.OperationType.CreateSnapshotRepository.name, Retry(CreateSnapshotRepository()), async_runner=True)
register_runner(track.OperationType.WaitForSnapshotCreate.name, Retry(WaitForSnapshotCreate()), async_runner=True)
register_runner(track.OperationType.WaitForRecovery.name, Retry(IndicesRecovery()), async_runner=True)
register_runner(track.OperationType.PutSettings.name, Retry(PutSettings()), async_runner=True)
register_runner(track.OperationType.CreateTransform.name, Retry(CreateTransform()), async_runner=True)
Expand Down Expand Up @@ -1464,33 +1465,64 @@ async def __call__(self, es, params):
repository = mandatory(params, "repository", repr(self))
snapshot = mandatory(params, "snapshot", repr(self))
body = mandatory(params, "body", repr(self))
response = await es.snapshot.create(repository=repository,
snapshot=snapshot,
body=body,
params=request_params,
wait_for_completion=wait_for_completion)

# We can derive a more useful throughput metric if the snapshot has successfully completed
if wait_for_completion and response.get("snapshot", {}).get("state") == "SUCCESS":
stats_response = await es.snapshot.status(repository=repository,
snapshot=snapshot)
size = stats_response["snapshots"][0]["stats"]["total"]["size_in_bytes"]
# while the actual service time as determined by Rally should be pretty accurate, the actual time it took
# to restore allows for a correct calculation of achieved throughput.
time_in_millis = stats_response["snapshots"][0]["stats"]["time_in_millis"]
time_in_seconds = time_in_millis / 1000
return {
"weight": size,
"unit": "byte",
"success": True,
"service_time": time_in_seconds,
"time_period": time_in_seconds
}
await es.snapshot.create(repository=repository,
snapshot=snapshot,
body=body,
params=request_params,
wait_for_completion=wait_for_completion)

def __repr__(self, *args, **kwargs):
return "create-snapshot"


class WaitForSnapshotCreate(Runner):
async def __call__(self, es, params):
repository = mandatory(params, "repository", repr(self))
snapshot = mandatory(params, "snapshot", repr(self))
wait_period = params.get("completion-recheck-wait-period", 1)

snapshot_done = False
stats = {}

while not snapshot_done:
response = await es.snapshot.status(repository=repository,
snapshot=snapshot,
ignore_unavailable=True)

if "snapshots" in response:
response_state = response["snapshots"][0]["state"]
# Possible states:
# https://www.elastic.co/guide/en/elasticsearch/reference/current/get-snapshot-status-api.html#get-snapshot-status-api-response-body
if response_state == "FAILED":
self.logger.error("Snapshot [%s] failed. Response:\n%s", snapshot, json.dumps(response, indent=2))
raise exceptions.RallyAssertionError(f"Snapshot [{snapshot}] failed. Please check logs.")
snapshot_done = response_state == "SUCCESS"
stats = response["snapshots"][0]["stats"]

if not snapshot_done:
await asyncio.sleep(wait_period)

size = stats["total"]["size_in_bytes"]
file_count = stats["total"]["file_count"]
start_time_in_millis = stats["start_time_in_millis"]
duration_in_millis = stats["time_in_millis"]
duration_in_seconds = duration_in_millis / 1000

return {
"weight": size,
"unit": "byte",
"success": True,
"throughput": size / duration_in_seconds,
"start_time_millis": start_time_in_millis,
"stop_time_millis": start_time_in_millis + duration_in_millis,
"duration": duration_in_millis,
"file_count": file_count
}

def __repr__(self, *args, **kwargs):
return "wait-for-snapshot-create"


class RestoreSnapshot(Runner):
"""
Restores a snapshot from an already registered repository
Expand Down
18 changes: 10 additions & 8 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,7 @@ class OperationType(Enum):
Bulk = 4
RawRequest = 5
WaitForRecovery = 6
CreateSnapshot = 7

WaitForSnapshotCreate = 7

# administrative actions
ForceMerge = 1001
Expand All @@ -443,12 +442,13 @@ class OperationType(Enum):
Sleep = 1018
DeleteSnapshotRepository = 1019
CreateSnapshotRepository = 1020
RestoreSnapshot = 1021
PutSettings = 1022
CreateTransform = 1023
StartTransform = 1024
WaitForTransform = 1025
DeleteTransform = 1026
CreateSnapshot = 1021
RestoreSnapshot = 1022
PutSettings = 1023
CreateTransform = 1024
StartTransform = 1025
WaitForTransform = 1026
DeleteTransform = 1027

@property
def admin_op(self):
Expand Down Expand Up @@ -510,6 +510,8 @@ def from_hyphenated_string(cls, v):
return OperationType.CreateSnapshotRepository
elif v == "create-snapshot":
return OperationType.CreateSnapshot
elif v == "wait-for-snapshot-create":
return OperationType.WaitForSnapshotCreate
elif v == "restore-snapshot":
return OperationType.RestoreSnapshot
elif v == "wait-for-recovery":
Expand Down
192 changes: 171 additions & 21 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2701,8 +2701,6 @@ class CreateSnapshotTests(TestCase):
@run_async
async def test_create_snapshot_no_wait(self, es):
es.snapshot.create.return_value = as_future({})
# should not be called
es.snapshot.status.return_value = as_future({})

params = {
"repository": "backups",
Expand All @@ -2726,7 +2724,6 @@ async def test_create_snapshot_no_wait(self, es):
},
params={"request_timeout": 7200},
wait_for_completion=False)
self.assertEqual(0, es.snapshot.status.call_count)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
Expand Down Expand Up @@ -2756,6 +2753,140 @@ async def test_create_snapshot_wait_for_completion(self, es):
}
})

params = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I cannot comment at the specific line but in test_create_snapshot_no_wait we still mock es.snapshot.status and assert later on that it is not called. IMHO we should remove this now because there is no chance we'd ever call it anymore in the new runner implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 32ee4ab + 2b242b2 + 6523ec9

"repository": "backups",
"snapshot": "snapshot-001",
"body": {
"indices": "logs-*"
},
"wait-for-completion": True,
"request-params": {
"request_timeout": 7200
}
}

r = runner.CreateSnapshot()
await r(es, params)

es.snapshot.create.assert_called_once_with(repository="backups",
snapshot="snapshot-001",
body={
"indices": "logs-*"
},
params={"request_timeout": 7200},
wait_for_completion=True)


class WaitForSnapshotCreateTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_wait_for_snapshot_create_entire_lifecycle(self, es):
es.snapshot.status.side_effect = [
# empty response
as_future({}),
# active snapshot
as_future({
"snapshots": [{
"snapshot": "restore_speed_snapshot",
"repository": "restore_speed",
"uuid": "92efRcQxRCCwJuuC2lb-Ow",
"state": "STARTED",
"include_global_state": True,
"shards_stats": {
"initializing": 0,
"started": 10,
"finalizing": 0,
"done": 3,
"failed": 0,
"total": 13},
"stats": {
"incremental": {
"file_count": 222,
"size_in_bytes": 243468220144
},
"processed": {
"file_count": 18,
"size_in_bytes": 82839346
},
"total": {
"file_count": 222,
"size_in_bytes": 243468220144
},
"start_time_in_millis": 1597319858606,
"time_in_millis": 6606
},
"indices": {
# skipping content as we don"t parse this
}
}]
}
),
# completed
as_future({
"snapshots": [{
"snapshot": "restore_speed_snapshot",
"repository": "restore_speed",
"uuid": "6gDpGbxOTpWKIutWdpWCFw",
"state": "SUCCESS",
"include_global_state": True,
"shards_stats": {
"initializing": 0,
"started": 0,
"finalizing": 0,
"done": 13,
"failed": 0,
"total": 13
},
"stats": {
"incremental": {
"file_count": 204,
"size_in_bytes": 243468188055
},
"total": {
"file_count": 204,
"size_in_bytes": 243468188055
},
"start_time_in_millis": 1597317564956,
"time_in_millis": 1113462
},
"indices": {
# skipping content here as don"t parse this
}
}]
})
]

basic_params = {
"repository": "restore_speed",
"snapshot": "restore_speed_snapshot",
"completion-recheck-wait-period": 0
}

r = runner.WaitForSnapshotCreate()
result = await r(es, basic_params)

es.snapshot.status.assert_called_with(
repository="restore_speed",
snapshot="restore_speed_snapshot",
ignore_unavailable=True
)

self.assertDictEqual({
"weight": 243468188055,
"unit": "byte",
"success": True,
"duration": 1113462,
"file_count": 204,
"throughput": 218658731.10622546,
"start_time_millis": 1597317564956,
"stop_time_millis": 1597317564956 + 1113462
}, result)

self.assertEqual(3, es.snapshot.status.call_count)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_wait_for_snapshot_create_immediate_success(self, es):
es.snapshot.status.return_value = as_future({
"snapshots": [
{
Expand All @@ -2779,36 +2910,55 @@ async def test_create_snapshot_wait_for_completion(self, es):
params = {
"repository": "backups",
"snapshot": "snapshot-001",
"body": {
"indices": "logs-*"
},
"wait-for-completion": True,
"request-params": {
"request_timeout": 7200
}
}

r = runner.CreateSnapshot()
r = runner.WaitForSnapshotCreate()
result = await r(es, params)

self.assertDictEqual({
"weight": 9399505,
"unit": "byte",
"success": True,
"service_time": 0.2,
"time_period": 0.2
"duration": 200,
"file_count": 70,
"throughput": 46997525.0,
"start_time_millis": 1591776481060,
"stop_time_millis": 1591776481060 + 200
}, result)

es.snapshot.create.assert_called_once_with(repository="backups",
es.snapshot.status.assert_called_once_with(repository="backups",
snapshot="snapshot-001",
body={
"indices": "logs-*"
},
params={"request_timeout": 7200},
wait_for_completion=True)
ignore_unavailable=True)

es.snapshot.status.assert_called_once_with(repository="backups",
snapshot="snapshot-001")
@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_wait_for_snapshot_create_failure(self, es):
snapshot_status = {
"snapshots": [
{
"snapshot": "snapshot-001",
"repository": "backups",
"state": "FAILED",
"include_global_state": False
}
]
}
es.snapshot.status.return_value = as_future(snapshot_status)

params = {
"repository": "backups",
"snapshot": "snapshot-001",
}

r = runner.WaitForSnapshotCreate()

with mock.patch.object(r.logger, "error") as mocked_error_logger:
with self.assertRaises(exceptions.RallyAssertionError) as ctx:
await r(es, params)
self.assertEqual("Snapshot [snapshot-001] failed. Please check logs.", ctx.exception.args[0])
mocked_error_logger.assert_has_calls([
mock.call("Snapshot [%s] failed. Response:\n%s", "snapshot-001", json.dumps(snapshot_status, indent=2))
])


class RestoreSnapshotTests(TestCase):
Expand Down