diff --git a/tcollector.py b/tcollector.py index 2fb5edbf..9cc725b4 100755 --- a/tcollector.py +++ b/tcollector.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # This file is part of tcollector. -# Copyright (C) 2010 The tcollector Authors. +# Copyright (C) 2010-2024 The tcollector Authors. # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by @@ -38,21 +38,12 @@ from optparse import OptionParser -import collections - -PY3 = sys.version_info[0] > 2 -if PY3: - import importlib - from queue import Queue, Empty, Full # pylint: disable=import-error - from urllib.request import Request, urlopen # pylint: disable=maybe-no-member,no-name-in-module,import-error - from urllib.error import HTTPError, URLError # pylint: disable=maybe-no-member,no-name-in-module,import-error - from http.server import HTTPServer, BaseHTTPRequestHandler # pylint: disable=maybe-no-member,no-name-in-module,import-error - from collections.abc import Callable # pylint: disable=maybe-no-member,no-name-in-module,import-error -else: - from Queue import Queue, Empty, Full # pylint: disable=maybe-no-member,no-name-in-module,import-error - from urllib2 import Request, urlopen, HTTPError, URLError # pylint: disable=maybe-no-member,no-name-in-module,import-error - from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler # pylint: disable=maybe-no-member,no-name-in-module,import-error - from collections import Callable # pylint: disable=maybe-no-member,no-name-in-module,import-error +import importlib +from queue import Queue, Empty, Full +from urllib.request import Request, urlopen +from urllib.error import HTTPError, URLError +from http.server import HTTPServer, BaseHTTPRequestHandler +from collections.abc import Callable # global variables. COLLECTORS = {} @@ -198,7 +189,7 @@ def read(self): if line: self.datalines.append(line) self.last_datapoint = int(time.time()) - self.buffer = self.buffer[idx+1:] + self.buffer = self.buffer[idx + 1:] def collect(self): """Reads input from the collector and returns the lines up to whomever @@ -269,8 +260,7 @@ def do_GET(self): self.send_header("content-type", "text/json") self.send_header("content-length", str(len(result))) self.end_headers() - if PY3: - result = result.encode("utf-8") + result = result.encode("utf-8") self.wfile.write(result) @@ -314,14 +304,11 @@ def read(self): else: ALIVE = False - def shutdown(self): pass - - class ReaderThread(threading.Thread): """The main ReaderThread is responsible for reading from the collectors and assuring that we always read from the input no matter what. @@ -401,10 +388,10 @@ def process_line(self, col, line): line = self.ns_prefix + line - parsed = re.match('^([-_./a-zA-Z0-9]+)\s+' # Metric name. - '(\d+\.?\d+)\s+' # Timestamp. - '(\S+?)' # Value (int or float). - '((?:\s+[-_./a-zA-Z0-9]+=[-_./a-zA-Z0-9]+)*)$', # Tags + parsed = re.match('^([-_./a-zA-Z0-9]+)\s+' # Metric name. + '(\d+\.?\d+)\s+' # Timestamp. + '(\S+?)' # Value (int or float). + '((?:\s+[-_./a-zA-Z0-9]+=[-_./a-zA-Z0-9]+)*)$', # Tags line) if parsed is None: LOG.warning('%s sent invalid data: %s', col.name, line) @@ -413,21 +400,18 @@ def process_line(self, col, line): metric, timestamp, value, tags = parsed.groups() if isinstance(value, bool): - LOG.warning('%s sent boolean value, converted to int: %s', col.name, line) - value = int(value) + LOG.warning('%s sent boolean value, converted to int: %s', col.name, line) + value = int(value) - if PY3: - string_types = str - else: - string_types = basestring # pylint:disable=undefined-variable + string_types = str if isinstance(value, string_types) and value.lower() == 'true': - LOG.warning('%s sent boolean value, converted to int: %s', col.name, line) - value = 1 + LOG.warning('%s sent boolean value, converted to int: %s', col.name, line) + value = 1 if isinstance(value, string_types) and value.lower() == 'false': - LOG.warning('%s sent boolean value, converted to int: %s', col.name, line) - value = 0 + LOG.warning('%s sent boolean value, converted to int: %s', col.name, line) + value = 0 try: # The regex above is fairly open, and would leave values like 'True' through @@ -480,8 +464,8 @@ def process_line(self, col, line): # value instead of the last. Fall through if we reach # the dedup interval so we can print the value. if ((not self.deduponlyzero or (self.deduponlyzero and float(value) == 0.0)) and - col.values[key][0] == value and - (timestamp - col.values[key][3] < local_dedupinterval)): + col.values[key][0] == value and + (timestamp - col.values[key][3] < local_dedupinterval)): col.values[key] = (value, True, line, col.values[key][3]) return @@ -490,8 +474,8 @@ def process_line(self, col, line): # replay the last value we skipped (if changed) so the jumps in # our graph are accurate, if ((col.values[key][1] or - (timestamp - col.values[key][3] >= local_dedupinterval)) - and col.values[key][0] != value): + (timestamp - col.values[key][3] >= local_dedupinterval)) + and col.values[key][0] != value): col.lines_sent += 1 if not self.readerq.nput(col.values[key][2]): self.lines_dropped += 1 @@ -538,7 +522,7 @@ def __init__(self, reader, dryrun, hosts, self_report_stats, tags, self.dryrun = dryrun self.reader = reader - self.tags = sorted(tags.items()) # dictionary transformed to list + self.tags = sorted(tags.items()) # dictionary transformed to list self.http = http self.http_api_path = http_api_path self.http_username = http_username @@ -551,13 +535,13 @@ def __init__(self, reader, dryrun, hosts, self_report_stats, tags, self.current_tsd = -1 # Index in self.hosts where we're at. self.host = None # The current TSD host we've selected. self.port = None # The port of the current TSD. - self.tsd = None # The socket connected to the aforementioned TSD. + self.tsd = None # The socket connected to the aforementioned TSD. self.last_verify = 0 - self.reconnectinterval = reconnectinterval # in seconds. + self.reconnectinterval = reconnectinterval # in seconds. self.time_reconnect = 0 # if reconnectinterval > 0, used to track the time. self.sendq = [] self.self_report_stats = self_report_stats - self.maxtags = maxtags # The maximum number of tags TSD will accept. + self.maxtags = maxtags # The maximum number of tags TSD will accept. def pick_connection(self): """Picks up a random host/port connection.""" @@ -654,7 +638,7 @@ def verify_conn(self): try: self.tsd.close() except socket.error as msg: - pass # not handling that + pass # not handling that self.time_reconnect = time.time() return False @@ -697,23 +681,17 @@ def verify_conn(self): # TODO need to fix this for http if self.self_report_stats: strs = [ - ('reader.lines_collected', - '', self.reader.lines_collected), - ('reader.lines_dropped', - '', self.reader.lines_dropped) - ] + ('reader.lines_collected', '', self.reader.lines_collected), + ('reader.lines_dropped', '', self.reader.lines_dropped) + ] for col in all_living_collectors(): - strs.append(('collector.lines_sent', 'collector=' - + col.name, col.lines_sent)) - strs.append(('collector.lines_received', 'collector=' - + col.name, col.lines_received)) - strs.append(('collector.lines_invalid', 'collector=' - + col.name, col.lines_invalid)) + strs.append(('collector.lines_sent', 'collector=' + col.name, col.lines_sent)) + strs.append(('collector.lines_received', 'collector=' + col.name, col.lines_received)) + strs.append(('collector.lines_invalid', 'collector=' + col.name, col.lines_invalid)) ts = int(time.time()) - strout = ["tcollector.%s %d %d %s" - % (x[0], ts, x[2], x[1]) for x in strs] + strout = ["tcollector.%s %d %d %s" % (x[0], ts, x[2], x[1]) for x in strs] for string in strout: self.sendq.append(string) @@ -767,7 +745,7 @@ def maintain_conn(self): self.tsd.settimeout(15) self.tsd.connect(sockaddr) # if we get here it connected - LOG.debug('Connection to %s was successful'%(str(sockaddr))) + LOG.debug('Connection to %s was successful' % (str(sockaddr))) break except socket.error as msg: LOG.warning('Connection attempt failed to %s:%d: %s', @@ -830,9 +808,9 @@ def build_http_url(self): protocol = "https" else: protocol = "http" - details="" + details = "" if LOG.level == logging.DEBUG: - details="?details" + details = "?details" return "%s://%s:%s/%s%s" % (protocol, self.host, self.port, self.http_api_path, details) def send_data_via_http(self): @@ -843,10 +821,10 @@ def send_data_via_http(self): parts = line.split(None, 3) # not all metrics have metric-specific tags if len(parts) == 4: - (metric, timestamp, value, raw_tags) = parts + (metric, timestamp, value, raw_tags) = parts else: - (metric, timestamp, value) = parts - raw_tags = "" + (metric, timestamp, value) = parts + raw_tags = "" # process the tags metric_tags = {} for tag in raw_tags.strip().split(): @@ -858,36 +836,35 @@ def send_data_via_http(self): metric_entry["value"] = float(value) metric_entry["tags"] = dict(self.tags).copy() if len(metric_tags) + len(metric_entry["tags"]) > self.maxtags: - metric_tags_orig = set(metric_tags) - subset_metric_keys = frozenset(metric_tags[:len(metric_tags[:self.maxtags-len(metric_entry["tags"])])]) - metric_tags = dict((k, v) for k, v in metric_tags.items() if k in subset_metric_keys) - LOG.error("Exceeding maximum permitted metric tags - removing %s for metric %s", - str(metric_tags_orig - set(metric_tags)), metric) + metric_tags_orig = set(metric_tags) + subset_metric_keys = frozenset( + metric_tags[:len(metric_tags[:self.maxtags - len(metric_entry["tags"])])]) + metric_tags = dict((k, v) for k, v in metric_tags.items() if k in subset_metric_keys) + LOG.error("Exceeding maximum permitted metric tags - removing %s for metric %s", + str(metric_tags_orig - set(metric_tags)), metric) metric_entry["tags"].update(metric_tags) metrics.append(metric_entry) if self.dryrun: - print("Would have sent:\n%s" % json.dumps(metrics, - sort_keys=True, - indent=4)) + print("Would have sent:\n%s" % json.dumps(metrics, sort_keys=True, indent=4)) return - if((self.current_tsd == -1) or (len(self.hosts) > 1)): + if ((self.current_tsd == -1) or (len(self.hosts) > 1)): self.pick_connection() url = self.build_http_url() LOG.debug("Sending metrics to url: %s", url) req = Request(url) if self.http_username and self.http_password: - req.add_header("Authorization", "Basic %s" - % base64.b64encode("%s:%s" % (self.http_username, self.http_password))) + req.add_header("Authorization", "Basic %s" + % base64.b64encode("%s:%s" % (self.http_username, self.http_password))) req.add_header("Content-Type", "application/json") try: body = json.dumps(metrics) if not isinstance(body, bytes): body = body.encode("utf-8") response = urlopen(req, body) - LOG.debug("Received response %s %s", response.getcode(), response.read().rstrip('\n')) + LOG.debug("Received response %s %s", response.getcode(), response.read().rstrip(b'\n')) # clear out the sendq self.sendq = [] @@ -922,6 +899,7 @@ def setup_logging(logfile=DEFAULT_LOG, logstdout=False): '%(levelname)s: %(message)s')) LOG.addHandler(ch) + def parse_cmdline(argv): """Parses the command-line.""" @@ -970,82 +948,82 @@ def parse_cmdline(argv): # get arguments parser = OptionParser(description='Manages collectors which gather ' - 'data and report back.') + 'data and report back.') parser.add_option('-c', '--collector-dir', dest='cdir', metavar='DIR', - default=defaults['cdir'], - help='Directory where the collectors are located.') + default=defaults['cdir'], + help='Directory where the collectors are located.') parser.add_option('-d', '--dry-run', dest='dryrun', action='store_true', - default=defaults['dryrun'], - help='Don\'t actually send anything to the TSD, ' + default=defaults['dryrun'], + help='Don\'t actually send anything to the TSD, ' 'just print the datapoints.') parser.add_option('-D', '--daemonize', dest='daemonize', action='store_true', - default=defaults['daemonize'], - help='Run as a background daemon.') + default=defaults['daemonize'], + help='Run as a background daemon.') parser.add_option('-H', '--host', dest='host', - metavar='HOST', - default=defaults['host'], - help='Hostname to use to connect to the TSD.') + metavar='HOST', + default=defaults['host'], + help='Hostname to use to connect to the TSD.') parser.add_option('-L', '--hosts-list', dest='hosts', - metavar='HOSTS', - default=defaults['hosts'], - help='List of host:port to connect to tsd\'s (comma separated).') + metavar='HOSTS', + default=defaults['hosts'], + help='List of host:port to connect to tsd\'s (comma separated).') parser.add_option('--no-tcollector-stats', dest='no_tcollector_stats', - action='store_true', - default=defaults['no_tcollector_stats'], - help='Prevent tcollector from reporting its own stats to TSD') + action='store_true', + default=defaults['no_tcollector_stats'], + help='Prevent tcollector from reporting its own stats to TSD') parser.add_option('-s', '--stdin', dest='stdin', action='store_true', - default=defaults['stdin'], - help='Run once, read and dedup data points from stdin.') + default=defaults['stdin'], + help='Run once, read and dedup data points from stdin.') parser.add_option('-p', '--port', dest='port', type='int', - default=defaults['port'], metavar='PORT', - help='Port to connect to the TSD instance on. ' - 'default=%default') + default=defaults['port'], metavar='PORT', + help='Port to connect to the TSD instance on. ' + 'default=%default') parser.add_option('-v', dest='verbose', action='store_true', - default=defaults['verbose'], - help='Verbose mode (log debug messages).') + default=defaults['verbose'], + help='Verbose mode (log debug messages).') parser.add_option('-t', '--tag', dest='tags', action='append', - default=defaults['tags'], metavar='TAG', - help='Tags to append to all timeseries we send, ' - 'e.g.: -t TAG=VALUE -t TAG2=VALUE') + default=defaults['tags'], metavar='TAG', + help='Tags to append to all timeseries we send, ' + 'e.g.: -t TAG=VALUE -t TAG2=VALUE') parser.add_option('-P', '--pidfile', dest='pidfile', - default=defaults['pidfile'], - metavar='FILE', help='Write our pidfile') + default=defaults['pidfile'], + metavar='FILE', help='Write our pidfile') parser.add_option('--dedup-interval', dest='dedupinterval', type='int', - default=defaults['dedupinterval'], metavar='DEDUPINTERVAL', - help='Number of seconds in which successive duplicate ' + default=defaults['dedupinterval'], metavar='DEDUPINTERVAL', + help='Number of seconds in which successive duplicate ' 'datapoints are suppressed before sending to the TSD. ' 'Use zero to disable. ' 'default=%default') parser.add_option('--dedup-only-zero', dest='deduponlyzero', action='store_true', - default=defaults['deduponlyzero'], - help='Only dedup 0 values.') + default=defaults['deduponlyzero'], + help='Only dedup 0 values.') parser.add_option('--evict-interval', dest='evictinterval', type='int', - default=defaults['evictinterval'], metavar='EVICTINTERVAL', - help='Number of seconds after which to remove cached ' + default=defaults['evictinterval'], metavar='EVICTINTERVAL', + help='Number of seconds after which to remove cached ' 'values of old data points to save memory. ' 'default=%default') parser.add_option('--allowed-inactivity-time', dest='allowed_inactivity_time', type='int', - default=ALLOWED_INACTIVITY_TIME, metavar='ALLOWEDINACTIVITYTIME', - help='How long to wait for datapoints before assuming ' - 'a collector is dead and restart it. ' - 'default=%default') + default=ALLOWED_INACTIVITY_TIME, metavar='ALLOWEDINACTIVITYTIME', + help='How long to wait for datapoints before assuming ' + 'a collector is dead and restart it. ' + 'default=%default') parser.add_option('--remove-inactive-collectors', dest='remove_inactive_collectors', action='store_true', - default=defaults['remove_inactive_collectors'], help='Remove collectors not sending data ' - 'in the max allowed inactivity interval') + default=defaults['remove_inactive_collectors'], help='Remove collectors not sending data ' + 'in the max allowed inactivity interval') parser.add_option('--log-stdout', dest='logstdout', action='store_true', - help='Send log message to stdout.') + help='Send log message to stdout.') parser.add_option('--logfile', dest='logfile', type='str', - default=DEFAULT_LOG, - help='Filename where logs are written to.') - parser.add_option('--reconnect-interval',dest='reconnectinterval', type='int', - default=defaults['reconnectinterval'], metavar='RECONNECTINTERVAL', - help='Number of seconds after which the connection to' + default=DEFAULT_LOG, + help='Filename where logs are written to.') + parser.add_option('--reconnect-interval', dest='reconnectinterval', type='int', + default=defaults['reconnectinterval'], metavar='RECONNECTINTERVAL', + help='Number of seconds after which the connection to' 'the TSD hostname reconnects itself. This is useful' 'when the hostname is a multiple A record (RRDNS).') parser.add_option('--max-tags', dest='maxtags', type=int, default=defaults['maxtags'], - help='The maximum number of tags to send to our TSD Instances') + help='The maximum number of tags to send to our TSD Instances') parser.add_option('--http', dest='http', action='store_true', default=defaults['http'], - help='Send the data via the http interface') + help='Send the data via the http interface') parser.add_option('--http-api-path', dest='http_api_path', type='str', default=defaults['http_api_path'], help='URL path to use for HTTP requests to TSD.') parser.add_option('--http-username', dest='http_username', default=defaults['http_username'], @@ -1085,6 +1063,7 @@ def parse_cmdline(argv): return (options, args) + def daemonize(): """Performs the necessary dance to become a background daemon.""" if os.fork(): @@ -1107,7 +1086,7 @@ def daemonize(): try: os.close(fd) except OSError: # This FD wasn't opened... - pass # ... ignore the exception. + pass # ... ignore the exception. def setup_python_path(collector_dir): @@ -1198,6 +1177,7 @@ def splitHost(hostport): host, port = hostport.split(":") return (host, int(port)) return (hostport, DEFAULT_PORT) + options.hosts = [splitHost(host) for host in options.hosts.split(",")] if options.host != "localhost" or options.port != DEFAULT_PORT: options.hosts.append((options.host, options.port)) @@ -1221,12 +1201,13 @@ def splitHost(hostport): # We're exiting, make sure we don't leave any collector behind. for col in all_living_collectors(): - col.shutdown() + col.shutdown() LOG.debug('Shutting down -- joining the reader thread.') reader.join() LOG.debug('Shutting down -- joining the sender thread.') sender.join() + def stdin_loop(options, modules, sender, tags): """The main loop of the program that runs when we are in stdin mode.""" @@ -1241,6 +1222,7 @@ def stdin_loop(options, modules, sender, tags): % sum(1 for col in all_living_collectors())) next_heartbeat = now + 600 + def main_loop(options, modules, sender, tags): """The main loop of the program that runs when we're not in stdin mode.""" @@ -1296,15 +1278,13 @@ def load_config_module(name, options, tags): """ if isinstance(name, str): - LOG.info('Loading %s', name) - d = {} - # Strip the trailing .py - module = __import__(name[:-3], d, d) + LOG.info('Loading %s', name) + d = {} + # Strip the trailing .py + module = __import__(name[:-3], d, d) else: - if PY3: - module = importlib.reload(name) # pylint: disable=no-member,undefined-variable - else: - module = reload(name) # pylint: disable=undefined-variable + module = importlib.reload(name) # pylint: disable=no-member,undefined-variable + onload = module.__dict__.get('onload') if isinstance(onload, Callable): try: @@ -1401,10 +1381,10 @@ def shutdown_signal(signum, frame): def kill(proc, signum=signal.SIGTERM): - try: - os.killpg(proc.pid, signum) - except: # pylint: disable=bare-except - LOG.info('already killed: %s', proc.pid) + try: + os.killpg(proc.pid, signum) + except: # pylint: disable=bare-except + LOG.info('already killed: %s', proc.pid) def shutdown(): @@ -1447,7 +1427,7 @@ def reap_children(): # any other status code is an error and is logged. if status == 13: LOG.info('removing %s from the list of collectors (by request)', - col.name) + col.name) col.dead = True elif status != 0: LOG.warning('collector %s terminated after %d seconds with ' @@ -1458,6 +1438,7 @@ def reap_children(): register_collector(Collector(col.name, col.interval, col.filename, col.mtime, col.lastspawn)) + def check_children(options): """When a child process hasn't received a datapoint in a while, assume it's died in some fashion and restart it.""" @@ -1465,7 +1446,7 @@ def check_children(options): for col in all_living_collectors(): now = int(time.time()) - if ((col.interval == 0) and (col.last_datapoint < (now - options.allowed_inactivity_time))): + if (col.interval == 0) and (col.last_datapoint < (now - options.allowed_inactivity_time)): # It's too old, kill it LOG.warning('Terminating collector %s after %d seconds of inactivity', col.name, now - col.last_datapoint) @@ -1495,9 +1476,8 @@ def spawn_collector(col): "close_fds": True, "preexec_fn": os.setsid, } - if PY3: - # Make sure we get back a string, not bytes - kwargs["encoding"] = "utf-8" + + kwargs["encoding"] = "utf-8" try: col.proc = subprocess.Popen(col.filename, **kwargs) except OSError as e: @@ -1553,15 +1533,15 @@ def spawn_children(): col.killstate = 1 elif col.killstate == 1: LOG.error('error: %s (interval=%d, pid=%d) still not dead, ' - 'SIGKILL sent', - col.name, col.interval, col.proc.pid) + 'SIGKILL sent', + col.name, col.interval, col.proc.pid) kill(col.proc, signal.SIGKILL) col.nextkill = now + 5 col.killstate = 2 else: LOG.error('error: %s (interval=%d, pid=%d) needs manual ' - 'intervention to kill it', - col.name, col.interval, col.proc.pid) + 'intervention to kill it', + col.name, col.interval, col.proc.pid) col.nextkill = now + 300 @@ -1602,8 +1582,8 @@ def populate_collectors(coldir): # this... if col.interval != interval: LOG.error('two collectors with the same name %s and ' - 'different intervals %d and %d', - colname, interval, col.interval) + 'different intervals %d and %d', + colname, interval, col.interval) continue # we have to increase the generation or we will kill @@ -1626,7 +1606,7 @@ def populate_collectors(coldir): for col in all_collectors(): if col.generation < GENERATION: LOG.info('collector %s removed from the filesystem, forgetting', - col.name) + col.name) col.shutdown() to_delete.append(col.name) for name in to_delete: