Skip to content

Commit

Permalink
Scale connection pool automatically (#1015)
Browse files Browse the repository at this point in the history
With this commit we scale Rally's connection pool automatically between
(at least) 256 and (at most) the number of simulated clients.
Additionally, we allow users to override this value with the client
option `max_connections`. Previously this value has been initialized to
a constant but very high value.
  • Loading branch information
danielmitterdorfer authored Jun 12, 2020
1 parent 8d328de commit 42df7d5
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 8 deletions.
4 changes: 4 additions & 0 deletions docs/command_line_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,10 @@ Default value: ``timeout:60``
.. warning::
If you provide your own client options, the default value will not be magically merged. You have to specify all client options explicitly. The only exceptions to this rule is ``ca_cert`` (see below).

Rally recognizes the following client options in addition:

* ``max_connections``: By default, Rally will choose the maximum allowed number of connections automatically (equal to the number of simulated clients but at least 256 connections). With this property it is possible to override that logic but a minimum of 256 is enforced internally.

**Examples**

Here are a few common examples:
Expand Down
5 changes: 2 additions & 3 deletions esrally/async_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(self, host='localhost', port=9200, http_auth=None,
use_ssl = True

trace_configs = [trace_config] if trace_config else None

max_connections = max(256, kwargs.get("max_connections", 0))
self.session = aiohttp.ClientSession(
auth=http_auth,
timeout=self.timeout,
Expand All @@ -91,8 +91,7 @@ def __init__(self, host='localhost', port=9200, http_auth=None,
verify_ssl=verify_certs,
use_dns_cache=use_dns_cache,
ssl_context=ssl_context,
# this has been changed from the default (100)
limit=100000
limit=max_connections
),
headers=headers,
trace_configs=trace_configs,
Expand Down
6 changes: 5 additions & 1 deletion esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,11 @@ def es_clients(all_hosts, all_client_options):
es[cluster_name] = client.EsClientFactory(cluster_hosts, all_client_options[cluster_name]).create_async()
return es

es = es_clients(self.cfg.opts("client", "hosts").all_hosts, self.cfg.opts("client", "options").all_client_options)
# Properly size the internal connection pool to match the number of expected clients but allow the user
# to override it if needed.
client_count = len(self.task_allocations)
es = es_clients(self.cfg.opts("client", "hosts").all_hosts,
self.cfg.opts("client", "options").with_max_connections(client_count))

aws = []
# A parameter source should only be created once per task - it is partitioned later on per client.
Expand Down
9 changes: 9 additions & 0 deletions esrally/utils/opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,12 @@ def normalize_to_dict(arg):
def all_client_options(self):
"""Return a dict with all client options"""
return self.all_options

def with_max_connections(self, max_connections):
final_client_options = {}
for cluster, original_opts in self.all_client_options.items():
amended_opts = dict(original_opts)
if "max_connections" not in amended_opts:
amended_opts["max_connections"] = max_connections
final_client_options[cluster] = amended_opts
return final_client_options
18 changes: 14 additions & 4 deletions tests/utils/opts_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ def test_csv_client_options_parses(self):
opts.ClientOptions(client_options_string).all_client_options
)


def test_jsonstring_client_options_parses(self):
client_options_string = '{"default": {"timeout": 60},' \
'"remote_1": {"use_ssl":true,"verify_certs":true,"basic_auth_user": "elastic", "basic_auth_password": "changeme"},'\
Expand All @@ -228,7 +227,6 @@ def test_jsonstring_client_options_parses(self):
'remote_2': {'use_ssl': True,'verify_certs': True, 'ca_certs':'/path/to/cacert.pem'}},
opts.ClientOptions(client_options_string).all_client_options)


def test_json_file_parameter_parses(self):
self.assertEqual(
{'default': {'timeout':60},
Expand All @@ -240,7 +238,6 @@ def test_json_file_parameter_parses(self):
{'default': {'timeout':60}},
opts.ClientOptions(os.path.join(os.path.dirname(__file__), "resources", "client_options_2.json")).all_client_options)


def test_no_client_option_parses_to_default(self):
client_options_string = opts.ClientOptions.DEFAULT_CLIENT_OPTIONS
target_hosts = None
Expand All @@ -260,7 +257,6 @@ def test_no_client_option_parses_to_default(self):
opts.ClientOptions(client_options_string,
target_hosts=target_hosts).default)


def test_no_client_option_parses_to_default_with_multicluster(self):
client_options_string = opts.ClientOptions.DEFAULT_CLIENT_OPTIONS
target_hosts = opts.TargetHosts('{"default": ["127.0.0.1:9200,10.17.0.5:19200"], "remote": ["88.33.22.15:19200"]}')
Expand All @@ -279,3 +275,17 @@ def test_no_client_option_parses_to_default_with_multicluster(self):
{"timeout": 60},
opts.ClientOptions(client_options_string,
target_hosts=target_hosts).default)

def test_amends_with_max_connections(self):
client_options_string = opts.ClientOptions.DEFAULT_CLIENT_OPTIONS
target_hosts = opts.TargetHosts('{"default": ["10.17.0.5:9200"], "remote": ["88.33.22.15:9200"]}')
self.assertEqual(
{"default": {"timeout": 60, "max_connections": 128}, "remote": {"timeout": 60, "max_connections": 128}},
opts.ClientOptions(client_options_string, target_hosts=target_hosts).with_max_connections(128))

def test_keeps_already_specified_max_connections(self):
client_options_string = '{"default": {"timeout":60,"max_connections":5}, "remote": {"timeout":60}}'
target_hosts = opts.TargetHosts('{"default": ["10.17.0.5:9200"], "remote": ["88.33.22.15:9200"]}')
self.assertEqual(
{"default": {"timeout": 60, "max_connections": 5}, "remote": {"timeout": 60, "max_connections": 32}},
opts.ClientOptions(client_options_string, target_hosts=target_hosts).with_max_connections(32))

0 comments on commit 42df7d5

Please sign in to comment.