diff --git a/docsrc/conf.py b/docsrc/conf.py index e8acf6f98..40c3c2f32 100644 --- a/docsrc/conf.py +++ b/docsrc/conf.py @@ -69,7 +69,6 @@ 'docker', 'docker-compose', 'email-validator', - 'fact_helper_file', 'flaky', 'flask', 'flask_login', diff --git a/src/helperFunctions/magic.py b/src/helperFunctions/magic.py new file mode 100644 index 000000000..aee7db78f --- /dev/null +++ b/src/helperFunctions/magic.py @@ -0,0 +1,46 @@ +"""This is a wrapper around pymagic. +It aims to provide the same API but with the ability to load multiple magic +files in the default api. +""" +from __future__ import annotations + +import os +from os import PathLike + +import magic as pymagic + +from helperFunctions.fileSystem import get_src_dir + +# On ubuntu this is provided by the libmagic-mgc package +_default_magic = os.getenv('MAGIC', '/usr/lib/file/magic.mgc') +_fact_magic = f'{get_src_dir()}/bin/firmware.mgc' +_internal_symlink_magic = f'{get_src_dir()}/bin/internal_symlink_magic.mgc' +_magic_file = f'{_internal_symlink_magic}:{_fact_magic}:{_default_magic}' + +_instances = {} + + +def _get_magic_instance(**kwargs): + """Returns an instance of pymagic.Maigc""" + # Dicts are not hashable but sorting and creating a tuple is a valid hash + key = hash(tuple(sorted(kwargs.items()))) + instance = _instances.get(key) + if instance is None: + instance = _instances[key] = pymagic.Magic(**kwargs) + return instance + + +def from_file(filename: bytes | str | PathLike, magic_file: str | None = _magic_file, **kwargs) -> str: + """Like pymagic's ``magic.from_file`` but it accepts all keyword arguments + that ``magic.Magic`` accepts. + """ + m = _get_magic_instance(magic_file=magic_file, **kwargs) + return m.from_file(filename) + + +def from_buffer(buf: bytes | str, magic_file: str | None = _magic_file, **kwargs) -> str: + """Like pymagic's ``magic.from_buffer`` but it accepts all keyword arguments + that ``magic.Magic`` accepts. + """ + instance = _get_magic_instance(magic_file=magic_file, **kwargs) + return instance.from_buffer(buf) diff --git a/src/install/common.py b/src/install/common.py index 2e0427c6a..22fcf3d43 100644 --- a/src/install/common.py +++ b/src/install/common.py @@ -46,6 +46,13 @@ def main(distribution): BIN_DIR.mkdir(exist_ok=True) + run_cmd_with_logging( + f'wget -O {BIN_DIR / "firmware"} https://github.com/fkie-cad/firmware-magic-database/releases/download/v0.2.0/firmware' + ) + run_cmd_with_logging(f'file -C -m {BIN_DIR / "firmware"}') + run_cmd_with_logging(f'file -C -m {INSTALL_DIR / "internal_symlink_magic"}') + run_cmd_with_logging(f'mv {INSTALL_DIR / "internal_symlink_magic.mgc"} {BIN_DIR}') + apt_packages_path = INSTALL_DIR / 'apt-pkgs-common.txt' dnf_packages_path = INSTALL_DIR / 'dnf-pkgs-common.txt' diff --git a/src/install/internal_symlink_magic b/src/install/internal_symlink_magic new file mode 100644 index 000000000..39ca8ab8a --- /dev/null +++ b/src/install/internal_symlink_magic @@ -0,0 +1,6 @@ +# ====================== fact internal ====================== + +# ---- fact internal link representation ---- +0 string symbolic\ link\ -> symbolic link +>17 string x to '%s' +!:mime inode/symlink diff --git a/src/install/requirements_common.txt b/src/install/requirements_common.txt index 8037a8c8a..312047099 100644 --- a/src/install/requirements_common.txt +++ b/src/install/requirements_common.txt @@ -26,8 +26,6 @@ pydantic==2.4.0 # Config parsing toml==0.10.2 -git+https://github.com/fkie-cad/fact_helper_file.git - # Common code modules git+https://github.com/fkie-cad/common_helper_files.git git+https://github.com/fkie-cad/common_helper_filter.git diff --git a/src/plugins/analysis/file_type/code/file_type.py b/src/plugins/analysis/file_type/code/file_type.py index 32ee77f8e..09fa0cd97 100644 --- a/src/plugins/analysis/file_type/code/file_type.py +++ b/src/plugins/analysis/file_type/code/file_type.py @@ -4,11 +4,11 @@ from typing import List import pydantic -from fact_helper_file import get_file_type_from_path from pydantic import Field from analysis.plugin import AnalysisPluginV0 from analysis.plugin.compat import AnalysisBasePluginAdapterMixin +from helperFunctions import magic if typing.TYPE_CHECKING: import io @@ -39,9 +39,7 @@ def summarize(self, result: Schema) -> List[str]: def analyze(self, file_handle: io.FileIO, virtual_file_path: str, analyses: dict) -> Schema: del virtual_file_path, analyses - file_dict = get_file_type_from_path(file_handle.name) - return AnalysisPlugin.Schema( - mime=file_dict['mime'], - full=file_dict['full'], + mime=magic.from_file(file_handle.name, mime=True), + full=magic.from_file(file_handle.name, mime=False), ) diff --git a/src/plugins/analysis/qemu_exec/code/qemu_exec.py b/src/plugins/analysis/qemu_exec/code/qemu_exec.py index 0d338fc89..9dff239b3 100644 --- a/src/plugins/analysis/qemu_exec/code/qemu_exec.py +++ b/src/plugins/analysis/qemu_exec/code/qemu_exec.py @@ -16,11 +16,11 @@ from common_helper_files import get_binary_from_file, safe_rglob from docker.errors import DockerException from docker.types import Mount -from fact_helper_file import get_file_type_from_path from requests.exceptions import ReadTimeout import config from analysis.PluginBase import AnalysisBasePlugin +from helperFunctions import magic from helperFunctions.docker import run_docker_container from helperFunctions.tag import TagColor from helperFunctions.uid import create_uid @@ -125,7 +125,10 @@ def _find_relevant_files(self, extracted_files_dir: Path): result = [] for path in safe_rglob(extracted_files_dir): if path.is_file() and not path.is_symlink(): - file_type = get_file_type_from_path(path.absolute()) + file_type = { + 'full': magic.from_file(path.absolute(), mime=False), + 'mime': magic.from_file(path.absolute(), mime=True), + } if self._has_relevant_type(file_type): result.append((f'/{path.relative_to(Path(self.root_path))}', file_type['full'])) return result diff --git a/src/test/acceptance/test_io_routes.py b/src/test/acceptance/test_io_routes.py index 65570e189..823b6f637 100644 --- a/src/test/acceptance/test_io_routes.py +++ b/src/test/acceptance/test_io_routes.py @@ -1,6 +1,6 @@ import pytest -from fact_helper_file import get_file_type_from_binary +from helperFunctions import magic from storage.db_interface_comparison import ComparisonDbInterface from test.common_helper import create_test_firmware @@ -68,4 +68,4 @@ def test_pdf_download(self, test_client, backend_db): assert response.status_code == 200, 'pdf download failed' # noqa: PLR2004 device = self.test_fw.device_name.replace(' ', '_') assert response.headers['Content-Disposition'] == f'attachment; filename={device}_analysis_report.pdf' - assert get_file_type_from_binary(response.data)['mime'] == 'application/pdf' + assert magic.from_buffer(response.data, mime=True) == 'application/pdf' diff --git a/src/test/integration/helperFunctions/test_pdf.py b/src/test/integration/helperFunctions/test_pdf.py index d1f6c87de..9776cea03 100644 --- a/src/test/integration/helperFunctions/test_pdf.py +++ b/src/test/integration/helperFunctions/test_pdf.py @@ -2,8 +2,7 @@ import os from pathlib import Path -from fact_helper_file import get_file_type_from_binary - +from helperFunctions import magic from helperFunctions.pdf import build_pdf_report from test.common_helper import TEST_FW @@ -21,5 +20,5 @@ def test_build_pdf_report(): pdf_path = build_pdf_report(TEST_FW, docker_mount_base_dir) - assert get_file_type_from_binary(pdf_path.read_bytes())['mime'] == 'application/pdf' + assert magic.from_buffer(pdf_path.read_bytes(), mime=True) == 'application/pdf' assert pdf_path.name == f"{TEST_FW.device_name.replace(' ', '_')}_analysis_report.pdf" diff --git a/src/test/unit/helperFunctions/test_magic.py b/src/test/unit/helperFunctions/test_magic.py new file mode 100644 index 000000000..664e8f298 --- /dev/null +++ b/src/test/unit/helperFunctions/test_magic.py @@ -0,0 +1,13 @@ +from helperFunctions import magic + + +def test_internal_magic(): + assert magic.from_buffer('symbolic link -> /foo/bar', mime=True) == 'inode/symlink' + + +def test_firmware_magic(): + assert magic.from_buffer('BOOTLOADER!', mime=False) == 'Mediatek bootloader' + + +def test_magic_from_file(): + assert magic.from_file('/dev/null', mime=True) == 'inode/chardevice' diff --git a/src/unpacker/unpack.py b/src/unpacker/unpack.py index e048a4b18..bd013315a 100644 --- a/src/unpacker/unpack.py +++ b/src/unpacker/unpack.py @@ -6,10 +6,9 @@ from time import time from typing import TYPE_CHECKING, Optional -from fact_helper_file import get_file_type_from_path - import config from analysis.PluginBase import sanitize_processed_analysis +from helperFunctions import magic from helperFunctions.fileSystem import file_is_empty, get_relative_object_path from helperFunctions.tag import TagColor from objects.file import FileObject @@ -94,7 +93,8 @@ def generate_objects_and_store_files( continue current_file = FileObject(file_path=str(path)) current_virtual_path = get_relative_object_path(path, extraction_dir) - current_file.temporary_data['parent_fo_type'] = get_file_type_from_path(parent.file_path)['mime'] + current_file.temporary_data['parent_fo_type'] = magic.from_file(parent.file_path, mime=True) + if current_file.uid not in extracted_files: # the same file can be contained multiple times in one archive -> only the VFP needs an update self.unpacking_locks.set_unpacking_lock(current_file.uid) diff --git a/src/web_interface/components/io_routes.py b/src/web_interface/components/io_routes.py index c9c9c5afc..7666994a5 100644 --- a/src/web_interface/components/io_routes.py +++ b/src/web_interface/components/io_routes.py @@ -6,10 +6,10 @@ from time import sleep import requests -from fact_helper_file import get_file_type_from_binary from flask import Response, make_response, redirect, render_template, request import config +from helperFunctions import magic from helperFunctions.database import get_shared_session from helperFunctions.pdf import build_pdf_report from helperFunctions.task_conversion import check_for_errors, convert_analysis_task_to_fw_obj, create_analysis_task @@ -82,7 +82,7 @@ def _prepare_file_download(self, uid: str, packed: bool = False) -> str | Respon def _get_file_download_mime(self, binary: bytes, uid: str) -> str: type_analysis = self.db.frontend.get_analysis(uid, 'file_type') mime = type_analysis.get('mime') if type_analysis is not None else None - return mime or get_file_type_from_binary(binary)['mime'] + return mime or magic.from_buffer(binary, mime=True) @roles_accepted(*PRIVILEGES['download']) @AppRoute('/ida-download/', GET)