Skip to content

Commit

Permalink
Remove deprecated usage of parameter source
Browse files Browse the repository at this point in the history
With this commit we remove the backwards-compatibility layer for several
parameters that have been deprecated for a while now. We also remove the
backwards-compabibility layer for deprecated signatures in custom
parameter sources. Finally we document this also in the migration docs.

Relates #521
  • Loading branch information
danielmitterdorfer authored Jun 14, 2018
1 parent 8858f94 commit 54edbcc
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 122 deletions.
37 changes: 37 additions & 0 deletions docs/migrate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,43 @@ You can also set version-specific environment variables, e.g. ``JAVA7_HOME``, ``

Rally will choose the highest appropriate JDK per Elasticsearch version. You can use ``--runtime-jdk`` to force a specific JDK version but the path will still be resolved according to the logic above.

Custom Parameter Sources
^^^^^^^^^^^^^^^^^^^^^^^^

In Rally 0.10.0 we have deprecated some parameter names in custom parameter sources. In Rally 1.0.0, these deprecated names have been removed. Therefore you need to replace the following parameter names if you use them in custom parameter sources:

============== ======================= =======================
Operation type Old name New name
============== ======================= =======================
search use_request_cache cache
search request_params request-params
search items_per_page results-per-page
bulk action_metadata_present action-metadata-present
force-merge max_num_segments max-num-segments
============== ======================= =======================

In Rally 0.9.0 the signature of custom parameter sources has also changed. In Rally 1.0.0 we have removed the backwards compatibility layer so you need to change the signatures.

Old::

# for parameter sources implemented as functions
def custom_param_source(indices, params):

# for parameter sources implemented as classes
class CustomParamSource:
def __init__(self, indices, params):


New::

# for parameter sources implemented as functions
def custom_param_source(track, params, **kwargs):

# for parameter sources implemented as classes
class CustomParamSource:
def __init__(self, track, params, **kwargs):

You can use the property ``track.indices`` to access indices.

Migrating to Rally 0.11.0
-------------------------
Expand Down
68 changes: 8 additions & 60 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,16 +334,7 @@ def __call__(self, es, params):
if "pipeline" in params:
bulk_params["pipeline"] = params["pipeline"]

# TODO: Remove this fallback logic with Rally 1.0
if "action-metadata-present" in params:
action_meta_data_key = "action-metadata-present"
else:
if "action_metadata_present" in params:
self.logger.warning("Your parameter source uses the deprecated name [action_metadata_present]. Please change it to "
"[action-metadata-present].")
action_meta_data_key = "action_metadata_present"

with_action_metadata = mandatory(params, action_meta_data_key, self)
with_action_metadata = mandatory(params, "action-metadata-present", self)
bulk_size = mandatory(params, "bulk-size", self)

if with_action_metadata:
Expand Down Expand Up @@ -372,21 +363,11 @@ def detailed_stats(self, params, bulk_size, response):
error_details = set()
bulk_request_size_bytes = 0
total_document_size_bytes = 0
with_action_metadata = mandatory(params, "action-metadata-present", self)

for line_number, data in enumerate(params["body"]):

line_size = len(data.encode('utf-8'))

# TODO: Remove this fallback logic with Rally 1.0
if "action-metadata-present" in params:
action_meta_data_key = "action-metadata-present"
else:
if "action_metadata_present" in params:
self.logger.warning("Your parameter source uses the deprecated name [action_metadata_present]. Please change it to "
"[action-metadata-present].")
action_meta_data_key = "action_metadata_present"

if params[action_meta_data_key]:
if with_action_metadata:
if line_number % 2 == 1:
total_document_size_bytes += line_size
else:
Expand Down Expand Up @@ -566,27 +547,15 @@ def __init__(self):
self.es = None

def __call__(self, es, params):
# TODO: Remove items_per_page with Rally 1.0.
if "pages" in params and ("results-per-page" in params or "items_per_page" in params):
if "pages" in params and "results-per-page" in params:
return self.scroll_query(es, params)
else:
return self.request_body_query(es, params)

def request_body_query(self, es, params):
if "request-params" in params:
request_params = params["request-params"]
elif "request_params" in params:
# TODO: Remove with Rally 1.0.
self.logger.warning("Your parameter source uses the deprecated name [request_params]. Please change it to [request-params].")
request_params = params["request_params"]
else:
request_params = {}
request_params = params.get("request-params", {})
if "cache" in params:
request_params["request_cache"] = params["cache"]
elif "use_request_cache" in params:
# TODO: Remove with Rally 1.0.
self.logger.warning("Your parameter source uses the deprecated name [use_request_cache]. Please change it to [cache].")
request_params["request_cache"] = params["use_request_cache"]
r = es.search(
index=params.get("index", "_all"),
doc_type=params.get("type"),
Expand All @@ -602,37 +571,16 @@ def request_body_query(self, es, params):
}

def scroll_query(self, es, params):
if "request-params" in params:
request_params = params["request-params"]
elif "request_params" in params:
# TODO: Remove with Rally 1.0.
self.logger.warning("Your parameter source uses the deprecated name [request_params]. Please change it to [request-params].")
request_params = params["request_params"]
else:
request_params = {}
request_params = params.get("request-params", {})
cache = params.get("cache")
hits = 0
retrieved_pages = 0
timed_out = False
took = 0
self.es = es
# explicitly convert to int to provoke an error otherwise
total_pages = sys.maxsize if params["pages"] == "all" else int(params["pages"])
if "cache" in params:
cache = params["cache"]
elif "use_request_cache" in params:
# TODO: Remove with Rally 1.0.
self.logger.warning("Your parameter source uses the deprecated name [use_request_cache]. Please change it to [cache].")
cache = params["use_request_cache"]
else:
cache = None
if "results-per-page" in params:
size = params["results-per-page"]
elif "items_per_page" in params:
# TODO: Remove with Rally 1.0.
self.logger.warning("Your parameter source uses the deprecated name [items_per_page]. Please change it to [results-per-page].")
size = params["items_per_page"]
else:
size = None
size = params.get("results-per-page")

for page in range(total_pages):
if page == 0:
Expand Down
48 changes: 3 additions & 45 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import time
import math
import types
import inspect
import operator
from enum import Enum

from esrally import exceptions
from esrally.track import track
from esrally.utils import io, console
from esrally.utils import io

__PARAM_SOURCES_BY_OP = {}
__PARAM_SOURCES_BY_NAME = {}
Expand All @@ -28,31 +27,9 @@ def param_source_for_name(name, track, params):

# we'd rather use callable() but this will erroneously also classify a class as callable...
if isinstance(param_source, types.FunctionType):
# TODO: Remove me after some grace period
try:
s = inspect.signature(param_source, follow_wrapped=False)
except TypeError:
# follow_wrapped has been introduced in Python 3.5
s = inspect.signature(param_source)
if len(s.parameters) == 2 and s.parameters.get("indices"):
console.warn("Parameter source '%s' is using deprecated method signature (indices, params). Please change it "
"to (track, params, **kwargs). For details please see the migration guide in the docs." % name)
return LegacyDelegatingParamSource(track, params, param_source)
else:
return DelegatingParamSource(track, params, param_source)
return DelegatingParamSource(track, params, param_source)
else:
try:
s = inspect.signature(param_source.__init__, follow_wrapped=False)
except TypeError:
# follow_wrapped has been introduced in Python 3.5
s = inspect.signature(param_source)
# self, indices, params
if len(s.parameters) == 3 and s.parameters.get("indices"):
console.warn("Parameter source '%s' is using deprecated method signature (indices, params). Please change it "
"to (track, params, **kwargs). For details please see the migration guide in the docs." % name)
return param_source(track.indices, params)
else:
return param_source(track, params)
return param_source(track, params)


def register_param_source_for_operation(op_type, param_source_class):
Expand Down Expand Up @@ -136,15 +113,6 @@ def params(self):
return self.delegate(self.track, self._params, **self.kwargs)


class LegacyDelegatingParamSource(ParamSource):
def __init__(self, track, params, delegate, **kwargs):
super().__init__(track, params, **kwargs)
self.delegate = delegate

def params(self):
return self.delegate(self.track.indices, self._params)


class CreateIndexParamSource(ParamSource):
def __init__(self, track, params, **kwargs):
super().__init__(track, params, **kwargs)
Expand Down Expand Up @@ -244,7 +212,6 @@ def __init__(self, track, params, **kwargs):

self.template_definitions.append((template.name, body))
else:
# TODO: Should we allow to create multiple index templates at once?
try:
self.template_definitions.append((params["template"], params["body"]))
except KeyError:
Expand Down Expand Up @@ -273,7 +240,6 @@ def __init__(self, track, params, **kwargs):
if not filter_template or template.name == filter_template:
self.template_definitions.append((template.name, template.delete_matching_indices, template.pattern))
else:
# TODO: Should we allow to delete multiple templates at once?
try:
template = params["template"]
except KeyError:
Expand Down Expand Up @@ -353,11 +319,7 @@ def __init__(self, track, params, **kwargs):
"index": index_name,
"type": type_name,
"cache": request_cache,
# TODO: This is the old name, remove with Rally 1.0
"use_request_cache": request_cache,
"request-params": request_params,
# TODO: This is the old name, remove with Rally 1.0
"request_params": request_params,
"body": query_body
}

Expand All @@ -368,8 +330,6 @@ def __init__(self, track, params, **kwargs):
self.query_params["pages"] = pages
if results_per_page:
self.query_params["results-per-page"] = results_per_page
# TODO: This is the old name, remove with Rally 1.0
self.query_params["items_per_page"] = results_per_page

self.query_body_params = []
if query_body_params:
Expand Down Expand Up @@ -674,8 +634,6 @@ def bulk_generator(readers, client_index, pipeline, original_params):
# For our implementation it's always present. Either the original source file already contains this line or the generator
# has added it.
"action-metadata-present": True,
# TODO: This is the old name, remove with Rally 1.0
"action_metadata_present": True,
"body": bulk,
# This is not always equal to the bulk_size we get as parameter. The last bulk may be less than the bulk size.
"bulk-size": docs_in_bulk,
Expand Down
2 changes: 1 addition & 1 deletion tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def test_bulk_index_missing_params(self, es):

with self.assertRaises(exceptions.DataError) as ctx:
bulk(es, bulk_params)
self.assertEqual("Parameter source for operation 'bulk-index' did not provide the mandatory parameter 'action_metadata_present'. "
self.assertEqual("Parameter source for operation 'bulk-index' did not provide the mandatory parameter 'action-metadata-present'. "
"Please add it to your parameter source.", ctx.exception.args[0])

@mock.patch("elasticsearch.Elasticsearch")
Expand Down
18 changes: 2 additions & 16 deletions tests/track/params_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,6 @@ def test_generate_two_bulks(self):
self.assertEqual(2, len(all_bulks))
self.assertEqual({
"action-metadata-present": True,
"action_metadata_present": True,
"body": ["1", "2", "3", "4", "5"],
"bulk-id": "0-1",
"bulk-size": 5,
Expand All @@ -899,7 +898,6 @@ def test_generate_two_bulks(self):

self.assertEqual({
"action-metadata-present": True,
"action_metadata_present": True,
"body": ["6", "7", "8"],
"bulk-id": "0-2",
"bulk-size": 3,
Expand Down Expand Up @@ -947,7 +945,6 @@ def test_generate_bulks_from_multiple_corpora(self):
self.assertEqual(3, len(all_bulks))
self.assertEqual({
"action-metadata-present": True,
"action_metadata_present": True,
"body": ["1", "2", "3", "4", "5"],
"bulk-id": "0-1",
"bulk-size": 5,
Expand All @@ -959,7 +956,6 @@ def test_generate_bulks_from_multiple_corpora(self):

self.assertEqual({
"action-metadata-present": True,
"action_metadata_present": True,
"body": ["1", "2", "3", "4", "5"],
"bulk-id": "0-2",
"bulk-size": 5,
Expand All @@ -971,7 +967,6 @@ def test_generate_bulks_from_multiple_corpora(self):

self.assertEqual({
"action-metadata-present": True,
"action_metadata_present": True,
"body": ["1", "2", "3", "4", "5"],
"bulk-id": "0-3",
"bulk-size": 5,
Expand Down Expand Up @@ -1003,7 +998,6 @@ def test_internal_params_take_precedence(self):
# body must not contain 'foo'!
self.assertEqual({
"action-metadata-present": True,
"action_metadata_present": True,
"body": ["1", "2", "3"],
"bulk-id": "0-1",
"bulk-size": 3,
Expand Down Expand Up @@ -1461,7 +1455,7 @@ def test_passes_request_parameters(self):
})
p = source.params()

self.assertEqual(7, len(p))
self.assertEqual(5, len(p))
self.assertEqual("index1", p["index"])
self.assertIsNone(p["type"])
self.assertEqual({
Expand All @@ -1473,11 +1467,6 @@ def test_passes_request_parameters(self):
"match_all": {}
}
}, p["body"])
# backwards-compatibility options
self.assertFalse(p["use_request_cache"])
self.assertEqual({
"_source_include": "some_field"
}, p["request_params"])

def test_user_specified_overrides_defaults(self):
index1 = track.Index(name="index1", types=["type1"])
Expand All @@ -1493,7 +1482,7 @@ def test_user_specified_overrides_defaults(self):
})
p = source.params()

self.assertEqual(7, len(p))
self.assertEqual(5, len(p))
self.assertEqual("_all", p["index"])
self.assertEqual("type1", p["type"])
self.assertDictEqual({}, p["request-params"])
Expand All @@ -1503,9 +1492,6 @@ def test_user_specified_overrides_defaults(self):
"match_all": {}
}
}, p["body"])
# backwards-compatibility options
self.assertFalse(p["use_request_cache"])
self.assertDictEqual({}, p["request_params"])

def test_replaces_body_params(self):
import copy
Expand Down

0 comments on commit 54edbcc

Please sign in to comment.