Skip to content

Commit

Permalink
Be resilient upon startup
Browse files Browse the repository at this point in the history
With this commit we make Rally more resilient when it is not able to
determine some information upon startup of the cluster.

Relates elastic#730
  • Loading branch information
danielmitterdorfer authored and novosibman committed Aug 12, 2019
1 parent 255f269 commit b1e39fb
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 33 deletions.
29 changes: 20 additions & 9 deletions esrally/mechanic/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def start(self):
all_hosts = self.cfg.opts("client", "hosts").all_hosts
default_hosts = self.cfg.opts("client", "hosts").default
preserve = self.cfg.opts("mechanic", "preserve.install")
skip_rest_api_check = self.cfg.opts("mechanic", "skip.rest.api.check")

es = {}
for cluster_name, cluster_hosts in all_hosts.items():
Expand All @@ -103,17 +104,22 @@ def start(self):
# The list of nodes will be populated by ClusterMetaDataInfo, so no need to do it here
c = cluster.Cluster(default_hosts, [], t, preserve)

self.logger.info("All cluster nodes have successfully started. Checking if REST API is available.")
if wait_for_rest_layer(es_default, max_attempts=40):
self.logger.info("REST API is available. Attaching telemetry devices to cluster.")
if skip_rest_api_check:
self.logger.info("Skipping REST API check and attaching telemetry devices to cluster.")
t.attach_to_cluster(c)
self.logger.info("Telemetry devices are now attached to the cluster.")
else:
# Just stop the cluster here and raise. The caller is responsible for terminating individual nodes.
self.logger.error("REST API layer is not yet available. Forcefully terminating cluster.")
self.stop(c)
raise exceptions.LaunchError(
"Elasticsearch REST API layer is not available. Forcefully terminated cluster.")
self.logger.info("All cluster nodes have successfully started. Checking if REST API is available.")
if wait_for_rest_layer(es_default, max_attempts=40):
self.logger.info("REST API is available. Attaching telemetry devices to cluster.")
t.attach_to_cluster(c)
self.logger.info("Telemetry devices are now attached to the cluster.")
else:
# Just stop the cluster here and raise. The caller is responsible for terminating individual nodes.
self.logger.error("REST API layer is not yet available. Forcefully terminating cluster.")
self.stop(c)
raise exceptions.LaunchError(
"Elasticsearch REST API layer is not available. Forcefully terminated cluster.")
return c

def stop(self, c):
Expand Down Expand Up @@ -226,7 +232,12 @@ def start(self, node_configurations=None):
# upfront.
c = cluster.Cluster(hosts, [], t)
user_defined_version = self.cfg.opts("mechanic", "distribution.version", mandatory=False)
distribution_version = es.info()["version"]["number"]
# noinspection PyBroadException
try:
distribution_version = es.info()["version"]["number"]
except BaseException:
self.logger.exception("Could not retrieve cluster distribution version")
distribution_version = None
if not user_defined_version or user_defined_version.strip() == "":
self.logger.info("Distribution version was not specified by user. Rally-determined version is [%s]",
distribution_version)
Expand Down
8 changes: 7 additions & 1 deletion esrally/mechanic/mechanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import sys
import json
import logging
from collections import defaultdict

import thespian.actors
Expand Down Expand Up @@ -207,7 +208,12 @@ def cluster_distribution_version(cfg, client_factory=client.EsClientFactory):
hosts = cfg.opts("client", "hosts").default
client_options = cfg.opts("client", "options").default
es = client_factory(hosts, client_options).create()
return es.info()["version"]["number"]
# noinspection PyBroadException
try:
return es.info()["version"]["number"]
except BaseException:
logging.getLogger(__name__).exception("Could not retrieve cluster distribution version")
return None


def to_ip_port(hosts):
Expand Down
59 changes: 44 additions & 15 deletions esrally/mechanic/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,14 +544,9 @@ def __init__(self, telemetry_params, clients, metrics_store):
self.metrics_store = metrics_store
self.samplers = []

def attach_to_cluster(self, cluster):
# This cluster parameter does not correspond to the cluster names passed in target.hosts, see on_benchmark_start()
super().attach_to_cluster(cluster)

def on_benchmark_start(self):
console.warn(NodeStats.warning, logger=self.logger)

recorder = []
for cluster_name in self.specified_cluster_names:
recorder = NodeStatsRecorder(self.telemetry_params, cluster_name, self.clients[cluster_name], self.metrics_store)
sampler = SamplerThread(recorder)
Expand Down Expand Up @@ -892,7 +887,12 @@ def __init__(self, client, metrics_store):
self.client = client

def attach_to_cluster(self, cluster):
client_info = self.client.info()
# noinspection PyBroadException
try:
client_info = self.client.info()
except BaseException:
self.logger.exception("Could not retrieve cluster version info")
return
revision = client_info["version"]["build_hash"]
distribution_version = client_info["version"]["number"]
# older versions (pre 6.3.0) don't expose a build_flavor property because the only (implicit) flavor was "oss".
Expand Down Expand Up @@ -940,19 +940,26 @@ def __init__(self, client, metrics_store):
super().__init__()
self.metrics_store = metrics_store
self.client = client
self._t = None

# noinspection PyBroadException
def attach_to_cluster(self, cluster):
stats = self.client.nodes.stats(metric="_all")
nodes = stats["nodes"]
for node in nodes.values():
try:
nodes_stats = self.client.nodes.stats(metric="_all")["nodes"].values()
except BaseException:
self.logger.exception("Could not retrieve nodes stats")
nodes_stats = []
try:
nodes_info = self.client.nodes.info(node_id="_all")["nodes"].values()
except BaseException:
self.logger.exception("Could not retrieve nodes info")
nodes_info = []

for node in nodes_stats:
node_name = node["name"]
host = node.get("host", "unknown")
self.metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "node_name", node_name)
self.metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "host_name", host)

info = self.client.nodes.info(node_id="_all")
nodes_info = info["nodes"].values()
for node in nodes_info:
node_name = node["name"]
self.store_node_info(node_name, "os_name", node, ["os", "name"])
Expand All @@ -977,7 +984,16 @@ def __init__(self, client):
self.client = client

def attach_to_cluster(self, cluster):
client_info = self.client.info()
self.cluster_metadata(cluster)
self.nodes_metadata(cluster)

def cluster_metadata(self, cluster):
# noinspection PyBroadException
try:
client_info = self.client.info()
except BaseException:
self.logger.exception("Could not retrieve cluster version info")
return
revision = client_info["version"]["build_hash"]
distribution_version = client_info["version"]["number"]
# older versions (pre 6.3.0) don't expose a build_flavor property because the only (implicit) flavor was "oss".
Expand All @@ -987,7 +1003,20 @@ def attach_to_cluster(self, cluster):
cluster.distribution_flavor = distribution_flavor
cluster.source_revision = revision

for node_stats in self.client.nodes.stats(metric="_all")["nodes"].values():
# noinspection PyBroadException
def nodes_metadata(self, cluster):
try:
nodes_stats = self.client.nodes.stats(metric="_all")["nodes"]
except BaseException:
self.logger.exception("Could not retrieve nodes stats")
nodes_stats = {}
try:
nodes_info = self.client.nodes.info(node_id="_all")["nodes"]
except BaseException:
self.logger.exception("Could not retrieve nodes info")
nodes_info = {}

for node_stats in nodes_stats.values():
node_name = node_stats["name"]
if cluster.has_node(node_name):
cluster_node = cluster.node(node_name)
Expand All @@ -996,7 +1025,7 @@ def attach_to_cluster(self, cluster):
cluster_node = cluster.add_node(host, node_name)
self.add_node_stats(cluster_node, node_stats)

for node_info in self.client.nodes.info(node_id="_all")["nodes"].values():
for node_info in nodes_info.values():
self.add_node_info(cluster, node_info)

def add_node_info(self, cluster, node_info):
Expand Down
7 changes: 7 additions & 0 deletions esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ def positive_number(v):
help=argparse.SUPPRESS,
action="store_true",
default=False)
# skips checking that the REST API is available before proceeding with the benchmark
p.add_argument(
"--skip-rest-api-check",
help=argparse.SUPPRESS,
action="store_true",
default=False)

for p in [parser, config_parser, list_parser, race_parser, compare_parser, download_parser]:
# This option is needed to support a separate configuration for the integration tests on the same machine
Expand Down Expand Up @@ -599,6 +605,7 @@ def main():
else:
cfg.add(config.Scope.applicationOverride, "mechanic", "keep.running", False)
cfg.add(config.Scope.applicationOverride, "mechanic", "preserve.install", convert.to_bool(args.preserve_install))
cfg.add(config.Scope.applicationOverride, "mechanic", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check))
cfg.add(config.Scope.applicationOverride, "mechanic", "runtime.jdk", args.runtime_jdk)
cfg.add(config.Scope.applicationOverride, "mechanic", "telemetry.devices", opts.csv_to_list(args.telemetry))
cfg.add(config.Scope.applicationOverride, "mechanic", "telemetry.params", opts.to_dict(args.telemetry_params))
Expand Down
17 changes: 17 additions & 0 deletions tests/mechanic/launcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,20 @@ def test_setup_external_cluster_multiple_nodes(self):
# did not change user defined value
self.assertEqual(cfg.opts("mechanic", "distribution.version"), "2.3.3")

def test_setup_external_cluster_cannot_determine_version(self):
client_options = opts.ClientOptions("timeout:60,raise-error-on-info:true")
cfg = config.Config()

cfg.add(config.Scope.application, "mechanic", "telemetry.devices", [])
cfg.add(config.Scope.application, "client", "hosts", self.test_host)
cfg.add(config.Scope.application, "client", "options", client_options)

m = launcher.ExternalLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory)
m.start()

# automatically determined by launcher on attach
self.assertIsNone(cfg.opts("mechanic", "distribution.version"))


class ClusterLauncherTests(TestCase):
test_host = opts.TargetHosts("10.0.0.10:9200,10.0.0.11:9200")
Expand All @@ -182,6 +196,7 @@ def test_launches_cluster(self):
cfg.add(config.Scope.application, "mechanic", "telemetry.devices", [])
cfg.add(config.Scope.application, "mechanic", "telemetry.params", {})
cfg.add(config.Scope.application, "mechanic", "preserve.install", False)
cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", False)

cluster_launcher = launcher.ClusterLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory)
cluster = cluster_launcher.start()
Expand All @@ -196,6 +211,7 @@ def test_launches_cluster_with_telemetry_client_timeout_enabled(self):
cfg.add(config.Scope.application, "mechanic", "telemetry.devices", [])
cfg.add(config.Scope.application, "mechanic", "telemetry.params", {})
cfg.add(config.Scope.application, "mechanic", "preserve.install", False)
cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", False)

cluster_launcher = launcher.ClusterLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory)
cluster = cluster_launcher.start()
Expand All @@ -217,6 +233,7 @@ def test_error_on_cluster_launch(self, sleep):
cfg.add(config.Scope.application, "mechanic", "telemetry.devices", [])
cfg.add(config.Scope.application, "mechanic", "telemetry.params", {})
cfg.add(config.Scope.application, "mechanic", "preserve.install", False)
cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", False)

cluster_launcher = launcher.ClusterLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory)
with self.assertRaisesRegex(exceptions.LaunchError,
Expand Down
75 changes: 67 additions & 8 deletions tests/mechanic/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,29 +148,49 @@ def test_store_calculated_metrics(self, listdir_mock, open_mock, metrics_store_p
class Client:
def __init__(self, nodes=None, info=None, indices=None, transport_client=None):
self.nodes = nodes
self._info = info
self._info = wrap(info)
self.indices = indices
if transport_client:
self.transport = transport_client

def info(self):
return self._info
return self._info()


class SubClient:
def __init__(self, stats=None, info=None, recovery=None):
self._stats = stats
self._info = info
self._recovery = recovery
self._stats = wrap(stats)
self._info = wrap(info)
self._recovery = wrap(recovery)

def stats(self, *args, **kwargs):
return self._stats
return self._stats()

def info(self, *args, **kwargs):
return self._info
return self._info()

def recovery(self, *args, **kwargs):
return self._recovery
return self._recovery()


def wrap(it):
return it if callable(it) else ResponseSupplier(it)


class ResponseSupplier:
def __init__(self, response):
self.response = response

def __call__(self, *args, **kwargs):
return self.response


class TransportErrorSupplier:
def __call__(self, *args, **kwargs):
raise elasticsearch.TransportError


raiseTransportError = TransportErrorSupplier()


class TransportClient:
Expand Down Expand Up @@ -1807,6 +1827,17 @@ def test_stores_cluster_level_metrics_on_attach(self, metrics_store_add_meta_inf

metrics_store_add_meta_info.assert_has_calls(calls)

@mock.patch("esrally.metrics.EsMetricsStore.add_meta_info")
def test_resilient_if_error_response(self, metrics_store_add_meta_info):
cfg = create_config()
client = Client(nodes=SubClient(stats=raiseTransportError, info=raiseTransportError), info=raiseTransportError)
metrics_store = metrics.EsMetricsStore(cfg)
env_device = telemetry.ClusterEnvironmentInfo(client, metrics_store)
t = telemetry.Telemetry(cfg, devices=[env_device])
t.attach_to_cluster(cluster.Cluster([], [], t))

self.assertEqual(0, metrics_store_add_meta_info.call_count)


class NodeEnvironmentInfoTests(TestCase):
@mock.patch("esrally.metrics.EsMetricsStore.add_meta_info")
Expand Down Expand Up @@ -1966,6 +1997,16 @@ def test_fallback_when_host_not_available(self, metrics_store_add_meta_info):
]
metrics_store_add_meta_info.assert_has_calls(calls)

@mock.patch("esrally.metrics.EsMetricsStore.add_meta_info")
def test_resilient_if_error_response(self, metrics_store_add_meta_info):
client = Client(nodes=SubClient(stats=raiseTransportError, info=raiseTransportError), info=raiseTransportError)
metrics_store = metrics.EsMetricsStore(self.cfg)
env_device = telemetry.ExternalEnvironmentInfo(client, metrics_store)
t = telemetry.Telemetry(self.cfg, devices=[env_device])
t.attach_to_cluster(cluster.Cluster([], [], t))

self.assertEqual(0, metrics_store_add_meta_info.call_count)


class ClusterMetaDataInfoTests(TestCase):
def setUp(self):
Expand Down Expand Up @@ -2080,6 +2121,24 @@ def test_enriches_cluster_nodes(self):
self.assertEqual("unknown", n.fs[1]["spins"])
self.assertEqual(["analysis-icu", "ingest-geoip", "ingest-user-agent"], n.plugins)

def test_resilient_if_error_response(self):
client = Client(nodes=SubClient(stats=raiseTransportError, info=raiseTransportError), info=raiseTransportError)

t = telemetry.Telemetry(devices=[telemetry.ClusterMetaDataInfo(client)])

c = cluster.Cluster(hosts=[{"host": "localhost", "port": 39200}],
nodes=[cluster.Node(pid=None, host_name="local", node_name="rally0", telemetry=None)],
telemetry=t)

t.attach_to_cluster(c)

self.assertIsNone(c.distribution_version)
self.assertIsNone(c.distribution_flavor)
self.assertIsNone(c.source_revision)
self.assertEqual(1, len(c.nodes))
n = c.nodes[0]
self.assertIsNone(n.ip)


class JvmStatsSummaryTests(TestCase):
@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
Expand Down

0 comments on commit b1e39fb

Please sign in to comment.