diff --git a/esrally/mechanic/launcher.py b/esrally/mechanic/launcher.py index eef30613b..ed183e44d 100644 --- a/esrally/mechanic/launcher.py +++ b/esrally/mechanic/launcher.py @@ -322,9 +322,6 @@ def _start_node(self, node_configuration, node_count_on_host): enabled_devices = self.cfg.opts("mechanic", "telemetry.devices") telemetry_params = self.cfg.opts("mechanic", "telemetry.params") node_telemetry = [ - telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version), - telemetry.JitCompiler(node_telemetry_dir), - telemetry.Gc(node_telemetry_dir, java_major_version), telemetry.DiskIo(self.metrics_store, node_count_on_host), telemetry.NodeEnvironmentInfo(self.metrics_store), telemetry.IndexSize(data_paths, self.metrics_store), @@ -350,17 +347,6 @@ def _prepare_env(self, car, node_name, java_home, t): # Don't merge here! env["JAVA_HOME"] = java_home - # we just blindly trust telemetry here... - for k, v in t.instrument_candidate_env(car, node_name).items(): - self._set_env(env, k, v) - - exit_on_oome_flag = "-XX:+ExitOnOutOfMemoryError" - if jvm.supports_option(java_home, exit_on_oome_flag): - self.logger.info("Setting [%s] to detect out of memory errors during the benchmark.", exit_on_oome_flag) - self._set_env(env, "ES_JAVA_OPTS", exit_on_oome_flag) - else: - self.logger.info("JVM does not support [%s]. A JDK upgrade is recommended.", exit_on_oome_flag) - self.logger.debug("env for [%s]: %s", node_name, str(env)) return env diff --git a/esrally/mechanic/provisioner.py b/esrally/mechanic/provisioner.py index 63b350d17..8f25dd1f5 100644 --- a/esrally/mechanic/provisioner.py +++ b/esrally/mechanic/provisioner.py @@ -23,8 +23,8 @@ import jinja2 from esrally import exceptions -from esrally.mechanic import team, java_resolver -from esrally.utils import io, process, versions +from esrally.mechanic import team, java_resolver, telemetry +from esrally.utils import io, process, versions, jvm def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_root, node_id): @@ -38,11 +38,22 @@ def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_ node_root_dir = "%s/%s" % (target_root, node_name) _, java_home = java_resolver.java_home(car, cfg) + + node_telemetry_dir = os.path.join(node_root_dir, "telemetry") + java_major_version, java_home = java_resolver.java_home(car, cfg) + enabled_devices = cfg.opts("mechanic", "telemetry.devices") + telemetry_params = cfg.opts("mechanic", "telemetry.params") + node_telemetry = [ + telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version), + telemetry.JitCompiler(node_telemetry_dir), + telemetry.Gc(node_telemetry_dir, java_major_version) + ] + t = telemetry.Telemetry(enabled_devices, devices=node_telemetry) es_installer = ElasticsearchInstaller(car, java_home, node_name, node_root_dir, all_node_ips, ip, http_port) plugin_installers = [PluginInstaller(plugin, java_home) for plugin in plugins] - return BareProvisioner(cluster_settings, es_installer, plugin_installers, preserve, distribution_version=distribution_version) + return BareProvisioner(cluster_settings, es_installer, plugin_installers, preserve, t, distribution_version=distribution_version) def no_op_provisioner(): @@ -147,17 +158,19 @@ class BareProvisioner: of the benchmark candidate to the appropriate place. """ - def __init__(self, cluster_settings, es_installer, plugin_installers, preserve, distribution_version=None, apply_config=_apply_config): + def __init__(self, cluster_settings, es_installer, plugin_installers, preserve, telemetry=None, distribution_version=None, apply_config=_apply_config): self.preserve = preserve self._cluster_settings = cluster_settings self.es_installer = es_installer self.plugin_installers = plugin_installers self.distribution_version = distribution_version self.apply_config = apply_config + self.telemetry = telemetry + self.logger = logging.getLogger(__name__) def prepare(self, binary): if not self.preserve: - logging.getLogger(__name__).info("Rally will delete the benchmark candidate after the benchmark") + self.logger.info("Rally will delete the benchmark candidate after the benchmark") self.es_installer.install(binary["elasticsearch"]) # we need to immediately delete it as plugins may copy their configuration during installation. self.es_installer.delete_pre_bundled_configuration() @@ -165,7 +178,6 @@ def prepare(self, binary): # determine after installation because some variables will depend on the install directory target_root_path = self.es_installer.es_home_path provisioner_vars = self._provisioner_variables() - for p in self.es_installer.config_source_paths: self.apply_config(p, target_root_path, provisioner_vars) @@ -185,6 +197,20 @@ def prepare(self, binary): def cleanup(self): self.es_installer.cleanup(self.preserve) + + def _prepare_java_opts(self): + java_opts = [] + if self.telemetry is not None: + java_opts.extend(self.telemetry.instrument_candidate_java_opts(self.es_installer.car, self.es_installer.node_name)) + + exit_on_oome_flag = "-XX:+ExitOnOutOfMemoryError" + if jvm.supports_option(self.es_installer.java_home, exit_on_oome_flag): + self.logger.info("Setting [%s] to detect out of memory errors during the benchmark.", exit_on_oome_flag) + java_opts.append(exit_on_oome_flag) + else: + self.logger.info("JVM does not support [%s]. A JDK upgrade is recommended.", exit_on_oome_flag) + + return java_opts def _provisioner_variables(self): plugin_variables = {} @@ -217,7 +243,11 @@ def _provisioner_variables(self): provisioner_vars.update(self.es_installer.variables) provisioner_vars.update(plugin_variables) provisioner_vars["cluster_settings"] = cluster_settings - + + java_opts = self._prepare_java_opts() + if java_opts: + provisioner_vars["additional_java_settings"] = java_opts + return provisioner_vars diff --git a/esrally/mechanic/telemetry.py b/esrally/mechanic/telemetry.py index d75c2875f..746fd80f5 100644 --- a/esrally/mechanic/telemetry.py +++ b/esrally/mechanic/telemetry.py @@ -46,17 +46,13 @@ def __init__(self, enabled_devices=None, devices=None): self.enabled_devices = enabled_devices self.devices = devices - def instrument_candidate_env(self, car, candidate_id): - opts = {} + def instrument_candidate_java_opts(self, car, candidate_id): + opts = [] for device in self.devices: if self._enabled(device): - additional_opts = device.instrument_env(car, candidate_id) + additional_opts = device.instrument_java_opts(car, candidate_id) # properly merge values with the same key - for k, v in additional_opts.items(): - if k in opts: - opts[k] = "%s %s" % (opts[k], v) - else: - opts[k] = v + opts.extend(additional_opts) return opts def attach_to_cluster(self, cluster): @@ -108,7 +104,7 @@ class TelemetryDevice: def __init__(self): self.logger = logging.getLogger(__name__) - def instrument_env(self, car, candidate_id): + def instrument_java_opts(self, car, candidate_id): return {} def attach_to_cluster(self, cluster): @@ -169,7 +165,7 @@ def __init__(self, telemetry_params, log_root, java_major_version): self.log_root = log_root self.java_major_version = java_major_version - def instrument_env(self, car, candidate_id): + def instrument_java_opts(self, car, candidate_id): io.ensure_dir(self.log_root) log_file = "%s/%s-%s.jfr" % (self.log_root, car.safe_name, candidate_id) @@ -190,31 +186,32 @@ def instrument_env(self, car, candidate_id): java_opts = self.java_opts(log_file) self.logger.info("jfr: Adding JVM arguments: [%s].", java_opts) - return {"ES_JAVA_OPTS": java_opts} + return java_opts def java_opts(self, log_file): recording_template = self.telemetry_params.get("recording-template") - java_opts = "-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints " - + java_opts = ["-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints"] + jfr_cmd = "" if self.java_major_version < 11: - java_opts += "-XX:+UnlockCommercialFeatures " + java_opts.append("-XX:+UnlockCommercialFeatures") if self.java_major_version < 9: - java_opts += "-XX:+FlightRecorder " - java_opts += "-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true,dumponexitpath={} ".format(log_file) - java_opts += "-XX:StartFlightRecording=defaultrecording=true" + java_opts.append("-XX:+FlightRecorder") + java_opts.append("-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true,dumponexitpath={}".format(log_file)) + jfr_cmd = "-XX:StartFlightRecording=defaultrecording=true" if recording_template: self.logger.info("jfr: Using recording template [%s].", recording_template) - java_opts += ",settings={}".format(recording_template) + jfr_cmd += ",settings={}".format(recording_template) else: self.logger.info("jfr: Using default recording template.") else: - java_opts += "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true,filename={}".format(log_file) + jfr_cmd += "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true,filename={}".format(log_file) if recording_template: self.logger.info("jfr: Using recording template [%s].", recording_template) - java_opts += ",settings={}".format(recording_template) + jfr_cmd += ",settings={}".format(recording_template) else: self.logger.info("jfr: Using default recording template.") + java_opts.append(jfr_cmd) return java_opts @@ -228,12 +225,12 @@ def __init__(self, log_root): super().__init__() self.log_root = log_root - def instrument_env(self, car, candidate_id): + def instrument_java_opts(self, car, candidate_id): io.ensure_dir(self.log_root) log_file = "%s/%s-%s.jit.log" % (self.log_root, car.safe_name, candidate_id) console.info("%s: Writing JIT compiler log to [%s]" % (self.human_name, log_file), logger=self.logger) - return {"ES_JAVA_OPTS": "-XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading -XX:+LogCompilation " - "-XX:LogFile=%s -XX:+PrintAssembly" % log_file} + return ["-XX:+UnlockDiagnosticVMOptions", "-XX:+TraceClassLoading", "-XX:+LogCompilation", + "-XX:LogFile={}".format(log_file), "-XX:+PrintAssembly"] class Gc(TelemetryDevice): @@ -247,7 +244,7 @@ def __init__(self, log_root, java_major_version): self.log_root = log_root self.java_major_version = java_major_version - def instrument_env(self, car, candidate_id): + def instrument_java_opts(self, car, candidate_id): io.ensure_dir(self.log_root) log_file = "%s/%s-%s.gc.log" % (self.log_root, car.safe_name, candidate_id) console.info("%s: Writing GC log to [%s]" % (self.human_name, log_file), logger=self.logger) @@ -255,12 +252,12 @@ def instrument_env(self, car, candidate_id): def java_opts(self, log_file): if self.java_major_version < 9: - return {"ES_JAVA_OPTS": "-Xloggc:%s -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps " - "-XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime " - "-XX:+PrintTenuringDistribution" % log_file} + return ["-Xloggc:{}".format(log_file), "-XX:+PrintGCDetails", "-XX:+PrintGCDateStamps", "-XX:+PrintGCTimeStamps", + "-XX:+PrintGCApplicationStoppedTime", "-XX:+PrintGCApplicationConcurrentTime", + "-XX:+PrintTenuringDistribution"] else: # see https://docs.oracle.com/javase/9/tools/java.htm#JSWOR-GUID-BE93ABDC-999C-4CB5-A88B-1994AAAC74D5 - return {"ES_JAVA_OPTS": "-Xlog:gc*=info,safepoint=info,age*=trace:file=%s:utctime,uptimemillis,level,tags:filecount=0" % log_file} + return ["-Xlog:gc*=info,safepoint=info,age*=trace:file={}:utctime,uptimemillis,level,tags:filecount=0".format(log_file)] class CcrStats(TelemetryDevice): diff --git a/tests/mechanic/telemetry_test.py b/tests/mechanic/telemetry_test.py index 676bd8201..788f15721 100644 --- a/tests/mechanic/telemetry_test.py +++ b/tests/mechanic/telemetry_test.py @@ -47,12 +47,12 @@ def create_config(): class MockTelemetryDevice(telemetry.InternalTelemetryDevice): - def __init__(self, mock_env): + def __init__(self, mock_java_opts): super().__init__() - self.mock_env = mock_env + self.mock_java_opts = mock_java_opts - def instrument_env(self, car, candidate_id): - return self.mock_env + def instrument_java_opts(self, car, candidate_id): + return self.mock_java_opts class TelemetryTests(TestCase): @@ -63,20 +63,19 @@ def test_merges_options_set_by_different_devices(self): cfg.add(config.Scope.application, "benchmarks", "metrics.log.dir", "telemetry") devices = [ - MockTelemetryDevice({"ES_JAVA_OPTS": "-Xms256M"}), - MockTelemetryDevice({"ES_JAVA_OPTS": "-Xmx512M"}), - MockTelemetryDevice({"ES_NET_HOST": "127.0.0.1"}) + MockTelemetryDevice(["-Xms256M"]), + MockTelemetryDevice(["-Xmx512M"]), + MockTelemetryDevice(["-Des.network.host=127.0.0.1"]) ] t = telemetry.Telemetry(enabled_devices=None, devices=devices) default_car = team.Car(names="default-car", root_path=None, config_paths=["/tmp/rally-config"]) - opts = t.instrument_candidate_env(default_car, "default-node") + opts = t.instrument_candidate_java_opts(default_car, "default-node") - self.assertTrue(opts) - self.assertEqual(len(opts), 2) - self.assertEqual("-Xms256M -Xmx512M", opts["ES_JAVA_OPTS"]) - self.assertEqual("127.0.0.1", opts["ES_NET_HOST"]) + self.assertIsNotNone(opts) + self.assertEqual(len(opts), 3) + self.assertEqual(["-Xms256M", "-Xmx512M", "-Des.network.host=127.0.0.1"], opts) class StartupTimeTests(TestCase): @@ -190,71 +189,68 @@ class JfrTests(TestCase): def test_sets_options_for_pre_java_9_default_recording_template(self): jfr = telemetry.FlightRecorder(telemetry_params={}, log_root="/var/log", java_major_version=random.randint(0, 8)) java_opts = jfr.java_opts("/var/log/test-recording.jfr") - self.assertEqual("-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:+UnlockCommercialFeatures -XX:+FlightRecorder " - "-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true," - "dumponexitpath=/var/log/test-recording.jfr -XX:StartFlightRecording=defaultrecording=true", java_opts) + self.assertEqual(["-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints", "-XX:+UnlockCommercialFeatures", "-XX:+FlightRecorder", + "-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true," + "dumponexitpath=/var/log/test-recording.jfr", "-XX:StartFlightRecording=defaultrecording=true"], java_opts) def test_sets_options_for_java_9_or_10_default_recording_template(self): jfr = telemetry.FlightRecorder(telemetry_params={}, log_root="/var/log", java_major_version=random.randint(9, 10)) java_opts = jfr.java_opts("/var/log/test-recording.jfr") - self.assertEqual("-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:+UnlockCommercialFeatures " - "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true,filename=/var/log/test-recording.jfr", - java_opts) + self.assertEqual(["-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints", "-XX:+UnlockCommercialFeatures", + "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true," + "dumponexit=true,filename=/var/log/test-recording.jfr"], java_opts) def test_sets_options_for_java_11_or_above_default_recording_template(self): jfr = telemetry.FlightRecorder(telemetry_params={}, log_root="/var/log", java_major_version=random.randint(11, 999)) java_opts = jfr.java_opts("/var/log/test-recording.jfr") - self.assertEqual("-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints " - "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true,filename=/var/log/test-recording.jfr", - java_opts) + self.assertEqual(["-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints", + "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true," + "dumponexit=true,filename=/var/log/test-recording.jfr"], java_opts) def test_sets_options_for_pre_java_9_custom_recording_template(self): jfr = telemetry.FlightRecorder(telemetry_params={"recording-template": "profile"}, log_root="/var/log", java_major_version=random.randint(0, 8)) java_opts = jfr.java_opts("/var/log/test-recording.jfr") - self.assertEqual("-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:+UnlockCommercialFeatures -XX:+FlightRecorder " - "-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true," - "dumponexitpath=/var/log/test-recording.jfr -XX:StartFlightRecording=defaultrecording=true,settings=profile", - java_opts) + self.assertEqual(["-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints", "-XX:+UnlockCommercialFeatures", "-XX:+FlightRecorder", + "-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true," + "dumponexitpath=/var/log/test-recording.jfr", "-XX:StartFlightRecording=defaultrecording=true,settings=profile"], java_opts) def test_sets_options_for_java_9_or_10_custom_recording_template(self): jfr = telemetry.FlightRecorder(telemetry_params={"recording-template": "profile"}, log_root="/var/log", java_major_version=random.randint(9, 10)) java_opts = jfr.java_opts("/var/log/test-recording.jfr") - self.assertEqual("-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:+UnlockCommercialFeatures " - "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true," - "filename=/var/log/test-recording.jfr,settings=profile", - java_opts) + self.assertEqual(["-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints", "-XX:+UnlockCommercialFeatures", + "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true," + "filename=/var/log/test-recording.jfr,settings=profile"], java_opts) def test_sets_options_for_java_11_or_above_custom_recording_template(self): jfr = telemetry.FlightRecorder(telemetry_params={"recording-template": "profile"}, log_root="/var/log", java_major_version=random.randint(11, 999)) java_opts = jfr.java_opts("/var/log/test-recording.jfr") - self.assertEqual("-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints " - "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true," - "filename=/var/log/test-recording.jfr,settings=profile", - java_opts) + self.assertEqual(["-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints", + "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true," + "filename=/var/log/test-recording.jfr,settings=profile"], java_opts) class GcTests(TestCase): def test_sets_options_for_pre_java_9(self): gc = telemetry.Gc("/var/log", java_major_version=random.randint(0, 8)) - env = gc.java_opts("/var/log/defaults-node-0.gc.log") - self.assertEqual(1, len(env)) - self.assertEqual("-Xloggc:/var/log/defaults-node-0.gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps " - "-XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution", - env["ES_JAVA_OPTS"]) + gc_java_opts = gc.java_opts("/var/log/defaults-node-0.gc.log") + self.assertEqual(7, len(gc_java_opts)) + self.assertEqual(["-Xloggc:/var/log/defaults-node-0.gc.log", "-XX:+PrintGCDetails", "-XX:+PrintGCDateStamps", "-XX:+PrintGCTimeStamps", + "-XX:+PrintGCApplicationStoppedTime", "-XX:+PrintGCApplicationConcurrentTime", + "-XX:+PrintTenuringDistribution"], gc_java_opts) def test_sets_options_for_java_9_or_above(self): gc = telemetry.Gc("/var/log", java_major_version=random.randint(9, 999)) - env = gc.java_opts("/var/log/defaults-node-0.gc.log") - self.assertEqual(1, len(env)) + gc_java_opts = gc.java_opts("/var/log/defaults-node-0.gc.log") + self.assertEqual(1, len(gc_java_opts)) self.assertEqual( - "-Xlog:gc*=info,safepoint=info,age*=trace:file=/var/log/defaults-node-0.gc.log:utctime,uptimemillis,level,tags:filecount=0", - env["ES_JAVA_OPTS"]) + ["-Xlog:gc*=info,safepoint=info,age*=trace:file=/var/log/defaults-node-0.gc.log:utctime,uptimemillis,level,tags:filecount=0"], + gc_java_opts) class CcrStatsTests(TestCase):