Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fine-tune logging #522

Merged
merged 1 commit into from
Jun 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions esrally/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ def transition_when_all_children_responded(self, sender, msg, expected_status, n
response_count = len(self.received_responses)
expected_count = len(self.children)

self.logger.info("[%d] of [%d] child actors have responded for transition from [%s] to [%s].",
response_count, expected_count, self.status, new_status)
self.logger.debug("[%d] of [%d] child actors have responded for transition from [%s] to [%s].",
response_count, expected_count, self.status, new_status)
if response_count == expected_count:
self.logger.info("All [%d] child actors have responded. Transitioning now from [%s] to [%s].", expected_count, self.status, new_status)
self.logger.debug("All [%d] child actors have responded. Transitioning now from [%s] to [%s].",
expected_count, self.status, new_status)
# all nodes have responded, change status
self.status = new_status
self.received_responses = []
Expand Down
1 change: 0 additions & 1 deletion esrally/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,6 @@ def warn_if_plugin_build_task_is_in_use(config):
" {}.\n".format(plugin_match.group(1), config_file.location, k, v, new_key,
console.format.link("%selasticsearch_plugins.html#running-a-benchmark-with-plugins" % DOC_LINK)))

logger.info("Migrating configuration version from 14 to 15.")
if "build" in config:
logger.info("Removing Gradle configuration as Rally now uses the Gradle Wrapper to build Elasticsearch.")
config.pop("build", None)
Expand Down
28 changes: 14 additions & 14 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,17 +385,17 @@ def joinpoint_reached(self, client_id, client_local_timestamp, task):
self.most_recent_sample_per_client = {}
self.current_step += 1

self.logger.info("Postprocessing samples...")
self.logger.debug("Postprocessing samples...")
self.post_process_samples()
m = self.metrics_store.to_externalizable(clear=True)

if self.finished():
self.logger.info("All steps completed.")
self.logger.info("Closing metrics store...")
self.logger.debug("Closing metrics store...")
self.metrics_store.close()
# immediately clear as we don't need it anymore and it can consume a significant amount of memory
del self.metrics_store
self.logger.info("Sending benchmark results...")
self.logger.debug("Sending benchmark results...")
self.target.on_benchmark_complete(m)
else:
if self.config.opts("track", "test.mode.enabled"):
Expand Down Expand Up @@ -439,7 +439,7 @@ def joinpoint_reached(self, client_id, client_local_timestamp, task):
self.target.complete_current_task(driver)

def reset_relative_time(self):
self.logger.info("Resetting relative time of request metrics store.")
self.logger.debug("Resetting relative time of request metrics store.")
self.metrics_store.reset_relative_time()

def finished(self):
Expand Down Expand Up @@ -503,11 +503,11 @@ def post_process_samples(self):
relative_time=sample.relative_time, meta_data=meta_data)

end = time.perf_counter()
self.logger.info("Storing latency and service time took [%f] seconds.", (end - start))
self.logger.debug("Storing latency and service time took [%f] seconds.", (end - start))
start = end
aggregates = self.throughput_calculator.calculate(raw_samples)
end = time.perf_counter()
self.logger.info("Calculating throughput took [%f] seconds.", (end - start))
self.logger.debug("Calculating throughput took [%f] seconds.", (end - start))
start = end
for task, samples in aggregates.items():
meta_data = self.merge(
Expand All @@ -522,7 +522,7 @@ def post_process_samples(self):
sample_type=sample_type, absolute_time=absolute_time,
relative_time=relative_time, meta_data=meta_data)
end = time.perf_counter()
self.logger.info("Storing throughput took [%f] seconds.", (end - start))
self.logger.debug("Storing throughput took [%f] seconds.", (end - start))
start = end
# this will be a noop for the in-memory metrics store. If we use an ES metrics store however, this will ensure that we already send
# the data and also clear the in-memory buffer. This allows users to see data already while running the benchmark. In cases where
Expand All @@ -532,8 +532,8 @@ def post_process_samples(self):
# no need for frequent refreshes.
self.metrics_store.flush(refresh=False)
end = time.perf_counter()
self.logger.info("Flushing the metrics store took [%f] seconds.", (end - start))
self.logger.info("Postprocessing [%d] raw samples took [%f] seconds in total.", len(raw_samples), (end - total_start))
self.logger.debug("Flushing the metrics store took [%f] seconds.", (end - start))
self.logger.debug("Postprocessing [%d] raw samples took [%f] seconds in total.", len(raw_samples), (end - total_start))

def merge(self, *args):
result = {}
Expand Down Expand Up @@ -650,13 +650,13 @@ def receiveMsg_WakeupMessage(self, msg, sender):
if current_samples and len(current_samples) > 0:
most_recent_sample = current_samples[-1]
if most_recent_sample.percent_completed is not None:
self.logger.info("LoadGenerator[%s] is executing [%s] (%.2f%% complete).",
str(self.client_id), most_recent_sample.task, most_recent_sample.percent_completed * 100.0)
self.logger.debug("LoadGenerator[%s] is executing [%s] (%.2f%% complete).",
str(self.client_id), most_recent_sample.task, most_recent_sample.percent_completed * 100.0)
else:
self.logger.info("LoadGenerator[%s] is executing [%s] (dependent eternal task).",
str(self.client_id), most_recent_sample.task)
self.logger.debug("LoadGenerator[%s] is executing [%s] (dependent eternal task).",
str(self.client_id), most_recent_sample.task)
else:
self.logger.info("LoadGenerator[%s] is executing (no samples).", str(self.client_id))
self.logger.debug("LoadGenerator[%s] is executing (no samples).", str(self.client_id))
self.wakeupAfter(datetime.timedelta(seconds=self.wakeup_interval))

def receiveMsg_ActorExitRequest(self, msg, sender):
Expand Down
1 change: 0 additions & 1 deletion esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,6 @@ class ForceMerge(Runner):
"""

def __call__(self, es, params):
self.logger.info("Force merging all indices.")
import elasticsearch
try:
if "max-num-segments" in params:
Expand Down
7 changes: 3 additions & 4 deletions esrally/mechanic/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,8 @@ def _start_node(self, node_configuration, node_count_on_host, java_major_version
t.on_pre_node_start(node_name)
node_process = self._start_process(env, node_name, binary_path)
node = cluster.Node(node_process, host_name, node_name, t)
self.logger.info("Node [%s] has successfully started. Attaching telemetry devices.", node_name)
self.logger.info("Attaching telemetry devices to node [%s].", node_name)
t.attach_to_node(node)
self.logger.info("Telemetry devices are now attached to node [%s].", node_name)

return node

Expand All @@ -303,7 +302,7 @@ def _prepare_env(self, car, node_name, t):
else:
self.logger.info("JVM does not support [%s]. A JDK upgrade is recommended.", exit_on_oome_flag)

self.logger.info("env for [%s]: %s", node_name, str(env))
self.logger.debug("env for [%s]: %s", node_name, str(env))
return env

def _set_env(self, env, k, v, separator=' '):
Expand Down Expand Up @@ -331,7 +330,7 @@ def _start_process(self, env, node_name, binary_path):
self.logger.error(msg)
raise exceptions.LaunchError(msg)
else:
self.logger.info("Started node [%s] with PID [%s]", node_name, process.pid)
self.logger.info("Started node [%s] with PID [%s].", node_name, process.pid)
return process
else:
msg = "Could not start node [%s] within timeout period of [%s] seconds." % (
Expand Down
4 changes: 2 additions & 2 deletions esrally/mechanic/mechanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def receiveMsg_WakeupMessage(self, msg, sender):
if msg.payload == MechanicActor.WAKEUP_RESET_RELATIVE_TIME:
self.reset_relative_time()
elif msg.payload == MechanicActor.WAKEUP_FLUSH_METRICS:
self.logger.info("Flushing cluster-wide system metrics store.")
self.logger.debug("Flushing cluster-wide system metrics store.")
self.metrics_store.flush(refresh=False)
self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS, payload=MechanicActor.WAKEUP_FLUSH_METRICS)
else:
Expand Down Expand Up @@ -572,7 +572,7 @@ def receiveUnrecognizedMessage(self, msg, sender):
self.send(sender, BenchmarkStarted())
elif isinstance(msg, thespian.actors.WakeupMessage):
if self.running:
self.logger.info("Flushing system metrics store on host [%s].", self.host)
self.logger.debug("Flushing system metrics store on host [%s].", self.host)
self.metrics_store.flush(refresh=False)
self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS)
elif isinstance(msg, OnBenchmarkStop):
Expand Down
4 changes: 2 additions & 2 deletions esrally/mechanic/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,10 +1027,10 @@ def on_benchmark_start(self):
self.first_time = False

def on_benchmark_stop(self):
import json
self.logger.info("Gathering indices stats for all primaries on benchmark stop.")
index_stats = self.index_stats()
self.logger.info("Returned indices stats:\n%s", json.dumps(index_stats, indent=2))
# import json
# self.logger.debug("Returned indices stats:\n%s", json.dumps(index_stats, indent=2))
if "_all" not in index_stats or "primaries" not in index_stats["_all"]:
return
p = index_stats["_all"]["primaries"]
Expand Down
4 changes: 2 additions & 2 deletions esrally/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ def bulk_add(self, memento):
:param memento: The external representation as returned by #to_externalizable().
"""
if memento:
self.logger.info("Restoring in-memory representation of metrics store.")
self.logger.debug("Restoring in-memory representation of metrics store.")
for doc in pickle.loads(zlib.decompress(memento)):
self._add(doc)

Expand Down Expand Up @@ -1000,7 +1000,7 @@ def to_externalizable(self, clear=False):
if clear:
self.docs = []
compressed = zlib.compress(pickle.dumps(docs))
self.logger.info("Compression changed size of metric store from [%d] bytes to [%d] bytes",
self.logger.debug("Compression changed size of metric store from [%d] bytes to [%d] bytes",
sys.getsizeof(docs, -1), sys.getsizeof(compressed, -1))
return compressed

Expand Down
7 changes: 4 additions & 3 deletions esrally/racecontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ def receiveMsg_BenchmarkComplete(self, msg, sender):
def receiveMsg_BenchmarkStopped(self, msg, sender):
self.logger.info("Bulk adding system metrics to metrics store.")
self.metrics_store.bulk_add(msg.system_metrics)
self.logger.info("Flushing metrics data...")
self.logger.debug("Flushing metrics data...")
self.metrics_store.flush()
self.logger.info("Flushing done")
self.logger.debug("Flushing done")
self.lap_counter.after_lap()
if self.lap_counter.has_more_laps():
self.run()
Expand Down Expand Up @@ -292,13 +292,14 @@ def race(cfg, sources=False, build=False, distribution=False, external=False, do
def set_default_hosts(cfg, host="127.0.0.1", port=9200):
logger = logging.getLogger(__name__)
configured_hosts = cfg.opts("client", "hosts")
if len(configured_hosts.default) !=0 :
if len(configured_hosts.default) != 0:
logger.info("Using configured hosts %s", configured_hosts.default)
else:
logger.info("Setting default host to [%s:%d]", host, port)
default_host_object = opts.TargetHosts("{}:{}".format(host,port))
cfg.add(config.Scope.benchmark, "client", "hosts", default_host_object)


# Poor man's curry
def from_sources_complete(cfg):
port = cfg.opts("provisioning", "node.http.port")
Expand Down