Skip to content

Commit

Permalink
Revert PR#711 to use ES_JAVA_OPTS for telemetry devices
Browse files Browse the repository at this point in the history
By using ES_JAVA_OPTS we can provision a node, run a benchmark, and then
“dynamically” (i.e. without reprovisioning) start the node again with
telemetry attached.

Relates to elastic#697
Relates to elastic#711
  • Loading branch information
ebadyano committed Jul 31, 2019
1 parent fe1ff28 commit 3661748
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 33 deletions.
16 changes: 14 additions & 2 deletions esrally/mechanic/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,21 @@ def _start_node(self, node_configuration, node_count_on_host):
car = node_configuration.car
binary_path = node_configuration.binary_path
data_paths = node_configuration.data_paths
node_telemetry_dir = "%s/telemetry" % node_configuration.node_root_path
#node_telemetry_dir = "%s/telemetry" % node_configuration.node_root_path
node_telemetry_dir = os.path.join(node_configuration.node_root_path, "telemetry")

print("here in oy %s" % node_telemetry_dir)

java_major_version, java_home = java_resolver.java_home(car, self.cfg)

self.logger.info("Starting node [%s] based on car [%s].", node_name, car)

enabled_devices = self.cfg.opts("mechanic", "telemetry.devices")
telemetry_params = self.cfg.opts("mechanic", "telemetry.params")
node_telemetry = [
telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version),
telemetry.JitCompiler(node_telemetry_dir),
telemetry.Gc(node_telemetry_dir, java_major_version),
telemetry.DiskIo(self.metrics_store, node_count_on_host, node_telemetry_dir, node_name),
telemetry.NodeEnvironmentInfo(self.metrics_store),
telemetry.IndexSize(data_paths, self.metrics_store),
Expand All @@ -331,6 +338,11 @@ def _prepare_env(self, car, node_name, java_home, t):
self._set_env(env, "PATH", os.path.join(java_home, "bin"), separator=os.pathsep)
# Don't merge here!
env["JAVA_HOME"] = java_home
env["ES_JAVA_OPTS"] = "-XX:+ExitOnOutOfMemoryError"

# we just blindly trust telemetry here...
for v in t.instrument_candidate_java_opts(car, node_name):
self._set_env(env, "ES_JAVA_OPTS", v)

self.logger.debug("env for [%s]: %s", node_name, str(env))
return env
Expand All @@ -340,7 +352,7 @@ def _set_env(self, env, k, v, separator=' '):
if k not in env:
env[k] = v
else: # merge
env[k] = v + separator + env[k]
env[k] = env[k] + separator + v

@staticmethod
def _start_process(binary_path, env):
Expand Down
32 changes: 5 additions & 27 deletions esrally/mechanic/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import jinja2

from esrally import exceptions
from esrally.mechanic import team, java_resolver, telemetry
from esrally.utils import io, process, versions, jvm
from esrally.mechanic import team, java_resolver
from esrally.utils import io, process, versions


def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_root, node_id):
Expand All @@ -39,21 +39,10 @@ def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_

_, java_home = java_resolver.java_home(car, cfg)

node_telemetry_dir = os.path.join(node_root_dir, "telemetry")
java_major_version, java_home = java_resolver.java_home(car, cfg)
enabled_devices = cfg.opts("mechanic", "telemetry.devices")
telemetry_params = cfg.opts("mechanic", "telemetry.params")
node_telemetry = [
telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version),
telemetry.JitCompiler(node_telemetry_dir),
telemetry.Gc(node_telemetry_dir, java_major_version)
]
t = telemetry.Telemetry(enabled_devices, devices=node_telemetry)

es_installer = ElasticsearchInstaller(car, java_home, node_name, node_root_dir, all_node_ips, ip, http_port)
plugin_installers = [PluginInstaller(plugin, java_home) for plugin in plugins]

return BareProvisioner(cluster_settings, es_installer, plugin_installers, preserve, t, distribution_version=distribution_version)
return BareProvisioner(cluster_settings, es_installer, plugin_installers, preserve, distribution_version=distribution_version)


def no_op_provisioner():
Expand Down Expand Up @@ -158,14 +147,13 @@ class BareProvisioner:
of the benchmark candidate to the appropriate place.
"""

def __init__(self, cluster_settings, es_installer, plugin_installers, preserve, telemetry=None, distribution_version=None, apply_config=_apply_config):
def __init__(self, cluster_settings, es_installer, plugin_installers, preserve, distribution_version=None, apply_config=_apply_config):
self.preserve = preserve
self._cluster_settings = cluster_settings
self.es_installer = es_installer
self.plugin_installers = plugin_installers
self.distribution_version = distribution_version
self.apply_config = apply_config
self.telemetry = telemetry
self.logger = logging.getLogger(__name__)

def prepare(self, binary):
Expand Down Expand Up @@ -197,13 +185,7 @@ def prepare(self, binary):

def cleanup(self):
self.es_installer.cleanup(self.preserve)

def _prepare_java_opts(self):
# To detect out of memory errors during the benchmark
java_opts = ["-XX:+ExitOnOutOfMemoryError"]
if self.telemetry is not None:
java_opts.extend(self.telemetry.instrument_candidate_java_opts(self.es_installer.car, self.es_installer.node_name))
return java_opts


def _provisioner_variables(self):
plugin_variables = {}
Expand Down Expand Up @@ -237,10 +219,6 @@ def _provisioner_variables(self):
provisioner_vars.update(plugin_variables)
provisioner_vars["cluster_settings"] = cluster_settings

java_opts = self._prepare_java_opts()
if java_opts:
provisioner_vars["additional_java_settings"] = java_opts

return provisioner_vars


Expand Down
3 changes: 2 additions & 1 deletion tests/mechanic/launcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def wait(self):


class ProcessLauncherTests(TestCase):
@mock.patch('os.path.join', return_value="/telemetry")
@mock.patch('os.kill')
@mock.patch('subprocess.Popen',new=MockPopen)
@mock.patch('esrally.mechanic.java_resolver.java_home', return_value=(12, "/java_home/"))
Expand All @@ -129,7 +130,7 @@ class ProcessLauncherTests(TestCase):
@mock.patch('esrally.mechanic.provisioner.NodeConfiguration')
@mock.patch('esrally.mechanic.launcher.wait_for_pidfile', return_value=MOCK_PID_VALUE)
@mock.patch('psutil.Process')
def test_daemon_start_stop(self, process, wait_for_pidfile, node_config, ms, cfg, chdir, supports, java_home, kill):
def test_daemon_start_stop(self, process, wait_for_pidfile, node_config, ms, cfg, chdir, supports, java_home, kill, path):
proc_launcher = launcher.ProcessLauncher(cfg, ms, paths.races_root(cfg))

nodes = proc_launcher.start([node_config])
Expand Down
3 changes: 0 additions & 3 deletions tests/mechanic/provisioner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
"cluster_settings": {
"indices.query.bool.max_clause_count": 50000,
},
"additional_java_settings": ["-XX:+ExitOnOutOfMemoryError"],
"heap": "4g",
"cluster_name": "rally-benchmark",
"node_name": "rally-node-0",
Expand Down Expand Up @@ -185,7 +184,6 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
"indices.query.bool.max_clause_count": 50000,
"plugin.mandatory": ["x-pack-security"]
},
"additional_java_settings": ["-XX:+ExitOnOutOfMemoryError"],
"heap": "4g",
"cluster_name": "rally-benchmark",
"node_name": "rally-node-0",
Expand Down Expand Up @@ -263,7 +261,6 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
"indices.query.bool.max_clause_count": 50000,
"plugin.mandatory": ["x-pack"]
},
"additional_java_settings": ["-XX:+ExitOnOutOfMemoryError"],
"heap": "4g",
"cluster_name": "rally-benchmark",
"node_name": "rally-node-0",
Expand Down

0 comments on commit 3661748

Please sign in to comment.