diff --git a/docs/command_line_reference.rst b/docs/command_line_reference.rst index 0494349ff..5e9923570 100644 --- a/docs/command_line_reference.rst +++ b/docs/command_line_reference.rst @@ -655,6 +655,7 @@ Rally recognizes the following client options in addition: * ``max_connections``: By default, Rally will choose the maximum allowed number of connections automatically (equal to the number of simulated clients but at least 256 connections). With this property it is possible to override that logic but a minimum of 256 is enforced internally. * ``enable_cleanup_closed`` (default: ``false``): In some cases, SSL connections might not be properly closed and the number of open connections increases as a result. When this client option is set to ``true``, the Elasticsearch client will check and forcefully close these connections. +* ``static_responses``: The path to a JSON file containing path patterns and the corresponding responses. When this value is set to ``true``, Rally will not send requests to Elasticsearch but return static responses as specified by the file. This is useful to diagnose performance issues in Rally itself. See below for a specific example. **Examples** @@ -682,6 +683,63 @@ Client certificates can be presented regardless of the ``verify_certs`` setting, * Enable SSL, verify server certificates using private CA: ``--client-options="use_ssl:true,verify_certs:true,ca_certs:'/path/to/cacert.pem'"`` * Enable SSL, verify server certificates using private CA, present client certificates: ``--client-options="use_ssl:true,verify_certs:true,ca_certs:'/path/to/cacert.pem',client_cert:'/path/to/client_cert.pem',client_key:'/path/to/client_key.pem'"`` +**Static Responses** + +Define a JSON file containing a list of objects with the following properties: + +* ``path``: A path or path pattern that should be matched. Only leading and trailing wildcards (``*``) are supported. A path containing only a wildcard acts matches any path. +* ``body``: The respective response body. +* ``body-encoding``: Either ``raw`` or ``json``. Use ``json`` by default and ``raw`` for the operation-type ``bulk`` and ``search``. + +Here we define the necessary responses for a track that bulk-indexes data:: + + [ + { + "path": "*/_bulk", + "body": { + "errors": false, + "took": 1 + }, + "body-encoding": "raw" + }, + { + "path": "/_cluster/health*", + "body": { + "status": "green", + "relocating_shards": 0 + }, + "body-encoding": "json" + }, + { + "path": "/_all/_stats/_all", + "body": { + "_all": { + "total": { + "merges": { + "current": 0 + } + } + } + }, + "body-encoding": "json" + }, + { + "path": "*", + "body": {}, + "body-encoding": "json" + } + ] + +.. note:: + Paths are evaluated from top to bottom. Therefore, place more restrictive paths at the top of the file. + +Save the above responses as ``responses.json`` and execute a benchmark as follows:: + + esrally race --track=geonames --challenge=append-no-conflicts-index-only --pipeline=benchmark-only --distribution-version=8.0.0 --client-options="static_responses:'responses.json'" + +.. note:: + Use ``--pipeline=benchmark-only`` as Rally should not start any cluster when static responses are used. + .. _command_line_reference_on_error: ``on-error`` diff --git a/esrally/async_connection.py b/esrally/async_connection.py index b97579837..3609dce99 100644 --- a/esrally/async_connection.py +++ b/esrally/async_connection.py @@ -1,7 +1,77 @@ import asyncio +import json +import logging +from typing import Optional, List import aiohttp import elasticsearch +from aiohttp import RequestInfo, BaseConnector +from aiohttp.client_proto import ResponseHandler +from aiohttp.helpers import BaseTimerContext +from multidict import CIMultiDictProxy, CIMultiDict +from yarl import URL + +from esrally.utils import io + + +class StaticTransport: + def __init__(self): + self.closed = False + + def is_closing(self): + return False + + def close(self): + self.closed = True + + +class StaticConnector(BaseConnector): + async def _create_connection(self, req: "ClientRequest", traces: List["Trace"], + timeout: "ClientTimeout") -> ResponseHandler: + handler = ResponseHandler(self._loop) + handler.transport = StaticTransport() + handler.protocol = "" + return handler + + +class StaticRequest(aiohttp.ClientRequest): + RESPONSES = None + + async def send(self, conn: "Connection") -> "ClientResponse": + self.response = self.response_class( + self.method, + self.original_url, + writer=self._writer, + continue100=self._continue, + timer=self._timer, + request_info=self.request_info, + traces=self._traces, + loop=self.loop, + session=self._session, + ) + path = self.original_url.path + self.response.static_body = StaticRequest.RESPONSES.response(path) + return self.response + + +class StaticResponse(aiohttp.ClientResponse): + def __init__(self, method: str, url: URL, *, writer: "asyncio.Task[None]", + continue100: Optional["asyncio.Future[bool]"], timer: BaseTimerContext, request_info: RequestInfo, + traces: List["Trace"], loop: asyncio.AbstractEventLoop, session: "ClientSession") -> None: + super().__init__(method, url, writer=writer, continue100=continue100, timer=timer, request_info=request_info, + traces=traces, loop=loop, session=session) + self.static_body = None + + async def start(self, connection: "Connection") -> "ClientResponse": + self._closed = False + self._protocol = connection.protocol + self._connection = connection + self._headers = CIMultiDictProxy(CIMultiDict()) + self.status = 200 + return self + + async def text(self, encoding=None, errors="strict"): + return self.static_body class RawClientResponse(aiohttp.ClientResponse): @@ -16,6 +86,65 @@ async def text(self, encoding=None, errors="strict"): return self._body +class ResponseMatcher: + def __init__(self, responses): + self.logger = logging.getLogger(__name__) + self.responses = [] + + for response in responses: + path = response["path"] + if path == "*": + matcher = ResponseMatcher.always() + elif path.startswith("*"): + matcher = ResponseMatcher.endswith(path[1:]) + elif path.endswith("*"): + matcher = ResponseMatcher.startswith(path[:-1]) + else: + matcher = ResponseMatcher.equals(path) + + body = response["body"] + body_encoding = response.get("body-encoding", "json") + if body_encoding == "raw": + body = json.dumps(body).encode("utf-8") + elif body_encoding == "json": + body = json.dumps(body) + else: + raise ValueError(f"Unknown body encoding [{body_encoding}] for path [{path}]") + + self.responses.append((path, matcher, body)) + + @staticmethod + def always(): + # pylint: disable=unused-variable + def f(p): + return True + return f + + @staticmethod + def startswith(path_pattern): + def f(p): + return p.startswith(path_pattern) + return f + + @staticmethod + def endswith(path_pattern): + def f(p): + return p.endswith(path_pattern) + return f + + @staticmethod + def equals(path_pattern): + def f(p): + return p == path_pattern + return f + + def response(self, path): + for path_pattern, matcher, body in self.responses: + if matcher(path): + self.logger.debug("Path pattern [%s] matches path [%s].", path_pattern, path) + return body + + class AIOHttpConnection(elasticsearch.AIOHttpConnection): def __init__(self, host="localhost", @@ -52,20 +181,42 @@ def __init__(self, self._trace_configs = [trace_config] if trace_config else None self._enable_cleanup_closed = kwargs.get("enable_cleanup_closed", False) + static_responses = kwargs.get("static_responses") + self.use_static_responses = static_responses is not None + + if self.use_static_responses: + # read static responses once and reuse them + if not StaticRequest.RESPONSES: + with open(io.normalize_path(static_responses)) as f: + StaticRequest.RESPONSES = ResponseMatcher(json.load(f)) + + self._request_class = StaticRequest + self._response_class = StaticResponse + else: + self._request_class = aiohttp.ClientRequest + self._response_class = RawClientResponse + async def _create_aiohttp_session(self): if self.loop is None: self.loop = asyncio.get_running_loop() + + if self.use_static_responses: + connector = StaticConnector(limit=self._limit, enable_cleanup_closed=self._enable_cleanup_closed) + else: + connector = aiohttp.TCPConnector( + limit=self._limit, + use_dns_cache=True, + ssl_context=self._ssl_context, + enable_cleanup_closed=self._enable_cleanup_closed + ) + self.session = aiohttp.ClientSession( headers=self.headers, auto_decompress=True, loop=self.loop, cookie_jar=aiohttp.DummyCookieJar(), - response_class=RawClientResponse, - connector=aiohttp.TCPConnector( - limit=self._limit, - use_dns_cache=True, - ssl_context=self._ssl_context, - enable_cleanup_closed=self._enable_cleanup_closed - ), + request_class=self._request_class, + response_class=self._response_class, + connector=connector, trace_configs=self._trace_configs, ) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index db5e470ae..5b6f1dc3d 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -399,38 +399,39 @@ def create_es_clients(self): es[cluster_name] = self.es_client_factory(cluster_hosts, cluster_client_options).create() return es - def prepare_telemetry(self, es): + def prepare_telemetry(self, es, enable): enabled_devices = self.config.opts("telemetry", "devices") telemetry_params = self.config.opts("telemetry", "params") log_root = paths.race_root(self.config) es_default = es["default"] - self.telemetry = telemetry.Telemetry(enabled_devices, devices=[ - telemetry.NodeStats(telemetry_params, es, self.metrics_store), - telemetry.ExternalEnvironmentInfo(es_default, self.metrics_store), - telemetry.ClusterEnvironmentInfo(es_default, self.metrics_store), - telemetry.JvmStatsSummary(es_default, self.metrics_store), - telemetry.IndexStats(es_default, self.metrics_store), - telemetry.MlBucketProcessingTime(es_default, self.metrics_store), - telemetry.SegmentStats(log_root, es_default), - telemetry.CcrStats(telemetry_params, es, self.metrics_store), - telemetry.RecoveryStats(telemetry_params, es, self.metrics_store), - telemetry.TransformStats(telemetry_params, es, self.metrics_store), - telemetry.SearchableSnapshotsStats(telemetry_params, es, self.metrics_store) - ]) + + if enable: + devices = [ + telemetry.NodeStats(telemetry_params, es, self.metrics_store), + telemetry.ExternalEnvironmentInfo(es_default, self.metrics_store), + telemetry.ClusterEnvironmentInfo(es_default, self.metrics_store), + telemetry.JvmStatsSummary(es_default, self.metrics_store), + telemetry.IndexStats(es_default, self.metrics_store), + telemetry.MlBucketProcessingTime(es_default, self.metrics_store), + telemetry.SegmentStats(log_root, es_default), + telemetry.CcrStats(telemetry_params, es, self.metrics_store), + telemetry.RecoveryStats(telemetry_params, es, self.metrics_store), + telemetry.TransformStats(telemetry_params, es, self.metrics_store), + telemetry.SearchableSnapshotsStats(telemetry_params, es, self.metrics_store) + ] + else: + devices = [] + self.telemetry = telemetry.Telemetry(enabled_devices, devices=devices) def wait_for_rest_api(self, es): - skip_rest_api_check = self.config.opts("mechanic", "skip.rest.api.check") - if skip_rest_api_check: - self.logger.info("Skipping REST API check.") + es_default = es["default"] + self.logger.info("Checking if REST API is available.") + if client.wait_for_rest_layer(es_default, max_attempts=40): + self.logger.info("REST API is available.") else: - es_default = es["default"] - self.logger.info("Checking if REST API is available.") - if client.wait_for_rest_layer(es_default, max_attempts=40): - self.logger.info("REST API is available.") - else: - self.logger.error("REST API layer is not yet available. Stopping benchmark.") - raise exceptions.SystemSetupError("Elasticsearch REST API layer is not available.") + self.logger.error("REST API layer is not yet available. Stopping benchmark.") + raise exceptions.SystemSetupError("Elasticsearch REST API layer is not available.") def retrieve_cluster_info(self, es): try: @@ -455,9 +456,21 @@ def prepare_benchmark(self, t): self.challenge.meta_data) es_clients = self.create_es_clients() - self.wait_for_rest_api(es_clients) - self.prepare_telemetry(es_clients) - self.target.on_cluster_details_retrieved(self.retrieve_cluster_info(es_clients)) + + skip_rest_api_check = self.config.opts("mechanic", "skip.rest.api.check") + uses_static_responses = self.config.opts("client", "options").uses_static_responses + if skip_rest_api_check: + self.logger.info("Skipping REST API check as requested explicitly.") + elif uses_static_responses: + self.logger.info("Skipping REST API check as static responses are used.") + else: + self.wait_for_rest_api(es_clients) + self.target.on_cluster_details_retrieved(self.retrieve_cluster_info(es_clients)) + + # Avoid issuing any requests to the target cluster when static responses are enabled. The results + # are not useful and attempts to connect to a non-existing cluster just lead to exception traces in logs. + self.prepare_telemetry(es_clients, enable=not uses_static_responses) + for host in self.config.opts("driver", "load_driver_hosts"): host_config = { # for simplicity we assume that all benchmark machines have the same specs diff --git a/esrally/rally.py b/esrally/rally.py index 2cb8232a8..7577d6cb3 100644 --- a/esrally/rally.py +++ b/esrally/rally.py @@ -534,7 +534,7 @@ def add_track_source(subparser): help=argparse.SUPPRESS, type=lambda s: datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S"), default=None) - # skips checking that the REST API is available before proceeding with the benchmark + # Skips checking that the REST API is available before proceeding with the benchmark race_parser.add_argument( "--skip-rest-api-check", help=argparse.SUPPRESS, diff --git a/esrally/utils/opts.py b/esrally/utils/opts.py index 65782ac57..a24bc1fbf 100644 --- a/esrally/utils/opts.py +++ b/esrally/utils/opts.py @@ -203,6 +203,10 @@ def all_client_options(self): """Return a dict with all client options""" return self.all_options + @property + def uses_static_responses(self): + return self.default.get("static_responses", False) + def with_max_connections(self, max_connections): final_client_options = {} for cluster, original_opts in self.all_client_options.items(): diff --git a/tests/driver/driver_test.py b/tests/driver/driver_test.py index c90f601c2..7d6ca42fd 100644 --- a/tests/driver/driver_test.py +++ b/tests/driver/driver_test.py @@ -63,6 +63,7 @@ class Holder: def __init__(self, all_hosts=None, all_client_options=None): self.all_hosts = all_hosts self.all_client_options = all_client_options + self.uses_static_responses = False def __init__(self, methodName='runTest'): super().__init__(methodName) diff --git a/tests/test_async_connection.py b/tests/test_async_connection.py new file mode 100644 index 000000000..13fec9a58 --- /dev/null +++ b/tests/test_async_connection.py @@ -0,0 +1,55 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +from unittest import TestCase + +from esrally.async_connection import ResponseMatcher + + +class ResponseMatcherTests(TestCase): + def test_matches(self): + matcher = ResponseMatcher(responses=[ + { + "path": "*/_bulk", + "body": { + "response-type": "bulk", + } + }, + { + "path": "/_cluster/health*", + "body": { + "response-type": "cluster-health", + } + }, + { + "path": "*", + "body": { + "response-type": "default" + } + } + ]) + + self.assert_response_type(matcher, "/_cluster/health", "cluster-health") + self.assert_response_type(matcher, "/_cluster/health/geonames", "cluster-health") + self.assert_response_type(matcher, "/geonames/_bulk", "bulk") + self.assert_response_type(matcher, "/geonames", "default") + self.assert_response_type(matcher, "/geonames/force_merge", "default") + + def assert_response_type(self, matcher, path, expected_response_type): + response = json.loads(matcher.response(path)) + self.assertEqual(response["response-type"], expected_response_type)