diff --git a/CHANGELOG.md b/CHANGELOG.md index 12b668f3..e7fbaf9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ ## Other changes - Upgrade stomp 8.0.0 to 8.0.1 - [#832](https://github.com/jertel/elastalert2/pull/832) - @jertel - Add support for Kibana 8.2 for Kibana Discover, Upgrade Pytest 7.1.1 to 7.1.2, Upgrade pylint 2.13.5 to 2.13.8, Upgrade Jinja2 3.1.1 to 3.1.2 - [#840](https://github.com/jertel/elastalert2/pull/840) - @nsano-rururu +- Add the possibility to use rule and match fields in the description of TheHive alerts - [#855](https://github.com/jertel/elastalert2/pull/855) - @luffynextgen # 2.5.0 diff --git a/docs/source/ruletypes.rst b/docs/source/ruletypes.rst index 85788954..c6b2739b 100644 --- a/docs/source/ruletypes.rst +++ b/docs/source/ruletypes.rst @@ -3232,6 +3232,9 @@ the observable value is also the same, including the behaviour for aggregated al ``hive_verify``: Whether or not to enable SSL certificate validation. Defaults to False. +``description_args``: can be used to call rule and match fileds in the description of the alert in TheHive +``description_missing_value``: Text to replace any match field not found when formating the ``description``. Defaults to ````. + Example usage:: alert: hivealerter @@ -3253,7 +3256,8 @@ Example usage:: severity: 2 status: 'New' source: 'elastalert' - description: 'Sample description' + description_args: [ name, description] + description: '{0} : {1}' tags: ['tag1', 'tag2'] title: 'Title' tlp: 3 diff --git a/elastalert/alerters/thehive.py b/elastalert/alerters/thehive.py index e7dbf9a3..e50ad201 100644 --- a/elastalert/alerters/thehive.py +++ b/elastalert/alerters/thehive.py @@ -72,6 +72,23 @@ def load_tags(self, tag_names: list, match: dict): return tag_values + def load_description(self, description_raw, match: dict): + missing = self.rule['hive_alert_config'].get('description_missing_value', '') + if 'description_args' in self.rule.get('hive_alert_config'): + description_args = self.rule['hive_alert_config'].get('description_args') + description_values=[] + for arg in description_args: + description_values.append(self.lookup_field(match, arg, missing)) + for i, text_value in enumerate(description_values): + if text_value is None: + description_value = self.rule.get(description_args[i]) + if description_value: + description_values[i] = description_value + description_values = [missing if val is None else val for val in description_values] + description_raw = description_raw.format(*description_values) + return description_raw + else: + return description_raw def alert(self, matches): # Build TheHive alert object, starting with some defaults, updating with any # user-specified config @@ -85,7 +102,7 @@ def alert(self, matches): 'title': self.create_title(matches), } alert_config.update(self.rule.get('hive_alert_config', {})) - + # Iterate through each match found, populating the alert tags and observables as required tags = set() artifacts = [] @@ -96,8 +113,11 @@ def alert(self, matches): alert_config['artifacts'] = artifacts alert_config['tags'] = list(tags) - # Populate the customFields - alert_config['customFields'] = self.load_custom_fields(alert_config['customFields'], + # Populate the customFields + if len(matches) > 0: + #Populate description field + alert_config['description']=self.load_description(alert_config['description'], matches[0]) + alert_config['customFields'] = self.load_custom_fields(alert_config['customFields'], matches[0]) # POST the alert to TheHive diff --git a/tests/alerters/thehive_test.py b/tests/alerters/thehive_test.py index e954588d..7bd1dc89 100644 --- a/tests/alerters/thehive_test.py +++ b/tests/alerters/thehive_test.py @@ -304,3 +304,172 @@ def test_load_tags(tags, expect): } actual = alert.load_tags(tags, match) assert expect == actual + + +def test_load_description_default(): + rule = {'alert': [], + 'alert_text': '', + 'alert_text_type': 'alert_text_only', + 'description': 'test', + 'hive_alert_config': {'customFields': [{'name': 'test', + 'type': 'string', + 'value': 2}], + 'follow': True, + 'severity': 2, + 'source': 'elastalert', + 'status': 'New', + 'tags': ['test.port'], + 'tlp': 3, + 'type': 'external'}, + 'hive_connection': {'hive_apikey': '', + 'hive_host': 'https://localhost', + 'hive_port': 9000}, + 'hive_observable_data_mapping': [{'ip': 'test.ip', 'autonomous-system': 'test.as_number'}], + 'name': 'test-thehive', + 'tags': ['a', 'b'], + 'type': 'any'} + + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = HiveAlerter(rule) + match = { + "test": { + "ip": "127.0.0.1", + "port": 9876, + "as_number": 1234 + }, + "@timestamp": "2021-05-09T14:43:30", + } + + actual = alert.load_description(alert.create_alert_body(match), match) + expected = alert.create_alert_body(match) + assert actual == expected + + +# Test when description is submitted under hive_alert_config but description_args is not +def test_load_description_no_args(): + rule = {'alert': [], + 'alert_text': '', + 'alert_text_type': 'alert_text_only', + 'description': 'test', + 'hive_alert_config': {'customFields': [{'name': 'test', + 'type': 'string', + 'value': 2}], + 'follow': True, + 'severity': 2, + 'source': 'elastalert', + 'description': 'TheHive description test', + 'status': 'New', + 'tags': ['test.port'], + 'tlp': 3, + 'type': 'external'}, + 'hive_connection': {'hive_apikey': '', + 'hive_host': 'https://localhost', + 'hive_port': 9000}, + 'hive_observable_data_mapping': [{'ip': 'test.ip', 'autonomous-system': 'test.as_number'}], + 'name': 'test-thehive', + 'tags': ['a', 'b'], + 'type': 'any'} + + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = HiveAlerter(rule) + match = { + "test": { + "ip": "127.0.0.1", + "port": 9876, + "as_number": 1234 + }, + "@timestamp": "2021-05-09T14:43:30", + } + actual = alert.load_description(rule['hive_alert_config']['description'], match) + expected = rule['hive_alert_config']['description'] + assert actual == expected + + +# Test with description_missing_value + +def test_load_description_args(): + rule = {'alert': [], + 'alert_text': '', + 'alert_text_type': 'alert_text_only', + 'title': 'Unit test', + 'description': 'test', + 'hive_alert_config': {'customFields': [{'name': 'test', + 'type': 'string', + 'value': 2}], + 'follow': True, + 'severity': 2, + 'source': 'elastalert', + 'description_missing_value': '', + 'description_args': ['title', 'test.ip', 'host'], + 'description': '{0} from host:{2} to {1}', + 'status': 'New', + 'tags': ['test.port'], + 'tlp': 3, + 'type': 'external'}, + 'hive_connection': {'hive_apikey': '', + 'hive_host': 'https://localhost', + 'hive_port': 9000}, + 'hive_observable_data_mapping': [{'ip': 'test.ip', 'autonomous-system': 'test.as_number'}], + 'name': 'test-thehive', + 'tags': ['a', 'b'], + 'type': 'any'} + + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = HiveAlerter(rule) + match = { + "test": { + "ip": "127.0.0.1", + "port": 9876, + "as_number": 1234 + }, + "@timestamp": "2021-05-09T14:43:30", + } + actual = alert.load_description(rule['hive_alert_config']['description'], match) + expected = "Unit test from host: to 127.0.0.1" + assert actual == expected + + +# Test without description_missing_value, missing values a replaced by a default value +def test_load_description_missing_value_default(): + rule = {'alert': [], + 'alert_text': '', + 'alert_text_type': 'alert_text_only', + 'title': 'Unit test', + 'description': 'test', + 'hive_alert_config': {'customFields': [{'name': 'test', + 'type': 'string', + 'value': 2}], + 'follow': True, + 'severity': 2, + 'source': 'elastalert', + 'description_args': ['title', 'test.ip', 'host'], + 'description': '{0} from host:{2} to {1}', + 'status': 'New', + 'tags': ['test.port'], + 'tlp': 3, + 'type': 'external'}, + 'hive_connection': {'hive_apikey': '', + 'hive_host': 'https://localhost', + 'hive_port': 9000}, + 'hive_observable_data_mapping': [{'ip': 'test.ip', 'autonomous-system': 'test.as_number'}], + 'name': 'test-thehive', + 'tags': ['a', 'b'], + 'type': 'any'} + + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = HiveAlerter(rule) + match = { + "test": { + "ip": "127.0.0.1", + "port": 9876, + "as_number": 1234 + }, + "@timestamp": "2021-05-09T14:43:30", + } + actual = alert.load_description(rule['hive_alert_config']['description'], match) + expected = "Unit test from host: to 127.0.0.1" + assert actual == expected