From 094e0f0f65e36e9853bc63e490db28ac0bec1cd7 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 13 Jun 2018 11:04:19 +0200 Subject: [PATCH] Fine-tune logging With this commit we reduce the logging level (e.g. info -> debug) in several places so Rally is a bit less verbose by default. --- esrally/actor.py | 7 ++++--- esrally/config.py | 1 - esrally/driver/driver.py | 28 ++++++++++++++-------------- esrally/driver/runner.py | 1 - esrally/mechanic/launcher.py | 7 +++---- esrally/mechanic/mechanic.py | 4 ++-- esrally/mechanic/telemetry.py | 4 ++-- esrally/metrics.py | 4 ++-- esrally/racecontrol.py | 7 ++++--- 9 files changed, 31 insertions(+), 32 deletions(-) diff --git a/esrally/actor.py b/esrally/actor.py index 493927cb0..c4eb36977 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -112,10 +112,11 @@ def transition_when_all_children_responded(self, sender, msg, expected_status, n response_count = len(self.received_responses) expected_count = len(self.children) - self.logger.info("[%d] of [%d] child actors have responded for transition from [%s] to [%s].", - response_count, expected_count, self.status, new_status) + self.logger.debug("[%d] of [%d] child actors have responded for transition from [%s] to [%s].", + response_count, expected_count, self.status, new_status) if response_count == expected_count: - self.logger.info("All [%d] child actors have responded. Transitioning now from [%s] to [%s].", expected_count, self.status, new_status) + self.logger.debug("All [%d] child actors have responded. Transitioning now from [%s] to [%s].", + expected_count, self.status, new_status) # all nodes have responded, change status self.status = new_status self.received_responses = [] diff --git a/esrally/config.py b/esrally/config.py index aba997e71..e6349dc8e 100644 --- a/esrally/config.py +++ b/esrally/config.py @@ -694,7 +694,6 @@ def warn_if_plugin_build_task_is_in_use(config): " {}.\n".format(plugin_match.group(1), config_file.location, k, v, new_key, console.format.link("%selasticsearch_plugins.html#running-a-benchmark-with-plugins" % DOC_LINK))) - logger.info("Migrating configuration version from 14 to 15.") if "build" in config: logger.info("Removing Gradle configuration as Rally now uses the Gradle Wrapper to build Elasticsearch.") config.pop("build", None) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 88a693b68..8a4cdc35b 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -385,17 +385,17 @@ def joinpoint_reached(self, client_id, client_local_timestamp, task): self.most_recent_sample_per_client = {} self.current_step += 1 - self.logger.info("Postprocessing samples...") + self.logger.debug("Postprocessing samples...") self.post_process_samples() m = self.metrics_store.to_externalizable(clear=True) if self.finished(): self.logger.info("All steps completed.") - self.logger.info("Closing metrics store...") + self.logger.debug("Closing metrics store...") self.metrics_store.close() # immediately clear as we don't need it anymore and it can consume a significant amount of memory del self.metrics_store - self.logger.info("Sending benchmark results...") + self.logger.debug("Sending benchmark results...") self.target.on_benchmark_complete(m) else: if self.config.opts("track", "test.mode.enabled"): @@ -439,7 +439,7 @@ def joinpoint_reached(self, client_id, client_local_timestamp, task): self.target.complete_current_task(driver) def reset_relative_time(self): - self.logger.info("Resetting relative time of request metrics store.") + self.logger.debug("Resetting relative time of request metrics store.") self.metrics_store.reset_relative_time() def finished(self): @@ -503,11 +503,11 @@ def post_process_samples(self): relative_time=sample.relative_time, meta_data=meta_data) end = time.perf_counter() - self.logger.info("Storing latency and service time took [%f] seconds.", (end - start)) + self.logger.debug("Storing latency and service time took [%f] seconds.", (end - start)) start = end aggregates = self.throughput_calculator.calculate(raw_samples) end = time.perf_counter() - self.logger.info("Calculating throughput took [%f] seconds.", (end - start)) + self.logger.debug("Calculating throughput took [%f] seconds.", (end - start)) start = end for task, samples in aggregates.items(): meta_data = self.merge( @@ -522,7 +522,7 @@ def post_process_samples(self): sample_type=sample_type, absolute_time=absolute_time, relative_time=relative_time, meta_data=meta_data) end = time.perf_counter() - self.logger.info("Storing throughput took [%f] seconds.", (end - start)) + self.logger.debug("Storing throughput took [%f] seconds.", (end - start)) start = end # this will be a noop for the in-memory metrics store. If we use an ES metrics store however, this will ensure that we already send # the data and also clear the in-memory buffer. This allows users to see data already while running the benchmark. In cases where @@ -532,8 +532,8 @@ def post_process_samples(self): # no need for frequent refreshes. self.metrics_store.flush(refresh=False) end = time.perf_counter() - self.logger.info("Flushing the metrics store took [%f] seconds.", (end - start)) - self.logger.info("Postprocessing [%d] raw samples took [%f] seconds in total.", len(raw_samples), (end - total_start)) + self.logger.debug("Flushing the metrics store took [%f] seconds.", (end - start)) + self.logger.debug("Postprocessing [%d] raw samples took [%f] seconds in total.", len(raw_samples), (end - total_start)) def merge(self, *args): result = {} @@ -650,13 +650,13 @@ def receiveMsg_WakeupMessage(self, msg, sender): if current_samples and len(current_samples) > 0: most_recent_sample = current_samples[-1] if most_recent_sample.percent_completed is not None: - self.logger.info("LoadGenerator[%s] is executing [%s] (%.2f%% complete).", - str(self.client_id), most_recent_sample.task, most_recent_sample.percent_completed * 100.0) + self.logger.debug("LoadGenerator[%s] is executing [%s] (%.2f%% complete).", + str(self.client_id), most_recent_sample.task, most_recent_sample.percent_completed * 100.0) else: - self.logger.info("LoadGenerator[%s] is executing [%s] (dependent eternal task).", - str(self.client_id), most_recent_sample.task) + self.logger.debug("LoadGenerator[%s] is executing [%s] (dependent eternal task).", + str(self.client_id), most_recent_sample.task) else: - self.logger.info("LoadGenerator[%s] is executing (no samples).", str(self.client_id)) + self.logger.debug("LoadGenerator[%s] is executing (no samples).", str(self.client_id)) self.wakeupAfter(datetime.timedelta(seconds=self.wakeup_interval)) def receiveMsg_ActorExitRequest(self, msg, sender): diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 17eaa6c55..83fc43ad1 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -475,7 +475,6 @@ class ForceMerge(Runner): """ def __call__(self, es, params): - self.logger.info("Force merging all indices.") import elasticsearch try: if "max-num-segments" in params: diff --git a/esrally/mechanic/launcher.py b/esrally/mechanic/launcher.py index c91426c5a..99bacdecf 100644 --- a/esrally/mechanic/launcher.py +++ b/esrally/mechanic/launcher.py @@ -277,9 +277,8 @@ def _start_node(self, node_configuration, node_count_on_host, java_major_version t.on_pre_node_start(node_name) node_process = self._start_process(env, node_name, binary_path) node = cluster.Node(node_process, host_name, node_name, t) - self.logger.info("Node [%s] has successfully started. Attaching telemetry devices.", node_name) + self.logger.info("Attaching telemetry devices to node [%s].", node_name) t.attach_to_node(node) - self.logger.info("Telemetry devices are now attached to node [%s].", node_name) return node @@ -303,7 +302,7 @@ def _prepare_env(self, car, node_name, t): else: self.logger.info("JVM does not support [%s]. A JDK upgrade is recommended.", exit_on_oome_flag) - self.logger.info("env for [%s]: %s", node_name, str(env)) + self.logger.debug("env for [%s]: %s", node_name, str(env)) return env def _set_env(self, env, k, v, separator=' '): @@ -331,7 +330,7 @@ def _start_process(self, env, node_name, binary_path): self.logger.error(msg) raise exceptions.LaunchError(msg) else: - self.logger.info("Started node [%s] with PID [%s]", node_name, process.pid) + self.logger.info("Started node [%s] with PID [%s].", node_name, process.pid) return process else: msg = "Could not start node [%s] within timeout period of [%s] seconds." % ( diff --git a/esrally/mechanic/mechanic.py b/esrally/mechanic/mechanic.py index e07d6dac9..c561b70ab 100644 --- a/esrally/mechanic/mechanic.py +++ b/esrally/mechanic/mechanic.py @@ -325,7 +325,7 @@ def receiveMsg_WakeupMessage(self, msg, sender): if msg.payload == MechanicActor.WAKEUP_RESET_RELATIVE_TIME: self.reset_relative_time() elif msg.payload == MechanicActor.WAKEUP_FLUSH_METRICS: - self.logger.info("Flushing cluster-wide system metrics store.") + self.logger.debug("Flushing cluster-wide system metrics store.") self.metrics_store.flush(refresh=False) self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS, payload=MechanicActor.WAKEUP_FLUSH_METRICS) else: @@ -572,7 +572,7 @@ def receiveUnrecognizedMessage(self, msg, sender): self.send(sender, BenchmarkStarted()) elif isinstance(msg, thespian.actors.WakeupMessage): if self.running: - self.logger.info("Flushing system metrics store on host [%s].", self.host) + self.logger.debug("Flushing system metrics store on host [%s].", self.host) self.metrics_store.flush(refresh=False) self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS) elif isinstance(msg, OnBenchmarkStop): diff --git a/esrally/mechanic/telemetry.py b/esrally/mechanic/telemetry.py index 50efe28c6..70166f134 100644 --- a/esrally/mechanic/telemetry.py +++ b/esrally/mechanic/telemetry.py @@ -1027,10 +1027,10 @@ def on_benchmark_start(self): self.first_time = False def on_benchmark_stop(self): - import json self.logger.info("Gathering indices stats for all primaries on benchmark stop.") index_stats = self.index_stats() - self.logger.info("Returned indices stats:\n%s", json.dumps(index_stats, indent=2)) + # import json + # self.logger.debug("Returned indices stats:\n%s", json.dumps(index_stats, indent=2)) if "_all" not in index_stats or "primaries" not in index_stats["_all"]: return p = index_stats["_all"]["primaries"] diff --git a/esrally/metrics.py b/esrally/metrics.py index 6017c4b51..458aaea78 100644 --- a/esrally/metrics.py +++ b/esrally/metrics.py @@ -619,7 +619,7 @@ def bulk_add(self, memento): :param memento: The external representation as returned by #to_externalizable(). """ if memento: - self.logger.info("Restoring in-memory representation of metrics store.") + self.logger.debug("Restoring in-memory representation of metrics store.") for doc in pickle.loads(zlib.decompress(memento)): self._add(doc) @@ -1000,7 +1000,7 @@ def to_externalizable(self, clear=False): if clear: self.docs = [] compressed = zlib.compress(pickle.dumps(docs)) - self.logger.info("Compression changed size of metric store from [%d] bytes to [%d] bytes", + self.logger.debug("Compression changed size of metric store from [%d] bytes to [%d] bytes", sys.getsizeof(docs, -1), sys.getsizeof(compressed, -1)) return compressed diff --git a/esrally/racecontrol.py b/esrally/racecontrol.py index bf76e16b2..9fafc6023 100644 --- a/esrally/racecontrol.py +++ b/esrally/racecontrol.py @@ -144,9 +144,9 @@ def receiveMsg_BenchmarkComplete(self, msg, sender): def receiveMsg_BenchmarkStopped(self, msg, sender): self.logger.info("Bulk adding system metrics to metrics store.") self.metrics_store.bulk_add(msg.system_metrics) - self.logger.info("Flushing metrics data...") + self.logger.debug("Flushing metrics data...") self.metrics_store.flush() - self.logger.info("Flushing done") + self.logger.debug("Flushing done") self.lap_counter.after_lap() if self.lap_counter.has_more_laps(): self.run() @@ -292,13 +292,14 @@ def race(cfg, sources=False, build=False, distribution=False, external=False, do def set_default_hosts(cfg, host="127.0.0.1", port=9200): logger = logging.getLogger(__name__) configured_hosts = cfg.opts("client", "hosts") - if len(configured_hosts.default) !=0 : + if len(configured_hosts.default) != 0: logger.info("Using configured hosts %s", configured_hosts.default) else: logger.info("Setting default host to [%s:%d]", host, port) default_host_object = opts.TargetHosts("{}:{}".format(host,port)) cfg.add(config.Scope.benchmark, "client", "hosts", default_host_object) + # Poor man's curry def from_sources_complete(cfg): port = cfg.opts("provisioning", "node.http.port")