diff --git a/esrally/metrics.py b/esrally/metrics.py index 33b0bca15..07b03da94 100644 --- a/esrally/metrics.py +++ b/esrally/metrics.py @@ -321,7 +321,7 @@ def __init__(self, cfg, clock=time.Clock, meta_info=None): self._trial_id = None self._trial_timestamp = None self._track = None - self._track_params = cfg.opts("track", "params") + self._track_params = cfg.opts("track", "params", default_value={}, mandatory=False) self._challenge = None self._car = None self._car_name = None diff --git a/tests/mechanic/launcher_test.py b/tests/mechanic/launcher_test.py index d12f5f6ca..0318cdfcf 100644 --- a/tests/mechanic/launcher_test.py +++ b/tests/mechanic/launcher_test.py @@ -14,19 +14,22 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import tempfile +from datetime import datetime import io, os +import uuid from unittest import TestCase, mock +import psutil + from esrally import config, exceptions, paths from esrally.mechanic import launcher, telemetry, team +from esrally.mechanic.provisioner import NodeConfiguration +from esrally.mechanic.team import Car +from esrally.metrics import InMemoryMetricsStore from esrally.utils import opts -class MockMetricsStore: - def add_meta_info(self, scope, scope_key, key, value): - pass - - class MockClientFactory: def __init__(self, hosts, client_options): self.client_options = client_options @@ -115,29 +118,62 @@ def wait(self): return 0 +class MockProcess: + def __init__(self, pid): + self.pid = pid + self.killed = False + + def name(self): + return "p{pid}".format(pid=self.pid) + + def wait(self, timeout=None): + raise psutil.TimeoutExpired(timeout) + + def kill(self): + self.killed = True + + +def get_metrics_store(cfg): + ms = InMemoryMetricsStore(cfg) + ms.open(trial_id=str(uuid.uuid4()), + trial_timestamp=datetime.now(), + track_name="test", + challenge_name="test", + car_name="test") + return ms + + MOCK_PID_VALUE = 1234 class ProcessLauncherTests(TestCase): @mock.patch('os.path.join', return_value="/telemetry") @mock.patch('os.kill') - @mock.patch('subprocess.Popen',new=MockPopen) + @mock.patch('subprocess.Popen', new=MockPopen) @mock.patch('esrally.mechanic.java_resolver.java_home', return_value=(12, "/java_home/")) @mock.patch('esrally.utils.jvm.supports_option', return_value=True) + @mock.patch('esrally.utils.io.get_size') @mock.patch('os.chdir') - @mock.patch('esrally.config.Config') - @mock.patch('esrally.metrics.MetricsStore') - @mock.patch('esrally.mechanic.provisioner.NodeConfiguration') @mock.patch('esrally.mechanic.launcher.wait_for_pidfile', return_value=MOCK_PID_VALUE) - @mock.patch('psutil.Process') - def test_daemon_start_stop(self, process, wait_for_pidfile, node_config, ms, cfg, chdir, supports, java_home, kill, path): + @mock.patch('psutil.Process', new=MockProcess) + def test_daemon_start_stop(self, wait_for_pidfile, chdir, get_size, supports, java_home, kill): + cfg = config.Config() + cfg.add(config.Scope.application, "node", "root.dir", "test") + cfg.add(config.Scope.application, "mechanic", "keep.running", False) + cfg.add(config.Scope.application, "mechanic", "telemetry.devices", []) + cfg.add(config.Scope.application, "mechanic", "telemetry.params", None) + cfg.add(config.Scope.application, "system", "env.name", "test") + + ms = get_metrics_store(cfg) proc_launcher = launcher.ProcessLauncher(cfg, ms, paths.races_root(cfg)) + node_config = NodeConfiguration(car=Car("default", root_path=None, config_paths=[]), ip="127.0.0.1", node_name="testnode", + node_root_path="/tmp", binary_path="/tmp", log_path="/tmp", data_paths="/tmp") + nodes = proc_launcher.start([node_config]) self.assertEqual(len(nodes), 1) self.assertEqual(nodes[0].pid, MOCK_PID_VALUE) - proc_launcher.keep_running = False proc_launcher.stop(nodes) self.assertTrue(kill.called) @@ -170,8 +206,11 @@ def test_setup_external_cluster_single_node(self): cfg.add(config.Scope.application, "mechanic", "telemetry.devices", []) cfg.add(config.Scope.application, "client", "hosts", self.test_host) cfg.add(config.Scope.application, "client", "options", self.client_options) + cfg.add(config.Scope.application, "system", "env.name", "test") + + ms = get_metrics_store(cfg) - m = launcher.ExternalLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory) + m = launcher.ExternalLauncher(cfg, ms, client_factory_class=MockClientFactory) m.start() # automatically determined by launcher on attach @@ -183,8 +222,11 @@ def test_setup_external_cluster_multiple_nodes(self): cfg.add(config.Scope.application, "client", "hosts", self.test_host) cfg.add(config.Scope.application, "client", "options", self.client_options) cfg.add(config.Scope.application, "mechanic", "distribution.version", "2.3.3") + cfg.add(config.Scope.application, "system", "env.name", "test") - m = launcher.ExternalLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory) + ms = get_metrics_store(cfg) + + m = launcher.ExternalLauncher(cfg, ms, client_factory_class=MockClientFactory) m.start() # did not change user defined value self.assertEqual(cfg.opts("mechanic", "distribution.version"), "2.3.3") @@ -196,8 +238,11 @@ def test_setup_external_cluster_cannot_determine_version(self): cfg.add(config.Scope.application, "mechanic", "telemetry.devices", []) cfg.add(config.Scope.application, "client", "hosts", self.test_host) cfg.add(config.Scope.application, "client", "options", client_options) + cfg.add(config.Scope.application, "system", "env.name", "test") + + ms = get_metrics_store(cfg) - m = launcher.ExternalLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory) + m = launcher.ExternalLauncher(cfg, ms, client_factory_class=MockClientFactory) m.start() # automatically determined by launcher on attach @@ -216,8 +261,11 @@ def test_launches_cluster(self): cfg.add(config.Scope.application, "mechanic", "telemetry.params", {}) cfg.add(config.Scope.application, "mechanic", "preserve.install", False) cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", False) + cfg.add(config.Scope.application, "system", "env.name", "test") + + ms = get_metrics_store(cfg) - cluster_launcher = launcher.ClusterLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory) + cluster_launcher = launcher.ClusterLauncher(cfg, ms, client_factory_class=MockClientFactory) cluster = cluster_launcher.start() self.assertEqual([{"host": "10.0.0.10", "port": 9200}, {"host": "10.0.0.11", "port": 9200}], cluster.hosts) @@ -231,8 +279,11 @@ def test_launches_cluster_with_telemetry_client_timeout_enabled(self): cfg.add(config.Scope.application, "mechanic", "telemetry.params", {}) cfg.add(config.Scope.application, "mechanic", "preserve.install", False) cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", False) + cfg.add(config.Scope.application, "system", "env.name", "test") - cluster_launcher = launcher.ClusterLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory) + ms = get_metrics_store(cfg) + + cluster_launcher = launcher.ClusterLauncher(cfg, ms, client_factory_class=MockClientFactory) cluster = cluster_launcher.start() for telemetry_device in cluster.telemetry.devices: @@ -253,8 +304,11 @@ def test_error_on_cluster_launch(self, sleep): cfg.add(config.Scope.application, "mechanic", "telemetry.params", {}) cfg.add(config.Scope.application, "mechanic", "preserve.install", False) cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", False) + cfg.add(config.Scope.application, "system", "env.name", "test") + + ms = get_metrics_store(cfg) - cluster_launcher = launcher.ClusterLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory) + cluster_launcher = launcher.ClusterLauncher(cfg, ms, client_factory_class=MockClientFactory) with self.assertRaisesRegex(exceptions.LaunchError, "Elasticsearch REST API layer is not available. Forcefully terminated cluster."): cluster_launcher.start()