-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add stix-extract tool #48
base: develop
Are you sure you want to change the base?
Changes from all commits
28d58c2
2e6a280
f7ffdeb
5320414
a040345
de9c841
971c4fb
1654c50
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,2 @@ | ||
"""This file defines the version of this module.""" | ||
__version__ = "1.5.1" | ||
__version__ = "1.5.2" |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,143 @@ | ||||||||||
#!/usr/bin/env python3 | ||||||||||
|
||||||||||
""" | ||||||||||
Comment on lines
+1
to
+3
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should not be included in a file that is part of a package. The
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @mcdonnnj , Is the suggested change being made? I still notice in the file stix_extract.py the first line reads as #!/usr/bin/env python3. |
||||||||||
Extract valuable information from STIX (Structured Threat Information Expression) files. | ||||||||||
|
||||||||||
This script parses the STIX file to extract and print the following observables: | ||||||||||
- IP addresses, which are associated with network indicators. | ||||||||||
- Hashes (SHA256, SHA1, MD5) of files, prioritizing by hash type. | ||||||||||
- Fully Qualified Domain Names (FQDNs), which can help identify associated domains. | ||||||||||
- URLs, which could represent potential threat sources or command and control servers. | ||||||||||
|
||||||||||
The script prints each observable type in a separate section with a clear title for easy reading. | ||||||||||
|
||||||||||
Usage: | ||||||||||
stix-extract [<file>] | ||||||||||
|
||||||||||
Options: | ||||||||||
-h --help Show this screen. | ||||||||||
|
||||||||||
Arguments: | ||||||||||
file The path to the STIX xml file to parse. If not specified, reads from standard input. | ||||||||||
""" | ||||||||||
|
||||||||||
# Standard Python Libraries | ||||||||||
from collections import OrderedDict | ||||||||||
from io import TextIOWrapper | ||||||||||
import ipaddress | ||||||||||
import sys | ||||||||||
|
||||||||||
# Third-Party Libraries | ||||||||||
from docopt import docopt | ||||||||||
from stix.core import STIXPackage | ||||||||||
|
||||||||||
from ._version import __version__ | ||||||||||
|
||||||||||
|
||||||||||
def extract_stix_info(stix_file): | ||||||||||
""" | ||||||||||
Extract valuable information (IP addresses, hashes, FQDNs, and URLs) from a STIX file. | ||||||||||
|
||||||||||
Prioritize hashes based on their type: SHA256 > SHA1 > MD5. | ||||||||||
|
||||||||||
Args: | ||||||||||
stix_file (str): path to the STIX file to parse | ||||||||||
|
||||||||||
Returns: | ||||||||||
tuple: a tuple containing four lists - one for IP addresses, one for hashes, one for FQDNs, and one for URLs. | ||||||||||
""" | ||||||||||
# Load the STIX package from the XML file | ||||||||||
try: | ||||||||||
stix_package = STIXPackage.from_xml(stix_file) | ||||||||||
except Exception as e: | ||||||||||
sys.stderr.write(f"Error parsing STIX file: {e}") | ||||||||||
raise e | ||||||||||
|
||||||||||
# Initialize lists to store IP addresses, hashes, FQDNs, and URLs | ||||||||||
ip_addresses = [] | ||||||||||
hashes = [] | ||||||||||
fqdns = [] | ||||||||||
urls = [] | ||||||||||
|
||||||||||
# Define hash type priority. Lower value means higher priority. | ||||||||||
hash_priority = OrderedDict([("SHA256", 0), ("SHA1", 1), ("MD5", 2)]) | ||||||||||
|
||||||||||
# Iterate over each indicator in the STIX package | ||||||||||
for indicator in stix_package.indicators: | ||||||||||
for observable in indicator.observables: | ||||||||||
object_type = observable.object_.properties._XSI_TYPE | ||||||||||
if object_type == "AddressObjectType": | ||||||||||
# Convert cybox.common.properties.String to str | ||||||||||
ip_addresses.append(str(observable.object_.properties.address_value)) | ||||||||||
elif object_type == "FileObjectType": | ||||||||||
hashes_dict = observable.object_.properties.hashes | ||||||||||
if hashes_dict: | ||||||||||
best_hash = None | ||||||||||
best_priority = float("inf") | ||||||||||
for h in hashes_dict: | ||||||||||
if ( | ||||||||||
h.type_.value in hash_priority | ||||||||||
and hash_priority[h.type_.value] < best_priority | ||||||||||
): | ||||||||||
best_hash = str(h.simple_hash_value.value) # Convert to str | ||||||||||
best_priority = hash_priority[h.type_.value] | ||||||||||
if best_hash is not None: | ||||||||||
hashes.append(best_hash) | ||||||||||
elif object_type == "DomainNameObjectType": | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor but would you adjust this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if you'd want to make those lists alphabetical or not considering we've done this before. LGTM tho |
||||||||||
# Convert cybox.common.properties.String to str | ||||||||||
fqdns.append(str(observable.object_.properties.value.value)) | ||||||||||
elif object_type == "URIObjectType": | ||||||||||
# Convert cybox.common.properties.String to str | ||||||||||
urls.append(str(observable.object_.properties.value.value)) | ||||||||||
|
||||||||||
return ip_addresses, hashes, fqdns, urls | ||||||||||
|
||||||||||
|
||||||||||
def sort_ip_address(ip): | ||||||||||
""" | ||||||||||
Take an IP address as input and return a tuple that can be used for sorting. | ||||||||||
|
||||||||||
Args: | ||||||||||
ip (str): an IP address | ||||||||||
|
||||||||||
Returns: | ||||||||||
tuple: a tuple containing two elements - the IP version (int) and the integer representation of the IP address (int). | ||||||||||
""" | ||||||||||
ip_obj = ipaddress.ip_address(ip) | ||||||||||
return (ip_obj.version, int(ip_obj)) | ||||||||||
|
||||||||||
|
||||||||||
def main(): | ||||||||||
"""Parse command line arguments and extract information from the STIX file.""" | ||||||||||
# Parse command line arguments | ||||||||||
args = docopt(__doc__, version=__version__) | ||||||||||
# Extract data from the STIX file or from stdin | ||||||||||
stix_file = ( | ||||||||||
args["<file>"] | ||||||||||
if args["<file>"] | ||||||||||
else TextIOWrapper(sys.stdin.buffer, encoding="utf-8") | ||||||||||
) | ||||||||||
# Extract data from the STIX file | ||||||||||
ip_addresses, hashes, fqdns, urls = extract_stix_info(stix_file) | ||||||||||
# Sort IP addresses naturally (by their integer representation) | ||||||||||
ip_addresses.sort(key=sort_ip_address) | ||||||||||
hashes.sort() | ||||||||||
fqdns.sort() | ||||||||||
urls.sort() | ||||||||||
# Print IPs, hashes, FQDNs, and URLs with separators and titles | ||||||||||
print(f"\n{'#' * 20}\n# IP Addresses\n{'#' * 20}\n") | ||||||||||
for ip in ip_addresses: | ||||||||||
print(ip) | ||||||||||
Comment on lines
+128
to
+130
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we're going to have four instances of the same logic I think we should DRY this out into a function. All four data sources are iterables and we're printing the headers with identical formatting. |
||||||||||
print(f"\n{'#' * 20}\n# Hashes\n{'#' * 20}\n") | ||||||||||
for hash in hashes: | ||||||||||
print(hash) | ||||||||||
print(f"\n{'#' * 20}\n# FQDNs\n{'#' * 20}\n") | ||||||||||
for fqdn in fqdns: | ||||||||||
print(fqdn) | ||||||||||
print(f"\n{'#' * 20}\n# URLs\n{'#' * 20}\n") | ||||||||||
for url in urls: | ||||||||||
print(url) | ||||||||||
|
||||||||||
|
||||||||||
if __name__ == "__main__": | ||||||||||
sys.exit(main()) | ||||||||||
Comment on lines
+141
to
+143
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same rationale as the shebang comment.
Suggested change
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if this file had typehints.