Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uefi plugin #1143

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
4 changes: 4 additions & 0 deletions src/config/fact-core-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ delay = 0.0
name = "cpu_architecture"
processes = 4

[[backend.plugin]]
name = "uefi"
processes = 4

[[backend.plugin]]
name = "cve_lookup"
processes = 4
Expand Down
Empty file.
Empty file.
161 changes: 161 additions & 0 deletions src/plugins/analysis/uefi/code/uefi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from __future__ import annotations

import json
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import List, Optional, TYPE_CHECKING

from pydantic import BaseModel, Field

from analysis.plugin import AnalysisPluginV0, Tag
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from helperFunctions.docker import run_docker_container

from docker.types import Mount

from helperFunctions.tag import TagColor

if TYPE_CHECKING:
from io import FileIO

DOCKER_IMAGE = 'fact/uefi'


class Variant(BaseModel):
name: str = Field(description='The name of the vulnerability variant')
match: bool = Field(description='Whether there was a match for this vulnerability')
output: str = Field(description='The output of FwHunt')


class Rule(BaseModel):
name: str = Field(description='The name of the rule')
category: str = Field(description='The rule category (e.g. vulnerabilities or mitigation failures)')
author: Optional[str] = Field(None, description='The Author of the rule')
description: Optional[str] = Field(None, description='The description of the rule/vulnerability')
url: Optional[str] = Field(None, description='A link with more information for this rule/vulnerability')
cve: Optional[str] = Field(None, description='A list of related CVEs')
architecture: Optional[str] = Field(None, description='The affected architecture')
variants: List[Variant] = Field(description='The list of variants with matching information')


class Schema(BaseModel):
vulnerabilities: List[Rule] = Field(description='A list of UEFI vulnerabilities')


class UefiPluginError(Exception):
pass


class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
def __init__(self):
super().__init__(
metadata=AnalysisPluginV0.MetaData(
name='uefi',
description='find vulnerabilities in UEFI modules using the tool FwHunt',
dependencies=['file_type'],
version='0.1.0',
Schema=Schema,
mime_whitelist=['application/x-dosexec', 'firmware/uefi'],
),
)

def analyze(
self,
file_handle: FileIO,
virtual_file_path: dict[str, list[str]],
analyses: dict[str, BaseModel],
) -> Schema | None:
del virtual_file_path

type_analysis = analyses['file_type']
if _is_no_uefi_module(type_analysis):
# only EFI modules are analyzed, not regular PE files
return None

return self._analyze_uefi_module(file_handle.name, _get_analysis_mode(type_analysis.mime))

def _analyze_uefi_module(self, path: str, mode: str) -> Schema | None:
with TemporaryDirectory() as tmp_dir:
output_file = Path(tmp_dir) / 'output.json'
output_file.touch()
run_docker_container(
DOCKER_IMAGE,
combine_stderr_stdout=True,
timeout=self.TIMEOUT,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of the Mixin and should not be used.
Also I'd rather use functions instead of methods since plugins don't maintain state.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops I didn't intend to use it. This is more of an artifact from the pre-V1 plugin

mounts=[
Mount('/input/file', path, type='bind'),
Mount('/output/file', str(output_file), type='bind'),
],
environment={'UEFI_ANALYSIS_MODE': mode},
)
try:
return _convert_json_to_schema(json.loads(output_file.read_text()))
except json.JSONDecodeError as error:
raise UefiPluginError('Could not load container output') from error

def summarize(self, result: Schema) -> list[str]:
summary = set()
for rule in result.vulnerabilities:
for variant in rule.variants:
if variant.match:
summary.add(rule.category)
continue
return sorted(summary)

def get_tags(self, result: Schema, summary: list[str]) -> list[Tag]:
del result
return [
Tag(
name=category,
value='UEFI vulnerability',
color=TagColor.ORANGE,
propagate=True,
)
for category in summary
]


def _convert_json_to_schema(fw_hunt_data: dict[str, dict]) -> Schema:
"""
The output of the docker container has the following structure:
{
<rule_name>: {
category: ...,
[author: ...,]
[description: ...,]
[url: ...,]
variants: {
<name>: {
output: ...,
match: ...
},
...
},
},
...
}
"""
vulnerabilities = [
Rule(
name=rule_name,
category=data['category'],
author=data.get('author'),
description=data.get('description'),
url=data.get('url'),
architecture=data.get('architecture'),
cve=data.get('CVE'),
variants=[
Variant(name=variant_name, **variant_data) for variant_name, variant_data in data['variants'].items()
],
)
for rule_name, data in fw_hunt_data.items()
]
return Schema(vulnerabilities=vulnerabilities)


def _is_no_uefi_module(type_analysis: BaseModel) -> bool:
return type_analysis.mime == 'application/x-dosexec' and 'EFI boot service driver' not in type_analysis.full


def _get_analysis_mode(mime: str) -> str:
return 'firmware' if mime == 'firmware/uefi' else 'module'
27 changes: 27 additions & 0 deletions src/plugins/analysis/uefi/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
FROM alpine:3.18

# install rizin
ARG rizin_version="v0.6.2"
ARG ARCHIVE="rizin-${rizin_version}-static-x86_64.tar.xz"
RUN wget https://github.com/rizinorg/rizin/releases/download/${rizin_version}/${ARCHIVE} && \
tar xf ${ARCHIVE} && \
rm ${ARCHIVE}

# clone FwHunt rules
WORKDIR /work/FwHunt
ARG fwhunt_sha="1f684f1d0d38ba061988c39e0ac4d43eaeec0e50"
RUN apk add --virtual --no-cache git && \
git init && \
git remote add origin https://github.com/binarly-io/fwhunt && \
git fetch --depth 1 origin ${fwhunt_sha} && \
git checkout FETCH_HEAD && \
apk del git

# install fwhunt-scan & python
RUN apk add --virtual --no-cache python3 py3-pip && \
python3 -m pip install --no-cache-dir fwhunt-scan && \
apk del py3-pip

COPY scan.py .

ENTRYPOINT ["/work/FwHunt/scan.py"]
123 changes: 123 additions & 0 deletions src/plugins/analysis/uefi/docker/scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#!/usr/bin/env python3
from __future__ import annotations

import json
import os
import re
import sys
import yaml
from pathlib import Path
from shlex import split
from subprocess import run

RULE_SUFFIXES = ['.yml', '.yaml']
RULES = Path(__file__).parent / 'rules'
INPUT_FILE = Path('/input/file')
OUTPUT_FILE = Path('/output/file')
BLACKLIST = [
'RsbStuffingCheck.yml', # too many false positives
]
CLI_COLOR_REGEX = re.compile(rb'\x1b\[\d{1,3}m')
RESULT_PARSING_REGEX = re.compile(r'Scanner result ([^\n]+?) \(variant: ([^\n]+?)\) ([^(]+?)(?: \(|\n|$)')
NO_MATCH_STR = 'No threat detected'


def main():
_validate_setup()
rule_files = _find_rule_files()
_scan_file(_load_rules(rule_files), rule_files)


def _validate_setup():
if not INPUT_FILE.is_file():
print('error: input file not found')
sys.exit(1)
if not RULES.is_dir():
print('error: rules dir not found')
sys.exit(2)


def _find_rule_files() -> list[Path]:
return [file for file in RULES.glob('**/*') if _is_rule_file(file) and file.name not in BLACKLIST]


def _load_rules(rule_files: list[Path]) -> dict[str, dict]:
"""
Rule structure should look something like this:
{
"<rule_name>": {
"meta": {
"author": "...",
"name": "...",
"namespace": "<rule_type>",
"description": "...",
"url": "...",
"CVE number": "...",
"advisory": "...",
...
},
"variants": {
"<variant_name>": {
"<requirement>": {...}
},
...
}
}
}
"""
rules = {}
for file in rule_files:
with file.open('rb') as fp:
rule_data = yaml.safe_load(fp)
for rule_dict in rule_data.values():
rules[rule_dict['meta']['name']] = rule_dict
return rules


def _scan_file(rules: dict[str, dict], rule_files: list[Path]):
rules_str = ' '.join(f'-r {file}' for file in rule_files)
mode = os.environ.get('UEFI_ANALYSIS_MODE', default='module')
proc = run(
split(f'fwhunt_scan_analyzer.py scan-{mode} {INPUT_FILE} {rules_str}'),
capture_output=True,
)
if proc.returncode != 0:
print(f'warning: Scan exited with return code {proc.returncode}: {proc.stderr}')
else:
output = CLI_COLOR_REGEX.sub(b'', proc.stdout).decode(errors='replace')
result = _parse_output(output, rules)
OUTPUT_FILE.write_text(json.dumps(result))


def _parse_output(output: str, rules: dict[str, dict]) -> dict[str, dict]:
result = {}
for rule_name, variant, detected in RESULT_PARSING_REGEX.findall(output):
rule_data = rules.get(rule_name)
if rule_data is None:
print(f'error: rule {rule_name} not found')
sys.exit(3)
result.setdefault(
rule_name,
{
'category': rule_data['meta']['namespace'],
'description': rule_data['meta'].get('description'),
'author': rule_data['meta'].get('author'),
'url': rule_data['meta'].get('url', rule_data['meta'].get('advisory')),
'CVE': rule_data['meta'].get('CVE number'),
'architecture': rule_data['meta'].get('architecture'),
'variants': {},
},
)
result[rule_name]['variants'][variant] = {
'output': detected,
'match': NO_MATCH_STR not in detected,
}
return result


def _is_rule_file(rule: Path) -> bool:
return rule.is_file() and rule.suffix in RULE_SUFFIXES


if __name__ == '__main__':
main()
31 changes: 31 additions & 0 deletions src/plugins/analysis/uefi/install.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/env python3

import logging
from pathlib import Path

try:
from helperFunctions.install import run_cmd_with_logging
from plugins.installer import AbstractPluginInstaller
except ImportError:
import sys

SRC_PATH = Path(__file__).absolute().parent.parent.parent.parent
sys.path.append(str(SRC_PATH))

from helperFunctions.install import run_cmd_with_logging
from plugins.installer import AbstractPluginInstaller


class UefiInstaller(AbstractPluginInstaller):
base_path = Path(__file__).resolve().parent

def install_docker_images(self):
run_cmd_with_logging('docker build -t fact/uefi ./docker')


# Alias for generic use
Installer = UefiInstaller

if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
Installer().install()
Empty file.
Binary file added src/plugins/analysis/uefi/test/data/test_file.pe
Binary file not shown.
36 changes: 36 additions & 0 deletions src/plugins/analysis/uefi/test/test_plugin_uefi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from io import FileIO
from pathlib import Path

import pytest

from ..code.uefi import AnalysisPlugin, Schema
from plugins.analysis.file_type.code.file_type import AnalysisPlugin as FileType

TEST_FILE = Path(__file__).parent / 'data' / 'test_file.pe'


@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin)
class TestFileSystemMetadata:
def test_analyze_summarize_and_tag(self, analysis_plugin):
assert TEST_FILE.is_file(), 'test file is missing'
dependencies = {
'file_type': FileType.Schema(
mime='application/x-dosexec',
full='MS-DOS executable PE32+ executable (DLL) (EFI boot service driver) x86-64, for MS Windows',
)
}
result = analysis_plugin.analyze(FileIO(str(TEST_FILE)), {}, dependencies)
assert isinstance(result, Schema)
assert len(result.vulnerabilities) > 0

rules_by_name = {r.name: r for r in result.vulnerabilities}
assert 'BRLY-2021-007' in rules_by_name
matching_rule = rules_by_name['BRLY-2021-007']
assert matching_rule.variants[0].match is True, 'rule did not match'

summary = analysis_plugin.summarize(result)
assert summary == [matching_rule.category]

tags = analysis_plugin.get_tags(result, summary)
assert len(tags) == 1
assert tags[0].name == matching_rule.category
Loading
Loading