Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow multiple configurable elasticsearch nodes #207

Merged
merged 1 commit into from
Feb 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 155 additions & 156 deletions collectors/0/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
import re

from collectors.lib import utils
from collectors.etc import elasticsearch_conf


COLLECTION_INTERVAL = 15 # seconds
DEFAULT_TIMEOUT = 10.0 # seconds
ES_HOST = "localhost"
ES_PORT = 9200 # TCP port on which ES listens.

# regexes to separate differences in version numbers
PRE_VER1 = re.compile(r'^0\.')
Expand Down Expand Up @@ -85,173 +84,173 @@ def node_stats(server, version):
url = "/_nodes/_local/stats"
return request(server, url)

def printmetric(metric, ts, value, **tags):
if tags:
tags = " " + " ".join("%s=%s" % (name, value)
for name, value in tags.iteritems())
else:
tags = ""
print ("elasticsearch.%s %d %s %s"
% (metric, ts, value, tags))

def _collect_server(server, version):
ts = int(time.time())
nstats = node_stats(server, version)
cluster_name = nstats["cluster_name"]
nodeid, nstats = nstats["nodes"].popitem()
node_name = nstats["name"]

is_master = nodeid == cluster_state(server)["master_node"]
printmetric("is_master", ts, int(is_master), node=node_name, cluster=cluster_name)
if is_master:
ts = int(time.time()) # In case last call took a while.
cstats = cluster_health(server)
for stat, value in cstats.iteritems():
if stat == "status":
value = STATUS_MAP.get(value, -1)
elif not utils.is_numeric(value):
continue
printmetric("cluster." + stat, ts, value, cluster=cluster_name)

if "os" in nstats:
ts = nstats["os"]["timestamp"] / 1000 # ms -> s
if "timestamp" in nstats:
ts = nstats["timestamp"] / 1000 # ms -> s

if "indices" in nstats:
indices = nstats["indices"]
if "docs" in indices:
printmetric("num_docs", ts, indices["docs"]["count"], node=node_name, cluster=cluster_name)
if "store" in indices:
printmetric("indices.size", ts, indices["store"]["size_in_bytes"], node=node_name, cluster=cluster_name)
if "indexing" in indices:
d = indices["indexing"]
printmetric("indexing.index_total", ts, d["index_total"], node=node_name, cluster=cluster_name)
printmetric("indexing.index_time", ts, d["index_time_in_millis"], node=node_name, cluster=cluster_name)
printmetric("indexing.index_current", ts, d["index_current"], node=node_name, cluster=cluster_name)
printmetric("indexing.delete_total", ts, d["delete_total"], node=node_name, cluster=cluster_name)
printmetric("indexing.delete_time_in_millis", ts, d["delete_time_in_millis"], node=node_name, cluster=cluster_name)
printmetric("indexing.delete_current", ts, d["delete_current"], node=node_name, cluster=cluster_name)
del d
if "get" in indices:
d = indices["get"]
printmetric("get.total", ts, d["total"], node=node_name, cluster=cluster_name)
printmetric("get.time", ts, d["time_in_millis"], node=node_name, cluster=cluster_name)
printmetric("get.exists_total", ts, d["exists_total"], node=node_name, cluster=cluster_name)
printmetric("get.exists_time", ts, d["exists_time_in_millis"], node=node_name, cluster=cluster_name)
printmetric("get.missing_total", ts, d["missing_total"], node=node_name, cluster=cluster_name)
printmetric("get.missing_time", ts, d["missing_time_in_millis"], node=node_name, cluster=cluster_name)
del d
if "search" in indices:
d = indices["search"]
printmetric("search.query_total", ts, d["query_total"], node=node_name, cluster=cluster_name)
printmetric("search.query_time", ts, d["query_time_in_millis"], node=node_name, cluster=cluster_name)
printmetric("search.query_current", ts, d["query_current"], node=node_name, cluster=cluster_name)
printmetric("search.fetch_total", ts, d["fetch_total"], node=node_name, cluster=cluster_name)
printmetric("search.fetch_time", ts, d["fetch_time_in_millis"], node=node_name, cluster=cluster_name)
printmetric("search.fetch_current", ts, d["fetch_current"], node=node_name, cluster=cluster_name)
del d
if "cache" in indices:
d = indices["cache"]
printmetric("cache.field.evictions", ts, d["field_evictions"], node=node_name, cluster=cluster_name)
printmetric("cache.field.size", ts, d["field_size_in_bytes"], node=node_name, cluster=cluster_name)
printmetric("cache.filter.count", ts, d["filter_count"], node=node_name, cluster=cluster_name)
printmetric("cache.filter.evictions", ts, d["filter_evictions"], node=node_name, cluster=cluster_name)
printmetric("cache.filter.size", ts, d["filter_size_in_bytes"], node=node_name, cluster=cluster_name)
del d
if "merges" in indices:
d = indices["merges"]
printmetric("merges.current", ts, d["current"], node=node_name, cluster=cluster_name)
printmetric("merges.total", ts, d["total"], node=node_name, cluster=cluster_name)
printmetric("merges.total_time", ts, d["total_time_in_millis"] / 1000., node=node_name, cluster=cluster_name)
del d
del indices
if "process" in nstats:
process = nstats["process"]
ts = process["timestamp"] / 1000 # ms -> s
open_fds = process.get("open_file_descriptors") # ES 0.17
if open_fds is None:
open_fds = process.get("fd") # ES 0.16
if open_fds is not None:
open_fds = open_fds["total"]
if open_fds is not None:
printmetric("process.open_file_descriptors", ts, open_fds, node=node_name, cluster=cluster_name)
if "cpu" in process:
d = process["cpu"]
printmetric("process.cpu.percent", ts, d["percent"], node=node_name, cluster=cluster_name)
printmetric("process.cpu.sys", ts, d["sys_in_millis"] / 1000., node=node_name, cluster=cluster_name)
printmetric("process.cpu.user", ts, d["user_in_millis"] / 1000., node=node_name, cluster=cluster_name)
del d
if "mem" in process:
d = process["mem"]
printmetric("process.mem.resident", ts, d["resident_in_bytes"], node=node_name, cluster=cluster_name)
printmetric("process.mem.shared", ts, d["share_in_bytes"], node=node_name, cluster=cluster_name)
printmetric("process.mem.total_virtual", ts, d["total_virtual_in_bytes"], node=node_name, cluster=cluster_name)
del d
del process
if "jvm" in nstats:
jvm = nstats["jvm"]
ts = jvm["timestamp"] / 1000 # ms -> s
if "mem" in jvm:
d = jvm["mem"]
printmetric("jvm.mem.heap_used", ts, d["heap_used_in_bytes"], node=node_name, cluster=cluster_name)
printmetric("jvm.mem.heap_committed", ts, d["heap_committed_in_bytes"], node=node_name, cluster=cluster_name)
printmetric("jvm.mem.non_heap_used", ts, d["non_heap_used_in_bytes"], node=node_name, cluster=cluster_name)
printmetric("jvm.mem.non_heap_committed", ts, d["non_heap_committed_in_bytes"], node=node_name, cluster=cluster_name)
del d
if "threads" in jvm:
d = jvm["threads"]
printmetric("jvm.threads.count", ts, d["count"], node=node_name, cluster=cluster_name)
printmetric("jvm.threads.peak_count", ts, d["peak_count"], node=node_name, cluster=cluster_name)
del d
for gc, d in jvm["gc"]["collectors"].iteritems():
printmetric("jvm.gc.collection_count", ts, d["collection_count"], gc=gc, node=node_name, cluster=cluster_name)
printmetric("jvm.gc.collection_time", ts,
d["collection_time_in_millis"] / 1000., gc=gc, node=node_name, cluster=cluster_name)
del jvm
del d
if "network" in nstats:
for stat, value in nstats["network"]["tcp"].iteritems():
if utils.is_numeric(value):
printmetric("network.tcp." + stat, ts, value, node=node_name, cluster=cluster_name)
for stat, value in nstats["transport"].iteritems():
if utils.is_numeric(value):
printmetric("transport." + stat, ts, value, node=node_name, cluster=cluster_name)
# New in ES 0.17:
for stat, value in nstats.get("http", {}).iteritems():
if utils.is_numeric(value):
printmetric("http." + stat, ts, value, node=node_name, cluster=cluster_name)
del nstats

def main(argv):
utils.drop_privileges()
socket.setdefaulttimeout(DEFAULT_TIMEOUT)
server = httplib.HTTPConnection(ES_HOST, ES_PORT)
try:
server.connect()
except socket.error, (erno, e):
if erno == errno.ECONNREFUSED:
return 13 # No ES running, ask tcollector to not respawn us.
raise
servers = []

if json is None:
utils.err("This collector requires the `json' Python module.")
return 1

for conf in elasticsearch_conf.get_servers():
server = httplib.HTTPConnection( *conf )
try:
server.connect()
except socket.error, (erno, e):
if erno == errno.ECONNREFUSED:
continue
raise
servers.append( server )

if len( servers ) == 0:
return 13 # No ES running, ask tcollector to not respawn us.

status = node_status(server)
version = status["version"]["number"]
nstats = node_stats(server, version)
cluster_name = nstats["cluster_name"]
nodeid, nstats = nstats["nodes"].popitem()

ts = None
def printmetric(metric, value, **tags):
if tags:
tags = " " + " ".join("%s=%s" % (name, value)
for name, value in tags.iteritems())
else:
tags = ""
print ("elasticsearch.%s %d %s cluster=%s%s"
% (metric, ts, value, cluster_name, tags))

while True:
ts = int(time.time())
nstats = node_stats(server, version)
# Check that the node's identity hasn't changed in the mean time.
if nstats["cluster_name"] != cluster_name:
utils.err("cluster_name changed from %r to %r"
% (cluster_name, nstats["cluster_name"]))
return 1
this_nodeid, nstats = nstats["nodes"].popitem()
if this_nodeid != nodeid:
utils.err("node ID changed from %r to %r" % (nodeid, this_nodeid))
return 1

is_master = nodeid == cluster_state(server)["master_node"]
printmetric("is_master", int(is_master))
if is_master:
ts = int(time.time()) # In case last call took a while.
cstats = cluster_health(server)
for stat, value in cstats.iteritems():
if stat == "status":
value = STATUS_MAP.get(value, -1)
elif not utils.is_numeric(value):
continue
printmetric("cluster." + stat, value)

if "os" in nstats:
ts = nstats["os"]["timestamp"] / 1000 # ms -> s
if "timestamp" in nstats:
ts = nstats["timestamp"] / 1000 # ms -> s

if "indices" in nstats:
indices = nstats["indices"]
if "docs" in indices:
printmetric("num_docs", indices["docs"]["count"])
if "store" in indices:
printmetric("indices.size", indices["store"]["size_in_bytes"])
if "indexing" in indices:
d = indices["indexing"]
printmetric("indexing.index_total", d["index_total"])
printmetric("indexing.index_time", d["index_time_in_millis"])
printmetric("indexing.index_current", d["index_current"])
printmetric("indexing.delete_total", d["delete_total"])
printmetric("indexing.delete_time_in_millis", d["delete_time_in_millis"])
printmetric("indexing.delete_current", d["delete_current"])
del d
if "get" in indices:
d = indices["get"]
printmetric("get.total", d["total"])
printmetric("get.time", d["time_in_millis"])
printmetric("get.exists_total", d["exists_total"])
printmetric("get.exists_time", d["exists_time_in_millis"])
printmetric("get.missing_total", d["missing_total"])
printmetric("get.missing_time", d["missing_time_in_millis"])
del d
if "search" in indices:
d = indices["search"]
printmetric("search.query_total", d["query_total"])
printmetric("search.query_time", d["query_time_in_millis"])
printmetric("search.query_current", d["query_current"])
printmetric("search.fetch_total", d["fetch_total"])
printmetric("search.fetch_time", d["fetch_time_in_millis"])
printmetric("search.fetch_current", d["fetch_current"])
del d
if "cache" in indices:
d = indices["cache"]
printmetric("cache.field.evictions", d["field_evictions"])
printmetric("cache.field.size", d["field_size_in_bytes"])
printmetric("cache.filter.count", d["filter_count"])
printmetric("cache.filter.evictions", d["filter_evictions"])
printmetric("cache.filter.size", d["filter_size_in_bytes"])
del d
if "merges" in indices:
d = indices["merges"]
printmetric("merges.current", d["current"])
printmetric("merges.total", d["total"])
printmetric("merges.total_time", d["total_time_in_millis"] / 1000.)
del d
del indices
if "process" in nstats:
process = nstats["process"]
ts = process["timestamp"] / 1000 # ms -> s
open_fds = process.get("open_file_descriptors") # ES 0.17
if open_fds is None:
open_fds = process.get("fd") # ES 0.16
if open_fds is not None:
open_fds = open_fds["total"]
if open_fds is not None:
printmetric("process.open_file_descriptors", open_fds)
if "cpu" in process:
d = process["cpu"]
printmetric("process.cpu.percent", d["percent"])
printmetric("process.cpu.sys", d["sys_in_millis"] / 1000.)
printmetric("process.cpu.user", d["user_in_millis"] / 1000.)
del d
if "mem" in process:
d = process["mem"]
printmetric("process.mem.resident", d["resident_in_bytes"])
printmetric("process.mem.shared", d["share_in_bytes"])
printmetric("process.mem.total_virtual", d["total_virtual_in_bytes"])
del d
del process
if "jvm" in nstats:
jvm = nstats["jvm"]
ts = jvm["timestamp"] / 1000 # ms -> s
if "mem" in jvm:
d = jvm["mem"]
printmetric("jvm.mem.heap_used", d["heap_used_in_bytes"])
printmetric("jvm.mem.heap_committed", d["heap_committed_in_bytes"])
printmetric("jvm.mem.non_heap_used", d["non_heap_used_in_bytes"])
printmetric("jvm.mem.non_heap_committed", d["non_heap_committed_in_bytes"])
del d
if "threads" in jvm:
d = jvm["threads"]
printmetric("jvm.threads.count", d["count"])
printmetric("jvm.threads.peak_count", d["peak_count"])
del d
for gc, d in jvm["gc"]["collectors"].iteritems():
printmetric("jvm.gc.collection_count", d["collection_count"], gc=gc)
printmetric("jvm.gc.collection_time",
d["collection_time_in_millis"] / 1000., gc=gc)
del jvm
del d
if "network" in nstats:
for stat, value in nstats["network"]["tcp"].iteritems():
if utils.is_numeric(value):
printmetric("network.tcp." + stat, value)
for stat, value in nstats["transport"].iteritems():
if utils.is_numeric(value):
printmetric("transport." + stat, value)
# New in ES 0.17:
for stat, value in nstats.get("http", {}).iteritems():
if utils.is_numeric(value):
printmetric("http." + stat, value)
del nstats
for server in servers:
_collect_server(server, version)
time.sleep(COLLECTION_INTERVAL)


if __name__ == "__main__":
sys.exit(main(sys.argv))
22 changes: 22 additions & 0 deletions collectors/etc/elasticsearch_conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/python
# This file is part of tcollector.
# Copyright (C) 2015 The tcollector Authors.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version. This program is distributed in the hope that it
# will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
# General Public License for more details. You should have received a copy
# of the GNU Lesser General Public License along with this program. If not,
# see <http://www.gnu.org/licenses/>.

def get_servers():
"""Get the ElasticSearch servers on this host.

Returns:
An iterable of tuples of (host, port)
"""
return [ ("localhost", 9200) ]