From fb698bb9d28945e725836d23d93df2f22361edec Mon Sep 17 00:00:00 2001 From: Roberto Rodriguez Date: Fri, 7 Jan 2022 03:22:15 -0500 Subject: [PATCH] updated enterprise pre mobile and ics main functions and revoked and deprecated functions --- attackcti/attack_api.py | 216 ++++++++++++++++++++++++++-------------- 1 file changed, 142 insertions(+), 74 deletions(-) diff --git a/attackcti/attack_api.py b/attackcti/attack_api.py index 89d0cb7..d906a6a 100644 --- a/attackcti/attack_api.py +++ b/attackcti/attack_api.py @@ -10,6 +10,7 @@ # https://stackoverflow.com/a/4406521 from stix2 import TAXIICollectionSource, Filter, CompositeDataSource, FileSystemSource +from stix2.datastore.filters import apply_common_filters from stix2.utils import get_type_from_id #from stix2.v20.sdo import * from taxii2client.v20 import Collection @@ -277,37 +278,34 @@ def handle_list(list_object, object_type): stix_objects_list.append(obj_dict) return stix_objects_list - def remove_revoked(self, stix_objects): - non_revoked = list() - for obj in stix_objects: - if 'revoked' in obj.keys() and obj['revoked'] == True: - continue - else: - non_revoked.append(obj) - return non_revoked + # https://github.com/mitre/cti/issues/127 + # https://github.com/mitre/cti/blob/master/USAGE.md#removing-revoked-and-deprecated-objects + def remove_revoked_deprecated(self, stix_objects): + """Remove any revoked or deprecated objects from queries made to the data source""" + return list( + filter( + lambda x: x.get("x_mitre_deprecated", False) is False and x.get("revoked", False) is False, + stix_objects + ) + ) + # https://stix2.readthedocs.io/en/latest/api/datastore/stix2.datastore.filters.html def extract_revoked(self, stix_objects): - revoked = list() - for obj in stix_objects: - if 'revoked' in obj.keys() and obj['revoked'] == True: - revoked.append(obj) - return revoked + """Extract revoked objects from STIX objects""" + return list( + apply_common_filters( + stix_objects, + [Filter('revoked','=',True)] + )) - def remove_deprecated(self, stix_objects): - non_deprecated = list() - for obj in stix_objects: - if 'x_mitre_deprecated' in obj.keys() and obj['x_mitre_deprecated'] == True: - continue - else: - non_deprecated.append(obj) - return non_deprecated - + # https://stix2.readthedocs.io/en/latest/api/datastore/stix2.datastore.filters.html def extract_deprecated(self, stix_objects): - deprecated = list() - for obj in stix_objects: - if 'x_mitre_deprecated' in obj.keys() and obj['x_mitre_deprecated'] == True: - deprecated.append(obj) - return deprecated + """Extract deprecated objects from STIX objects""" + return list( + apply_common_filters( + stix_objects, + [Filter('x_mitre_deprecated','=',True)] + )) # ******** Enterprise ATT&CK Technology Domain ******* def get_enterprise(self, stix_format=True): @@ -321,35 +319,49 @@ def get_enterprise(self, stix_format=True): """ enterprise_filter_objects = { - "techniques": Filter("type", "=", "attack-pattern"), - "mitigations": Filter("type", "=", "course-of-action"), - "groups": Filter("type", "=", "intrusion-set"), - "malware": Filter("type", "=", "malware"), - "tools": Filter("type", "=", "tool"), - "data-component": Filter("type", "=", "x-mitre-data-component"), - "relationships": Filter("type", "=", "relationship"), - "tactics": Filter("type", "=", "x-mitre-tactic"), + "techniques": self.get_enterprise_techniques, + "data-component": self.get_enterprise_data_components, + "mitigations": self.get_enterprise_mitigations, + "groups": self.get_enterprise_groups, + "malware": self.get_enterprise_malware, + "tools": self.get_enterprise_tools, + "data-source": self.get_enterprise_data_sources, + "relationships": self.get_enterprise_relationships, + "tactics": self.get_enterprise_tactics, "matrix": Filter("type", "=", "x-mitre-matrix"), "identity": Filter("type", "=", "identity"), "marking-definition": Filter("type", "=", "marking-definition") } - enterprise_stix_objects = {} + enterprise_stix_objects = dict() for key in enterprise_filter_objects: - enterprise_stix_objects[key] = (self.TC_ENTERPRISE_SOURCE.query(enterprise_filter_objects[key])) + enterprise_stix_objects[key] = self.TC_ENTERPRISE_SOURCE.query(enterprise_filter_objects[key]) if isinstance(enterprise_filter_objects[key], Filter) else enterprise_filter_objects[key]() if not stix_format: enterprise_stix_objects[key] = self.translate_stix_objects(enterprise_stix_objects[key]) return enterprise_stix_objects - def get_enterprise_techniques(self, stix_format=True): + def get_enterprise_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, stix_format=True): """ Extracts all the available techniques STIX objects in the Enterprise ATT&CK matrix Args: + skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects. + include_subtechniques (bool): default True. Include techniques and sub-techniques STIX objects. stix_format (bool): Returns results in original STIX format or friendly syntax (i.e. 'attack-pattern' or 'technique') Returns: List of STIX objects """ - enterprise_techniques = self.TC_ENTERPRISE_SOURCE.query(Filter("type", "=", "attack-pattern")) + + if include_subtechniques: + enterprise_techniques = self.TC_ENTERPRISE_SOURCE.query(Filter("type", "=", "attack-pattern")) + else: + enterprise_techniques = self.TC_ENTERPRISE_SOURCE.query([ + Filter("type", "=", "attack-pattern"), + Filter('x_mitre_is_subtechnique', '=', False) + ]) + + if skip_revoked_deprecated: + enterprise_techniques = self.remove_revoked_deprecated(enterprise_techniques) + if not stix_format: enterprise_techniques = self.translate_stix_objects(enterprise_techniques) return enterprise_techniques @@ -383,10 +395,11 @@ def get_enterprise_mitigations(self, stix_format=True): enterprise_mitigations = self.translate_stix_objects(enterprise_mitigations) return enterprise_mitigations - def get_enterprise_groups(self, stix_format=True): + def get_enterprise_groups(self, skip_revoked_deprecated=True, stix_format=True): """ Extracts all the available groups STIX objects in the Enterprise ATT&CK matrix Args: + skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects. stix_format (bool): Returns results in original STIX format or friendly syntax (i.e. 'attack-pattern' or 'technique') Returns: @@ -394,6 +407,10 @@ def get_enterprise_groups(self, stix_format=True): """ enterprise_groups = self.TC_ENTERPRISE_SOURCE.query(Filter("type", "=", "intrusion-set")) + + if skip_revoked_deprecated: + enterprise_groups = self.remove_revoked_deprecated(enterprise_groups) + if not stix_format: enterprise_groups = self.translate_stix_objects(enterprise_groups) return enterprise_groups @@ -489,43 +506,55 @@ def get_pre(self, stix_format=True): warnings.warn("PRE ATT&CK is deprecated. It will be removed in future versions. Consider adjusting your application") pre_filter_objects = { - "techniques": Filter("type", "=", "attack-pattern"), - "groups": Filter("type", "=", "intrusion-set"), - "relationships": Filter("type", "=", "relationship"), - "tactics": Filter("type", "=", "x-mitre-tactic"), + "techniques": self.get_pre_techniques, + "groups": self.get_pre_groups, + "relationships": self.get_pre_relationships, + "tactics": self.get_pre_tactics, "matrix": Filter("type", "=", "x-mitre-matrix"), "identity": Filter("type", "=", "identity"), "marking-definition": Filter("type", "=", "marking-definition") } pre_stix_objects = {} for key in pre_filter_objects: - pre_stix_objects[key] = self.TC_PRE_SOURCE.query(pre_filter_objects[key]) + pre_stix_objects[key] = self.TC_PRE_SOURCE.query(pre_filter_objects[key]) if isinstance(pre_filter_objects[key], Filter) else pre_filter_objects[key]() if not stix_format: pre_stix_objects[key] = self.translate_stix_objects(pre_stix_objects[key]) return pre_stix_objects - - def get_pre_techniques(self, stix_format=True): + + def get_pre_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, stix_format=True): """ Extracts all the available techniques STIX objects in the Pre ATT&CK matrix [ DEPRECATED AS OF 11/23/2020 ] Args: + skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects. + include_subtechniques (bool): default True. Include techniques and sub-techniques STIX objects. stix_format (bool): Returns results in original STIX format or friendly syntax (i.e. 'attack-pattern' or 'technique') Returns: List of STIX objects - """ - + warnings.warn("PRE ATT&CK is deprecated. It will be removed in future versions. Consider adjusting your application") - pre_techniques = self.TC_PRE_SOURCE.query(Filter("type", "=", "attack-pattern")) + if include_subtechniques: + pre_techniques = self.TC_PRE_SOURCE.query(Filter("type", "=", "attack-pattern")) + else: + pre_techniques = self.TC_ENTERPRISE_SOURCE.query([ + Filter("type", "=", "attack-pattern"), + Filter('x_mitre_is_subtechnique', '=', False) + ]) + + if skip_revoked_deprecated: + pre_techniques = self.remove_revoked_deprecated(pre_techniques) + if not stix_format: pre_techniques = self.translate_stix_objects(pre_techniques) return pre_techniques - def get_pre_groups(self, stix_format=True): + def get_pre_groups(self, skip_revoked_deprecated=True, stix_format=True): """ Extracts all the available groups STIX objects in the Pre ATT&CK matrix [ DEPRECATED AS OF 11/23/2020 ] Args: + skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects. stix_format (bool): Returns results in original STIX format or friendly syntax (i.e. 'attack-pattern' or 'technique') Returns: @@ -536,6 +565,10 @@ def get_pre_groups(self, stix_format=True): warnings.warn("PRE ATT&CK is deprecated. It will be removed in future versions. Consider adjusting your application") pre_groups = self.TC_PRE_SOURCE.query(Filter("type", "=", "intrusion-set")) + + if skip_revoked_deprecated: + pre_groups = self.remove_revoked_deprecated(pre_groups) + if not stix_format: pre_groups = self.translate_stix_objects(pre_groups) return pre_groups @@ -589,35 +622,47 @@ def get_mobile(self, stix_format=True): """ mobile_filter_objects = { - "techniques": Filter("type", "=", "attack-pattern"), - "mitigations": Filter("type", "=", "course-of-action"), - "groups": Filter("type", "=", "intrusion-set"), - "malware": Filter("type", "=", "malware"), - "tools": Filter("type", "=", "tool"), - "relationships": Filter("type", "=", "relationship"), - "tactics": Filter("type", "=", "x-mitre-tactic"), + "techniques": self.get_mobile_techniques, + "mitigations": self.get_mobile_mitigations, + "groups": self.get_mobile_groups, + "malware": self.get_mobile_malware, + "tools": self.get_mobile_tools, + "relationships": self.get_mobile_relationships, + "tactics": self.get_mobile_tactics, "matrix": Filter("type", "=", "x-mitre-matrix"), "identity": Filter("type", "=", "identity"), "marking-definition": Filter("type", "=", "marking-definition") } mobile_stix_objects = {} for key in mobile_filter_objects: - mobile_stix_objects[key] = self.TC_MOBILE_SOURCE.query(mobile_filter_objects[key]) + mobile_stix_objects[key] = self.TC_MOBILE_SOURCE.query(mobile_filter_objects[key]) if isinstance(mobile_filter_objects[key], Filter) else mobile_filter_objects[key]() if not stix_format: mobile_stix_objects[key] = self.translate_stix_objects(mobile_stix_objects[key]) return mobile_stix_objects - def get_mobile_techniques(self, stix_format=True): - """ Extracts all the available techniques STIX objects in the Mobile ATT&CK matrix + def get_mobile_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, stix_format=True): + """ Extracts all the available techniques STIX objects in the Mobile ATT&CK matrix Args: + skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects. + include_subtechniques (bool): default True. Include techniques and sub-techniques STIX objects. stix_format (bool): Returns results in original STIX format or friendly syntax (i.e. 'attack-pattern' or 'technique') Returns: List of STIX objects - """ - mobile_techniques = self.TC_MOBILE_SOURCE.query(Filter("type", "=", "attack-pattern")) + + if include_subtechniques: + mobile_techniques = self.TC_MOBILE_SOURCE.query(Filter("type", "=", "attack-pattern")) + else: + mobile_techniques = self.TC_MOBILE_SOURCE.query([ + Filter("type", "=", "attack-pattern"), + Filter('x_mitre_is_subtechnique', '=', False) + ]) + + if skip_revoked_deprecated: + mobile_techniques = self.remove_revoked_deprecated(mobile_techniques) + if not stix_format: mobile_techniques = self.translate_stix_objects(mobile_techniques) return mobile_techniques @@ -637,10 +682,11 @@ def get_mobile_mitigations(self, stix_format=True): mobile_mitigations = self.translate_stix_objects(mobile_mitigations) return mobile_mitigations - def get_mobile_groups(self, stix_format=True): + def get_mobile_groups(self, skip_revoked_deprecated=True, stix_format=True): """ Extracts all the available groups STIX objects in the Mobile ATT&CK matrix Args: + skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects. stix_format (bool): Returns results in original STIX format or friendly syntax (i.e. 'attack-pattern' or 'technique') Returns: @@ -648,6 +694,10 @@ def get_mobile_groups(self, stix_format=True): """ mobile_groups = self.TC_MOBILE_SOURCE.query(Filter("type", "=", "intrusion-set")) + + if skip_revoked_deprecated: + mobile_groups = self.remove_revoked_deprecated(mobile_groups) + if not stix_format: mobile_groups = self.translate_stix_objects(mobile_groups) return mobile_groups @@ -724,32 +774,45 @@ def get_ics(self, stix_format=True): """ ics_filter_objects = { - "techniques": Filter("type", "=", "attack-pattern"), - "mitigations": Filter("type", "=", "course-of-action"), - "groups": Filter("type", "=", "intrusion-set"), - "malware": Filter("type", "=", "malware"), - "relationships": Filter("type", "=", "relationship"), - "tactics": Filter("type", "=", "x-mitre-tactic"), + "techniques": self.get_ics_techniques, + "mitigations": self.get_ics_mitigations, + "groups": self.get_ics_groups, + "malware": self.get_ics_malware, + "relationships": self.get_ics_relationships, + "tactics": self.get_ics_tactics, "matrix": Filter("type", "=", "x-mitre-matrix") } ics_stix_objects = {} for key in ics_filter_objects: - ics_stix_objects[key] = self.TC_ICS_SOURCE.query(ics_filter_objects[key]) + ics_stix_objects[key] = self.TC_ICS_SOURCE.query(ics_filter_objects[key]) if isinstance(ics_filter_objects[key], Filter) else ics_filter_objects[key]() if not stix_format: ics_stix_objects[key] = self.translate_stix_objects(ics_stix_objects[key]) return ics_stix_objects - def get_ics_techniques(self, stix_format=True): + def get_ics_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, stix_format=True): """ Extracts all the available techniques STIX objects in the ICS ATT&CK matrix Args: + skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects. + include_subtechniques (bool): default True. Include techniques and sub-techniques STIX objects. stix_format (bool): Returns results in original STIX format or friendly syntax (i.e. 'attack-pattern' or 'technique') Returns: List of STIX objects """ - ics_techniques = self.TC_ICS_SOURCE.query(Filter("type", "=", "attack-pattern")) + + if include_subtechniques: + ics_techniques = self.TC_ICS_SOURCE.query(Filter("type", "=", "attack-pattern")) + else: + ics_techniques = self.TC_ICS_SOURCE.query([ + Filter("type", "=", "attack-pattern"), + Filter('x_mitre_is_subtechnique', '=', False) + ]) + + if skip_revoked_deprecated: + ics_techniques = self.remove_revoked_deprecated(ics_techniques) + if not stix_format: ics_techniques = self.translate_stix_objects(ics_techniques) return ics_techniques @@ -769,10 +832,11 @@ def get_ics_mitigations(self, stix_format=True): ics_mitigations = self.translate_stix_objects(ics_mitigations) return ics_mitigations - def get_ics_groups(self, stix_format=True): + def get_ics_groups(self, skip_revoked_deprecated=True, stix_format=True): """ Extracts all the available groups STIX objects in the ICS ATT&CK matrix Args: + skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects. stix_format (bool): Returns results in original STIX format or friendly syntax (i.e. 'attack-pattern' or 'technique') Returns: @@ -780,6 +844,10 @@ def get_ics_groups(self, stix_format=True): """ ics_groups = self.TC_ICS_SOURCE.query(Filter("type", "=", "intrusion-set")) + + if skip_revoked_deprecated: + ics_groups = self.remove_revoked_deprecated(ics_groups) + if not stix_format: ics_groups = self.translate_stix_objects(ics_groups) return ics_groups