Skip to content

Commit

Permalink
Add an optional namespace prefix to all metric tags, and fix a Python…
Browse files Browse the repository at this point in the history
… 3 problem (#434)

* Add support for namespace prefix.

* Changelog entry.

* Make namespace prefix configurable.

* A simpler implementation of namespace prefixing, and a test.

* Better defaults.

* Fix Python 3 problem.
  • Loading branch information
itamarst authored Sep 17, 2020
1 parent 2744820 commit b5c50d0
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ This project adheres to [Semantic Versioning](http://semver.org/).
##[1.3.3](https://github.com/OpenTSDB/tcollector/issues?q=is%3Aopen+is%3Aissue+milestone%3A1.3.3)

### Improvements
- A namespace prefix can be added to all metrics.
- An optional status monitoring API, serving JSON over HTTP

## [1.3.1](https://github.com/OpenTSDB/tcollector/issues?utf8=%E2%9C%93&q=milestone%3A1.3.1+)
Expand Down
1 change: 1 addition & 0 deletions collectors/etc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def get_defaults():
'hosts': False,
"monitoring_interface": None,
"monitoring_port": 13280,
"namespace_prefix": "",
}

return defaults
38 changes: 30 additions & 8 deletions tcollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ class ReaderThread(threading.Thread):
All data read is put into the self.readerq Queue, which is
consumed by the SenderThread."""

def __init__(self, dedupinterval, evictinterval, deduponlyzero):
def __init__(self, dedupinterval, evictinterval, deduponlyzero, ns_prefix=""):
"""Constructor.
Args:
dedupinterval: If a metric sends the same value over successive
Expand All @@ -331,6 +331,7 @@ def __init__(self, dedupinterval, evictinterval, deduponlyzero):
evictinterval will be removed from the cache to save RAM.
Invariant: evictinterval > dedupinterval
deduponlyzero: do the above only for 0 values.
ns_prefix: Prefix to add to metric tags.
"""
assert evictinterval > dedupinterval, "%r <= %r" % (evictinterval,
dedupinterval)
Expand All @@ -342,6 +343,7 @@ def __init__(self, dedupinterval, evictinterval, deduponlyzero):
self.dedupinterval = dedupinterval
self.evictinterval = evictinterval
self.deduponlyzero = deduponlyzero
self.ns_prefix = ns_prefix

def run(self):
"""Main loop for this thread. Just reads from collectors,
Expand Down Expand Up @@ -386,6 +388,9 @@ def process_line(self, col, line):
LOG.warning('%s line too long: %s', col.name, line)
col.lines_invalid += 1
return

line = self.ns_prefix + line

parsed = re.match('^([-_./a-zA-Z0-9]+)\s+' # Metric name.
'(\d+\.?\d+)\s+' # Timestamp.
'(\S+?)' # Value (int or float).
Expand Down Expand Up @@ -576,6 +581,7 @@ def run(self):

if ALIVE:
self.send_data()

errors = 0 # We managed to do a successful iteration.
except (ArithmeticError, EOFError, EnvironmentError, LookupError,
ValueError) as e:
Expand Down Expand Up @@ -842,6 +848,7 @@ def send_data_via_http(self):
try:
response = urlopen(req, json.dumps(metrics))
LOG.debug("Received response %s %s", response.getcode(), response.read().rstrip('\n'))

# clear out the sendq
self.sendq = []
# print "Got response code: %s" % response.getcode()
Expand Down Expand Up @@ -909,6 +916,7 @@ def parse_cmdline(argv):
'hosts': False,
"monitoring_interface": None,
"monitoring_port": 13280,
"namespace_prefix": "",
}
except Exception as e:
sys.stderr.write("Unexpected error: %s\n" % e)
Expand Down Expand Up @@ -1003,6 +1011,8 @@ def parse_cmdline(argv):
help='Password to use for HTTP Basic Auth when sending the data via HTTP')
parser.add_option('--ssl', dest='ssl', action='store_true', default=defaults['ssl'],
help='Enable SSL - used in conjunction with http')
parser.add_option('--namespace-prefix', dest='namespace_prefix', default=defaults["namespace_prefix"],
help='Prefix to prepend to all metric names collected', type=str)
parser.add_option('--monitoring-interface', dest='monitoring_interface', action='store',
# Old installs may not have this config option:
default=defaults.get("monitoring_interface", None),
Expand All @@ -1012,6 +1022,7 @@ def parse_cmdline(argv):
parser.add_option('--monitoring-port', dest='monitoring_port', action='store',
default=defaults.get("monitoring_port", 13280),
help="Port for status API to listen on.")

(options, args) = parser.parse_args(args=argv[1:])
if options.dedupinterval < 0:
parser.error('--dedup-interval must be at least 0 seconds')
Expand All @@ -1023,8 +1034,13 @@ def parse_cmdline(argv):
# We cannot write to stdout when we're a daemon.
if (options.daemonize or options.max_bytes) and not options.backup_count:
options.backup_count = 1
return (options, args)

prefix = options.namespace_prefix
if prefix and not prefix.endswith('.'):
prefix += '.'
options.namespace_prefix = prefix

return (options, args)

def daemonize():
"""Performs the necessary dance to become a background daemon."""
Expand Down Expand Up @@ -1123,7 +1139,7 @@ def main(argv):

# at this point we're ready to start processing, so start the ReaderThread
# so we can have it running and pulling in data for us
reader = ReaderThread(options.dedupinterval, options.evictinterval, options.deduponlyzero)
reader = ReaderThread(options.dedupinterval, options.evictinterval, options.deduponlyzero, options.namespace_prefix)
reader.start()

# prepare list of (host, port) of TSDs given on CLI
Expand Down Expand Up @@ -1431,19 +1447,25 @@ def spawn_collector(col):
# FIXME: do custom integration of Python scripts into memory/threads
# if re.search('\.py$', col.name) is not None:
# ... load the py module directly instead of using a subprocess ...
kwargs = {
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
"close_fds": True,
"preexec_fn": os.setsid,
}
if PY3:
# Make sure we get back a string, not bytes
kwargs["encoding"] = "utf-8"
try:
col.proc = subprocess.Popen(col.filename, stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
preexec_fn=os.setsid)
col.proc = subprocess.Popen(col.filename, **kwargs)
except OSError as e:
LOG.error('Failed to spawn collector %s: %s' % (col.filename, e))
return
# The following line needs to move below this line because it is used in
# other logic and it makes no sense to update the last spawn time if the
# collector didn't actually start.
col.lastspawn = int(time.time())
# Without setting last_datapoint here, a long running check (>15s) will be
# Without setting last_datapoint here, a long running check (>15s) will be
# killed by check_children() the first time check_children is called.
col.last_datapoint = col.lastspawn
set_nonblocking(col.proc.stdout.fileno())
Expand Down
14 changes: 14 additions & 0 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ def test_endtoend(self):
self.assertEqual(json.loads(r), [c.to_json() for c in collectors.values()])


class NamespacePrefixTests(unittest.TestCase):
"""Tests for metric namespace prefix."""

def test_prefix_added(self):
"""Namespace prefix gets added to metrics as they are read."""
thread = tcollector.ReaderThread(1, 10, True, "my.namespace.")
collector = tcollector.Collector("c", 1, "c")
line = "mymetric 123 12 a=b"
thread.process_line(collector, line)
self.assertEqual(thread.readerq.get(), "my.namespace." + line)
self.assertEqual(collector.lines_received, 1)
self.assertEqual(collector.lines_invalid, 0)


class TSDBlacklistingTests(unittest.TestCase):
"""
Tests of TSD blacklisting logic
Expand Down

0 comments on commit b5c50d0

Please sign in to comment.