Skip to content

Commit

Permalink
Allow index pattern for source-index in shrink-index operation (#1118)
Browse files Browse the repository at this point in the history
This commit adds support for specifying several source indices using the
standard Elasticsearch Multi-target syntax for the shrink-index
runner.

This helps automate shrink operations for multiple indices.
  • Loading branch information
dliappis authored Nov 20, 2020
1 parent 9f86c76 commit b8e8e6b
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 45 deletions.
22 changes: 20 additions & 2 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1302,8 +1302,8 @@ shrink-index

With the operation ``shrink-index`` you can execute the `shrink index API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-shrink-index.html>`_. Note that this does not correspond directly to the shrink index API call in Elasticsearch but it is a high-level operation that executes all the necessary low-level operations under the hood to shrink an index. It supports the following parameters:

* ``source-index`` (mandatory): The name of the index that should be shrinked.
* ``target-index`` (mandatory): The name of the index that contains the shrinked shards.
* ``source-index`` (mandatory): The name of the index that should be shrinked. Multiple indices can be defined using the `Multi-target syntax <https://www.elastic.co/guide/en/elasticsearch/reference/current/multi-index.html>`_.
* ``target-index`` (mandatory): The name of the index that contains the shrinked shards. If multiple indices match ``source-index``, one shrink operation will execute for every matching index. Each shrink operation will use a modified ``target-index``: the unique suffix of the source index (derived by removing the common prefix of all matching source indices) will be appended to ``target-index``. See also the example below.
* ``target-body`` (mandatory): The body containing settings and aliases for ``target-index``.
* ``shrink-node`` (optional, defaults to a random data node): As a first step, the source index needs to be fully relocated to a single node. Rally will automatically choose a random data node in the cluster but you can choose one explicitly if needed.

Expand All @@ -1325,6 +1325,24 @@ Example::

This will shrink the index ``src`` to ``target``. The target index will consist of one shard and have one replica. With ``shrink-node`` we also explicitly specify the name of the node where we want the source index to be relocated to.

The following example ``src*`` matches a list of indices ``src-a,src-b``::

{
"operation-type": "shrink-index",
"shrink-node": "rally-node-0",
"source-index": "src*",
"target-index": "target",
"target-body": {
"settings": {
"index.number_of_replicas": 1,
"index.number_of_shards": 1,
"index.codec": "best_compression"
}
}
}

and will reindex ``src-a`` as ``target-a`` and ``src-b`` as ``target-b``.

This operation is :ref:`retryable <track_operations>`.

delete-ml-datafeed
Expand Down
83 changes: 53 additions & 30 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import types
from collections import Counter, OrderedDict
from copy import deepcopy
from os.path import commonprefix

import ijson

from esrally import exceptions, track
Expand Down Expand Up @@ -305,8 +307,17 @@ def mandatory(params, key, op):
try:
return params[key]
except KeyError:
raise exceptions.DataError("Parameter source for operation '%s' did not provide the mandatory parameter '%s'. Please add it to your"
" parameter source." % (str(op), key))
raise exceptions.DataError(
f"Parameter source for operation '{str(op)}' did not provide the mandatory parameter '{key}'. "
f"Add it to your parameter source and try again.")


# TODO: remove and use https://docs.python.org/3/library/stdtypes.html#str.removeprefix
# once Python 3.9 becomes the minimum version
def remove_prefix(string, prefix):
if string.startswith(prefix):
return string[len(prefix):]
return string


def escape(v):
Expand Down Expand Up @@ -1324,48 +1335,60 @@ async def _wait_for(self, es, idx, description):

async def __call__(self, es, params):
source_index = mandatory(params, "source-index", self)
source_indices_get = await es.indices.get(source_index)
source_indices = list(source_indices_get.keys())
source_indices_stem = commonprefix(source_indices)

target_index = mandatory(params, "target-index", self)

# we need to inject additional settings so we better copy the body
target_body = deepcopy(mandatory(params, "target-body", self))
shrink_node = params.get("shrink-node")
# Choose a random data node if none is specified
if not shrink_node:
if shrink_node:
node_names = [shrink_node]
else:
node_names = []
# choose a random data node
node_info = await es.nodes.info()
for node in node_info["nodes"].values():
if "data" in node["roles"]:
node_names.append(node["name"])
if not node_names:
raise exceptions.RallyAssertionError("Could not choose a suitable shrink-node automatically. Please specify it explicitly.")
raise exceptions.RallyAssertionError("Could not choose a suitable shrink-node automatically. Specify it explicitly.")

for source_index in source_indices:
shrink_node = random.choice(node_names)
self.logger.info("Using [%s] as shrink node.", shrink_node)
self.logger.info("Preparing [%s] for shrinking.", source_index)
# prepare index for shrinking
await es.indices.put_settings(index=source_index,
body={
"settings": {
"index.routing.allocation.require._name": shrink_node,
"index.blocks.write": "true"
}
},
preserve_existing=True)

self.logger.info("Waiting for relocation to finish for index [%s]...", source_index)
await self._wait_for(es, source_index, "shard relocation for index [{}]".format(source_index))
self.logger.info("Shrinking [%s] to [%s].", source_index, target_index)
if "settings" not in target_body:
target_body["settings"] = {}
target_body["settings"]["index.routing.allocation.require._name"] = None
target_body["settings"]["index.blocks.write"] = None
# kick off the shrink operation
await es.indices.shrink(index=source_index, target=target_index, body=target_body)

self.logger.info("Waiting for shrink to finish for index [%s]...", source_index)
await self._wait_for(es, target_index, "shrink for index [{}]".format(target_index))
self.logger.info("Shrinking [%s] to [%s] has finished.", source_index, target_index)
self.logger.info("Using [%s] as shrink node.", shrink_node)
self.logger.info("Preparing [%s] for shrinking.", source_index)

# prepare index for shrinking
await es.indices.put_settings(index=source_index,
body={
"settings": {
"index.routing.allocation.require._name": shrink_node,
"index.blocks.write": "true"
}
},
preserve_existing=True)

self.logger.info("Waiting for relocation to finish for index [%s] ...", source_index)
await self._wait_for(es, source_index, f"shard relocation for index [{source_index}]")
self.logger.info("Shrinking [%s] to [%s].", source_index, target_index)
if "settings" not in target_body:
target_body["settings"] = {}
target_body["settings"]["index.routing.allocation.require._name"] = None
target_body["settings"]["index.blocks.write"] = None
# kick off the shrink operation
index_suffix = remove_prefix(source_index, source_indices_stem)
final_target_index = target_index if len(index_suffix) == 0 else target_index+index_suffix
await es.indices.shrink(index=source_index, target=final_target_index, body=target_body)

self.logger.info("Waiting for shrink to finish for index [%s] ...", source_index)
await self._wait_for(es, final_target_index, f"shrink for index [{final_target_index}]")
self.logger.info("Shrinking [%s] to [%s] has finished.", source_index, final_target_index)
# ops_count is not really important for this operation...
return 1, "ops"
return len(source_indices), "ops"

def __repr__(self, *args, **kwargs):
return "shrink-index"
Expand Down
Loading

0 comments on commit b8e8e6b

Please sign in to comment.