From 4e7d6ccb8a4a40bf52c0267b7b8e5ab4cc075d4e Mon Sep 17 00:00:00 2001 From: nsano-rururu Date: Sat, 12 Jun 2021 19:48:01 +0900 Subject: [PATCH] Added error handling for unsupported operand type --- docs/source/ruletypes.rst | 5 ++++ elastalert/elastalert.py | 23 +++++++++++++------ tests/base_test.py | 48 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 7 deletions(-) diff --git a/docs/source/ruletypes.rst b/docs/source/ruletypes.rst index 67ca34fc..ac23307d 100644 --- a/docs/source/ruletypes.rst +++ b/docs/source/ruletypes.rst @@ -456,6 +456,11 @@ query_delay ``query_delay``: This option will cause ElastAlert to subtract a time delta from every query, causing the rule to run with a delay. This is useful if the data is Elasticsearch doesn't get indexed immediately. (Optional, time) +For example:: + + query_delay: + hours: 2 + owner ^^^^^ diff --git a/elastalert/elastalert.py b/elastalert/elastalert.py index 74a3d9fe..8e268910 100755 --- a/elastalert/elastalert.py +++ b/elastalert/elastalert.py @@ -613,7 +613,10 @@ def remove_old_events(self, rule): remove = [] buffer_time = rule.get('buffer_time', self.buffer_time) if rule.get('query_delay'): - buffer_time += rule['query_delay'] + try: + buffer_time += rule['query_delay'] + except Exception as e: + self.handle_error("[remove_old_events]Error parsing query_delay send time format %s" % e) for _id, timestamp in rule['processed_hits'].items(): if now - timestamp > buffer_time: remove.append(_id) @@ -1271,7 +1274,10 @@ def handle_rule_execution(self, rule): if hasattr(self.args, 'end') and self.args.end: endtime = ts_to_dt(self.args.end) elif delay: - endtime = ts_now() - delay + try: + endtime = ts_now() - delay + except Exception as e: + self.handle_error("[handle_rule_execution]Error parsing query_delay send time format %s" % e) else: endtime = ts_now() @@ -1847,11 +1853,14 @@ def add_aggregated_alert(self, match, rule): except Exception as e: self.handle_error("Error parsing aggregate send time Cron format %s" % (e), rule['aggregation']['schedule']) else: - if rule.get('aggregate_by_match_time', False): - match_time = ts_to_dt(lookup_es_key(match, rule['timestamp_field'])) - alert_time = match_time + rule['aggregation'] - else: - alert_time = ts_now() + rule['aggregation'] + try: + if rule.get('aggregate_by_match_time', False): + match_time = ts_to_dt(lookup_es_key(match, rule['timestamp_field'])) + alert_time = match_time + rule['aggregation'] + else: + alert_time = ts_now() + rule['aggregation'] + except Exception as e: + self.handle_error("[add_aggregated_alert]Error parsing aggregate send time format %s" % (e), rule['aggregation']) rule['aggregate_alert_time'][aggregation_key_value] = alert_time agg_id = None diff --git a/tests/base_test.py b/tests/base_test.py index ff17e3ce..7bcf3f48 100644 --- a/tests/base_test.py +++ b/tests/base_test.py @@ -1378,3 +1378,51 @@ def test_query_with_blacklist_filter_es_five(ea_sixsix): ea_sixsix.init_rule(new_rule, True) assert 'username:"xudan1" OR username:"xudan12" OR username:"aa1"' in new_rule['filter'][-1]['query_string'][ 'query'] + + +def test_handle_rule_execution_error(ea, caplog): + with mock.patch('elastalert.elastalert.elasticsearch_client'): + ea.rules[0]['aggregate_by_match_time'] = True + ea.rules[0]['summary_table_fields'] = ['@timestamp'] + ea.rules[0]['aggregation_key'] = ['service.name'] + ea.rules[0]['alert_text_type'] = 'aggregation_summary_only' + ea.rules[0]['query_delay'] = 'a' + new_rule = copy.copy(ea.rules[0]) + ea.init_rule(new_rule, True) + + ea.handle_rule_execution(ea.rules[0]) + user, level, message = caplog.record_tuples[0] + assert '[handle_rule_execution]Error parsing query_delay send time format' in message + + +def test_remove_old_events_error(ea, caplog): + with mock.patch('elastalert.elastalert.elasticsearch_client'): + ea.rules[0]['aggregate_by_match_time'] = True + ea.rules[0]['summary_table_fields'] = ['@timestamp'] + ea.rules[0]['aggregation_key'] = ['service.name'] + ea.rules[0]['alert_text_type'] = 'aggregation_summary_only' + ea.rules[0]['query_delay'] = 'a' + new_rule = copy.copy(ea.rules[0]) + ea.init_rule(new_rule, True) + + ea.remove_old_events(ea.rules[0]) + user, level, message = caplog.record_tuples[0] + assert '[remove_old_events]Error parsing query_delay send time format' in message + + +def test_add_aggregated_alert_error(ea, caplog): + mod = BaseEnhancement(ea.rules[0]) + mod.process = mock.Mock() + ea.rules[0]['match_enhancements'] = [mod] + ea.rules[0]['aggregation'] = {"hour": 5} + ea.rules[0]['run_enhancements_first'] = True + ea.rules[0]['aggregate_by_match_time'] = True + hits = generate_hits([START_TIMESTAMP, END_TIMESTAMP]) + ea.thread_data.current_es.search.return_value = hits + ea.rules[0]['type'].matches = [{'@timestamp': END}] + with mock.patch('elastalert.elastalert.elasticsearch_client'): + ea.run_rule(ea.rules[0], END, START) + user, level, message = caplog.record_tuples[0] + exceptd = "[add_aggregated_alert]" + exceptd += "Error parsing aggregate send time format unsupported operand type(s) for +: 'datetime.datetime' and 'dict'" + assert exceptd in message