Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change telemetry devices to rely on jvm.config instead of ES_JAVA_OPTS #711

Merged
merged 8 commits into from
Jun 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions esrally/mechanic/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,6 @@ def _start_node(self, node_configuration, node_count_on_host):
enabled_devices = self.cfg.opts("mechanic", "telemetry.devices")
telemetry_params = self.cfg.opts("mechanic", "telemetry.params")
node_telemetry = [
telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version),
telemetry.JitCompiler(node_telemetry_dir),
telemetry.Gc(node_telemetry_dir, java_major_version),
telemetry.DiskIo(self.metrics_store, node_count_on_host),
telemetry.NodeEnvironmentInfo(self.metrics_store),
telemetry.IndexSize(data_paths, self.metrics_store),
Expand All @@ -350,17 +347,6 @@ def _prepare_env(self, car, node_name, java_home, t):
# Don't merge here!
env["JAVA_HOME"] = java_home

# we just blindly trust telemetry here...
for k, v in t.instrument_candidate_env(car, node_name).items():
self._set_env(env, k, v)

exit_on_oome_flag = "-XX:+ExitOnOutOfMemoryError"
if jvm.supports_option(java_home, exit_on_oome_flag):
self.logger.info("Setting [%s] to detect out of memory errors during the benchmark.", exit_on_oome_flag)
self._set_env(env, "ES_JAVA_OPTS", exit_on_oome_flag)
else:
self.logger.info("JVM does not support [%s]. A JDK upgrade is recommended.", exit_on_oome_flag)

self.logger.debug("env for [%s]: %s", node_name, str(env))
return env

Expand Down
44 changes: 37 additions & 7 deletions esrally/mechanic/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import jinja2

from esrally import exceptions
from esrally.mechanic import team, java_resolver
from esrally.utils import io, process, versions
from esrally.mechanic import team, java_resolver, telemetry
from esrally.utils import io, process, versions, jvm


def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_root, node_id):
Expand All @@ -38,11 +38,22 @@ def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_
node_root_dir = "%s/%s" % (target_root, node_name)

_, java_home = java_resolver.java_home(car, cfg)

node_telemetry_dir = os.path.join(node_root_dir, "telemetry")
java_major_version, java_home = java_resolver.java_home(car, cfg)
enabled_devices = cfg.opts("mechanic", "telemetry.devices")
telemetry_params = cfg.opts("mechanic", "telemetry.params")
node_telemetry = [
telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version),
telemetry.JitCompiler(node_telemetry_dir),
telemetry.Gc(node_telemetry_dir, java_major_version)
]
t = telemetry.Telemetry(enabled_devices, devices=node_telemetry)

es_installer = ElasticsearchInstaller(car, java_home, node_name, node_root_dir, all_node_ips, ip, http_port)
plugin_installers = [PluginInstaller(plugin, java_home) for plugin in plugins]

return BareProvisioner(cluster_settings, es_installer, plugin_installers, preserve, distribution_version=distribution_version)
return BareProvisioner(cluster_settings, es_installer, plugin_installers, preserve, t, distribution_version=distribution_version)


def no_op_provisioner():
Expand Down Expand Up @@ -147,25 +158,26 @@ class BareProvisioner:
of the benchmark candidate to the appropriate place.
"""

def __init__(self, cluster_settings, es_installer, plugin_installers, preserve, distribution_version=None, apply_config=_apply_config):
def __init__(self, cluster_settings, es_installer, plugin_installers, preserve, telemetry=None, distribution_version=None, apply_config=_apply_config):
self.preserve = preserve
self._cluster_settings = cluster_settings
self.es_installer = es_installer
self.plugin_installers = plugin_installers
self.distribution_version = distribution_version
self.apply_config = apply_config
self.telemetry = telemetry
self.logger = logging.getLogger(__name__)

def prepare(self, binary):
if not self.preserve:
logging.getLogger(__name__).info("Rally will delete the benchmark candidate after the benchmark")
self.logger.info("Rally will delete the benchmark candidate after the benchmark")
self.es_installer.install(binary["elasticsearch"])
# we need to immediately delete it as plugins may copy their configuration during installation.
self.es_installer.delete_pre_bundled_configuration()

# determine after installation because some variables will depend on the install directory
target_root_path = self.es_installer.es_home_path
provisioner_vars = self._provisioner_variables()

for p in self.es_installer.config_source_paths:
self.apply_config(p, target_root_path, provisioner_vars)

Expand All @@ -185,6 +197,20 @@ def prepare(self, binary):

def cleanup(self):
self.es_installer.cleanup(self.preserve)

def _prepare_java_opts(self):
java_opts = []
if self.telemetry is not None:
java_opts.extend(self.telemetry.instrument_candidate_java_opts(self.es_installer.car, self.es_installer.node_name))

exit_on_oome_flag = "-XX:+ExitOnOutOfMemoryError"
if jvm.supports_option(self.es_installer.java_home, exit_on_oome_flag):
self.logger.info("Setting [%s] to detect out of memory errors during the benchmark.", exit_on_oome_flag)
java_opts.append(exit_on_oome_flag)
else:
self.logger.info("JVM does not support [%s]. A JDK upgrade is recommended.", exit_on_oome_flag)

return java_opts

def _provisioner_variables(self):
plugin_variables = {}
Expand Down Expand Up @@ -217,7 +243,11 @@ def _provisioner_variables(self):
provisioner_vars.update(self.es_installer.variables)
provisioner_vars.update(plugin_variables)
provisioner_vars["cluster_settings"] = cluster_settings


java_opts = self._prepare_java_opts()
if java_opts:
provisioner_vars["additional_java_settings"] = java_opts

return provisioner_vars


Expand Down
53 changes: 25 additions & 28 deletions esrally/mechanic/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,13 @@ def __init__(self, enabled_devices=None, devices=None):
self.enabled_devices = enabled_devices
self.devices = devices

def instrument_candidate_env(self, car, candidate_id):
opts = {}
def instrument_candidate_java_opts(self, car, candidate_id):
opts = []
for device in self.devices:
if self._enabled(device):
additional_opts = device.instrument_env(car, candidate_id)
additional_opts = device.instrument_java_opts(car, candidate_id)
# properly merge values with the same key
for k, v in additional_opts.items():
if k in opts:
opts[k] = "%s %s" % (opts[k], v)
else:
opts[k] = v
opts.extend(additional_opts)
return opts

def attach_to_cluster(self, cluster):
Expand Down Expand Up @@ -108,7 +104,7 @@ class TelemetryDevice:
def __init__(self):
self.logger = logging.getLogger(__name__)

def instrument_env(self, car, candidate_id):
def instrument_java_opts(self, car, candidate_id):
return {}

def attach_to_cluster(self, cluster):
Expand Down Expand Up @@ -169,7 +165,7 @@ def __init__(self, telemetry_params, log_root, java_major_version):
self.log_root = log_root
self.java_major_version = java_major_version

def instrument_env(self, car, candidate_id):
def instrument_java_opts(self, car, candidate_id):
io.ensure_dir(self.log_root)
log_file = "%s/%s-%s.jfr" % (self.log_root, car.safe_name, candidate_id)

Expand All @@ -190,31 +186,32 @@ def instrument_env(self, car, candidate_id):
java_opts = self.java_opts(log_file)

self.logger.info("jfr: Adding JVM arguments: [%s].", java_opts)
return {"ES_JAVA_OPTS": java_opts}
return java_opts

def java_opts(self, log_file):
recording_template = self.telemetry_params.get("recording-template")
java_opts = "-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints "

java_opts = ["-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints"]
jfr_cmd = ""
if self.java_major_version < 11:
java_opts += "-XX:+UnlockCommercialFeatures "
java_opts.append("-XX:+UnlockCommercialFeatures")

if self.java_major_version < 9:
java_opts += "-XX:+FlightRecorder "
java_opts += "-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true,dumponexitpath={} ".format(log_file)
java_opts += "-XX:StartFlightRecording=defaultrecording=true"
java_opts.append("-XX:+FlightRecorder")
java_opts.append("-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true,dumponexitpath={}".format(log_file))
jfr_cmd = "-XX:StartFlightRecording=defaultrecording=true"
if recording_template:
self.logger.info("jfr: Using recording template [%s].", recording_template)
java_opts += ",settings={}".format(recording_template)
jfr_cmd += ",settings={}".format(recording_template)
else:
self.logger.info("jfr: Using default recording template.")
else:
java_opts += "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true,filename={}".format(log_file)
jfr_cmd += "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true,filename={}".format(log_file)
if recording_template:
self.logger.info("jfr: Using recording template [%s].", recording_template)
java_opts += ",settings={}".format(recording_template)
jfr_cmd += ",settings={}".format(recording_template)
else:
self.logger.info("jfr: Using default recording template.")
java_opts.append(jfr_cmd)
return java_opts


Expand All @@ -228,12 +225,12 @@ def __init__(self, log_root):
super().__init__()
self.log_root = log_root

def instrument_env(self, car, candidate_id):
def instrument_java_opts(self, car, candidate_id):
io.ensure_dir(self.log_root)
log_file = "%s/%s-%s.jit.log" % (self.log_root, car.safe_name, candidate_id)
console.info("%s: Writing JIT compiler log to [%s]" % (self.human_name, log_file), logger=self.logger)
return {"ES_JAVA_OPTS": "-XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading -XX:+LogCompilation "
"-XX:LogFile=%s -XX:+PrintAssembly" % log_file}
return ["-XX:+UnlockDiagnosticVMOptions", "-XX:+TraceClassLoading", "-XX:+LogCompilation",
"-XX:LogFile={}".format(log_file), "-XX:+PrintAssembly"]


class Gc(TelemetryDevice):
Expand All @@ -247,20 +244,20 @@ def __init__(self, log_root, java_major_version):
self.log_root = log_root
self.java_major_version = java_major_version

def instrument_env(self, car, candidate_id):
def instrument_java_opts(self, car, candidate_id):
io.ensure_dir(self.log_root)
log_file = "%s/%s-%s.gc.log" % (self.log_root, car.safe_name, candidate_id)
console.info("%s: Writing GC log to [%s]" % (self.human_name, log_file), logger=self.logger)
return self.java_opts(log_file)

def java_opts(self, log_file):
if self.java_major_version < 9:
return {"ES_JAVA_OPTS": "-Xloggc:%s -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps "
"-XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime "
"-XX:+PrintTenuringDistribution" % log_file}
return ["-Xloggc:{}".format(log_file), "-XX:+PrintGCDetails", "-XX:+PrintGCDateStamps", "-XX:+PrintGCTimeStamps",
"-XX:+PrintGCApplicationStoppedTime", "-XX:+PrintGCApplicationConcurrentTime",
"-XX:+PrintTenuringDistribution"]
else:
# see https://docs.oracle.com/javase/9/tools/java.htm#JSWOR-GUID-BE93ABDC-999C-4CB5-A88B-1994AAAC74D5
return {"ES_JAVA_OPTS": "-Xlog:gc*=info,safepoint=info,age*=trace:file=%s:utctime,uptimemillis,level,tags:filecount=0" % log_file}
return ["-Xlog:gc*=info,safepoint=info,age*=trace:file={}:utctime,uptimemillis,level,tags:filecount=0".format(log_file)]


class CcrStats(TelemetryDevice):
Expand Down
Loading