diff --git a/src/config/fact-core-config.toml b/src/config/fact-core-config.toml index 6a878149c..45c057266 100644 --- a/src/config/fact-core-config.toml +++ b/src/config/fact-core-config.toml @@ -103,6 +103,10 @@ delay = 0.0 name = "cpu_architecture" processes = 4 +[[backend.plugin]] +name = "uefi" +processes = 4 + [[backend.plugin]] name = "cve_lookup" processes = 4 diff --git a/src/plugins/analysis/uefi/__init__.py b/src/plugins/analysis/uefi/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/plugins/analysis/uefi/code/__init__.py b/src/plugins/analysis/uefi/code/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/plugins/analysis/uefi/code/uefi.py b/src/plugins/analysis/uefi/code/uefi.py new file mode 100644 index 000000000..4e4eb517c --- /dev/null +++ b/src/plugins/analysis/uefi/code/uefi.py @@ -0,0 +1,161 @@ +from __future__ import annotations + +import json +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import List, Optional, TYPE_CHECKING + +from pydantic import BaseModel, Field + +from analysis.plugin import AnalysisPluginV0, Tag +from analysis.plugin.compat import AnalysisBasePluginAdapterMixin +from helperFunctions.docker import run_docker_container + +from docker.types import Mount + +from helperFunctions.tag import TagColor + +if TYPE_CHECKING: + from io import FileIO + +DOCKER_IMAGE = 'fact/uefi' + + +class Variant(BaseModel): + name: str = Field(description='The name of the vulnerability variant') + match: bool = Field(description='Whether there was a match for this vulnerability') + output: str = Field(description='The output of FwHunt') + + +class Rule(BaseModel): + name: str = Field(description='The name of the rule') + category: str = Field(description='The rule category (e.g. vulnerabilities or mitigation failures)') + author: Optional[str] = Field(None, description='The Author of the rule') + description: Optional[str] = Field(None, description='The description of the rule/vulnerability') + url: Optional[str] = Field(None, description='A link with more information for this rule/vulnerability') + cve: Optional[str] = Field(None, description='A list of related CVEs') + architecture: Optional[str] = Field(None, description='The affected architecture') + variants: List[Variant] = Field(description='The list of variants with matching information') + + +class Schema(BaseModel): + vulnerabilities: List[Rule] = Field(description='A list of UEFI vulnerabilities') + + +class UefiPluginError(Exception): + pass + + +class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin): + def __init__(self): + super().__init__( + metadata=AnalysisPluginV0.MetaData( + name='uefi', + description='find vulnerabilities in UEFI modules using the tool FwHunt', + dependencies=['file_type'], + version='0.1.0', + Schema=Schema, + mime_whitelist=['application/x-dosexec', 'firmware/uefi'], + ), + ) + + def analyze( + self, + file_handle: FileIO, + virtual_file_path: dict[str, list[str]], + analyses: dict[str, BaseModel], + ) -> Schema | None: + del virtual_file_path + + type_analysis = analyses['file_type'] + if _is_no_uefi_module(type_analysis): + # only EFI modules are analyzed, not regular PE files + return None + + return self._analyze_uefi_module(file_handle.name, _get_analysis_mode(type_analysis.mime)) + + def _analyze_uefi_module(self, path: str, mode: str) -> Schema | None: + with TemporaryDirectory() as tmp_dir: + output_file = Path(tmp_dir) / 'output.json' + output_file.touch() + run_docker_container( + DOCKER_IMAGE, + combine_stderr_stdout=True, + timeout=self.metadata.timeout, + mounts=[ + Mount('/input/file', path, type='bind'), + Mount('/output/file', str(output_file), type='bind'), + ], + environment={'UEFI_ANALYSIS_MODE': mode}, + ) + try: + return _convert_json_to_schema(json.loads(output_file.read_text())) + except json.JSONDecodeError as error: + raise UefiPluginError('Could not load container output') from error + + def summarize(self, result: Schema) -> list[str]: + summary = set() + for rule in result.vulnerabilities: + for variant in rule.variants: + if variant.match: + summary.add(rule.category) + continue + return sorted(summary) + + def get_tags(self, result: Schema, summary: list[str]) -> list[Tag]: + del result + return [ + Tag( + name=category, + value='UEFI vulnerability', + color=TagColor.ORANGE, + propagate=True, + ) + for category in summary + ] + + +def _convert_json_to_schema(fw_hunt_data: dict[str, dict]) -> Schema: + """ + The output of the docker container has the following structure: + { + : { + category: ..., + [author: ...,] + [description: ...,] + [url: ...,] + variants: { + : { + output: ..., + match: ... + }, + ... + }, + }, + ... + } + """ + vulnerabilities = [ + Rule( + name=rule_name, + category=data['category'], + author=data.get('author'), + description=data.get('description'), + url=data.get('url'), + architecture=data.get('architecture'), + cve=data.get('CVE'), + variants=[ + Variant(name=variant_name, **variant_data) for variant_name, variant_data in data['variants'].items() + ], + ) + for rule_name, data in fw_hunt_data.items() + ] + return Schema(vulnerabilities=vulnerabilities) + + +def _is_no_uefi_module(type_analysis: BaseModel) -> bool: + return type_analysis.mime == 'application/x-dosexec' and 'EFI boot service driver' not in type_analysis.full + + +def _get_analysis_mode(mime: str) -> str: + return 'firmware' if mime == 'firmware/uefi' else 'module' diff --git a/src/plugins/analysis/uefi/docker/Dockerfile b/src/plugins/analysis/uefi/docker/Dockerfile new file mode 100644 index 000000000..3d838517f --- /dev/null +++ b/src/plugins/analysis/uefi/docker/Dockerfile @@ -0,0 +1,27 @@ +FROM alpine:3.18 + +# install rizin +ARG rizin_version="v0.6.2" +ARG ARCHIVE="rizin-${rizin_version}-static-x86_64.tar.xz" +RUN wget https://github.com/rizinorg/rizin/releases/download/${rizin_version}/${ARCHIVE} && \ + tar xf ${ARCHIVE} && \ + rm ${ARCHIVE} + +# clone FwHunt rules +WORKDIR /work/FwHunt +ARG fwhunt_sha="1f684f1d0d38ba061988c39e0ac4d43eaeec0e50" +RUN apk add --virtual --no-cache git && \ + git init && \ + git remote add origin https://github.com/binarly-io/fwhunt && \ + git fetch --depth 1 origin ${fwhunt_sha} && \ + git checkout FETCH_HEAD && \ + apk del git + +# install fwhunt-scan & python +RUN apk add --virtual --no-cache python3 py3-pip && \ + python3 -m pip install --no-cache-dir fwhunt-scan && \ + apk del py3-pip + +COPY scan.py . + +ENTRYPOINT ["/work/FwHunt/scan.py"] diff --git a/src/plugins/analysis/uefi/docker/scan.py b/src/plugins/analysis/uefi/docker/scan.py new file mode 100755 index 000000000..c11dd8bf6 --- /dev/null +++ b/src/plugins/analysis/uefi/docker/scan.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import json +import os +import re +import sys +import yaml +from pathlib import Path +from shlex import split +from subprocess import run + +RULE_SUFFIXES = ['.yml', '.yaml'] +RULES = Path(__file__).parent / 'rules' +INPUT_FILE = Path('/input/file') +OUTPUT_FILE = Path('/output/file') +BLACKLIST = [ + 'RsbStuffingCheck.yml', # too many false positives +] +CLI_COLOR_REGEX = re.compile(rb'\x1b\[\d{1,3}m') +RESULT_PARSING_REGEX = re.compile(r'Scanner result ([^\n]+?) \(variant: ([^\n]+?)\) ([^(]+?)(?: \(|\n|$)') +NO_MATCH_STR = 'No threat detected' + + +def main(): + _validate_setup() + rule_files = _find_rule_files() + _scan_file(_load_rules(rule_files), rule_files) + + +def _validate_setup(): + if not INPUT_FILE.is_file(): + print('error: input file not found') + sys.exit(1) + if not RULES.is_dir(): + print('error: rules dir not found') + sys.exit(2) + + +def _find_rule_files() -> list[Path]: + return [file for file in RULES.glob('**/*') if _is_rule_file(file) and file.name not in BLACKLIST] + + +def _load_rules(rule_files: list[Path]) -> dict[str, dict]: + """ + Rule structure should look something like this: + { + "": { + "meta": { + "author": "...", + "name": "...", + "namespace": "", + "description": "...", + "url": "...", + "CVE number": "...", + "advisory": "...", + ... + }, + "variants": { + "": { + "": {...} + }, + ... + } + } + } + """ + rules = {} + for file in rule_files: + with file.open('rb') as fp: + rule_data = yaml.safe_load(fp) + for rule_dict in rule_data.values(): + rules[rule_dict['meta']['name']] = rule_dict + return rules + + +def _scan_file(rules: dict[str, dict], rule_files: list[Path]): + rules_str = ' '.join(f'-r {file}' for file in rule_files) + mode = os.environ.get('UEFI_ANALYSIS_MODE', default='module') + proc = run( + split(f'fwhunt_scan_analyzer.py scan-{mode} {INPUT_FILE} {rules_str}'), + capture_output=True, + ) + if proc.returncode != 0: + print(f'warning: Scan exited with return code {proc.returncode}: {proc.stderr}') + else: + output = CLI_COLOR_REGEX.sub(b'', proc.stdout).decode(errors='replace') + result = _parse_output(output, rules) + OUTPUT_FILE.write_text(json.dumps(result)) + + +def _parse_output(output: str, rules: dict[str, dict]) -> dict[str, dict]: + result = {} + for rule_name, variant, detected in RESULT_PARSING_REGEX.findall(output): + rule_data = rules.get(rule_name) + if rule_data is None: + print(f'error: rule {rule_name} not found') + sys.exit(3) + result.setdefault( + rule_name, + { + 'category': rule_data['meta']['namespace'], + 'description': rule_data['meta'].get('description'), + 'author': rule_data['meta'].get('author'), + 'url': rule_data['meta'].get('url', rule_data['meta'].get('advisory')), + 'CVE': rule_data['meta'].get('CVE number'), + 'architecture': rule_data['meta'].get('architecture'), + 'variants': {}, + }, + ) + result[rule_name]['variants'][variant] = { + 'output': detected, + 'match': NO_MATCH_STR not in detected, + } + return result + + +def _is_rule_file(rule: Path) -> bool: + return rule.is_file() and rule.suffix in RULE_SUFFIXES + + +if __name__ == '__main__': + main() diff --git a/src/plugins/analysis/uefi/install.py b/src/plugins/analysis/uefi/install.py new file mode 100755 index 000000000..66f11b8de --- /dev/null +++ b/src/plugins/analysis/uefi/install.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 + +import logging +from pathlib import Path + +try: + from helperFunctions.install import run_cmd_with_logging + from plugins.installer import AbstractPluginInstaller +except ImportError: + import sys + + SRC_PATH = Path(__file__).absolute().parent.parent.parent.parent + sys.path.append(str(SRC_PATH)) + + from helperFunctions.install import run_cmd_with_logging + from plugins.installer import AbstractPluginInstaller + + +class UefiInstaller(AbstractPluginInstaller): + base_path = Path(__file__).resolve().parent + + def install_docker_images(self): + run_cmd_with_logging('docker build -t fact/uefi ./docker') + + +# Alias for generic use +Installer = UefiInstaller + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + Installer().install() diff --git a/src/plugins/analysis/uefi/test/__init__.py b/src/plugins/analysis/uefi/test/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/plugins/analysis/uefi/test/data/test_file.pe b/src/plugins/analysis/uefi/test/data/test_file.pe new file mode 100644 index 000000000..34925606c Binary files /dev/null and b/src/plugins/analysis/uefi/test/data/test_file.pe differ diff --git a/src/plugins/analysis/uefi/test/test_plugin_uefi.py b/src/plugins/analysis/uefi/test/test_plugin_uefi.py new file mode 100644 index 000000000..699e47dbb --- /dev/null +++ b/src/plugins/analysis/uefi/test/test_plugin_uefi.py @@ -0,0 +1,36 @@ +from io import FileIO +from pathlib import Path + +import pytest + +from ..code.uefi import AnalysisPlugin, Schema +from plugins.analysis.file_type.code.file_type import AnalysisPlugin as FileType + +TEST_FILE = Path(__file__).parent / 'data' / 'test_file.pe' + + +@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin) +class TestFileSystemMetadata: + def test_analyze_summarize_and_tag(self, analysis_plugin): + assert TEST_FILE.is_file(), 'test file is missing' + dependencies = { + 'file_type': FileType.Schema( + mime='application/x-dosexec', + full='MS-DOS executable PE32+ executable (DLL) (EFI boot service driver) x86-64, for MS Windows', + ) + } + result = analysis_plugin.analyze(FileIO(str(TEST_FILE)), {}, dependencies) + assert isinstance(result, Schema) + assert len(result.vulnerabilities) > 0 + + rules_by_name = {r.name: r for r in result.vulnerabilities} + assert 'BRLY-2021-007' in rules_by_name + matching_rule = rules_by_name['BRLY-2021-007'] + assert matching_rule.variants[0].match is True, 'rule did not match' + + summary = analysis_plugin.summarize(result) + assert summary == [matching_rule.category] + + tags = analysis_plugin.get_tags(result, summary) + assert len(tags) == 1 + assert tags[0].name == matching_rule.category diff --git a/src/plugins/analysis/uefi/view/uefi.html b/src/plugins/analysis/uefi/view/uefi.html new file mode 100644 index 000000000..d247ae7d1 --- /dev/null +++ b/src/plugins/analysis/uefi/view/uefi.html @@ -0,0 +1,55 @@ +{% extends "analysis_plugins/general_information.html" %} + +{% block analysis_result_details %} + {% for category, result_list in analysis_result.vulnerabilities | groupby("category") %} + + {{ category | safe }} + + + {% for rule_results in result_list | sort(attribute="name") %} + + + + + {% endfor %} +
{{ rule_results.name | safe }} + + {% if rule_results.url %} + + + + + {% endif %} + {% for key in ["author", "description", "cve", "architecture"] %} + {% if rule_results.get(key) %} + + + + + {% endif %} + {% endfor %} + + + + +
url{{ rule_results.url | safe }}
{{ key }}{{ rule_results.get(key) | link_cve | safe }}
variants + + {% for variant_data in rule_results.variants %} + + + {% if variant_data.match %} + + {% else %} + + {% endif %} + + {% endfor %} +
{{ variant_data.name | safe }}{{ variant_data.output | safe }}{{ variant_data.output | safe }}
+
+
+ + + {% endfor %} + +{% endblock %} + diff --git a/src/storage/db_interface_common.py b/src/storage/db_interface_common.py index 70815cc73..8a968de09 100644 --- a/src/storage/db_interface_common.py +++ b/src/storage/db_interface_common.py @@ -32,6 +32,7 @@ 'known_vulnerabilities', 'qemu_exec', 'software_components', + 'uefi', 'users_and_passwords', ] Summary = Dict[str, List[str]]