diff --git a/docs/command_line_reference.rst b/docs/command_line_reference.rst index 7aa45f439..2393ce831 100644 --- a/docs/command_line_reference.rst +++ b/docs/command_line_reference.rst @@ -641,6 +641,10 @@ Default value: ``timeout:60`` .. warning:: If you provide your own client options, the default value will not be magically merged. You have to specify all client options explicitly. The only exceptions to this rule is ``ca_cert`` (see below). +Rally recognizes the following client options in addition: + +* ``max_connections``: By default, Rally will choose the maximum allowed number of connections automatically (equal to the number of simulated clients but at least 256 connections). With this property it is possible to override that logic but a minimum of 256 is enforced internally. + **Examples** Here are a few common examples: diff --git a/esrally/async_connection.py b/esrally/async_connection.py index 53306ddfc..a436a4b54 100644 --- a/esrally/async_connection.py +++ b/esrally/async_connection.py @@ -82,7 +82,7 @@ def __init__(self, host='localhost', port=9200, http_auth=None, use_ssl = True trace_configs = [trace_config] if trace_config else None - + max_connections = max(256, kwargs.get("max_connections", 0)) self.session = aiohttp.ClientSession( auth=http_auth, timeout=self.timeout, @@ -91,8 +91,7 @@ def __init__(self, host='localhost', port=9200, http_auth=None, verify_ssl=verify_certs, use_dns_cache=use_dns_cache, ssl_context=ssl_context, - # this has been changed from the default (100) - limit=100000 + limit=max_connections ), headers=headers, trace_configs=trace_configs, diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 3611516eb..5794f63e1 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -1184,7 +1184,11 @@ def es_clients(all_hosts, all_client_options): es[cluster_name] = client.EsClientFactory(cluster_hosts, all_client_options[cluster_name]).create_async() return es - es = es_clients(self.cfg.opts("client", "hosts").all_hosts, self.cfg.opts("client", "options").all_client_options) + # Properly size the internal connection pool to match the number of expected clients but allow the user + # to override it if needed. + client_count = len(self.task_allocations) + es = es_clients(self.cfg.opts("client", "hosts").all_hosts, + self.cfg.opts("client", "options").with_max_connections(client_count)) aws = [] # A parameter source should only be created once per task - it is partitioned later on per client. diff --git a/esrally/utils/opts.py b/esrally/utils/opts.py index c4d288a10..e8cd9bd03 100644 --- a/esrally/utils/opts.py +++ b/esrally/utils/opts.py @@ -202,3 +202,12 @@ def normalize_to_dict(arg): def all_client_options(self): """Return a dict with all client options""" return self.all_options + + def with_max_connections(self, max_connections): + final_client_options = {} + for cluster, original_opts in self.all_client_options.items(): + amended_opts = dict(original_opts) + if "max_connections" not in amended_opts: + amended_opts["max_connections"] = max_connections + final_client_options[cluster] = amended_opts + return final_client_options diff --git a/tests/utils/opts_test.py b/tests/utils/opts_test.py index 7187593e0..67cd59830 100644 --- a/tests/utils/opts_test.py +++ b/tests/utils/opts_test.py @@ -208,7 +208,6 @@ def test_csv_client_options_parses(self): opts.ClientOptions(client_options_string).all_client_options ) - def test_jsonstring_client_options_parses(self): client_options_string = '{"default": {"timeout": 60},' \ '"remote_1": {"use_ssl":true,"verify_certs":true,"basic_auth_user": "elastic", "basic_auth_password": "changeme"},'\ @@ -228,7 +227,6 @@ def test_jsonstring_client_options_parses(self): 'remote_2': {'use_ssl': True,'verify_certs': True, 'ca_certs':'/path/to/cacert.pem'}}, opts.ClientOptions(client_options_string).all_client_options) - def test_json_file_parameter_parses(self): self.assertEqual( {'default': {'timeout':60}, @@ -240,7 +238,6 @@ def test_json_file_parameter_parses(self): {'default': {'timeout':60}}, opts.ClientOptions(os.path.join(os.path.dirname(__file__), "resources", "client_options_2.json")).all_client_options) - def test_no_client_option_parses_to_default(self): client_options_string = opts.ClientOptions.DEFAULT_CLIENT_OPTIONS target_hosts = None @@ -260,7 +257,6 @@ def test_no_client_option_parses_to_default(self): opts.ClientOptions(client_options_string, target_hosts=target_hosts).default) - def test_no_client_option_parses_to_default_with_multicluster(self): client_options_string = opts.ClientOptions.DEFAULT_CLIENT_OPTIONS target_hosts = opts.TargetHosts('{"default": ["127.0.0.1:9200,10.17.0.5:19200"], "remote": ["88.33.22.15:19200"]}') @@ -279,3 +275,17 @@ def test_no_client_option_parses_to_default_with_multicluster(self): {"timeout": 60}, opts.ClientOptions(client_options_string, target_hosts=target_hosts).default) + + def test_amends_with_max_connections(self): + client_options_string = opts.ClientOptions.DEFAULT_CLIENT_OPTIONS + target_hosts = opts.TargetHosts('{"default": ["10.17.0.5:9200"], "remote": ["88.33.22.15:9200"]}') + self.assertEqual( + {"default": {"timeout": 60, "max_connections": 128}, "remote": {"timeout": 60, "max_connections": 128}}, + opts.ClientOptions(client_options_string, target_hosts=target_hosts).with_max_connections(128)) + + def test_keeps_already_specified_max_connections(self): + client_options_string = '{"default": {"timeout":60,"max_connections":5}, "remote": {"timeout":60}}' + target_hosts = opts.TargetHosts('{"default": ["10.17.0.5:9200"], "remote": ["88.33.22.15:9200"]}') + self.assertEqual( + {"default": {"timeout": 60, "max_connections": 5}, "remote": {"timeout": 60, "max_connections": 32}}, + opts.ClientOptions(client_options_string, target_hosts=target_hosts).with_max_connections(32))