Skip to content

Commit

Permalink
Merge pull request #855 from luffynextgen/master
Browse files Browse the repository at this point in the history
Allow the possibility to use rule and match filed in thehive alert description
  • Loading branch information
jertel authored May 24, 2022
2 parents 6c56fdb + 1085436 commit 80fb1d0
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
## Other changes
- Upgrade stomp 8.0.0 to 8.0.1 - [#832](https://github.com/jertel/elastalert2/pull/832) - @jertel
- Add support for Kibana 8.2 for Kibana Discover, Upgrade Pytest 7.1.1 to 7.1.2, Upgrade pylint 2.13.5 to 2.13.8, Upgrade Jinja2 3.1.1 to 3.1.2 - [#840](https://github.com/jertel/elastalert2/pull/840) - @nsano-rururu
- Add the possibility to use rule and match fields in the description of TheHive alerts - [#855](https://github.com/jertel/elastalert2/pull/855) - @luffynextgen

# 2.5.0

Expand Down
6 changes: 5 additions & 1 deletion docs/source/ruletypes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3232,6 +3232,9 @@ the observable value is also the same, including the behaviour for aggregated al

``hive_verify``: Whether or not to enable SSL certificate validation. Defaults to False.

``description_args``: can be used to call rule and match fileds in the description of the alert in TheHive
``description_missing_value``: Text to replace any match field not found when formating the ``description``. Defaults to ``<MISSING VALUE>``.

Example usage::

alert: hivealerter
Expand All @@ -3253,7 +3256,8 @@ Example usage::
severity: 2
status: 'New'
source: 'elastalert'
description: 'Sample description'
description_args: [ name, description]
description: '{0} : {1}'
tags: ['tag1', 'tag2']
title: 'Title'
tlp: 3
Expand Down
26 changes: 23 additions & 3 deletions elastalert/alerters/thehive.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,23 @@ def load_tags(self, tag_names: list, match: dict):

return tag_values

def load_description(self, description_raw, match: dict):
missing = self.rule['hive_alert_config'].get('description_missing_value', '<MISSING VALUE>')
if 'description_args' in self.rule.get('hive_alert_config'):
description_args = self.rule['hive_alert_config'].get('description_args')
description_values=[]
for arg in description_args:
description_values.append(self.lookup_field(match, arg, missing))
for i, text_value in enumerate(description_values):
if text_value is None:
description_value = self.rule.get(description_args[i])
if description_value:
description_values[i] = description_value
description_values = [missing if val is None else val for val in description_values]
description_raw = description_raw.format(*description_values)
return description_raw
else:
return description_raw
def alert(self, matches):
# Build TheHive alert object, starting with some defaults, updating with any
# user-specified config
Expand All @@ -85,7 +102,7 @@ def alert(self, matches):
'title': self.create_title(matches),
}
alert_config.update(self.rule.get('hive_alert_config', {}))

# Iterate through each match found, populating the alert tags and observables as required
tags = set()
artifacts = []
Expand All @@ -96,8 +113,11 @@ def alert(self, matches):
alert_config['artifacts'] = artifacts
alert_config['tags'] = list(tags)

# Populate the customFields
alert_config['customFields'] = self.load_custom_fields(alert_config['customFields'],
# Populate the customFields
if len(matches) > 0:
#Populate description field
alert_config['description']=self.load_description(alert_config['description'], matches[0])
alert_config['customFields'] = self.load_custom_fields(alert_config['customFields'],
matches[0])

# POST the alert to TheHive
Expand Down
169 changes: 169 additions & 0 deletions tests/alerters/thehive_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,172 @@ def test_load_tags(tags, expect):
}
actual = alert.load_tags(tags, match)
assert expect == actual


def test_load_description_default():
rule = {'alert': [],
'alert_text': '',
'alert_text_type': 'alert_text_only',
'description': 'test',
'hive_alert_config': {'customFields': [{'name': 'test',
'type': 'string',
'value': 2}],
'follow': True,
'severity': 2,
'source': 'elastalert',
'status': 'New',
'tags': ['test.port'],
'tlp': 3,
'type': 'external'},
'hive_connection': {'hive_apikey': '',
'hive_host': 'https://localhost',
'hive_port': 9000},
'hive_observable_data_mapping': [{'ip': 'test.ip', 'autonomous-system': 'test.as_number'}],
'name': 'test-thehive',
'tags': ['a', 'b'],
'type': 'any'}

rules_loader = FileRulesLoader({})
rules_loader.load_modules(rule)
alert = HiveAlerter(rule)
match = {
"test": {
"ip": "127.0.0.1",
"port": 9876,
"as_number": 1234
},
"@timestamp": "2021-05-09T14:43:30",
}

actual = alert.load_description(alert.create_alert_body(match), match)
expected = alert.create_alert_body(match)
assert actual == expected


# Test when description is submitted under hive_alert_config but description_args is not
def test_load_description_no_args():
rule = {'alert': [],
'alert_text': '',
'alert_text_type': 'alert_text_only',
'description': 'test',
'hive_alert_config': {'customFields': [{'name': 'test',
'type': 'string',
'value': 2}],
'follow': True,
'severity': 2,
'source': 'elastalert',
'description': 'TheHive description test',
'status': 'New',
'tags': ['test.port'],
'tlp': 3,
'type': 'external'},
'hive_connection': {'hive_apikey': '',
'hive_host': 'https://localhost',
'hive_port': 9000},
'hive_observable_data_mapping': [{'ip': 'test.ip', 'autonomous-system': 'test.as_number'}],
'name': 'test-thehive',
'tags': ['a', 'b'],
'type': 'any'}

rules_loader = FileRulesLoader({})
rules_loader.load_modules(rule)
alert = HiveAlerter(rule)
match = {
"test": {
"ip": "127.0.0.1",
"port": 9876,
"as_number": 1234
},
"@timestamp": "2021-05-09T14:43:30",
}
actual = alert.load_description(rule['hive_alert_config']['description'], match)
expected = rule['hive_alert_config']['description']
assert actual == expected


# Test with description_missing_value

def test_load_description_args():
rule = {'alert': [],
'alert_text': '',
'alert_text_type': 'alert_text_only',
'title': 'Unit test',
'description': 'test',
'hive_alert_config': {'customFields': [{'name': 'test',
'type': 'string',
'value': 2}],
'follow': True,
'severity': 2,
'source': 'elastalert',
'description_missing_value': '<Value not found in logs>',
'description_args': ['title', 'test.ip', 'host'],
'description': '{0} from host:{2} to {1}',
'status': 'New',
'tags': ['test.port'],
'tlp': 3,
'type': 'external'},
'hive_connection': {'hive_apikey': '',
'hive_host': 'https://localhost',
'hive_port': 9000},
'hive_observable_data_mapping': [{'ip': 'test.ip', 'autonomous-system': 'test.as_number'}],
'name': 'test-thehive',
'tags': ['a', 'b'],
'type': 'any'}

rules_loader = FileRulesLoader({})
rules_loader.load_modules(rule)
alert = HiveAlerter(rule)
match = {
"test": {
"ip": "127.0.0.1",
"port": 9876,
"as_number": 1234
},
"@timestamp": "2021-05-09T14:43:30",
}
actual = alert.load_description(rule['hive_alert_config']['description'], match)
expected = "Unit test from host:<Value not found in logs> to 127.0.0.1"
assert actual == expected


# Test without description_missing_value, missing values a replaced by a default value <MISSING VALUE>
def test_load_description_missing_value_default():
rule = {'alert': [],
'alert_text': '',
'alert_text_type': 'alert_text_only',
'title': 'Unit test',
'description': 'test',
'hive_alert_config': {'customFields': [{'name': 'test',
'type': 'string',
'value': 2}],
'follow': True,
'severity': 2,
'source': 'elastalert',
'description_args': ['title', 'test.ip', 'host'],
'description': '{0} from host:{2} to {1}',
'status': 'New',
'tags': ['test.port'],
'tlp': 3,
'type': 'external'},
'hive_connection': {'hive_apikey': '',
'hive_host': 'https://localhost',
'hive_port': 9000},
'hive_observable_data_mapping': [{'ip': 'test.ip', 'autonomous-system': 'test.as_number'}],
'name': 'test-thehive',
'tags': ['a', 'b'],
'type': 'any'}

rules_loader = FileRulesLoader({})
rules_loader.load_modules(rule)
alert = HiveAlerter(rule)
match = {
"test": {
"ip": "127.0.0.1",
"port": 9876,
"as_number": 1234
},
"@timestamp": "2021-05-09T14:43:30",
}
actual = alert.load_description(rule['hive_alert_config']['description'], match)
expected = "Unit test from host:<MISSING VALUE> to 127.0.0.1"
assert actual == expected

0 comments on commit 80fb1d0

Please sign in to comment.