Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for campaings entity added in MITRE v12 #62

Merged
merged 2 commits into from
Nov 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,5 @@ venv.bak/
# intellij
.idea/

.DS_Store
.DS_Store
.history
148 changes: 141 additions & 7 deletions attackcti/attack_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,20 @@ def translate_stix_objects(self, stix_objects):
"x_mitre_collection_layers": "collection_layers",
"x_mitre_contributors": "contributors"
}
campaign_stix_mapping = {
"type": "type",
"id": "id",
"created_by_ref": "created_by_ref",
"created": "created",
"modified": "modified",
"name": "name",
"description": "campaign_description",
"aliases": "campaign_aliases",
"object_marking_refs": "object_marking_refs",
"external_references": "external_references",
"x_mitre_first_seen_citation": "first_seen_citation",
"x_mitre_last_seen_citation": "last_seen_citation"
}

# ******** Helper Functions ********
def handle_list(list_object, object_type):
Expand All @@ -230,6 +244,8 @@ def handle_list(list_object, object_type):
obj_dict['tactic_id'] = list_object[0]['external_id']
elif obj_dict['type'] == 'matrix':
obj_dict['matrix_id'] = list_object[0]['external_id']
elif obj_dict['type'] == 'campaign':
obj_dict['campaign_id'] = list_object[0]['external_id']
elif object_type == "kill_chain_phases":
tactic_list = list()
for phase in list_object:
Expand Down Expand Up @@ -266,6 +282,8 @@ def handle_list(list_object, object_type):
stix_mapping = marking_stix_mapping
elif obj['type'] == "x-mitre-data-source":
stix_mapping = data_source_stix_mapping
elif obj['type'] == "campaign":
stix_mapping = campaign_stix_mapping
else:
return stix_objects_list

Expand Down Expand Up @@ -330,7 +348,8 @@ def get_enterprise(self, stix_format=True):
"tactics": self.get_enterprise_tactics,
"matrix": Filter("type", "=", "x-mitre-matrix"),
"identity": Filter("type", "=", "identity"),
"marking-definition": Filter("type", "=", "marking-definition")
"marking-definition": Filter("type", "=", "marking-definition"),
"campaign": self.get_enterprise_campaigns
}
enterprise_stix_objects = dict()
for key in enterprise_filter_objects:
Expand All @@ -339,6 +358,25 @@ def get_enterprise(self, stix_format=True):
enterprise_stix_objects[key] = self.translate_stix_objects(enterprise_stix_objects[key])
return enterprise_stix_objects

def get_enterprise_campaigns(self, skip_revoked_deprecated=True, stix_format=True):
""" Extracts all the available campaigns STIX objects in the Enterprise ATT&CK matrix

Args:
skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects.
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')

Returns:
List of STIX objects
"""
enterprise_campaigns = self.TC_ENTERPRISE_SOURCE.query([Filter("type", "=", "campaign")])

if skip_revoked_deprecated:
enterprise_campaigns = self.remove_revoked_deprecated(enterprise_campaigns)

if not stix_format:
enterprise_campaigns = self.translate_stix_objects(enterprise_campaigns)
return enterprise_campaigns

def get_enterprise_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, enrich_data_sources = False, stix_format=True):
""" Extracts all the available techniques STIX objects in the Enterprise ATT&CK matrix

Expand Down Expand Up @@ -636,15 +674,36 @@ def get_mobile(self, stix_format=True):
"tactics": self.get_mobile_tactics,
"matrix": Filter("type", "=", "x-mitre-matrix"),
"identity": Filter("type", "=", "identity"),
"marking-definition": Filter("type", "=", "marking-definition")
"marking-definition": Filter("type", "=", "marking-definition"),
"campaigns": self.get_mobile_campaigns
}
mobile_stix_objects = {}
for key in mobile_filter_objects:
mobile_stix_objects[key] = self.TC_MOBILE_SOURCE.query(mobile_filter_objects[key]) if isinstance(mobile_filter_objects[key], Filter) else mobile_filter_objects[key]()
if not stix_format:
mobile_stix_objects[key] = self.translate_stix_objects(mobile_stix_objects[key])
return mobile_stix_objects


def get_mobile_campaigns(self, skip_revoked_deprecated=True, stix_format=True):
""" Extracts all the available techniques STIX objects in the Mobile ATT&CK matrix

Args:
skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects.
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')

Returns:
List of STIX objects
"""

mobile_campaigns = self.TC_MOBILE_SOURCE.query(Filter("type", "=", "campaign"))

if skip_revoked_deprecated:
mobile_campaigns = self.remove_revoked_deprecated(mobile_campaigns)

if not stix_format:
mobile_campaigns = self.translate_stix_objects(mobile_campaigns)
return mobile_campaigns

def get_mobile_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, stix_format=True):
""" Extracts all the available techniques STIX objects in the Mobile ATT&CK matrix

Expand Down Expand Up @@ -945,7 +1004,32 @@ def get_stix_objects(self, stix_format=True):
for resource_type in attack_stix_objects[matrix].keys():
attack_stix_objects[matrix][resource_type] = self.translate_stix_objects(attack_stix_objects[matrix][resource_type])
return attack_stix_objects


def get_campaigns(self, skip_revoked_deprecated=True, stix_format=True):
""" Extracts all the available campaigns STIX objects across all ATT&CK matrices

Args:
skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects.
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')

Returns:
List of STIX objects
"""

enterprise_campaigns = self.get_enterprise_campaigns()
mobile_campaigns = self.get_mobile_campaigns()
for mc in mobile_campaigns:
if mc not in enterprise_campaigns:
enterprise_campaigns.append(mc)

if skip_revoked_deprecated:
enterprise_campaigns = self.remove_revoked_deprecated(enterprise_campaigns)

if not stix_format:
enterprise_campaigns = self.translate_stix_objects(enterprise_campaigns)

return enterprise_campaigns

def get_techniques(self, include_subtechniques=True, skip_revoked_deprecated=True, enrich_data_sources=False, stix_format=True):
""" Extracts all the available techniques STIX objects across all ATT&CK matrices

Expand Down Expand Up @@ -1269,7 +1353,7 @@ def get_object_by_attack_id(self, object_type, attack_id, stix_format=True):
List of STIX objects

"""
valid_objects = {'attack-pattern','course-of-action','intrusion-set','malware','tool','x-mitre-data-source', 'x-mitre-data-component'}
valid_objects = {'attack-pattern','course-of-action','intrusion-set','malware','tool','x-mitre-data-source', 'x-mitre-data-component', 'campaign'}
if object_type not in valid_objects:
raise ValueError(f"ERROR: Valid object must be one of {valid_objects}")
else:
Expand All @@ -1282,6 +1366,36 @@ def get_object_by_attack_id(self, object_type, attack_id, stix_format=True):
all_stix_objects = self.translate_stix_objects(all_stix_objects)
return all_stix_objects

def get_campaign_by_alias(self, campaign_alias, case=True, stix_format=True):
""" Extracts campaign STIX objects by alias name accross all ATT&CK matrices

Args:
campaign_alias (str) : Alias of threat actor group
case (bool) : case sensitive or not
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')

Returns:
List of STIX objects

"""
if not case:
all_campaigns = self.get_campaigns()
all_campaigns_list = list()
for campaign in all_campaigns:
if "aliases" in campaign.keys():
for alias in campaign['aliases']:
if campaign_alias.lower() in alias.lower():
all_campaigns_list.append(campaign)
else:
filter_objects = [
Filter('type', '=', 'campaign'),
Filter('aliases', '=', campaign_alias)
]
all_campaigns_list = self.COMPOSITE_DS.query(filter_objects)
if not stix_format:
all_campaigns_list = self.translate_stix_objects(all_campaigns_list)
return all_campaigns_list

def get_group_by_alias(self, group_alias, case=True, stix_format=True):
""" Extracts group STIX objects by alias name accross all ATT&CK matrices

Expand Down Expand Up @@ -1311,7 +1425,27 @@ def get_group_by_alias(self, group_alias, case=True, stix_format=True):
if not stix_format:
all_groups_list = self.translate_stix_objects(all_groups_list)
return all_groups_list


def get_campaigns_since_time(self, timestamp, stix_format=True):
""" Extracts campaings STIX objects since specific time accross all ATT&CK matrices

Args:
timestamp (timestamp): Timestamp
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')

Returns:
List of STIX objects

"""
filter_objects = [
Filter('type', '=', 'campaign'),
Filter('created', '>', timestamp)
]
all_campaigns_list = self.COMPOSITE_DS.query(filter_objects)
if not stix_format:
all_campaigns_list = self.translate_stix_objects(all_campaigns_list)
return all_campaigns_list

def get_techniques_since_time(self, timestamp, stix_format=True):
""" Extracts techniques STIX objects since specific time accross all ATT&CK matrices

Expand Down Expand Up @@ -1804,4 +1938,4 @@ def enrich_techniques_data_sources(self, stix_object):
if technique_ds:
new_data_sources = [ v for v in technique_ds.values()]
stix_object[i] = stix_object[i].new_version(x_mitre_data_sources = new_data_sources)
return stix_object
return stix_object