Skip to content

Commit

Permalink
Merge pull request #266 from nsano-rururu/fix_elastalert_py
Browse files Browse the repository at this point in the history
Added error handling for unsupported operand type
  • Loading branch information
jertel authored Jun 12, 2021
2 parents 0769c87 + 0f676f9 commit 68b2d0a
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 7 deletions.
5 changes: 5 additions & 0 deletions docs/source/ruletypes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,11 @@ query_delay
``query_delay``: This option will cause ElastAlert to subtract a time delta from every query, causing the rule to run with a delay.
This is useful if the data is Elasticsearch doesn't get indexed immediately. (Optional, time)

For example::

query_delay:
hours: 2

owner
^^^^^

Expand Down
23 changes: 16 additions & 7 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,10 @@ def remove_old_events(self, rule):
remove = []
buffer_time = rule.get('buffer_time', self.buffer_time)
if rule.get('query_delay'):
buffer_time += rule['query_delay']
try:
buffer_time += rule['query_delay']
except Exception as e:
self.handle_error("[remove_old_events]Error parsing query_delay send time format %s" % e)
for _id, timestamp in rule['processed_hits'].items():
if now - timestamp > buffer_time:
remove.append(_id)
Expand Down Expand Up @@ -1271,7 +1274,10 @@ def handle_rule_execution(self, rule):
if hasattr(self.args, 'end') and self.args.end:
endtime = ts_to_dt(self.args.end)
elif delay:
endtime = ts_now() - delay
try:
endtime = ts_now() - delay
except Exception as e:
self.handle_error("[handle_rule_execution]Error parsing query_delay send time format %s" % e)
else:
endtime = ts_now()

Expand Down Expand Up @@ -1847,11 +1853,14 @@ def add_aggregated_alert(self, match, rule):
except Exception as e:
self.handle_error("Error parsing aggregate send time Cron format %s" % (e), rule['aggregation']['schedule'])
else:
if rule.get('aggregate_by_match_time', False):
match_time = ts_to_dt(lookup_es_key(match, rule['timestamp_field']))
alert_time = match_time + rule['aggregation']
else:
alert_time = ts_now() + rule['aggregation']
try:
if rule.get('aggregate_by_match_time', False):
match_time = ts_to_dt(lookup_es_key(match, rule['timestamp_field']))
alert_time = match_time + rule['aggregation']
else:
alert_time = ts_now() + rule['aggregation']
except Exception as e:
self.handle_error("[add_aggregated_alert]Error parsing aggregate send time format %s" % (e), rule['aggregation'])

rule['aggregate_alert_time'][aggregation_key_value] = alert_time
agg_id = None
Expand Down
48 changes: 48 additions & 0 deletions tests/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,3 +1378,51 @@ def test_query_with_blacklist_filter_es_five(ea_sixsix):
ea_sixsix.init_rule(new_rule, True)
assert 'username:"xudan1" OR username:"xudan12" OR username:"aa1"' in new_rule['filter'][-1]['query_string'][
'query']


def test_handle_rule_execution_error(ea, caplog):
with mock.patch('elastalert.elastalert.elasticsearch_client'):
ea.rules[0]['aggregate_by_match_time'] = True
ea.rules[0]['summary_table_fields'] = ['@timestamp']
ea.rules[0]['aggregation_key'] = ['service.name']
ea.rules[0]['alert_text_type'] = 'aggregation_summary_only'
ea.rules[0]['query_delay'] = 'a'
new_rule = copy.copy(ea.rules[0])
ea.init_rule(new_rule, True)

ea.handle_rule_execution(ea.rules[0])
user, level, message = caplog.record_tuples[0]
assert '[handle_rule_execution]Error parsing query_delay send time format' in message


def test_remove_old_events_error(ea, caplog):
with mock.patch('elastalert.elastalert.elasticsearch_client'):
ea.rules[0]['aggregate_by_match_time'] = True
ea.rules[0]['summary_table_fields'] = ['@timestamp']
ea.rules[0]['aggregation_key'] = ['service.name']
ea.rules[0]['alert_text_type'] = 'aggregation_summary_only'
ea.rules[0]['query_delay'] = 'a'
new_rule = copy.copy(ea.rules[0])
ea.init_rule(new_rule, True)

ea.remove_old_events(ea.rules[0])
user, level, message = caplog.record_tuples[0]
assert '[remove_old_events]Error parsing query_delay send time format' in message


def test_add_aggregated_alert_error(ea, caplog):
mod = BaseEnhancement(ea.rules[0])
mod.process = mock.Mock()
ea.rules[0]['match_enhancements'] = [mod]
ea.rules[0]['aggregation'] = {"hour": 5}
ea.rules[0]['run_enhancements_first'] = True
ea.rules[0]['aggregate_by_match_time'] = True
hits = generate_hits([START_TIMESTAMP, END_TIMESTAMP])
ea.thread_data.current_es.search.return_value = hits
ea.rules[0]['type'].matches = [{'@timestamp': END}]
with mock.patch('elastalert.elastalert.elasticsearch_client'):
ea.run_rule(ea.rules[0], END, START)
user, level, message = caplog.record_tuples[0]
exceptd = "[add_aggregated_alert]"
exceptd += "Error parsing aggregate send time format unsupported operand type(s) for +: 'datetime.datetime' and 'dict'"
assert exceptd in message

0 comments on commit 68b2d0a

Please sign in to comment.