From 28d58c2cb5156b0a12ac950c63a003cd2d13fb69 Mon Sep 17 00:00:00 2001 From: Felddy Date: Fri, 9 Jun 2023 17:09:24 -0400 Subject: [PATCH 1/8] Add stix extraction utility --- src/ioc_scan/stix_extract.py | 142 +++++++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100755 src/ioc_scan/stix_extract.py diff --git a/src/ioc_scan/stix_extract.py b/src/ioc_scan/stix_extract.py new file mode 100755 index 0000000..5c562d3 --- /dev/null +++ b/src/ioc_scan/stix_extract.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 + +""" +Extract valuable information from STIX (Structured Threat Information Expression) files. + +This script parses the STIX file to extract and print the following observables: +- IP addresses, which are associated with network indicators. +- Hashes (SHA256, SHA1, MD5) of files, prioritizing by hash type. +- Fully Qualified Domain Names (FQDNs), which can help identify associated domains. +- URLs, which could represent potential threat sources or command and control servers. + +The script prints each observable type in a separate section with a clear title for easy reading. + +Usage: + stix-extract [] + +Options: + -h --help Show this screen. + +Arguments: + file The path to the STIX xml file to parse. If not specified, reads from standard input. +""" + +# Standard Python Libraries +from collections import OrderedDict +from io import TextIOWrapper +import ipaddress +import sys + +# Third-Party Libraries +from docopt import docopt +from stix.core import STIXPackage + + +def extract_stix_info(stix_file): + """ + Extract valuable information (IP addresses, hashes, FQDNs, and URLs) from a STIX file. + + Prioritize hashes based on their type: SHA256 > SHA1 > MD5. + + Args: + stix_file (str): path to the STIX file to parse + + Returns: + tuple: a tuple containing four lists - one for IP addresses, one for hashes, one for FQDNs, and one for URLs. + """ + # Load the STIX package from the XML file + try: + stix_package = STIXPackage.from_xml(stix_file) + except Exception as e: + print(f"Error parsing STIX file: {e}") + raise e + + # Initialize lists to store IP addresses, hashes, FQDNs, and URLs + ip_addresses = [] + hashes = [] + fqdns = [] + urls = [] + + # Define hash type priority. Lower value means higher priority. + hash_priority = OrderedDict([("SHA256", 0), ("SHA1", 1), ("MD5", 2)]) + + # Iterate over each indicator in the STIX package + for indicator in stix_package.indicators: + for observable in indicator.observables: + object_type = observable.object_.properties._XSI_TYPE + if object_type == "AddressObjectType": + # Convert cybox.common.properties.String to str + ip_addresses.append(str(observable.object_.properties.address_value)) + elif object_type == "FileObjectType": + hashes_dict = observable.object_.properties.hashes + if hashes_dict: + best_hash = None + best_priority = float("inf") + for h in hashes_dict: + if ( + h.type_.value in hash_priority + and hash_priority[h.type_.value] < best_priority + ): + best_hash = str(h.simple_hash_value.value) # Convert to str + best_priority = hash_priority[h.type_.value] + if best_hash is not None: + hashes.append(best_hash) + elif object_type == "DomainNameObjectType": + # Convert cybox.common.properties.String to str + fqdns.append(str(observable.object_.properties.value.value)) + elif object_type == "URIObjectType": + # Convert cybox.common.properties.String to str + urls.append(str(observable.object_.properties.value.value)) + + return ip_addresses, hashes, fqdns, urls + + +def sort_ip_address(ip): + """ + Take an IP address as input and return a tuple that can be used for sorting. + + Args: + ip (str): an IP address + + Returns: + tuple: a tuple containing two elements - the IP version (int) and the integer representation of the IP address (int). + """ + ip_version = ipaddress.ip_address(ip).version + ip_int = int(ipaddress.ip_address(ip)) + return ip_version, ip_int + + +def main(): + """Parse command line arguments and extract information from the STIX file.""" + # Parse command line arguments + args = docopt(__doc__, version="1.5.1") + # Extract data from the STIX file or from stdin + stix_file = ( + args[""] + if args[""] + else TextIOWrapper(sys.stdin.buffer, encoding="utf-8") + ) + # Extract data from the STIX file + ip_addresses, hashes, fqdns, urls = extract_stix_info(stix_file) + # Sort IP addresses naturally (by their integer representation) + ip_addresses.sort(key=sort_ip_address) + hashes.sort() + fqdns.sort() + urls.sort() + # Print IPs, hashes, FQDNs, and URLs with separators and titles + print(f"\n{'#' * 20}\n# IP Addresses\n{'#' * 20}\n") + for ip in ip_addresses: + print(ip) + print(f"\n{'#' * 20}\n# Hashes\n{'#' * 20}\n") + for hash in hashes: + print(hash) + print(f"\n{'#' * 20}\n# FQDNs\n{'#' * 20}\n") + for fqdn in fqdns: + print(fqdn) + print(f"\n{'#' * 20}\n# URLs\n{'#' * 20}\n") + for url in urls: + print(url) + + +if __name__ == "__main__": + sys.exit(main()) From 2e6a2804fdd66654088b8a4db744d3a63441e638 Mon Sep 17 00:00:00 2001 From: Felddy Date: Fri, 9 Jun 2023 17:11:52 -0400 Subject: [PATCH 2/8] Add required library and entry point --- setup.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 194791b..1e38227 100644 --- a/setup.py +++ b/setup.py @@ -90,7 +90,7 @@ def get_version(version_file): package_dir={"": "src"}, py_modules=[splitext(basename(path))[0] for path in glob("src/*.py")], include_package_data=True, - install_requires=["docopt", "setuptools >= 24.2.0"], + install_requires=["docopt", "setuptools >= 24.2.0", "stix"], extras_require={ "test": [ "coverage", @@ -107,11 +107,12 @@ def get_version(version_file): "pytest", ] }, - # Conveniently allows one to run the CLI tool as `ioc-scan` + # Conveniently allows one to run the CLI tools entry_points={ "console_scripts": [ "ioc-scan = ioc_scan.ioc_scan_cli:main", "ioc-scan-bare = ioc_scan.ioc_scanner:main", + "stix-extract = ioc_scan.stix_extract:main", ] }, ) From f7ffdeba665dfdb066eb6179252d3439ee0d827d Mon Sep 17 00:00:00 2001 From: Felddy Date: Fri, 9 Jun 2023 17:13:57 -0400 Subject: [PATCH 3/8] Update docs to include new util --- README.md | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/README.md b/README.md index e0e4f58..382fcef 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,43 @@ ansible --inventory=hosts-file cool-servers \ --become --ask-become-pass --user="ian.kilmister" ``` +## Helper utilities ## + +Additional helper tools and scripts are bundled with the ioc-scanner. + +### `stix-extract` ### + +```console +Extract valuable information from STIX (Structured Threat Information Expression) files. + +This script parses the STIX file to extract and print the following observables: +- IP addresses, which are associated with network indicators. +- Hashes (SHA256, SHA1, MD5) of files, prioritizing by hash type. +- Fully Qualified Domain Names (FQDNs), which can help identify associated domains. +- URLs, which could represent potential threat sources or command and control servers. + +The script prints each observable type in a separate section with a clear title for easy reading. + +Usage: + stix-extract [] + +Options: + -h --help Show this screen. + +Arguments: + file The path to the STIX xml file to parse. If not specified, reads from standard input. +``` + +The `stix-extract` utility can be used alone or in conjunction with the +`ioc-scan` tool to scan for IoCs in a STIX file. + +```console +curl https://www.cisa.gov/sites/default/files/2023-06/aa23-158a.stix_.xml \ +| stix-extract | ioc-scan --stdin --target=. +``` + +### `ioc_scan_by_host.sh` ### + To scan for indicator strings on AWS instances that are accessible via [SSM](https://docs.aws.amazon.com/systems-manager/latest/userguide/what-is-systems-manager.html), the `ioc_scan_by_host.sh` shell script has been provided in the `extras` From 5320414ccf90d5850f164b7d247464ca5e90cf8b Mon Sep 17 00:00:00 2001 From: Felddy Date: Fri, 9 Jun 2023 18:51:51 -0400 Subject: [PATCH 4/8] Add tests for stix-extract --- tests/test_stix_extract.py | 236 +++++++++++++++++++++++++++++++++++++ 1 file changed, 236 insertions(+) create mode 100644 tests/test_stix_extract.py diff --git a/tests/test_stix_extract.py b/tests/test_stix_extract.py new file mode 100644 index 0000000..947979a --- /dev/null +++ b/tests/test_stix_extract.py @@ -0,0 +1,236 @@ +"""Test the stix_extract module.""" +# Standard Python Libraries +import os +import sys +import tempfile +from unittest.mock import MagicMock, patch + +# Third-Party Libraries +import pytest + +# cisagov Libraries +import ioc_scan +from ioc_scan import stix_extract +from ioc_scan.stix_extract import extract_stix_info, sort_ip_address + +PROJECT_VERSION = ioc_scan.__version__ + + +def test_extract_stix_info_ip(): + """Test extracting IP addresses from a STIX package.""" + observable_mock = MagicMock() + observable_mock.object_.properties._XSI_TYPE = "AddressObjectType" + observable_mock.object_.properties.address_value = "127.0.0.1" + indicator_mock = MagicMock() + indicator_mock.observables = [observable_mock] + stix_package_mock = MagicMock() + stix_package_mock.indicators = [indicator_mock] + with patch( + "ioc_scan.stix_extract.STIXPackage.from_xml", return_value=stix_package_mock + ): + ip_addresses, hashes, fqdns, urls = extract_stix_info("stix_file") + assert ip_addresses == ["127.0.0.1"] + assert hashes == [] + assert fqdns == [] + assert urls == [] + + +def test_sort_ip_address(): + """Test sorting IP addresses.""" + result = sort_ip_address("127.0.0.1") + assert result == (4, 2130706433) + + +@pytest.fixture +def mock_domain_observable(): + """Return a mock STIX DomainNameObjectType observable.""" + observable = MagicMock() + observable.object_.properties._XSI_TYPE = "DomainNameObjectType" + observable.object_.properties.value.value = "www.example.com" + return observable + + +@pytest.fixture +def mock_uri_observable(): + """Return a mock STIX URIObjectType observable.""" + observable = MagicMock() + observable.object_.properties._XSI_TYPE = "URIObjectType" + observable.object_.properties.value.value = "www.example.com/path" + return observable + + +def test_extract_stix_info_with_domain_and_uri_observables( + mock_domain_observable, mock_uri_observable +): + """Test extracting FQDNs and URLs from a STIX package.""" + stix_package_mock = MagicMock() + stix_package_mock.indicators = [ + MagicMock(observables=[mock_domain_observable, mock_uri_observable]) + ] + with patch( + "ioc_scan.stix_extract.STIXPackage.from_xml", return_value=stix_package_mock + ): + ips, hashes, fqdns, urls = extract_stix_info("fake_file.xml") + assert fqdns == ["www.example.com"] + assert urls == ["www.example.com/path"] + + +@pytest.fixture +def mock_hash_observable_with_valid_types(): + """Return a mock STIX FileObjectType observable with valid hash types.""" + observable = MagicMock() + observable.object_.properties._XSI_TYPE = "FileObjectType" + observable.object_.properties.hashes = [ + MagicMock( + type_=MagicMock(value="SHA1"), + simple_hash_value=MagicMock(value="SHA1_HASH"), + ), + MagicMock( + type_=MagicMock(value="MD5"), simple_hash_value=MagicMock(value="MD5_HASH") + ), + MagicMock( + type_=MagicMock(value="SHA256"), + simple_hash_value=MagicMock(value="SHA256_HASH"), + ), + ] + return observable + + +@pytest.fixture +def mock_hash_observable_with_invalid_types(): + """Return a mock STIX FileObjectType observable with invalid hash types.""" + observable = MagicMock() + observable.object_.properties._XSI_TYPE = "FileObjectType" + observable.object_.properties.hashes = [ + MagicMock( + type_=MagicMock(value="INVALID1"), + simple_hash_value=MagicMock(value="INVALID1_HASH"), + ), + MagicMock( + type_=MagicMock(value="INVALID2"), + simple_hash_value=MagicMock(value="INVALID2_HASH"), + ), + MagicMock( + type_=MagicMock(value="INVALID3"), + simple_hash_value=MagicMock(value="INVALID3_HASH"), + ), + ] + return observable + + +@pytest.mark.parametrize( + "hash_observable, expected", + [ + ("mock_hash_observable_with_valid_types", ["SHA256_HASH"]), + ("mock_hash_observable_with_invalid_types", []), + ], +) +def test_extract_stix_info_with_hash_observable(hash_observable, expected, request): + """Test extracting hashes from a STIX package.""" + mock_hash_observable = request.getfixturevalue(hash_observable) + stix_package_mock = MagicMock() + stix_package_mock.indicators = [MagicMock(observables=[mock_hash_observable])] + with patch( + "ioc_scan.stix_extract.STIXPackage.from_xml", return_value=stix_package_mock + ): + ips, hashes, fqdns, urls = extract_stix_info("fake_file.xml") + assert hashes == expected + + +def test_extract_stix_info_with_invalid_stix_file(): + """Test invalid filename.""" + with pytest.raises(Exception): + extract_stix_info("invalid.stix") + + +def test_extract_stix_info_with_unexpected_object_type(): + """Test extracting observables from a STIX package with an unexpected object type.""" + observable_mock = MagicMock() + observable_mock.object_.properties._XSI_TYPE = "UnexpectedObjectType" + indicator_mock = MagicMock() + indicator_mock.observables = [observable_mock] + stix_package_mock = MagicMock() + stix_package_mock.indicators = [indicator_mock] + with patch( + "ioc_scan.stix_extract.STIXPackage.from_xml", return_value=stix_package_mock + ): + ip_addresses, hashes, fqdns, urls = extract_stix_info("stix_file") + assert ip_addresses == [] + assert hashes == [] + assert fqdns == [] + assert urls == [] + + +def test_extract_stix_info_with_file_object_without_hashes(): + """Test extracting observables from a STIX package where the file object does not have hashes.""" + observable_mock = MagicMock() + observable_mock.object_.properties._XSI_TYPE = "FileObjectType" + observable_mock.object_.properties.hashes = None + indicator_mock = MagicMock() + indicator_mock.observables = [observable_mock] + stix_package_mock = MagicMock() + stix_package_mock.indicators = [indicator_mock] + with patch( + "ioc_scan.stix_extract.STIXPackage.from_xml", return_value=stix_package_mock + ): + ip_addresses, hashes, fqdns, urls = extract_stix_info("stix_file") + assert ip_addresses == [] + assert hashes == [] + assert fqdns == [] + assert urls == [] + + +def test_version(capsys): + """Verify that version string sent to stdout, and agrees with the module.""" + with pytest.raises(SystemExit): + with patch.object(sys, "argv", ["bogus", "--version"]): + stix_extract.main() + captured = capsys.readouterr() + assert ( + captured.out == f"{PROJECT_VERSION}\n" + ), "standard output by '--version' should agree with module.__version__" + + +def test_help(capsys): + """Verify that the help text is sent to stdout.""" + with pytest.raises(SystemExit): + with patch.object(sys, "argv", ["bogus", "--help"]): + stix_extract.main() + captured = capsys.readouterr() + assert ( + "This script parses" in captured.out + ), "help text did not have expected string" + + +def test_main(): + """Test the main function of the script.""" + # Mock the command line arguments + with patch("ioc_scan.stix_extract.docopt") as mock_docopt: + # Create a temporary STIX file + temp_file = tempfile.NamedTemporaryFile(delete=False) + temp_file.write(b"") # Minimal XML content to prevent parse errors + temp_file.close() + + mock_docopt.return_value = {"": temp_file.name} + + # Mock the extraction function to return some test data + with patch("ioc_scan.stix_extract.extract_stix_info") as mock_extract: + mock_extract.return_value = ( + ["1.1.1.1", "2.2.2.2"], + ["hash1", "hash2"], + ["fqdn1", "fqdn2"], + ["url1", "url2"], + ) + + # Mock the print function to do nothing + with patch("builtins.print") as mock_print: + stix_extract.main() + + # Verify the mock calls. + mock_docopt.assert_called_once_with( + stix_extract.__doc__, version=PROJECT_VERSION + ) + mock_extract.assert_called_once_with(temp_file.name) + assert mock_print.call_count == 12 # Check how many times print is called + + os.unlink(temp_file.name) # Delete the temporary file From a040345bf99cf25d2e6b5f4bc54fb55a5f386291 Mon Sep 17 00:00:00 2001 From: Felddy Date: Sat, 10 Jun 2023 00:00:22 -0400 Subject: [PATCH 5/8] Optimise calculation by reusing ip object --- src/ioc_scan/stix_extract.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/ioc_scan/stix_extract.py b/src/ioc_scan/stix_extract.py index 5c562d3..147b2d6 100755 --- a/src/ioc_scan/stix_extract.py +++ b/src/ioc_scan/stix_extract.py @@ -101,9 +101,8 @@ def sort_ip_address(ip): Returns: tuple: a tuple containing two elements - the IP version (int) and the integer representation of the IP address (int). """ - ip_version = ipaddress.ip_address(ip).version - ip_int = int(ipaddress.ip_address(ip)) - return ip_version, ip_int + ip_obj = ipaddress.ip_address(ip) + return (ip_obj.version, int(ip_obj)) def main(): From de9c841f3a143a869750382e7b105832cd3f3655 Mon Sep 17 00:00:00 2001 From: Felddy Date: Sat, 10 Jun 2023 00:03:45 -0400 Subject: [PATCH 6/8] Direct error message to stderr --- src/ioc_scan/stix_extract.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ioc_scan/stix_extract.py b/src/ioc_scan/stix_extract.py index 147b2d6..18526f9 100755 --- a/src/ioc_scan/stix_extract.py +++ b/src/ioc_scan/stix_extract.py @@ -48,7 +48,7 @@ def extract_stix_info(stix_file): try: stix_package = STIXPackage.from_xml(stix_file) except Exception as e: - print(f"Error parsing STIX file: {e}") + sys.stderr.write(f"Error parsing STIX file: {e}") raise e # Initialize lists to store IP addresses, hashes, FQDNs, and URLs From 971c4fb01eddb01979806a72c6190855308792a5 Mon Sep 17 00:00:00 2001 From: Felddy Date: Sat, 10 Jun 2023 00:08:08 -0400 Subject: [PATCH 7/8] Normalize version usage across tools --- src/ioc_scan/stix_extract.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ioc_scan/stix_extract.py b/src/ioc_scan/stix_extract.py index 18526f9..53b3b1a 100755 --- a/src/ioc_scan/stix_extract.py +++ b/src/ioc_scan/stix_extract.py @@ -31,6 +31,8 @@ from docopt import docopt from stix.core import STIXPackage +from ._version import __version__ + def extract_stix_info(stix_file): """ @@ -108,7 +110,7 @@ def sort_ip_address(ip): def main(): """Parse command line arguments and extract information from the STIX file.""" # Parse command line arguments - args = docopt(__doc__, version="1.5.1") + args = docopt(__doc__, version=__version__) # Extract data from the STIX file or from stdin stix_file = ( args[""] From 1654c509acd624f02b275230e2432b882118abad Mon Sep 17 00:00:00 2001 From: Felddy Date: Sat, 10 Jun 2023 00:08:43 -0400 Subject: [PATCH 8/8] Bump version patch --- src/ioc_scan/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ioc_scan/_version.py b/src/ioc_scan/_version.py index 14b98b6..caf37cf 100644 --- a/src/ioc_scan/_version.py +++ b/src/ioc_scan/_version.py @@ -1,2 +1,2 @@ """This file defines the version of this module.""" -__version__ = "1.5.1" +__version__ = "1.5.2"