diff --git a/esrally/mechanic/launcher.py b/esrally/mechanic/launcher.py index 49b4d3fd7..1f7b03655 100644 --- a/esrally/mechanic/launcher.py +++ b/esrally/mechanic/launcher.py @@ -299,7 +299,11 @@ def _start_node(self, node_configuration, node_count_on_host): car = node_configuration.car binary_path = node_configuration.binary_path data_paths = node_configuration.data_paths - node_telemetry_dir = "%s/telemetry" % node_configuration.node_root_path + #node_telemetry_dir = "%s/telemetry" % node_configuration.node_root_path + node_telemetry_dir = os.path.join(node_configuration.node_root_path, "telemetry") + + print("here in oy %s" % node_telemetry_dir) + java_major_version, java_home = java_resolver.java_home(car, self.cfg) self.logger.info("Starting node [%s] based on car [%s].", node_name, car) @@ -307,6 +311,9 @@ def _start_node(self, node_configuration, node_count_on_host): enabled_devices = self.cfg.opts("mechanic", "telemetry.devices") telemetry_params = self.cfg.opts("mechanic", "telemetry.params") node_telemetry = [ + telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version), + telemetry.JitCompiler(node_telemetry_dir), + telemetry.Gc(node_telemetry_dir, java_major_version), telemetry.DiskIo(self.metrics_store, node_count_on_host, node_telemetry_dir, node_name), telemetry.NodeEnvironmentInfo(self.metrics_store), telemetry.IndexSize(data_paths, self.metrics_store), @@ -331,6 +338,11 @@ def _prepare_env(self, car, node_name, java_home, t): self._set_env(env, "PATH", os.path.join(java_home, "bin"), separator=os.pathsep) # Don't merge here! env["JAVA_HOME"] = java_home + env["ES_JAVA_OPTS"] = "-XX:+ExitOnOutOfMemoryError" + + # we just blindly trust telemetry here... + for v in t.instrument_candidate_java_opts(car, node_name): + self._set_env(env, "ES_JAVA_OPTS", v) self.logger.debug("env for [%s]: %s", node_name, str(env)) return env @@ -340,7 +352,7 @@ def _set_env(self, env, k, v, separator=' '): if k not in env: env[k] = v else: # merge - env[k] = v + separator + env[k] + env[k] = env[k] + separator + v @staticmethod def _start_process(binary_path, env): diff --git a/esrally/mechanic/provisioner.py b/esrally/mechanic/provisioner.py index 0c767533d..e927c4513 100644 --- a/esrally/mechanic/provisioner.py +++ b/esrally/mechanic/provisioner.py @@ -23,8 +23,8 @@ import jinja2 from esrally import exceptions -from esrally.mechanic import team, java_resolver, telemetry -from esrally.utils import io, process, versions, jvm +from esrally.mechanic import team, java_resolver +from esrally.utils import io, process, versions def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_root, node_id): @@ -39,21 +39,10 @@ def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_ _, java_home = java_resolver.java_home(car, cfg) - node_telemetry_dir = os.path.join(node_root_dir, "telemetry") - java_major_version, java_home = java_resolver.java_home(car, cfg) - enabled_devices = cfg.opts("mechanic", "telemetry.devices") - telemetry_params = cfg.opts("mechanic", "telemetry.params") - node_telemetry = [ - telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version), - telemetry.JitCompiler(node_telemetry_dir), - telemetry.Gc(node_telemetry_dir, java_major_version) - ] - t = telemetry.Telemetry(enabled_devices, devices=node_telemetry) - es_installer = ElasticsearchInstaller(car, java_home, node_name, node_root_dir, all_node_ips, ip, http_port) plugin_installers = [PluginInstaller(plugin, java_home) for plugin in plugins] - return BareProvisioner(cluster_settings, es_installer, plugin_installers, preserve, t, distribution_version=distribution_version) + return BareProvisioner(cluster_settings, es_installer, plugin_installers, preserve, distribution_version=distribution_version) def no_op_provisioner(): @@ -158,14 +147,13 @@ class BareProvisioner: of the benchmark candidate to the appropriate place. """ - def __init__(self, cluster_settings, es_installer, plugin_installers, preserve, telemetry=None, distribution_version=None, apply_config=_apply_config): + def __init__(self, cluster_settings, es_installer, plugin_installers, preserve, distribution_version=None, apply_config=_apply_config): self.preserve = preserve self._cluster_settings = cluster_settings self.es_installer = es_installer self.plugin_installers = plugin_installers self.distribution_version = distribution_version self.apply_config = apply_config - self.telemetry = telemetry self.logger = logging.getLogger(__name__) def prepare(self, binary): @@ -197,13 +185,7 @@ def prepare(self, binary): def cleanup(self): self.es_installer.cleanup(self.preserve) - - def _prepare_java_opts(self): - # To detect out of memory errors during the benchmark - java_opts = ["-XX:+ExitOnOutOfMemoryError"] - if self.telemetry is not None: - java_opts.extend(self.telemetry.instrument_candidate_java_opts(self.es_installer.car, self.es_installer.node_name)) - return java_opts + def _provisioner_variables(self): plugin_variables = {} @@ -237,10 +219,6 @@ def _provisioner_variables(self): provisioner_vars.update(plugin_variables) provisioner_vars["cluster_settings"] = cluster_settings - java_opts = self._prepare_java_opts() - if java_opts: - provisioner_vars["additional_java_settings"] = java_opts - return provisioner_vars diff --git a/tests/mechanic/launcher_test.py b/tests/mechanic/launcher_test.py index 1634a1f09..207ae969f 100644 --- a/tests/mechanic/launcher_test.py +++ b/tests/mechanic/launcher_test.py @@ -119,6 +119,7 @@ def wait(self): class ProcessLauncherTests(TestCase): + @mock.patch('os.path.join', return_value="/telemetry") @mock.patch('os.kill') @mock.patch('subprocess.Popen',new=MockPopen) @mock.patch('esrally.mechanic.java_resolver.java_home', return_value=(12, "/java_home/")) @@ -129,7 +130,7 @@ class ProcessLauncherTests(TestCase): @mock.patch('esrally.mechanic.provisioner.NodeConfiguration') @mock.patch('esrally.mechanic.launcher.wait_for_pidfile', return_value=MOCK_PID_VALUE) @mock.patch('psutil.Process') - def test_daemon_start_stop(self, process, wait_for_pidfile, node_config, ms, cfg, chdir, supports, java_home, kill): + def test_daemon_start_stop(self, process, wait_for_pidfile, node_config, ms, cfg, chdir, supports, java_home, kill, path): proc_launcher = launcher.ProcessLauncher(cfg, ms, paths.races_root(cfg)) nodes = proc_launcher.start([node_config]) diff --git a/tests/mechanic/provisioner_test.py b/tests/mechanic/provisioner_test.py index e1a8616bb..2efe9394c 100644 --- a/tests/mechanic/provisioner_test.py +++ b/tests/mechanic/provisioner_test.py @@ -70,7 +70,6 @@ def null_apply_config(source_root_path, target_root_path, config_vars): "cluster_settings": { "indices.query.bool.max_clause_count": 50000, }, - "additional_java_settings": ["-XX:+ExitOnOutOfMemoryError"], "heap": "4g", "cluster_name": "rally-benchmark", "node_name": "rally-node-0", @@ -185,7 +184,6 @@ def null_apply_config(source_root_path, target_root_path, config_vars): "indices.query.bool.max_clause_count": 50000, "plugin.mandatory": ["x-pack-security"] }, - "additional_java_settings": ["-XX:+ExitOnOutOfMemoryError"], "heap": "4g", "cluster_name": "rally-benchmark", "node_name": "rally-node-0", @@ -263,7 +261,6 @@ def null_apply_config(source_root_path, target_root_path, config_vars): "indices.query.bool.max_clause_count": 50000, "plugin.mandatory": ["x-pack"] }, - "additional_java_settings": ["-XX:+ExitOnOutOfMemoryError"], "heap": "4g", "cluster_name": "rally-benchmark", "node_name": "rally-node-0",