Skip to content

Commit

Permalink
Fold node-stats telemetry device fields in one doc per sample
Browse files Browse the repository at this point in the history
This is similar to the work done earlier for the ccr-stats telemetry
device in 198ceef

Relates  elastic#517
  • Loading branch information
dliappis authored Jun 5, 2018
1 parent 367b829 commit 2a90727
Show file tree
Hide file tree
Showing 2 changed files with 472 additions and 480 deletions.
158 changes: 68 additions & 90 deletions esrally/mechanic/telemetry.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import collections
import logging
import os
import re
Expand Down Expand Up @@ -481,107 +482,84 @@ def record(self):
current_sample = self.sample()
for node_stats in current_sample:
node_name = node_stats["name"]
collected_node_stats = collections.OrderedDict()

if self.include_indices:
self.record_indices_stats(node_name, node_stats,
include=["docs", "store", "indexing", "search", "merges", "query_cache", "fielddata",
"segments", "translog", "request_cache"])
collected_node_stats.update(
self.indices_stats(node_name, node_stats,
include=["docs", "store", "indexing", "search", "merges", "query_cache", "fielddata",
"segments", "translog", "request_cache"]))
if self.include_thread_pools:
self.record_thread_pool_stats(node_name, node_stats)
collected_node_stats.update(self.thread_pool_stats(node_name, node_stats))
if self.include_breakers:
self.record_circuit_breaker_stats(node_name, node_stats)
collected_node_stats.update(self.circuit_breaker_stats(node_name, node_stats))
if self.include_buffer_pools:
self.record_jvm_buffer_pool_stats(node_name, node_stats)
collected_node_stats.update(self.jvm_buffer_pool_stats(node_name, node_stats))
if self.include_mem_stats:
self.record_jvm_mem_stats(node_name, node_stats)
collected_node_stats.update(self.jvm_mem_stats(node_name, node_stats))
if self.include_network:
self.record_network_stats(node_name, node_stats)
collected_node_stats.update(self.network_stats(node_name, node_stats))
if self.include_process:
self.record_process_stats(node_name, node_stats)
collected_node_stats.update(self.process_stats(node_name, node_stats))

self.metrics_store.put_doc(dict(collected_node_stats),
level=MetaInfoScope.node,
node_name=node_name,
meta_data=self.metrics_store_meta_data)

time.sleep(self.sample_interval)

def record_indices_stats(self, node_name, node_stats, include):
indices_stats = node_stats["indices"]
def flatten_stats_fields(self, prefix=None, stats=None):
"""
Flatten provided dict using an optional prefix and top level key filters.
:param prefix: The prefix for all flattened values. Defaults to None.
:param stats: Dict with values to be flattened, using _ as a separator. Defaults to {}.
:return: Return flattened dictionary, separated by _ and prefixed with prefix.
"""

def iterate():
for section_name, section_value in stats.items():
if isinstance(section_value, dict):
new_prefix = "{}_{}".format(prefix, section_name)
# https://www.python.org/dev/peps/pep-0380/
yield from self.flatten_stats_fields(prefix=new_prefix, stats=section_value).items()
# Avoid duplication for metric fields that have unit embedded in value as they are also recorded elsewhere
# example: `breakers_parent_limit_size_in_bytes` vs `breakers_parent_limit_size`
elif isinstance(section_value, (int, float)) and not isinstance(section_value, bool):
yield "{}{}".format(prefix + "_" if prefix else "", section_name), section_value

if stats:
return dict(iterate())
else:
return dict()

def indices_stats(self, node_name, node_stats, include):
idx_stats = node_stats["indices"]
ordered_results = collections.OrderedDict()
for section in include:
if section in indices_stats:
for metric_name, metric_value in indices_stats[section].items():
self.put_value(node_name,
metric_name="indices_{}_{}".format(section, metric_name),
node_stats_metric_name=metric_name,
metric_value=metric_value)

def record_thread_pool_stats(self, node_name, node_stats):
thread_pool_stats = node_stats["thread_pool"]
for pool_name, pool_metrics in thread_pool_stats.items():
for metric_name, metric_value in pool_metrics.items():
self.put_value(node_name,
metric_name="thread_pool_{}_{}".format(pool_name, metric_name),
node_stats_metric_name=metric_name,
metric_value=metric_value)

def record_circuit_breaker_stats(self, node_name, node_stats):
breaker_stats = node_stats["breakers"]
for breaker_name, breaker_metrics in breaker_stats.items():
for metric_name, metric_value in breaker_metrics.items():
self.put_value(node_name,
metric_name="breaker_{}_{}".format(breaker_name, metric_name),
node_stats_metric_name=metric_name,
metric_value=metric_value)

def record_jvm_buffer_pool_stats(self, node_name, node_stats):
buffer_pool_stats = node_stats["jvm"]["buffer_pools"]
for pool_name, pool_metrics in buffer_pool_stats.items():
for metric_name, metric_value in pool_metrics.items():
self.put_value(node_name,
metric_name="jvm_buffer_pool_{}_{}".format(pool_name, metric_name),
node_stats_metric_name=metric_name,
metric_value=metric_value)

def record_jvm_mem_stats(self, node_name, node_stats):
mem_stats = node_stats["jvm"]["mem"]
if mem_stats:
for metric_name, metric_value in mem_stats.items():
self.put_value(node_name,
metric_name="jvm_mem_{}".format(metric_name),
node_stats_metric_name=metric_name,
metric_value=metric_value)

def record_network_stats(self, node_name, node_stats):
transport_stats = node_stats.get("transport")
if transport_stats:
for metric_name, metric_value in transport_stats.items():
self.put_value(node_name,
metric_name="transport_{}".format(metric_name),
node_stats_metric_name=metric_name,
metric_value=metric_value)

def record_process_stats(self, node_name, node_stats):
process_stats = node_stats["process"]["cpu"]
if process_stats:
for metric_name, metric_value in process_stats.items():
self.put_value(node_name,
metric_name="process_cpu_{}".format(metric_name),
node_stats_metric_name=metric_name,
metric_value=metric_value)

def put_value(self, node_name, metric_name, node_stats_metric_name, metric_value):
if isinstance(metric_value, (int, float)) and not isinstance(metric_value, bool):
# auto-recognize metric keys ending with well-known suffixes
if node_stats_metric_name.endswith("in_bytes"):
self.metrics_store.put_value_node_level(node_name=node_name,
name=metric_name,
value=metric_value, unit="byte",
meta_data=self.metrics_store_meta_data)
elif node_stats_metric_name.endswith("in_millis"):
self.metrics_store.put_value_node_level(node_name=node_name,
name=metric_name,
value=metric_value, unit="ms",
meta_data=self.metrics_store_meta_data)
else:
self.metrics_store.put_count_node_level(node_name=node_name,
name=metric_name,
count=metric_value,
meta_data=self.metrics_store_meta_data)
if section in idx_stats:
ordered_results.update(self.flatten_stats_fields(prefix="indices_" + section, stats=idx_stats[section]))

return ordered_results

def thread_pool_stats(self, node_name, node_stats):
return self.flatten_stats_fields(prefix="thread_pool", stats=node_stats["thread_pool"])

def circuit_breaker_stats(self, node_name, node_stats):
return self.flatten_stats_fields(prefix="breakers", stats=node_stats["breakers"])

def jvm_buffer_pool_stats(self, node_name, node_stats):
return self.flatten_stats_fields(prefix="jvm_buffer_pools", stats=node_stats["jvm"]["buffer_pools"])

def jvm_mem_stats(self, node_name, node_stats):
return self.flatten_stats_fields(prefix="jvm_mem", stats=node_stats["jvm"]["mem"])

def network_stats(self, node_name, node_stats):
return self.flatten_stats_fields(prefix="transport", stats=node_stats.get("transport"))

def process_stats(self, node_name, node_stats):
return self.flatten_stats_fields(prefix="process_cpu", stats=node_stats["process"]["cpu"])

def sample(self):
import elasticsearch
Expand Down
Loading

0 comments on commit 2a90727

Please sign in to comment.