diff --git a/aggregator.py b/aggregator.py index 9bbbefd5aa..9a7da19cf5 100644 --- a/aggregator.py +++ b/aggregator.py @@ -232,17 +232,28 @@ def __init__(self, formatter, name, tags, hostname, device_name, extra_config=No self.name = name self.count = 0 self.samples = [] - self.aggregates = extra_config['aggregates'] if\ - extra_config is not None and extra_config.get('aggregates') is not None\ - else DEFAULT_HISTOGRAM_AGGREGATES - self.percentiles = extra_config['percentiles'] if\ - extra_config is not None and extra_config.get('percentiles') is not None\ - else DEFAULT_HISTOGRAM_PERCENTILES + self.aggregates = self._extract_relevant_config(extra_config, 'aggregates', + DEFAULT_HISTOGRAM_AGGREGATES) + self.percentiles = self._extract_relevant_config(extra_config, 'percentiles', + DEFAULT_HISTOGRAM_PERCENTILES) self.tags = tags self.hostname = hostname self.device_name = device_name self.last_sample_time = None + def _extract_relevant_config(self, extra_config, key, default): + if extra_config is None or extra_config.get(key) is None: + return default + + # let's try and see if we have a regex matching our name + for regex, config in extra_config[key]: + if regex is None: + default = config + elif regex.search(self.name): + return config + + return default + def sample(self, value, sample_rate, timestamp=None): self.count += int(1 / sample_rate) self.samples.append(value) diff --git a/config.py b/config.py index 8c50a9779f..421677870a 100644 --- a/config.py +++ b/config.py @@ -17,6 +17,7 @@ import platform import re from socket import gaierror, gethostbyname +import sre_constants import string import sys import traceback @@ -244,52 +245,78 @@ def get_default_bind_host(): def get_histogram_aggregates(configstr=None): - if configstr is None: - return None + return _get_histogram_config(configstr, 'aggregate', _parse_histogram_aggregates_values) - try: - vals = configstr.split(',') - valid_values = ['min', 'max', 'median', 'avg', 'sum', 'count'] - result = [] +def _parse_histogram_aggregates_values(vals): + result = [] + valid_values = ['min', 'max', 'median', 'avg', 'sum', 'count'] - for val in vals: - val = val.strip() - if val not in valid_values: - log.warning("Ignored histogram aggregate {0}, invalid".format(val)) - continue - else: - result.append(val) - except Exception: - log.exception("Error when parsing histogram aggregates, skipping") - return None + for val in vals: + val = val.strip() + if val not in valid_values: + log.warning("Ignored histogram aggregate {0}, invalid".format(val)) + continue + else: + result.append(val) return result - def get_histogram_percentiles(configstr=None): + return _get_histogram_config(configstr, 'percentile', _parse_histogram_percentiles_values) + +def _parse_histogram_percentiles_values(vals): + result = [] + + for val in vals: + try: + val = val.strip() + floatval = float(val) + if floatval <= 0 or floatval >= 1: + raise ValueError + if len(val) > 4: + log.warning("Histogram percentiles are rounded to 2 digits: {0} rounded" + .format(floatval)) + result.append(float(val[0:4])) + except ValueError: + log.warning("Bad histogram percentile value {0}, must be float in ]0;1[, skipping" + .format(val)) + + return result + +def _get_histogram_config(configstr, type_str, parse_callback): if configstr is None: return None - result = [] try: - vals = configstr.split(',') - for val in vals: - try: - val = val.strip() - floatval = float(val) - if floatval <= 0 or floatval >= 1: - raise ValueError - if len(val) > 4: - log.warning("Histogram percentiles are rounded to 2 digits: {0} rounded" - .format(floatval)) - result.append(float(val[0:4])) - except ValueError: - log.warning("Bad histogram percentile value {0}, must be float in ]0;1[, skipping" - .format(val)) + return _parse_histogram_config(configstr, type_str, parse_callback) except Exception: - log.exception("Error when parsing histogram percentiles, skipping") + log.exception('Error when parsing histogram {0}s, skipping'.format(type_str)) return None +def _parse_histogram_config(configstr, type_str, parse_callback): + groups = configstr.split(';') + + result = [] + for group in groups: + group_config = group.rsplit(':', 1) + if len(group_config) == 1: + # general config + result.append((None, parse_callback(group.split(',')))) + elif len(group_config) == 2: + # only applies to metrics matching a given regex + regex = group_config[0].strip() + if not regex: + log.warning('Ignoring empty regex for histogram {0}s'.format(type_str)) + continue + try: + compiled_regex = re.compile(regex) + except sre_constants.error as exception: + log.warning('Ignoring invalid regex {0} for histogram {1}s: {2}' + .format(regex, type_str, exception.message)) + continue + + result.append((compiled_regex, parse_callback(group_config[1].split(',')))) + return result diff --git a/tests/core/test_aggregator.py b/tests/core/test_aggregator.py index b2d58d9808..49095feb6f 100644 --- a/tests/core/test_aggregator.py +++ b/tests/core/test_aggregator.py @@ -89,7 +89,7 @@ def test_histogram_normalization(self): stats = MetricsAggregator( 'myhost', interval=10, - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min'] + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])] ) for i in range(5): stats.submit_packets('h1:1|h') @@ -342,7 +342,7 @@ def test_histogram(self): # The min is not enabled by default stats = MetricsAggregator( 'myhost', - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min'] + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])] ) # Sample all numbers between 1-100 many times. This @@ -377,7 +377,7 @@ def test_sampled_histogram(self): # The min is not enabled by default stats = MetricsAggregator( 'myhost', - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min'] + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])] ) stats.submit_packets('sampled.hist:5|h|@0.5') @@ -410,13 +410,13 @@ def test_monokey_batching_notags(self): # The min is not enabled by default stats = MetricsAggregator( 'host', - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min'] + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])] ) stats.submit_packets('test_hist:0.3|ms:2.5|ms|@0.5:3|ms') stats_ref = MetricsAggregator( 'host', - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min'] + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])] ) packets = [ 'test_hist:0.3|ms', @@ -458,13 +458,13 @@ def test_monokey_batching_withtags_with_sampling(self): # The min is not enabled by default stats = MetricsAggregator( 'host', - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min'] + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])] ) stats.submit_packets('test_metric:1.5|c|#tag1:one,tag2:two:2.3|g|#tag3:three:3|g:42|h|#tag1:12,tag42:42|@0.22') stats_ref = MetricsAggregator( 'host', - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min'] + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])] ) packets = [ 'test_metric:1.5|c|#tag1:one,tag2:two', @@ -517,7 +517,7 @@ def test_metrics_expiry(self): 'myhost', interval=ag_interval, expiry_seconds=expiry, - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min'] + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])] ) stats.submit_packets('test.counter:123|c') stats.submit_packets('test.gauge:55|g') @@ -779,7 +779,7 @@ def test_recent_point_threshold(self): stats = MetricsAggregator( 'myhost', recent_point_threshold=threshold, - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min'] + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])] ) timestamp_beyond_threshold = time.time() - threshold*2 timestamp_within_threshold = time.time() - threshold/2 diff --git a/tests/core/test_bucket_aggregator.py b/tests/core/test_bucket_aggregator.py index cc3b334ed9..8aa3180a2a 100644 --- a/tests/core/test_bucket_aggregator.py +++ b/tests/core/test_bucket_aggregator.py @@ -78,7 +78,7 @@ def test_histogram_normalization(self): ag_interval = 10 # The min is not enabled by default stats = MetricsBucketAggregator('myhost', interval=ag_interval, - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min']) + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])]) for i in range(5): stats.submit_packets('h1:1|h') for i in range(20): @@ -551,7 +551,7 @@ def test_histogram(self): ag_interval = self.interval # The min is not enabled by default stats = MetricsBucketAggregator('myhost', interval=ag_interval, - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min']) + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])]) self.wait_for_bucket_boundary(ag_interval) # Sample all numbers between 1-100 many times. This @@ -589,7 +589,7 @@ def test_sampled_histogram(self): stats = MetricsBucketAggregator( 'myhost', interval=self.interval, - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min'] + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])] ) stats.submit_packets('sampled.hist:5|h|@0.5') @@ -607,7 +607,7 @@ def test_histogram_buckets(self): ag_interval = 1 # The min is not enabled by default stats = MetricsBucketAggregator('myhost', interval=ag_interval, - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min']) + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])]) # Sample all numbers between 1-100 many times. This # means our percentiles should be relatively close to themselves. @@ -665,7 +665,7 @@ def test_histogram_flush_during_bucket(self): ag_interval = 1 # The min is not enabled by default stats = MetricsBucketAggregator('myhost', interval=ag_interval, - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min']) + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])]) # Sample all numbers between 1-100 many times. This # means our percentiles should be relatively close to themselves. @@ -768,7 +768,7 @@ def test_metrics_expiry(self): # The min is not enabled by default stats = MetricsBucketAggregator('myhost', interval=ag_interval, expiry_seconds=expiry, - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min']) + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])]) stats.submit_packets('test.counter:123|c') stats.submit_packets('test.gauge:55|g') stats.submit_packets('test.set:44|s') @@ -974,7 +974,7 @@ def test_recent_point_threshold(self): 'myhost', recent_point_threshold=threshold, interval=ag_interval, - histogram_aggregates=DEFAULT_HISTOGRAM_AGGREGATES+['min'] + histogram_aggregates=[(None, DEFAULT_HISTOGRAM_AGGREGATES+['min'])] ) timestamp_beyond_threshold = time.time() - threshold*2 diff --git a/tests/core/test_histogram.py b/tests/core/test_histogram.py index 2853f5f914..d834e422be 100644 --- a/tests/core/test_histogram.py +++ b/tests/core/test_histogram.py @@ -1,4 +1,5 @@ # stdlib +import re import unittest # project @@ -22,7 +23,8 @@ def test_default(self): self.assertEquals( sorted(value_by_type.keys()), - ['95percentile', 'avg', 'count', 'max', 'median'], value_by_type + ['95percentile', 'avg', 'count', 'max', 'median'], + value_by_type ) self.assertEquals(value_by_type['max'], 19, value_by_type) @@ -40,7 +42,7 @@ def test_custom_single_percentile(self): self.assertEquals( stats.metric_config[Histogram]['percentiles'], - [0.40], + [(None, [0.40])], stats.metric_config[Histogram] ) @@ -66,7 +68,7 @@ def test_custom_multiple_percentile(self): self.assertEquals( stats.metric_config[Histogram]['percentiles'], - [0.4, 0.65, 0.99], + [(None, [0.4, 0.65, 0.99])], stats.metric_config[Histogram] ) @@ -94,7 +96,7 @@ def test_custom_invalid_percentile(self): self.assertEquals( stats.metric_config[Histogram]['percentiles'], - [], + [(None, [])], stats.metric_config[Histogram] ) @@ -107,7 +109,7 @@ def test_custom_invalid_percentile2(self): self.assertEquals( stats.metric_config[Histogram]['percentiles'], - [], + [(None, [])], stats.metric_config[Histogram] ) @@ -120,7 +122,7 @@ def test_custom_invalid_percentile3skip(self): self.assertEquals( stats.metric_config[Histogram]['percentiles'], - [0.8], + [(None, [0.8])], stats.metric_config[Histogram] ) @@ -132,8 +134,8 @@ def test_custom_aggregate(self): ) self.assertEquals( - sorted(stats.metric_config[Histogram]['aggregates']), - ['max', 'median', 'sum'], + stats.metric_config[Histogram]['aggregates'], + [(None, ['median', 'max', 'sum'])], stats.metric_config[Histogram] ) @@ -152,3 +154,41 @@ def test_custom_aggregate(self): self.assertEquals(value_by_type['max'], 19, value_by_type) self.assertEquals(value_by_type['sum'], 190, value_by_type) self.assertEquals(value_by_type['95percentile'], 18, value_by_type) + + def test_custom_aggregate_with_regexes(self): + configstr = '^my_pre\.fix: median, max, sum; max, median' + stats = MetricsAggregator( + 'myhost', + histogram_aggregates=get_histogram_aggregates(configstr) + ) + + self.assertEquals( + stats.metric_config[Histogram]['aggregates'], + [(re.compile(r'^my_pre\.fix'), ['median', 'max', 'sum']), (None, ['max', 'median'])], + stats.metric_config[Histogram] + ) + + for i in xrange(20): + stats.submit_packets('my_pre.fix_metric:{0}|h'.format(i)) + stats.submit_packets('not_my_pre.fix_metric:{0}|h'.format(-i)) + + metrics = stats.flush() + + self.assertEquals(len(metrics), 7, metrics) + + value_by_type_for_my_prefix = {} + value_by_type_for_not_my_prefix = {} + for k in metrics: + if k['metric'].startswith('my_pre.fix_metric'): + value_by_type_for_my_prefix[k['metric'][len('my_pre.fix_metric')+1:]] = k['points'][0][1] + else: + value_by_type_for_not_my_prefix[k['metric'][len('not_my_pre.fix_metric')+1:]] = k['points'][0][1] + + self.assertEquals(value_by_type_for_my_prefix['median'], 9, value_by_type_for_my_prefix) + self.assertEquals(value_by_type_for_my_prefix['max'], 19, value_by_type_for_my_prefix) + self.assertEquals(value_by_type_for_my_prefix['sum'], 190, value_by_type_for_my_prefix) + self.assertEquals(value_by_type_for_my_prefix['95percentile'], 18, value_by_type_for_my_prefix) + + self.assertEquals(value_by_type_for_not_my_prefix['median'], -10, value_by_type_for_not_my_prefix) + self.assertEquals(value_by_type_for_not_my_prefix['max'], 0, value_by_type_for_not_my_prefix) + self.assertEquals(value_by_type_for_not_my_prefix['95percentile'], -1, value_by_type_for_not_my_prefix)