diff --git a/README.md b/README.md index 950a7fc..960906e 100644 --- a/README.md +++ b/README.md @@ -2,60 +2,4 @@ collectd-haproxy ================ This is a collectd plugin to pull HAProxy () stats from the HAProxy management socket. It is written in Python and as such, runs under the collectd Python plugin. - -Requirements ------------- - -*HAProxy* -To use this plugin, HAProxy must be configured to create a management socket with the `stats socket` -configuration option. collectd must have read/write access to the socket. - -*collectd* -collectd must have the Python plugin installed. See () - -Options -------- -* `ProxyMonitor` -Proxy to monitor. If unset, defaults to ['server', 'frontend', 'backend']. -Specify multiple times to specify additional proxies -* `Socket` -File location of the HAProxy management socket - -Install -------- -1. Copy this repository to `/usr/share/collectd/collectd-haproxy` -2. Create a collectd configuration for the plugin file in If you installed collectd using the signalfx installer, you should place your the haproxy configuration file in the `/etc/collectd/managed_config/` directory. See the example configuration below. -3. `SELINUX ONLY` Create a SELinux policy package using the supplied type enforcement file. Enter the commands below to create and install the policy package. -```bash -$ cd /usr/share/collectd/collectd-haproxy/selinux -$ checkmodule -M -m -o collectd-haproxy.mod collectd-haproxy.te -checkmodule: loading policy configuration from collectd-haproxy.te -checkmodule: policy configuration loaded -checkmodule: writing binary representation (version 17) to collectd-haproxy.mod -$ semodule_package -o collectd-haproxy.pp -m collectd-haproxy.mod -$ sudo semodule -i collectd-haproxy.pp -$ sudo reboot -``` -4. Restart collectd - -Example -------- -```apache - - Globals true - - - - # haproxy.py is at /usr/share/collectd/collectd-haproxy/haproxy.py - ModulePath "/usr/share/collectd/collectd-haproxy" - - Import "haproxy" - - - # Some versions of haproxy expose the socket in "/var/lib/haproxy/stats" - Socket "/var/run/haproxy.sock" - ProxyMonitor "server" - ProxyMonitor "backend" - - -``` \ No newline at end of file +For usage guide and configuration, see the [documentation](https://github.com/signalfx/integrations/tree/master/collectd-haproxy). diff --git a/haproxy.py b/haproxy.py index c5414cd..6cea05c 100644 --- a/haproxy.py +++ b/haproxy.py @@ -18,20 +18,45 @@ PLUGIN_NAME = 'haproxy' RECV_SIZE = 1024 -METRIC_TYPES = { - #Metrics that are collected for the whole haproxy instance. +DEFAULT_METRICS = { + 'ConnRate': ('connection_rate', 'gauge'), + 'CumReq': ('requests', 'derive'), + 'Idle_pct': ('idle_pct', 'gauge'), + 'scur': ('session_current', 'gauge'), + 'SessRate': ('session_rate_all', 'gauge'), + 'lbtot': ('server_selected_total', 'counter'), + 'bout': ('bytes_out', 'derive'), + 'bin': ('bytes_in', 'derive'), + 'ttime': ('session_time_avg', 'gauge'), + 'req_rate': ('request_rate', 'gauge'), + 'rate': ('session_rate', 'gauge'), + 'hrsp_2xx': ('response_2xx', 'derive'), + 'hrsp_4xx': ('response_4xx', 'derive'), + 'hrsp_5xx': ('response_5xx', 'derive'), + 'ereq': ('error_request', 'derive'), + 'dreq': ('denied_request', 'derive'), + 'econ': ('error_connection', 'derive'), + 'dresp': ('denied_response', 'derive'), + 'qcur': ('queue_current', 'gauge'), + 'qtime': ('queue_time_avg', 'gauge'), + 'rtime': ('response_time_avg', 'gauge'), + 'eresp': ('error_response', 'derive'), + 'wretr': ('retries', 'derive'), + 'wredis': ('redispatched', 'derive'), +} + +ENHANCED_METRICS = { + # Metrics that are collected for the whole haproxy instance. # The format is haproxy_metricname : {'signalfx_corresponding_metric': 'collectd_type'} # Currently signalfx_corresponding_metric match haproxy_metricname - #Correspond to 'show info' socket command + # Correspond to 'show info' socket command 'MaxConn': ('max_connections', 'gauge'), 'CumConns': ('connections', 'derive'), - 'CumReq': ('requests', 'derive'), 'MaxConnRate': ('max_connection_rate', 'gauge'), 'MaxSessRate': ('max_session_rate', 'gauge'), 'MaxSslConns': ('max_ssl_connections', 'gauge'), 'CumSslConns': ('ssl_connections', 'derive'), 'MaxPipes': ('max_pipes', 'gauge'), - 'Idle_pct': ('idle_pct', 'gauge'), 'Tasks': ('tasks', 'gauge'), 'Run_queue': ('run_queue', 'gauge'), 'PipesUsed': ('pipes_used', 'gauge'), @@ -39,8 +64,6 @@ 'Uptime_sec': ('uptime_seconds', 'derive'), 'CurrConns': ('current_connections', 'gauge'), 'CurrSslConns': ('current_ssl_connections', 'gauge'), - 'ConnRate': ('connection_rate', 'gauge'), - 'SessRate': ('session_rate', 'gauge'), 'SslRate': ('ssl_rate', 'gauge'), 'SslFrontendKeyRate': ('ssl_frontend_key_rate', 'gauge'), 'SslBackendKeyRate': ('ssl_backend_key_rate', 'gauge'), @@ -49,67 +72,88 @@ 'CompressBpsIn': ('compress_bps_in', 'derive'), 'CompressBpsOut': ('compress_bps_out', 'derive'), 'ZlibMemUsage': ('zlib_mem_usage', 'gauge'), - 'Idle_pct': ('idle_pct', 'gauge'), - #Metrics that are collected per each proxy separately. Proxy name would be the dimension as well as service_name - #Correspond to 'show stats' socket command - 'bin': ('bytes_in', 'derive'), - 'bout': ('bytes_out', 'derive'), + # Metrics that are collected per each proxy separately. + # Proxy name would be the dimension as well as service_name + # Correspond to 'show stats' socket command 'chkfail': ('failed_checks', 'derive'), 'downtime': ('downtime', 'derive'), - 'dresp': ('denied_response', 'derive'), - 'dreq': ('denied_request', 'derive'), - 'econ': ('error_connection', 'derive'), - 'ereq': ('error_request', 'derive'), - 'eresp': ('error_response', 'derive'), 'hrsp_1xx': ('response_1xx', 'derive'), - 'hrsp_2xx': ('response_2xx', 'derive'), 'hrsp_3xx': ('response_3xx', 'derive'), - 'hrsp_4xx': ('response_4xx', 'derive'), - 'hrsp_5xx': ('response_5xx', 'derive'), 'hrsp_other': ('response_other', 'derive'), - 'qcur': ('queue_current', 'gauge'), - 'rate': ('session_rate', 'gauge'), - 'req_rate': ('request_rate', 'gauge'), + 'qmax': ('queue_max', 'gauge'), + 'qlimit': ('queue_limit', 'gauge'), + 'rate_lim': ('session_rate_limit', 'gauge'), + 'rate_max': ('session_rate_max', 'gauge'), + 'req_rate_max': ('request_rate_max', 'gauge'), 'stot': ('session_total', 'derive'), - 'scur': ('session_current', 'gauge'), - 'wredis': ('redistributed', 'derive'), - 'wretr': ('retries', 'derive'), + 'slim': ('session_limit', 'gauge'), + 'smax': ('session_max', 'gauge'), 'throttle': ('throttle', 'gauge'), - 'req_tot': ('req_tot', 'derive'), 'cli_abrt': ('cli_abrt', 'derive'), 'srv_abrt': ('srv_abrt', 'derive'), 'comp_in': ('comp_in', 'derive'), 'comp_out': ('comp_out', 'derive'), 'comp_byp': ('comp_byp', 'derive'), 'comp_rsp': ('comp_rsp', 'derive'), - 'qtime': ('queue_time_avg', 'gauge'), 'ctime': ('connect_time_avg', 'gauge'), - 'rtime': ('response_time_avg', 'gauge'), + 'act': ('active_servers', 'gauge'), + 'bck': ('backup_servers', 'gauge'), + 'check_duration': ('health_check_duration', 'gauge'), + 'lastsess': ('last_session', 'gauge'), + 'conn_rate': ('conn_rate', 'gauge'), + 'conn_rate_max': ('conn_rate_max', 'gauge'), + 'conn_tot': ('conn_total', 'counter'), + 'intercepted': ('intercepted', 'gauge'), + 'dcon': ('denied_tcp_conn', 'gauge'), + 'dses': ('denied_tcp_sess', 'gauge'), } -#Making sure that metrics names are case insensitive. -#It helps with backward compatibility -METRIC_TYPES = dict((k.lower(), v) for k, v in METRIC_TYPES.items()) +DIMENSIONS_LIST = [ + 'pxname', + 'svname', + 'pid', + 'sid', + 'iid', + 'type', + 'addr', + 'cookie', + 'mode', + 'algo', +] + +DEFAULT_METRICS = dict((k.lower(), v) for k, v in DEFAULT_METRICS.items()) +ENHANCED_METRICS = dict((k.lower(), v) for k, v in ENHANCED_METRICS.items()) METRIC_DELIM = '.' # for the frontend/backend stats -DEFAULT_SOCKET = '/var/lib/haproxy/stats' -DEFAULT_PROXY_MONITORS = [ 'server', 'frontend', 'backend' ] -HAPROXY_SOCKET = None +DEFAULT_SOCKET = '/var/run/haproxy.sock' +DEFAULT_PROXY_MONITORS = ['server', 'frontend', 'backend'] class HAProxySocket(object): """ Encapsulates communication with HAProxy via the socket interface - """ + """ def __init__(self, socket_file=DEFAULT_SOCKET): self.socket_file = socket_file def connect(self): - stat_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - stat_sock.connect(self.socket_file) - return stat_sock + # unix sockets all start with '/', use tcp otherwise + is_unix = self.socket_file.startswith('/') + if is_unix: + stat_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + stat_sock.connect(self.socket_file) + return stat_sock + else: + socket_host, separator, port = self.socket_file.rpartition(':') + if socket_host is not '' and port is not '' and separator is ':': + stat_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + stat_sock.connect((socket_host, int(port))) + return stat_sock + else: + collectd.error('Could not connect to socket with host %s. Check HAProxy config.' % self.socket_file) + return def communicate(self, command): '''Get response from single command. @@ -123,12 +167,15 @@ def communicate(self, command): if not command.endswith('\n'): command += '\n' stat_sock = self.connect() + if stat_sock is None: + return '' stat_sock.sendall(command) result_buf = StringIO.StringIO() buf = stat_sock.recv(RECV_SIZE) while buf: result_buf.write(buf) buf = stat_sock.recv(RECV_SIZE) + stat_sock.close() return result_buf.getvalue() @@ -141,11 +188,12 @@ def get_server_info(self): except ValueError: continue result[key.strip()] = val.strip() + return result def get_server_stats(self): output = self.communicate('show stat') - #sanitize and make a list of lines + # sanitize and make a list of lines output = output.lstrip('# ').strip() output = [l.strip(',') for l in output.splitlines()] csvreader = csv.DictReader(output) @@ -153,42 +201,78 @@ def get_server_stats(self): return result -def get_stats(): +def get_stats(module_config): """ Makes two calls to haproxy to fetch server info and server stats. Returns the dict containing metric name as the key and a tuple of metric value and the dict of dimensions if any """ - if HAPROXY_SOCKET is None: + if module_config['socket'] is None: collectd.error("Socket configuration parameter is undefined. Couldn't get the stats") return stats = [] - haproxy = HAProxySocket(HAPROXY_SOCKET) + haproxy = HAProxySocket(module_config['socket']) try: server_info = haproxy.get_server_info() server_stats = haproxy.get_server_stats() except socket.error: - collectd.warning( - 'status err Unable to connect to HAProxy socket at %s' % - HAPROXY_SOCKET) + collectd.warning('status err Unable to connect to HAProxy socket at %s' % module_config['socket']) return stats + # server wide stats for key, val in server_info.iteritems(): try: stats.append((key, int(val), None)) except (TypeError, ValueError): pass + + # proxy specific stats for statdict in server_stats: - if not (statdict['svname'].lower() in PROXY_MONITORS or statdict['pxname'].lower() in PROXY_MONITORS): - continue + dimensions = _build_dimension_dict(statdict) + if not (statdict['svname'].lower() in module_config['proxy_monitors'] or + statdict['pxname'].lower() in module_config['proxy_monitors']): + continue for metricname, val in statdict.items(): try: - stats.append((metricname, int(val), {'proxy_name': statdict['pxname'], 'service_name': statdict['svname']})) + stats.append((metricname, int(val), dimensions)) except (TypeError, ValueError): pass + return stats +def _build_dimension_dict(statdict): + """ + Builds dimensions dict to send back with metrics with readable metric names + Args: + statdict dictionary of metrics from HAProxy to be filtered for dimensions + """ + + dimensions = {} + + for key in DIMENSIONS_LIST: + if key in statdict and key == 'pxname': + dimensions['proxy_name'] = statdict['pxname'] + elif key in statdict and key == 'svname': + dimensions['service_name'] = statdict['svname'] + elif key in statdict and key == 'pid': + dimensions['process_id'] = statdict['pid'] + elif key in statdict and key == 'sid': + dimensions['server_id'] = statdict['sid'] + elif key in statdict and key == 'iid': + dimensions['unique_proxy_id'] = statdict['iid'] + elif key in statdict and key == 'type': + dimensions['type'] = _get_proxy_type(statdict['type']) + elif key in statdict and key == 'addr': + dimensions['address'] = statdict['addr'] + elif key in statdict and key == 'algo': + dimensions['algorithm'] = statdict['algo'] + elif key in statdict: + dimensions[key] = statdict[key] + + return dimensions + + def config(config_values): """ A callback method that loads information from the HaProxy collectd plugin config file. @@ -196,19 +280,52 @@ def config(config_values): config_values (collectd.Config): Object containing config values """ - global PROXY_MONITORS, HAPROXY_SOCKET - PROXY_MONITORS = [ ] - HAPROXY_SOCKET = DEFAULT_SOCKET + module_config = {} + socket = DEFAULT_SOCKET + proxy_monitors = [] + excluded_metrics = set() + enhanced_metrics = False + interval = None + testing = False + for node in config_values.children: - if node.key == "ProxyMonitor": - PROXY_MONITORS.append(node.values[0].lower()) - elif node.key == "Socket": - HAPROXY_SOCKET = node.values[0] + if node.key == "ProxyMonitor" and node.values[0]: + proxy_monitors.append(node.values[0]) + elif node.key == "Socket" and node.values[0]: + socket = node.values[0] + elif node.key == "Interval" and node.values[0]: + interval = node.values[0] + elif node.key == "EnhancedMetrics" and node.values[0]: + enhanced_metrics = _str_to_bool(node.values[0]) + elif node.key == "ExcludeMetric" and node.values[0]: + excluded_metrics.add(node.values[0]) + elif node.key == "Testing" and node.values[0]: + testing = _str_to_bool(node.values[0]) else: collectd.warning('Unknown config key: %s' % node.key) - if not PROXY_MONITORS: - PROXY_MONITORS += DEFAULT_PROXY_MONITORS - PROXY_MONITORS = [ p.lower() for p in PROXY_MONITORS ] + + if not proxy_monitors: + proxy_monitors += DEFAULT_PROXY_MONITORS + + module_config = { + 'socket': socket, + 'proxy_monitors': proxy_monitors, + 'interval': interval, + 'enhanced_metrics': enhanced_metrics, + 'excluded_metrics': excluded_metrics, + 'testing': testing, + } + proxys = "_".join(proxy_monitors) + + if testing: + return module_config + + if interval is not None: + collectd.register_read(collect_metrics, interval, data=module_config, + name='node_' + module_config['socket'] + '_' + proxys) + else: + collectd.register_read(collect_metrics, data=module_config, + name='node_' + module_config['socket'] + '_' + proxys) def _format_dimensions(dimensions): @@ -228,26 +345,68 @@ def _format_dimensions(dimensions): return "[%s]" % (",".join(dim_pairs)) -def collect_metrics(): +def _get_proxy_type(type_id): + """ + Return human readable proxy type + Args: + type_id: 0=frontend, 1=backend, 2=server, 3=socket/listener + """ + proxy_types = { + 0: 'frontend', + 1: 'backend', + 2: 'server', + 3: 'socket/listener', + } + return proxy_types.get(int(type_id)) + + +def _str_to_bool(val): + ''' + Converts a true/false string to a boolean + ''' + val = str(val).strip().lower() + if val == 'true': + return True + elif val != 'false': + collectd.warning('Warning: String (%s) could not be converted to a boolean. Returning false.' % val) + + return False + + +def collect_metrics(module_config): collectd.debug('beginning collect_metrics') """ A callback method that gets metrics from HAProxy and records them to collectd. """ - info = get_stats() + info = get_stats(module_config) if not info: collectd.warning('%s: No data received' % PLUGIN_NAME) return for metric_name, metric_value, dimensions in info: - if not metric_name.lower() in METRIC_TYPES: - collectd.debug("Metric %s is not in the metric types" % metric_name) + # assert metric is in valid metrics lists + if not metric_name.lower() in DEFAULT_METRICS and not metric_name.lower() in ENHANCED_METRICS: + collectd.debug("metric %s is not in either metric list" % metric_name.lower()) continue - translated_metric_name, val_type = METRIC_TYPES[metric_name.lower()] + # skip metrics in enhanced metrics mode if not enabled + if not module_config['enhanced_metrics'] and metric_name.lower() in ENHANCED_METRICS: + continue + + # pull metric name & type from respective metrics list + if metric_name.lower() in DEFAULT_METRICS: + translated_metric_name, val_type = DEFAULT_METRICS[metric_name.lower()] + else: + translated_metric_name, val_type = ENHANCED_METRICS[metric_name.lower()] + + # skip over any exlcluded metrics + if translated_metric_name in module_config['excluded_metrics']: + collectd.debug("excluding metric %s" % translated_metric_name) + continue - collectd.debug('Collecting {0}: {1}'.format(translated_metric_name, metric_value)) + # create datapoint and dispatch datapoint = collectd.Values() datapoint.type = val_type datapoint.type_instance = translated_metric_name @@ -266,4 +425,3 @@ def collect_metrics(): datapoint.dispatch() collectd.register_config(config) -collectd.register_read(collect_metrics) diff --git a/test_haproxy.py b/test_haproxy.py new file mode 100644 index 0000000..e25bc4b --- /dev/null +++ b/test_haproxy.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python +import collections +from mock import MagicMock +from mock import Mock +from mock import patch +import sys +import pytest +import csv + + +class MockCollectd(MagicMock): + """ + Mocks the functions and objects provided by the collectd module + """ + + @staticmethod + def log(log_str): + print log_str + + debug = log + info = log + warning = log + error = log + + +class MockHAProxySocket(object): + def __init__(self, socket_file="whatever"): + self.socket_file = socket_file + + def get_server_info(self): + sample_data = {'ConnRate': '3', 'CumReq': '5', 'idle_pct': '78'} + return sample_data + + def get_server_stats(self): + sample_data = [{'bin': '3120628', 'lastchg': '', 'lbt': '', 'weight': '', + 'wretr': '', 'slim': '50', 'pid': '1', 'wredis': '', 'dresp': '0', + 'ereq': '0', 'pxname': 'sample_proxy', 'stot': '39728', + 'sid': '0', 'bout': '188112702395', 'qlimit': '', 'status': 'OPEN', + 'smax': '2', 'dreq': '0', 'econ': '', 'iid': '2', 'chkfail': '', + 'downtime': '', 'qcur': '', 'eresp': '', 'throttle': '', 'scur': '0', + 'bck': '', 'qmax': '', 'act': '', 'chkdown': '', 'svname': 'FRONTEND'}] + return sample_data + +sys.modules['collectd'] = MockCollectd() + +import haproxy + +ConfigOption = collections.namedtuple('ConfigOption', ('key', 'values')) + +mock_config_default_values = Mock() +mock_config_default_values.children = [ + ConfigOption('Testing', ('True',)) +] + + +def test_default_config(): + module_config = haproxy.config(mock_config_default_values) + assert module_config['socket'] == '/var/run/haproxy.sock' + assert not module_config['enhanced_metrics'] + assert module_config['proxy_monitors'] == ['server', 'frontend', 'backend'] + assert module_config['testing'] + assert module_config['excluded_metrics'] == set() + + +mock_config_enhanced_metrics_off = Mock() +mock_config_enhanced_metrics_off.children = [ + ConfigOption('Socket', ('/var/run/haproxy.sock',)), + ConfigOption('EnhancedMetrics', ('False',)), + ConfigOption('Testing', ('True',)) +] + + +def test_enhanced_metrics_off_config(): + module_config = haproxy.config(mock_config_enhanced_metrics_off) + assert module_config['socket'] == '/var/run/haproxy.sock' + assert not module_config['enhanced_metrics'] + assert module_config['proxy_monitors'] == ['server', 'frontend', 'backend'] + assert module_config['testing'] + assert module_config['excluded_metrics'] == set() + + +mock_config_enhanced_metrics_on = Mock() +mock_config_enhanced_metrics_on.children = [ + ConfigOption('Socket', ('/var/run/haproxy.sock',)), + ConfigOption('EnhancedMetrics', ('True',)), + ConfigOption('Testing', ('True',)) +] + + +def test_enhanced_metrics_on_config(): + module_config = haproxy.config(mock_config_enhanced_metrics_on) + assert module_config['socket'] == '/var/run/haproxy.sock' + assert module_config['enhanced_metrics'] + assert module_config['proxy_monitors'] == ['server', 'frontend', 'backend'] + assert module_config['testing'] + assert module_config['excluded_metrics'] == set() + +mock_config_exclude_idle_pct = Mock() +mock_config_exclude_idle_pct.children = [ + ConfigOption('Socket', ('/var/run/haproxy.sock',)), + ConfigOption('EnhancedMetrics', ('False',)), + ConfigOption('ExcludeMetric', ('idle_pct',)), + ConfigOption('Testing', ('True',)) +] + + +def test_exclude_metrics_config(): + module_config = haproxy.config(mock_config_exclude_idle_pct) + assert module_config['socket'] == '/var/run/haproxy.sock' + assert not module_config['enhanced_metrics'] + assert module_config['proxy_monitors'] == ['server', 'frontend', 'backend'] + assert module_config['testing'] + assert module_config['excluded_metrics'] == set(['idle_pct']) + +mock_config = Mock() +mock_config.children = [ + ConfigOption('Testing', ('True',)) +] + + +@patch('haproxy.HAProxySocket', MockHAProxySocket) +def test_read(): + haproxy.collect_metrics(haproxy.config(mock_config)) + +mock_config_exclude_bytes_out = Mock() +mock_config_exclude_bytes_out.children = [ + ConfigOption('ExcludeMetric', ('bytes_out',)), + ConfigOption('Testing', ('True',)) +] + + +@patch('haproxy.HAProxySocket', MockHAProxySocket) +def test_exclude_metric(): + haproxy.collect_metrics(haproxy.config(mock_config_exclude_bytes_out)) + +mock_config_enhanced_sample = Mock() +mock_config_enhanced_sample.children = [ + ConfigOption('ProxyMonitor', ('sample_proxy',)), + ConfigOption('EnhancedMetrics', ('True',)), + ConfigOption('Testing', ('True',)) +] + + +@patch('haproxy.HAProxySocket', MockHAProxySocket) +def test_enhanced_metrics(): + haproxy.collect_metrics(haproxy.config(mock_config_enhanced_sample))