Skip to content

Commit

Permalink
Update force merge polling (#489)
Browse files Browse the repository at this point in the history
Signed-off-by: Vijayan Balasubramanian <balasvij@amazon.com>
  • Loading branch information
VijayanB authored Apr 23, 2024
1 parent 794e11e commit c42a787
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 113 deletions.
41 changes: 25 additions & 16 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,30 +688,39 @@ class ForceMerge(Runner):
Runs a force merge operation against OpenSearch.
"""

PARAM_WAIT_FOR_COMPLETION = "wait_for_completion"

async def __call__(self, opensearch, params):
# pylint: disable=import-outside-toplevel
import opensearchpy
max_num_segments = params.get("max-num-segments")
mode = params.get("mode")
merge_params = self._default_kw_params(params)
if max_num_segments:
merge_params["max_num_segments"] = max_num_segments
# Request end time will not be 100% accurate, since we are using polling
# to check whether task status is completed or not.
if mode == "polling":
complete = False
try:
request_context_holder.on_client_request_start()
await opensearch.indices.forcemerge(**merge_params)
request_context_holder.on_client_request_end()
complete = True
except opensearchpy.ConnectionTimeout:
pass
while not complete:
await asyncio.sleep(params.get("poll-period"))
tasks = await opensearch.tasks.list(params={"actions": "indices:admin/forcemerge"})
if len(tasks["nodes"]) == 0:
# empty nodes response indicates no tasks
self.logger.warning(
"%s will be updated to false to run force merge in asynchronous way", self.PARAM_WAIT_FOR_COMPLETION)
merge_params[self.PARAM_WAIT_FOR_COMPLETION] = "false"
request_context_holder.on_client_request_start()
response_task = await opensearch.indices.forcemerge(**merge_params)
while True:
force_merge_task_id = response_task['task']
task = await opensearch.tasks.get(task_id=force_merge_task_id)
if not task:
self.logger.error("Failed to get task for task id: [%s]", force_merge_task_id)
request_context_holder.on_client_request_end()
raise exceptions.BenchmarkAssertionError(
"Force merge request failure: task was expected but not found in the get tasks api response.")
if 'completed' not in task:
request_context_holder.on_client_request_end()
complete = True
raise exceptions.BenchmarkAssertionError(
"Force merge request failure: 'completed' was expected but not found "
"in the get task api response.")
if task['completed']:
request_context_holder.on_client_request_end()
break
await asyncio.sleep(params.get("poll-period"))
else:
request_context_holder.on_client_request_start()
await opensearch.indices.forcemerge(**merge_params)
Expand Down
167 changes: 70 additions & 97 deletions tests/worker_coordinator/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1154,122 +1154,95 @@ async def test_force_merge_with_params(self, opensearch, on_client_request_start

opensearch.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000)

@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end')
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start')
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_force_merge_with_polling_no_timeout(self, opensearch, on_client_request_start, on_client_request_end):
opensearch.indices.forcemerge.return_value = as_future()

force_merge = runner.ForceMerge()
await force_merge(opensearch, params={"index" : "_all", "mode": "polling", 'poll-period': 0})
opensearch.indices.forcemerge.assert_called_once_with(index="_all")

@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end')
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start')
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_force_merge_with_polling(self, opensearch, on_client_request_start, on_client_request_end):
opensearch.indices.forcemerge.return_value = as_future(exception=opensearchpy.ConnectionTimeout())
opensearch.tasks.list.side_effect = [
as_future({
"nodes": {
"Ap3OfntPT7qL4CBeKvamxg": {
"name": "instance-0000000001",
"transport_address": "10.46.79.231:19693",
"host": "10.46.79.231",
"ip": "10.46.79.231:19693",
"roles": [
"data",
"ingest",
"master",
"remote_cluster_client",
"transform"
],
"attributes": {
"logical_availability_zone": "zone-1",
"server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526",
"availability_zone": "us-east4-a",
"instance_configuration": "gcp.data.highio.1",
"transform.node": "true",
"region": "unknown-region"
},
"tasks": {
"Ap3OfntPT7qL4CBeKvamxg:417009036": {
"node": "Ap3OfntPT7qL4CBeKvamxg",
"id": 417009036,
"type": "transport",
"action": "indices:admin/forcemerge",
"start_time_in_millis": 1598018980850,
"running_time_in_nanos": 3659821411,
"cancellable": False,
"headers": {}
}
}
}
}
}),
as_future({
"nodes": {}
})
]
opensearch.indices.forcemerge.return_value = as_future({"task": "7PtzISisT5SiwlBGUi2GzQ:2820798"})
opensearch.tasks.get.return_value = as_future({
"completed": True,
"task": {
"node": "7PtzISisT5SiwlBGUi2GzQ",
"id": 2820798,
"type": "transport",
"action": "indices:admin/forcemerge",
"description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]",
"start_time_in_millis": 1711389911601,
"running_time_in_nanos": 2806258,
"cancellable": False,
"cancelled": False,
"headers": {}
},
"response": {
"_shards": {
"total": 10,
"successful": 10,
"failed": 0
}
}
})
force_merge = runner.ForceMerge()
await force_merge(opensearch, params={"index": "_all", "mode": "polling", "poll-period": 0})
opensearch.indices.forcemerge.assert_called_once_with(index="_all")
await force_merge(opensearch, params={
"index": "_all", "mode": "polling", 'poll-period': 10, "wait_for_completion": True})
opensearch.indices.forcemerge.assert_called_once_with(index="_all", wait_for_completion='false')
opensearch.tasks.get.assert_called_once_with(task_id="7PtzISisT5SiwlBGUi2GzQ:2820798")

@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end')
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start')
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_force_merge_with_polling_and_params(self, opensearch, on_client_request_start, on_client_request_end):
opensearch.indices.forcemerge.return_value = as_future(exception=opensearchpy.ConnectionTimeout())
opensearch.tasks.list.side_effect = [
opensearch.indices.forcemerge.return_value = as_future({"task": "7PtzISisT5SiwlBGUi2GzQ:2820798"})
opensearch.tasks.get.side_effect = [
as_future({
"nodes": {
"Ap3OfntPT7qL4CBeKvamxg": {
"name": "instance-0000000001",
"transport_address": "10.46.79.231:19693",
"host": "10.46.79.231",
"ip": "10.46.79.231:19693",
"roles": [
"data",
"ingest",
"master",
"remote_cluster_client",
"transform"
],
"attributes": {
"logical_availability_zone": "zone-1",
"server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526",
"availability_zone": "us-east4-a",
"instance_configuration": "gcp.data.highio.1",
"transform.node": "true",
"region": "unknown-region"
},
"tasks": {
"Ap3OfntPT7qL4CBeKvamxg:417009036": {
"node": "Ap3OfntPT7qL4CBeKvamxg",
"id": 417009036,
"type": "transport",
"action": "indices:admin/forcemerge",
"start_time_in_millis": 1598018980850,
"running_time_in_nanos": 3659821411,
"cancellable": False,
"headers": {}
}
}
}
}
"completed": False,
"task": {
"node": "7PtzISisT5SiwlBGUi2GzQ",
"id": 2820798,
"type": "transport",
"action": "indices:admin/forcemerge",
"description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]",
"start_time_in_millis": 1711389911601,
"running_time_in_nanos": 2806258,
"cancellable": False,
"cancelled": False,
"headers": {}
},
"response": {}
}),
as_future({
"nodes": {}
"completed": True,
"task": {
"node": "7PtzISisT5SiwlBGUi2GzQ",
"id": 2820798,
"type": "transport",
"action": "indices:admin/forcemerge",
"description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]",
"start_time_in_millis": 1711389911601,
"running_time_in_nanos": 2806258,
"cancellable": "false",
"cancelled": "false",
"headers": {}
},
"response": {
"_shards": {
"total": 10,
"successful": 10,
"failed": 0
}
}
})
]
force_merge = runner.ForceMerge()
# request-timeout should be ignored as mode:polling
await force_merge(opensearch, params={"index" : "_all", "mode": "polling", "max-num-segments": 1,
"request-timeout": 50000, "poll-period": 0})
opensearch.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000)
await force_merge(opensearch, params={
"index": "_all", "mode": "polling", "max-num-segments": 1, "request-timeout": 50000, "poll-period": 10
})
opensearch.indices.forcemerge.assert_called_once_with(
index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion='false')
opensearch.tasks.get.assert_called_with(task_id="7PtzISisT5SiwlBGUi2GzQ:2820798")
self.assertEqual(opensearch.tasks.get.call_count, 2)


class IndicesStatsRunnerTests(TestCase):
Expand Down

0 comments on commit c42a787

Please sign in to comment.