Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to specify static responses #1234

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions docs/command_line_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ Rally recognizes the following client options in addition:

* ``max_connections``: By default, Rally will choose the maximum allowed number of connections automatically (equal to the number of simulated clients but at least 256 connections). With this property it is possible to override that logic but a minimum of 256 is enforced internally.
* ``enable_cleanup_closed`` (default: ``false``): In some cases, SSL connections might not be properly closed and the number of open connections increases as a result. When this client option is set to ``true``, the Elasticsearch client will check and forcefully close these connections.
* ``static_responses``: The path to a JSON file containing path patterns and the corresponding responses. When this value is set to ``true``, Rally will not send requests to Elasticsearch but return static responses as specified by the file. This is useful to diagnose performance issues in Rally itself. See below for a specific example.

**Examples**

Expand Down Expand Up @@ -682,6 +683,60 @@ Client certificates can be presented regardless of the ``verify_certs`` setting,
* Enable SSL, verify server certificates using private CA: ``--client-options="use_ssl:true,verify_certs:true,ca_certs:'/path/to/cacert.pem'"``
* Enable SSL, verify server certificates using private CA, present client certificates: ``--client-options="use_ssl:true,verify_certs:true,ca_certs:'/path/to/cacert.pem',client_cert:'/path/to/client_cert.pem',client_key:'/path/to/client_key.pem'"``

**Static Responses**

Define a JSON file containing a list of objects with the following properties:

* ``path``: A path or path pattern that should be matched. Only leading and trailing wildcards (``*``) are supported. A path containing only a wildcard acts matches any path.
* ``body``: The respective response body.
* ``body-encoding``: Either ``raw`` or ``json``. Use ``json`` by default and ``raw`` for the operation-type ``bulk`` and ``search``.

Here we define the necessary responses for a track that bulk-indexes data::

[
{
"path": "*/_bulk",
"body": {
"errors": false,
"took": 1
},
"body-encoding": "raw"
},
{
"path": "/_cluster/health*",
"body": {
"status": "green",
"relocating_shards": 0
},
"body-encoding": "json"
},
{
"path": "/_all/_stats/_all",
"body": {
"_all": {
"total": {
"merges": {
"current": 0
}
}
}
},
"body-encoding": "json"
},
{
"path": "*",
"body": {},
"body-encoding": "json"
}
]

.. note::
Paths are evaluated from top to bottom. Therefore, place more restrictive paths at the top of the file.

Save the above responses as ``responses.json`` and execute a benchmark as follows::

esrally race --track=geonames --challenge=append-no-conflicts-index-only --pipeline=benchmark-only --distribution-version=8.0.0 --client-options="static_responses:'responses.json'"
Copy link
Contributor

@dliappis dliappis Apr 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a few seconds to realize that while specifying a pipeline it won't really target any ES. One might even use the default pipeline, ES will get launched but won't be used.

Should we clarify with a note here or above in the client-options section that when static_responses:'file' is used, the use should also specify the benchmark-only pipeline and the distribution-version as Rally doesn't have a way to derive the version automatically as it'd normally do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I've added a note in 37c0900.


.. _command_line_reference_on_error:

``on-error``
Expand Down
165 changes: 158 additions & 7 deletions esrally/async_connection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,77 @@
import asyncio
import json
import logging
from typing import Optional, List

import aiohttp
import elasticsearch
from aiohttp import RequestInfo, BaseConnector
from aiohttp.client_proto import ResponseHandler
from aiohttp.helpers import BaseTimerContext
from multidict import CIMultiDictProxy, CIMultiDict
from yarl import URL

from esrally.utils import io


class StaticTransport:
def __init__(self):
self.closed = False

def is_closing(self):
return False

def close(self):
self.closed = True


class StaticConnector(BaseConnector):
async def _create_connection(self, req: "ClientRequest", traces: List["Trace"],
timeout: "ClientTimeout") -> ResponseHandler:
handler = ResponseHandler(self._loop)
handler.transport = StaticTransport()
handler.protocol = ""
return handler


class StaticRequest(aiohttp.ClientRequest):
RESPONSES = None

async def send(self, conn: "Connection") -> "ClientResponse":
self.response = self.response_class(
self.method,
self.original_url,
writer=self._writer,
continue100=self._continue,
timer=self._timer,
request_info=self.request_info,
traces=self._traces,
loop=self.loop,
session=self._session,
)
path = self.original_url.path
self.response.static_body = StaticRequest.RESPONSES.response(path)
return self.response


class StaticResponse(aiohttp.ClientResponse):
def __init__(self, method: str, url: URL, *, writer: "asyncio.Task[None]",
continue100: Optional["asyncio.Future[bool]"], timer: BaseTimerContext, request_info: RequestInfo,
traces: List["Trace"], loop: asyncio.AbstractEventLoop, session: "ClientSession") -> None:
super().__init__(method, url, writer=writer, continue100=continue100, timer=timer, request_info=request_info,
traces=traces, loop=loop, session=session)
self.static_body = None

async def start(self, connection: "Connection") -> "ClientResponse":
self._closed = False
self._protocol = connection.protocol
self._connection = connection
self._headers = CIMultiDictProxy(CIMultiDict())
self.status = 200
return self

async def text(self, encoding=None, errors="strict"):
return self.static_body


class RawClientResponse(aiohttp.ClientResponse):
Expand All @@ -16,6 +86,65 @@ async def text(self, encoding=None, errors="strict"):
return self._body


class ResponseMatcher:
def __init__(self, responses):
self.logger = logging.getLogger(__name__)
self.responses = []

for response in responses:
path = response["path"]
if path == "*":
matcher = ResponseMatcher.always()
elif path.startswith("*"):
matcher = ResponseMatcher.endswith(path[1:])
elif path.endswith("*"):
matcher = ResponseMatcher.startswith(path[:-1])
else:
matcher = ResponseMatcher.equals(path)

body = response["body"]
body_encoding = response.get("body-encoding", "json")
if body_encoding == "raw":
body = json.dumps(body).encode("utf-8")
elif body_encoding == "json":
body = json.dumps(body)
else:
raise ValueError(f"Unknown body encoding [{body_encoding}] for path [{path}]")

self.responses.append((path, matcher, body))

@staticmethod
def always():
# pylint: disable=unused-variable
def f(p):
return True
return f
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternatively we could replace the closures with more Pythonic (IMHO? :) ) lambdas e.g.

return lambda f: True

or for startswith:

def startswith(path_pattern):
    return lambda f: f.startswith(path_pattern)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented this with lambdas earlier but then got a PEP-8 violation warning. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOL whaaaaaat?


@staticmethod
def startswith(path_pattern):
def f(p):
return p.startswith(path_pattern)
return f

@staticmethod
def endswith(path_pattern):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thought, we could enhance it in the future and support simple shell like patterns like *_snapshot/frozen* easily with https://docs.python.org/3/library/fnmatch.html. This could be quite useful.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Before we enhance it, we should probably run microbenchmarks to get a better grasp of the overhead though because we're on the hot code path.

def f(p):
return p.endswith(path_pattern)
return f

@staticmethod
def equals(path_pattern):
def f(p):
return p == path_pattern
return f

def response(self, path):
for path_pattern, matcher, body in self.responses:
if matcher(path):
self.logger.debug("Path pattern [%s] matches path [%s].", path_pattern, path)
return body


class AIOHttpConnection(elasticsearch.AIOHttpConnection):
def __init__(self,
host="localhost",
Expand Down Expand Up @@ -52,20 +181,42 @@ def __init__(self,
self._trace_configs = [trace_config] if trace_config else None
self._enable_cleanup_closed = kwargs.get("enable_cleanup_closed", False)

static_responses = kwargs.get("static_responses")
self.use_static_responses = static_responses is not None

if self.use_static_responses:
# read static responses once and reuse them
if not StaticRequest.RESPONSES:
with open(io.normalize_path(static_responses)) as f:
StaticRequest.RESPONSES = ResponseMatcher(json.load(f))

self._request_class = StaticRequest
self._response_class = StaticResponse
else:
self._request_class = aiohttp.ClientRequest
self._response_class = RawClientResponse

async def _create_aiohttp_session(self):
if self.loop is None:
self.loop = asyncio.get_running_loop()

if self.use_static_responses:
connector = StaticConnector(limit=self._limit, enable_cleanup_closed=self._enable_cleanup_closed)
else:
connector = aiohttp.TCPConnector(
limit=self._limit,
use_dns_cache=True,
ssl_context=self._ssl_context,
enable_cleanup_closed=self._enable_cleanup_closed
)

self.session = aiohttp.ClientSession(
headers=self.headers,
auto_decompress=True,
loop=self.loop,
cookie_jar=aiohttp.DummyCookieJar(),
response_class=RawClientResponse,
connector=aiohttp.TCPConnector(
limit=self._limit,
use_dns_cache=True,
ssl_context=self._ssl_context,
enable_cleanup_closed=self._enable_cleanup_closed
),
request_class=self._request_class,
response_class=self._response_class,
connector=connector,
trace_configs=self._trace_configs,
)
67 changes: 40 additions & 27 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,38 +399,39 @@ def create_es_clients(self):
es[cluster_name] = self.es_client_factory(cluster_hosts, cluster_client_options).create()
return es

def prepare_telemetry(self, es):
def prepare_telemetry(self, es, enable):
enabled_devices = self.config.opts("telemetry", "devices")
telemetry_params = self.config.opts("telemetry", "params")
log_root = paths.race_root(self.config)

es_default = es["default"]
self.telemetry = telemetry.Telemetry(enabled_devices, devices=[
telemetry.NodeStats(telemetry_params, es, self.metrics_store),
telemetry.ExternalEnvironmentInfo(es_default, self.metrics_store),
telemetry.ClusterEnvironmentInfo(es_default, self.metrics_store),
telemetry.JvmStatsSummary(es_default, self.metrics_store),
telemetry.IndexStats(es_default, self.metrics_store),
telemetry.MlBucketProcessingTime(es_default, self.metrics_store),
telemetry.SegmentStats(log_root, es_default),
telemetry.CcrStats(telemetry_params, es, self.metrics_store),
telemetry.RecoveryStats(telemetry_params, es, self.metrics_store),
telemetry.TransformStats(telemetry_params, es, self.metrics_store),
telemetry.SearchableSnapshotsStats(telemetry_params, es, self.metrics_store)
])

if enable:
devices = [
telemetry.NodeStats(telemetry_params, es, self.metrics_store),
telemetry.ExternalEnvironmentInfo(es_default, self.metrics_store),
telemetry.ClusterEnvironmentInfo(es_default, self.metrics_store),
telemetry.JvmStatsSummary(es_default, self.metrics_store),
telemetry.IndexStats(es_default, self.metrics_store),
telemetry.MlBucketProcessingTime(es_default, self.metrics_store),
telemetry.SegmentStats(log_root, es_default),
telemetry.CcrStats(telemetry_params, es, self.metrics_store),
telemetry.RecoveryStats(telemetry_params, es, self.metrics_store),
telemetry.TransformStats(telemetry_params, es, self.metrics_store),
telemetry.SearchableSnapshotsStats(telemetry_params, es, self.metrics_store)
]
else:
devices = []
self.telemetry = telemetry.Telemetry(enabled_devices, devices=devices)

def wait_for_rest_api(self, es):
skip_rest_api_check = self.config.opts("mechanic", "skip.rest.api.check")
if skip_rest_api_check:
self.logger.info("Skipping REST API check.")
es_default = es["default"]
self.logger.info("Checking if REST API is available.")
if client.wait_for_rest_layer(es_default, max_attempts=40):
self.logger.info("REST API is available.")
else:
es_default = es["default"]
self.logger.info("Checking if REST API is available.")
if client.wait_for_rest_layer(es_default, max_attempts=40):
self.logger.info("REST API is available.")
else:
self.logger.error("REST API layer is not yet available. Stopping benchmark.")
raise exceptions.SystemSetupError("Elasticsearch REST API layer is not available.")
self.logger.error("REST API layer is not yet available. Stopping benchmark.")
raise exceptions.SystemSetupError("Elasticsearch REST API layer is not available.")

def retrieve_cluster_info(self, es):
try:
Expand All @@ -455,9 +456,21 @@ def prepare_benchmark(self, t):
self.challenge.meta_data)

es_clients = self.create_es_clients()
self.wait_for_rest_api(es_clients)
self.prepare_telemetry(es_clients)
self.target.on_cluster_details_retrieved(self.retrieve_cluster_info(es_clients))

skip_rest_api_check = self.config.opts("mechanic", "skip.rest.api.check")
uses_static_responses = self.config.opts("client", "options").uses_static_responses
if skip_rest_api_check:
self.logger.info("Skipping REST API check as requested explicitly.")
elif uses_static_responses:
self.logger.info("Skipping REST API check as static responses are used.")
else:
self.wait_for_rest_api(es_clients)
self.target.on_cluster_details_retrieved(self.retrieve_cluster_info(es_clients))

# Avoid issuing any requests to the target cluster when static responses are enabled. The results
# are not useful and attempts to connect to a non-existing cluster just lead to exception traces in logs.
self.prepare_telemetry(es_clients, enable=not uses_static_responses)

for host in self.config.opts("driver", "load_driver_hosts"):
host_config = {
# for simplicity we assume that all benchmark machines have the same specs
Expand Down
2 changes: 1 addition & 1 deletion esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ def add_track_source(subparser):
help=argparse.SUPPRESS,
type=lambda s: datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S"),
default=None)
# skips checking that the REST API is available before proceeding with the benchmark
# Skips checking that the REST API is available before proceeding with the benchmark
race_parser.add_argument(
"--skip-rest-api-check",
help=argparse.SUPPRESS,
Expand Down
4 changes: 4 additions & 0 deletions esrally/utils/opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ def all_client_options(self):
"""Return a dict with all client options"""
return self.all_options

@property
def uses_static_responses(self):
return self.default.get("static_responses", False)

def with_max_connections(self, max_connections):
final_client_options = {}
for cluster, original_opts in self.all_client_options.items():
Expand Down
1 change: 1 addition & 0 deletions tests/driver/driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class Holder:
def __init__(self, all_hosts=None, all_client_options=None):
self.all_hosts = all_hosts
self.all_client_options = all_client_options
self.uses_static_responses = False

def __init__(self, methodName='runTest'):
super().__init__(methodName)
Expand Down
Loading