From fadc0650ab6b819a4623737d89fff1e6657ced3a Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Mon, 15 Jun 2020 14:52:22 +0200 Subject: [PATCH] Check for recovery details when finished (#1018) With this commit we defer checking any details about indices recovery until recovery has finished. This avoids any issues with properties that are not yet returned by the API in earlier stages (e.g. `stop_time_in_millis`). --- esrally/driver/runner.py | 14 +++++++----- tests/driver/runner_test.py | 43 +++++++++++++++++++++++++++++++++++-- 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 325240b98..247a8c5e0 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -1514,6 +1514,8 @@ async def __call__(self, es, params): total_end_millis = 0 # wait until recovery is done + # The nesting level is ok here given the structure of the API response + # pylint: disable=too-many-nested-blocks while not all_shards_done: response = await es.indices.recovery(index=index) # This might happen if we happen to call the API before the next recovery is scheduled. @@ -1528,11 +1530,13 @@ async def __call__(self, es, params): for _, idx_data in response.items(): for _, shard_data in idx_data.items(): for shard in shard_data: - all_shards_done = all_shards_done and (shard["stage"] == "DONE") - total_start_millis = min(total_start_millis, shard["start_time_in_millis"]) - total_end_millis = max(total_end_millis, shard["stop_time_in_millis"]) - idx_size = shard["index"]["size"] - total_recovered += idx_size["recovered_in_bytes"] + current_shard_done = shard["stage"] == "DONE" + all_shards_done = all_shards_done and current_shard_done + if current_shard_done: + total_start_millis = min(total_start_millis, shard["start_time_in_millis"]) + total_end_millis = max(total_end_millis, shard["stop_time_in_millis"]) + idx_size = shard["index"]["size"] + total_recovered += idx_size["recovered_in_bytes"] self.logger.debug("All shards done for [%s]: [%s].", index, all_shards_done) if not all_shards_done: diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 71a2e10f3..979d86e83 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -2879,6 +2879,45 @@ async def test_waits_for_ongoing_indices_recovery(self, es): es.indices.recovery.side_effect = [ # recovery did not yet start as_future({}), + # recovery about to be started + as_future({ + "index1": { + "shards": [ + { + "id": 0, + "type": "SNAPSHOT", + "stage": "INIT", + "primary": True, + "start_time_in_millis": 1393244159716, + "index": { + "size": { + "total": "75.4mb", + "total_in_bytes": 79063092, + "recovered": "0mb", + "recovered_in_bytes": 0, + } + } + }, + { + "id": 1, + "type": "SNAPSHOT", + "stage": "DONE", + "primary": True, + "start_time_in_millis": 1393244155000, + "stop_time_in_millis": 1393244158000, + "index": { + "size": { + "total": "175.4mb", + "total_in_bytes": 179063092, + "recovered": "165.7mb", + "recovered_in_bytes": 168891939, + } + } + } + ] + } + }), + # active recovery - one shard is not yet finished as_future({ "index1": { @@ -2975,8 +3014,8 @@ async def test_waits_for_ongoing_indices_recovery(self, es): self.assertEqual(5, result["time_period"]) es.indices.recovery.assert_called_with(index="index1") - # retries three times - self.assertEqual(3, es.indices.recovery.call_count) + # retries four times + self.assertEqual(4, es.indices.recovery.call_count) class ShrinkIndexTests(TestCase):