Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an asyncio-based load generator #916

Merged
merged 33 commits into from
Mar 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a7e8ec2
Add async load generator (WIP)
danielmitterdorfer Feb 5, 2020
25ab4d7
Enable async driver profiling
danielmitterdorfer Feb 6, 2020
936f232
Add uvloop support
danielmitterdorfer Feb 6, 2020
5fc0042
Fine-tune profile log format
danielmitterdorfer Feb 6, 2020
a7780d3
Use uvloop only if async
danielmitterdorfer Feb 6, 2020
7077bb3
Merge remote-tracking branch 'origin/master' into async
danielmitterdorfer Feb 17, 2020
7a4e5e5
Implement readlines for StaticSource
danielmitterdorfer Feb 17, 2020
fe3f7ab
Remove uvloop
danielmitterdorfer Feb 17, 2020
dcba70d
Run sub-tasks in parallel
danielmitterdorfer Feb 17, 2020
fd7a3d9
Require at least Python 3.6
danielmitterdorfer Feb 17, 2020
596f242
Merge remote-tracking branch 'origin/master' into drop-py-35
danielmitterdorfer Feb 18, 2020
fd359a9
Merge remote-tracking branch 'origin/master' into async
danielmitterdorfer Feb 18, 2020
ec32cf7
Merge branch 'drop-py-35' into async
danielmitterdorfer Feb 18, 2020
7f90c72
Implement async load generator
danielmitterdorfer Feb 20, 2020
139d143
Merge remote-tracking branch 'origin/master' into async
danielmitterdorfer Feb 20, 2020
65925e7
Support completion of parallel task structures in async mode
danielmitterdorfer Feb 20, 2020
440c5a3
Expose meta-data that async-runner is required
danielmitterdorfer Feb 21, 2020
7568fae
async fallback for queries
danielmitterdorfer Feb 21, 2020
d1023c6
Properly integrate async driver with racecontrol
danielmitterdorfer Feb 21, 2020
6fd6bf0
Merge remote-tracking branch 'origin/master' into async
danielmitterdorfer Feb 21, 2020
0c9ddd9
Temporarily use elasticsearch-py-async master
danielmitterdorfer Feb 21, 2020
3cc3861
Properly shutdown async components in adapter layer
danielmitterdorfer Feb 24, 2020
4e2c951
Expose kwargs in runner registry
danielmitterdorfer Feb 24, 2020
78a4fc2
Add docs
danielmitterdorfer Feb 24, 2020
a7da504
Simplifications and more tests
danielmitterdorfer Feb 24, 2020
a0bca34
Properly close transport
danielmitterdorfer Feb 24, 2020
135b685
Improve error handling
danielmitterdorfer Feb 25, 2020
419d0d6
More cleanups
danielmitterdorfer Feb 25, 2020
513ccc8
Merge remote-tracking branch 'origin/master' into async
danielmitterdorfer Feb 25, 2020
8a666d1
Merge remote-tracking branch 'origin/master' into async
danielmitterdorfer Mar 3, 2020
92bee31
Special case handling for connection timeouts
danielmitterdorfer Mar 4, 2020
d0d5041
Merge remote-tracking branch 'origin/master' into async
danielmitterdorfer Mar 8, 2020
25fe49a
Make new test async
danielmitterdorfer Mar 8, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions create-notice.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,25 @@ function main {
printf "The source code can be obtained at https://github.com/certifi/python-certifi\n" >> "${OUTPUT_FILE}"
add_license "certifi" "https://raw.githubusercontent.com/certifi/python-certifi/master/LICENSE"
add_license "elasticsearch" "https://raw.githubusercontent.com/elastic/elasticsearch-py/master/LICENSE"
add_license "elasticsearch-async" "https://raw.githubusercontent.com/elastic/elasticsearch-py-async/master/LICENSE"
add_license "jinja2" "https://raw.githubusercontent.com/pallets/jinja/master/LICENSE.rst"
add_license "jsonschema" "https://raw.githubusercontent.com/Julian/jsonschema/master/COPYING"
add_license "psutil" "https://raw.githubusercontent.com/giampaolo/psutil/master/LICENSE"
add_license "py-cpuinfo" "https://raw.githubusercontent.com/workhorsy/py-cpuinfo/master/LICENSE"
add_license "tabulate" "https://bitbucket.org/astanin/python-tabulate/raw/03182bf9b8a2becbc54d17aa7e3e7dfed072c5f5/LICENSE"
add_license "thespian" "https://raw.githubusercontent.com/kquick/Thespian/master/LICENSE.txt"
add_license "boto3" "https://raw.githubusercontent.com/boto/boto3/develop/LICENSE"
add_license "yappi" "https://raw.githubusercontent.com/sumerc/yappi/master/LICENSE"

# transitive dependencies
# Jinja2 -> Markupsafe
add_license "Markupsafe" "https://raw.githubusercontent.com/pallets/markupsafe/master/LICENSE.rst"
# elasticsearch -> urllib3
add_license "urllib3" "https://raw.githubusercontent.com/shazow/urllib3/master/LICENSE.txt"
#elasticsearch_async -> aiohttp
add_license "aiohttp" "https://raw.githubusercontent.com/aio-libs/aiohttp/master/LICENSE.txt"
#elasticsearch_async -> async_timeout
add_license "async_timeout" "https://raw.githubusercontent.com/aio-libs/async-timeout/master/LICENSE"
# boto3 -> s3transfer
add_license "s3transfer" "https://raw.githubusercontent.com/boto/s3transfer/develop/LICENSE.txt"
# boto3 -> jmespath
Expand Down
46 changes: 30 additions & 16 deletions docs/adding_tracks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -881,17 +881,15 @@ In ``track.json`` set the ``operation-type`` to "percolate" (you can choose this

Then create a file ``track.py`` next to ``track.json`` and implement the following two functions::

def percolate(es, params):
es.percolate(
index="queries",
doc_type="content",
body=params["body"]
)

async def percolate(es, params):
await es.percolate(
index="queries",
doc_type="content",
body=params["body"]
)

def register(registry):
registry.register_runner("percolate", percolate)

registry.register_runner("percolate", percolate, async_runner=True)

The function ``percolate`` is the actual runner and takes the following parameters:

Expand All @@ -906,11 +904,25 @@ This function can return:

Similar to a parameter source you also need to bind the name of your operation type to the function within ``register``.

To illustrate how to use custom return values, suppose we want to implement a custom runner that calls the `pending tasks API <https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-pending.html>`_ and returns the number of pending tasks as additional meta-data::

async def pending_tasks(es, params):
response = await es.cluster.pending_tasks()
return {
"weight": 1,
"unit": "ops",
"pending-tasks-count": len(response["tasks"])
}

def register(registry):
registry.register_runner("pending-tasks", pending_tasks, async_runner=True)


If you need more control, you can also implement a runner class. The example above, implemented as a class looks as follows::

class PercolateRunner:
def __call__(self, es, params):
es.percolate(
async def __call__(self, es, params):
await es.percolate(
index="queries",
doc_type="content",
body=params["body"]
Expand All @@ -920,10 +932,12 @@ If you need more control, you can also implement a runner class. The example abo
return "percolate"

def register(registry):
registry.register_runner("percolate", PercolateRunner())
registry.register_runner("percolate", PercolateRunner(), async_runner=True)


The actual runner is implemented in the method ``__call__`` and the same return value conventions apply as for functions. For debugging purposes you should also implement ``__repr__`` and provide a human-readable name for your runner. Finally, you need to register your runner in the ``register`` function.

The actual runner is implemented in the method ``__call__`` and the same return value conventions apply as for functions. For debugging purposes you should also implement ``__repr__`` and provide a human-readable name for your runner. Finally, you need to register your runner in the ``register`` function. Runners also support Python's `context manager <https://docs.python.org/3/library/stdtypes.html#typecontextmanager>`_ interface. Rally uses a new context for each request. Implementing the context manager interface can be handy for cleanup of resources after executing an operation. Rally uses it, for example, to clear open scrolls.
Runners also support Python's `asynchronous context manager <https://docs.python.org/3/reference/datamodel.html#async-context-managers>`_ interface. Rally uses a new context for each request. Implementing the asynchronous context manager interface can be handy for cleanup of resources after executing an operation. Rally uses it, for example, to clear open scrolls.

If you have specified multiple Elasticsearch clusters using :ref:`target-hosts <command_line_reference_advanced_topics>` you can make Rally pass a dictionary of client connections instead of one for the ``default`` cluster in the ``es`` parameter.

Expand All @@ -938,14 +952,14 @@ Example (assuming Rally has been invoked specifying ``default`` and ``remote`` i
class CreateIndexInRemoteCluster:
multi_cluster = True

def __call__(self, es, params):
es['remote'].indices.create(index='remote-index')
async def __call__(self, es, params):
await es["remote"].indices.create(index="remote-index")

def __repr__(self, *args, **kwargs):
return "create-index-in-remote-cluster"

def register(registry):
registry.register_runner("create-index-in-remote-cluster", CreateIndexInRemoteCluster())
registry.register_runner("create-index-in-remote-cluster", CreateIndexInRemoteCluster(), async_runner=True)


.. note::
Expand Down
41 changes: 41 additions & 0 deletions docs/migrate.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,47 @@
Migration Guide
===============

Migrating to Rally 1.5.0
------------------------

Runner API uses asyncio
^^^^^^^^^^^^^^^^^^^^^^^

In order to support more concurrent clients in the future, Rally is moving from a synchronous model to an asynchronous model internally. With Rally 1.5.0 all custom runners need to be implemented using async APIs and a new bool argument ``async_runner=True`` needs to be provided upon registration. Below is an example how to migrate a custom runner function.

A custom runner prior to Rally 1.5.0::

def percolate(es, params):
es.percolate(
index="queries",
doc_type="content",
body=params["body"]
)

def register(registry):
registry.register_runner("percolate", percolate)

With Rally 1.5.0, the implementation changes as follows::

async def percolate(es, params):
await es.percolate(
index="queries",
doc_type="content",
body=params["body"]
)

def register(registry):
registry.register_runner("percolate", percolate, async_runner=True)

Apply to the following changes for each custom runner:

* Prefix the function signature with ``async``.
* Add an ``await`` keyword before each Elasticsearch API call.
* Add ``async_runner=True`` as the last argument to the ``register_runner`` function.

For more details please refer to the updated documentation on :ref:`custom runners <adding_tracks_custom_runners>`.


Migrating to Rally 1.4.1
------------------------

Expand Down
131 changes: 131 additions & 0 deletions esrally/async_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import asyncio
import ssl
import warnings

import aiohttp
from aiohttp.client_exceptions import ServerFingerprintMismatch
import async_timeout

from elasticsearch.exceptions import ConnectionError, ConnectionTimeout, ImproperlyConfigured, SSLError
from elasticsearch.connection import Connection
from elasticsearch.compat import urlencode
from elasticsearch.connection.http_urllib3 import create_ssl_context


# This is only needed because https://github.com/elastic/elasticsearch-py-async/pull/68 is not merged yet
# In addition we have raised the connection limit in TCPConnector from 100 to 10000.

# We want to keep the diff as small as possible thus suppressing pylint warnings that we would not allow in Rally
# pylint: disable=W0706
class AIOHttpConnection(Connection):
def __init__(self, host='localhost', port=9200, http_auth=None,
use_ssl=False, verify_certs=False, ca_certs=None, client_cert=None,
client_key=None, loop=None, use_dns_cache=True, headers=None,
ssl_context=None, **kwargs):
super().__init__(host=host, port=port, **kwargs)

self.loop = asyncio.get_event_loop() if loop is None else loop

if http_auth is not None:
if isinstance(http_auth, str):
http_auth = tuple(http_auth.split(':', 1))

if isinstance(http_auth, (tuple, list)):
http_auth = aiohttp.BasicAuth(*http_auth)

headers = headers or {}
headers.setdefault('content-type', 'application/json')

# if providing an SSL context, raise error if any other SSL related flag is used
if ssl_context and (verify_certs or ca_certs):
raise ImproperlyConfigured("When using `ssl_context`, `use_ssl`, `verify_certs`, `ca_certs` are not permitted")

if use_ssl or ssl_context:
cafile = ca_certs
if not cafile and not ssl_context and verify_certs:
# If no ca_certs and no sslcontext passed and asking to verify certs
# raise error
raise ImproperlyConfigured("Root certificates are missing for certificate "
"validation. Either pass them in using the ca_certs parameter or "
"install certifi to use it automatically.")
if verify_certs or ca_certs:
warnings.warn('Use of `verify_certs`, `ca_certs` have been deprecated in favor of using SSLContext`', DeprecationWarning)

if not ssl_context:
# if SSLContext hasn't been passed in, create one.
# need to skip if sslContext isn't avail
try:
ssl_context = create_ssl_context(cafile=cafile)
except AttributeError:
ssl_context = None

if not verify_certs and ssl_context is not None:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
warnings.warn(
'Connecting to %s using SSL with verify_certs=False is insecure.' % host)
if ssl_context:
verify_certs = True
use_ssl = True

self.session = aiohttp.ClientSession(
auth=http_auth,
timeout=self.timeout,
connector=aiohttp.TCPConnector(
loop=self.loop,
verify_ssl=verify_certs,
use_dns_cache=use_dns_cache,
ssl_context=ssl_context,
# this has been changed from the default (100)
limit=100000
),
headers=headers
)

self.base_url = 'http%s://%s:%d%s' % (
's' if use_ssl else '',
host, port, self.url_prefix
)

@asyncio.coroutine
def close(self):
yield from self.session.close()

@asyncio.coroutine
def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None):
url_path = url
if params:
url_path = '%s?%s' % (url, urlencode(params or {}))
url = self.base_url + url_path

start = self.loop.time()
response = None
try:
with async_timeout.timeout(timeout or self.timeout.total, loop=self.loop):
response = yield from self.session.request(method, url, data=body, headers=headers)
raw_data = yield from response.text()
duration = self.loop.time() - start

except asyncio.CancelledError:
raise

except Exception as e:
self.log_request_fail(method, url, url_path, body, self.loop.time() - start, exception=e)
if isinstance(e, ServerFingerprintMismatch):
raise SSLError('N/A', str(e), e)
if isinstance(e, asyncio.TimeoutError):
raise ConnectionTimeout('TIMEOUT', str(e), e)
raise ConnectionError('N/A', str(e), e)

finally:
if response is not None:
yield from response.release()

# raise errors based on http status codes, let the client handle those if needed
if not (200 <= response.status < 300) and response.status not in ignore:
self.log_request_fail(method, url, url_path, body, duration, status_code=response.status, response=raw_data)
self._raise_error(response.status, raw_data)

self.log_request_success(method, url, url_path, body, response.status, raw_data, duration)

return response.status, response.headers, raw_data
37 changes: 37 additions & 0 deletions esrally/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,43 @@ def create(self):
import elasticsearch
return elasticsearch.Elasticsearch(hosts=self.hosts, ssl_context=self.ssl_context, **self.client_options)

def create_async(self):
# keep imports confined as we do some temporary patching to work around unsolved issues in the async ES connector
import elasticsearch
import elasticsearch_async
from aiohttp.client import ClientTimeout
import esrally.async_connection

# needs patching as https://github.com/elastic/elasticsearch-py-async/pull/68 is not merged yet
class RallyAsyncTransport(elasticsearch_async.transport.AsyncTransport):
def __init__(self, hosts, connection_class=esrally.async_connection.AIOHttpConnection, loop=None,
connection_pool_class=elasticsearch_async.connection_pool.AsyncConnectionPool,
sniff_on_start=False, raise_on_sniff_error=True, **kwargs):
super().__init__(hosts, connection_class, loop, connection_pool_class, sniff_on_start, raise_on_sniff_error, **kwargs)

if "timeout" in self.client_options and not isinstance(self.client_options["timeout"], ClientTimeout):
self.client_options["timeout"] = ClientTimeout(total=self.client_options["timeout"])
else:
# 10 seconds is the Elasticsearch default, ensure we always set a ClientTimeout object here
self.client_options["timeout"] = ClientTimeout(total=10)

# copy of AsyncElasticsearch as https://github.com/elastic/elasticsearch-py-async/pull/49 is not yet released.
# That PR (also) fixes the behavior reported in https://github.com/elastic/elasticsearch-py-async/issues/43.
class RallyAsyncElasticsearch(elasticsearch.Elasticsearch):
def __init__(self, hosts=None, transport_class=RallyAsyncTransport, **kwargs):
super().__init__(hosts, transport_class=transport_class, **kwargs)

async def __aenter__(self):
return self

async def __aexit__(self, _exc_type, _exc_val, _exc_tb):
yield self.transport.close()

return RallyAsyncElasticsearch(hosts=self.hosts,
transport_class=RallyAsyncTransport,
ssl_context=self.ssl_context,
**self.client_options)


def wait_for_rest_layer(es, max_attempts=40):
"""
Expand Down
3 changes: 3 additions & 0 deletions esrally/driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@

# expose only the minimum API
from .driver import DriverActor, PrepareBenchmark, PreparationComplete, StartBenchmark, BenchmarkComplete, TaskFinished

# async API
from .async_driver import AsyncDriver, Timer
Loading