diff --git a/dissect/target/container.py b/dissect/target/container.py index 2e24af3e5..90a5d60c2 100644 --- a/dissect/target/container.py +++ b/dissect/target/container.py @@ -239,7 +239,7 @@ def open(item: Union[list, str, BinaryIO, Path], *args, **kwargs): log.info("Failed to import %s", container) log.debug("", exc_info=e) except Exception as e: - raise ContainerError(f"Failed to open container {item}", cause=e) + raise ContainerError(f"Failed to open container {item}") from e finally: if first_fh_opened: first_fh.close() diff --git a/dissect/target/exceptions.py b/dissect/target/exceptions.py index aea713290..77dc540b8 100644 --- a/dissect/target/exceptions.py +++ b/dissect/target/exceptions.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import os import sys import traceback @@ -7,13 +9,12 @@ class Error(Exception): """Generic dissect.target error""" - def __init__(self, message=None, cause=None, extra=None): + def __init__(self, message: str | None = None, extra: list[Exception] | None = None): if extra: exceptions = "\n\n".join(["".join(traceback.format_exception_only(type(e), e)) for e in extra]) message = f"{message}\n\nAdditionally, the following exceptions occurred:\n\n{exceptions}" super().__init__(message) - self.__cause__ = cause self.__extra__ = extra @@ -72,15 +73,15 @@ class PluginNotFoundError(PluginError): """Plugin cannot be found.""" -class FileNotFoundError(FilesystemError): +class FileNotFoundError(FilesystemError, FileNotFoundError): """The requested path could not be found.""" -class IsADirectoryError(FilesystemError): +class IsADirectoryError(FilesystemError, IsADirectoryError): """The entry is a directory.""" -class NotADirectoryError(FilesystemError): +class NotADirectoryError(FilesystemError, NotADirectoryError): """The entry is not a directory.""" diff --git a/dissect/target/filesystem.py b/dissect/target/filesystem.py index e22c099ec..9e1d8c166 100644 --- a/dissect/target/filesystem.py +++ b/dissect/target/filesystem.py @@ -1142,7 +1142,7 @@ def get(self, path: str, relentry: Optional[FilesystemEntry] = None) -> Filesyst try: return entry.top.get(fsutil.join(*parts[i:], alt_separator=self.alt_separator)) except FilesystemError as e: - raise FileNotFoundError(full_path, cause=e) + raise FileNotFoundError(full_path) from e else: raise FileNotFoundError(full_path) @@ -1715,7 +1715,7 @@ def open(fh: BinaryIO, *args, **kwargs) -> Filesystem: log.info("Failed to import %s", filesystem) log.debug("", exc_info=e) except Exception as e: - raise FilesystemError(f"Failed to open filesystem for {fh}", cause=e) + raise FilesystemError(f"Failed to open filesystem for {fh}") from e finally: fh.seek(offset) diff --git a/dissect/target/filesystems/btrfs.py b/dissect/target/filesystems/btrfs.py index e24ad08bc..2d1fbc55e 100644 --- a/dissect/target/filesystems/btrfs.py +++ b/dissect/target/filesystems/btrfs.py @@ -79,13 +79,13 @@ def _get_node(self, path: str, node: Optional[btrfs.INode] = None) -> btrfs.INod try: return self.subvolume.get(path, node) except btrfs.FileNotFoundError as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e except btrfs.NotADirectoryError as e: - raise NotADirectoryError(path, cause=e) + raise NotADirectoryError(path) from e except btrfs.NotASymlinkError as e: - raise NotASymlinkError(path, cause=e) + raise NotASymlinkError(path) from e except btrfs.Error as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e class BtrfsFilesystemEntry(FilesystemEntry): diff --git a/dissect/target/filesystems/extfs.py b/dissect/target/filesystems/extfs.py index e9334e9cb..d015a83bf 100644 --- a/dissect/target/filesystems/extfs.py +++ b/dissect/target/filesystems/extfs.py @@ -33,13 +33,13 @@ def _get_node(self, path: str, node: Optional[extfs.INode] = None) -> extfs.INod try: return self.extfs.get(path, node) except extfs.FileNotFoundError as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e except extfs.NotADirectoryError as e: - raise NotADirectoryError(path, cause=e) + raise NotADirectoryError(path) from e except extfs.NotASymlinkError as e: - raise NotASymlinkError(path, cause=e) + raise NotASymlinkError(path) from e except extfs.Error as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e class ExtFilesystemEntry(FilesystemEntry): diff --git a/dissect/target/filesystems/fat.py b/dissect/target/filesystems/fat.py index a204345c3..c57859daf 100644 --- a/dissect/target/filesystems/fat.py +++ b/dissect/target/filesystems/fat.py @@ -1,4 +1,5 @@ import datetime +import math import stat from typing import BinaryIO, Iterator, Optional, Union @@ -41,11 +42,11 @@ def _get_entry( try: return self.fatfs.get(path, dirent=entry) except fat_exc.FileNotFoundError as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e except fat_exc.NotADirectoryError as e: - raise NotADirectoryError(path, cause=e) + raise NotADirectoryError(path) from e except fat_exc.Error as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e class FatFilesystemEntry(FilesystemEntry): @@ -100,16 +101,21 @@ def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result: def lstat(self) -> fsutil.stat_result: """Return the stat information of the given path, without resolving links.""" # mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime - st_info = [ - (stat.S_IFDIR if self.is_dir() else stat.S_IFREG) | 0o777, - self.entry.cluster, - id(self.fs), - 1, - 0, - 0, - self.entry.size, - self.entry.atime.replace(tzinfo=self.fs.tzinfo).timestamp(), - self.entry.mtime.replace(tzinfo=self.fs.tzinfo).timestamp(), - self.entry.ctime.replace(tzinfo=self.fs.tzinfo).timestamp(), - ] - return fsutil.stat_result(st_info) + st_info = fsutil.stat_result( + [ + (stat.S_IFDIR if self.is_dir() else stat.S_IFREG) | 0o777, + self.entry.cluster, + id(self.fs), + 1, + 0, + 0, + self.entry.size, + self.entry.atime.replace(tzinfo=self.fs.tzinfo).timestamp(), + self.entry.mtime.replace(tzinfo=self.fs.tzinfo).timestamp(), + self.entry.ctime.replace(tzinfo=self.fs.tzinfo).timestamp(), + ] + ) + + st_info.st_blocks = math.ceil(self.entry.size / self.entry.fs.cluster_size) + st_info.st_blksize = self.entry.fs.cluster_size + return st_info diff --git a/dissect/target/filesystems/ffs.py b/dissect/target/filesystems/ffs.py index 35158c96d..acc54a702 100644 --- a/dissect/target/filesystems/ffs.py +++ b/dissect/target/filesystems/ffs.py @@ -39,13 +39,13 @@ def _get_node(self, path: str, node: Optional[ffs.INode] = None) -> ffs.INode: try: return self.ffs.get(path, node) except ffs.FileNotFoundError as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e except ffs.NotADirectoryError as e: - raise NotADirectoryError(path, cause=e) + raise NotADirectoryError(path) from e except ffs.NotASymlinkError as e: - raise NotASymlinkError(path, cause=e) + raise NotASymlinkError(path) from e except ffs.Error as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e class FfsFilesystemEntry(FilesystemEntry): @@ -126,6 +126,12 @@ def lstat(self) -> fsutil.stat_result: ] ) + # Note: stat on linux always returns the default block size of 4096 + # We are returning the actual block size of the filesystem, as on BSD + st_info.st_blksize = self.fs.ffs.block_size + # Note: st_blocks * 512 can be lower than st_blksize because FFS employs fragments + st_info.st_blocks = self.entry.nblocks + # Set the nanosecond resolution separately st_info.st_atime_ns = self.entry.atime_ns st_info.st_mtime_ns = self.entry.mtime_ns @@ -134,5 +140,6 @@ def lstat(self) -> fsutil.stat_result: # FFS2 has a birth time, FFS1 does not if btime := self.entry.btime: st_info.st_birthtime = btime.timestamp() + st_info.st_birthtime_ns = self.entry.btime_ns return st_info diff --git a/dissect/target/filesystems/jffs.py b/dissect/target/filesystems/jffs.py index 2d0122940..cc3b78a89 100644 --- a/dissect/target/filesystems/jffs.py +++ b/dissect/target/filesystems/jffs.py @@ -35,13 +35,13 @@ def _get_node(self, path: str, node: Optional[jffs2.INode] = None) -> jffs2.INod try: return self.jffs2.get(path, node) except jffs2.FileNotFoundError as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e except jffs2.NotADirectoryError as e: - raise NotADirectoryError(path, cause=e) + raise NotADirectoryError(path) from e except jffs2.NotASymlinkError as e: - raise NotASymlinkError(path, cause=e) + raise NotASymlinkError(path) from e except jffs2.Error as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e class JFFSFilesystemEntry(FilesystemEntry): @@ -76,13 +76,13 @@ def scandir(self) -> Iterator[FilesystemEntry]: entry_path = fsutil.join(self.path, name, alt_separator=self.fs.alt_separator) yield JFFSFilesystemEntry(self.fs, entry_path, entry) - def is_dir(self, follow_symlinks: bool = False) -> bool: + def is_dir(self, follow_symlinks: bool = True) -> bool: try: return self._resolve(follow_symlinks).entry.is_dir() except FilesystemError: return False - def is_file(self, follow_symlinks: bool = False) -> bool: + def is_file(self, follow_symlinks: bool = True) -> bool: try: return self._resolve(follow_symlinks).entry.is_file() except FilesystemError: @@ -97,7 +97,7 @@ def readlink(self) -> str: return self.entry.link - def stat(self, follow_symlinks: bool = False) -> fsutil.stat_result: + def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result: return self._resolve(follow_symlinks).lstat() def lstat(self) -> fsutil.stat_result: diff --git a/dissect/target/filesystems/ntfs.py b/dissect/target/filesystems/ntfs.py index e384a6908..a54442cbe 100644 --- a/dissect/target/filesystems/ntfs.py +++ b/dissect/target/filesystems/ntfs.py @@ -48,12 +48,12 @@ def _get_record(self, path: str, root: Optional[MftRecord] = None) -> MftRecord: try: path = path.rsplit(":", maxsplit=1)[0] return self.ntfs.mft.get(path, root=root) - except NtfsFileNotFoundError: - raise FileNotFoundError(path) + except NtfsFileNotFoundError as e: + raise FileNotFoundError(path) from e except NtfsNotADirectoryError as e: - raise NotADirectoryError(path, cause=e) + raise NotADirectoryError(path) from e except NtfsError as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e class NtfsFilesystemEntry(FilesystemEntry): diff --git a/dissect/target/filesystems/smb.py b/dissect/target/filesystems/smb.py index d00de8537..f90e9da68 100644 --- a/dissect/target/filesystems/smb.py +++ b/dissect/target/filesystems/smb.py @@ -58,10 +58,10 @@ def _get_entry(self, path: str) -> SharedFile: except SessionError as e: if e.error == STATUS_NOT_A_DIRECTORY: # STATUS_NOT_A_DIRECTORY - raise NotADirectoryError(path, cause=e) + raise NotADirectoryError(path) from e else: # 0xC000000F is STATUS_NO_SUCH_FILE, but everything else should raise a FileNotFoundError anyway - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e if len(result) != 1: raise FileNotFoundError(path) @@ -106,7 +106,7 @@ def open(self) -> SmbStream: try: return SmbStream(self.fs.conn, self.fs.share_name, self.path, self.entry.get_filesize()) except SessionError as e: - raise FilesystemError(f"Failed to open file: {self.path}", cause=e) + raise FilesystemError(f"Failed to open file: {self.path}") from e def is_dir(self, follow_symlinks: bool = True) -> bool: try: diff --git a/dissect/target/filesystems/squashfs.py b/dissect/target/filesystems/squashfs.py index af378a242..d0323b3bc 100644 --- a/dissect/target/filesystems/squashfs.py +++ b/dissect/target/filesystems/squashfs.py @@ -32,13 +32,13 @@ def _get_node(self, path: str, node: Optional[INode] = None) -> INode: try: return self.squashfs.get(path, node) except exceptions.FileNotFoundError as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e except exceptions.NotADirectoryError as e: - raise NotADirectoryError(path, cause=e) + raise NotADirectoryError(path) from e except exceptions.NotASymlinkError as e: - raise NotASymlinkError(path, cause=e) + raise NotASymlinkError(path) from e except exceptions.Error as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e class SquashFSFilesystemEntry(FilesystemEntry): diff --git a/dissect/target/filesystems/vmfs.py b/dissect/target/filesystems/vmfs.py index 64503b43a..66d001f81 100644 --- a/dissect/target/filesystems/vmfs.py +++ b/dissect/target/filesystems/vmfs.py @@ -41,13 +41,13 @@ def _get_node(self, path: str, node: Optional[FileDescriptor] = None) -> FileDes try: return self.vmfs.get(path, node) except vmfs.FileNotFoundError as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e except vmfs.NotADirectoryError as e: - raise NotADirectoryError(path, cause=e) + raise NotADirectoryError(path) from e except vmfs.NotASymlinkError as e: - raise NotASymlinkError(path, cause=e) + raise NotASymlinkError(path) from e except vmfs.Error as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e class VmfsFilesystemEntry(FilesystemEntry): diff --git a/dissect/target/filesystems/xfs.py b/dissect/target/filesystems/xfs.py index d63af1c2d..68099399b 100644 --- a/dissect/target/filesystems/xfs.py +++ b/dissect/target/filesystems/xfs.py @@ -35,14 +35,14 @@ def get(self, path: str) -> FilesystemEntry: def _get_node(self, path: str, node: Optional[xfs.INode] = None) -> xfs.INode: try: return self.xfs.get(path, node) - except xfs.FileNotFoundError: - raise FileNotFoundError(path) - except xfs.NotADirectoryError: - raise NotADirectoryError(path) - except xfs.NotASymlinkError: - raise NotASymlinkError(path) + except xfs.FileNotFoundError as e: + raise FileNotFoundError(path) from e + except xfs.NotADirectoryError as e: + raise NotADirectoryError(path) from e + except xfs.NotASymlinkError as e: + raise NotASymlinkError(path) from e except xfs.Error as e: - raise FileNotFoundError(path, cause=e) + raise FileNotFoundError(path) from e class XfsFilesystemEntry(FilesystemEntry): diff --git a/dissect/target/helpers/data/windowsZones.xml b/dissect/target/helpers/data/windowsZones.xml index 562315a8e..b0fc6af3b 100644 --- a/dissect/target/helpers/data/windowsZones.xml +++ b/dissect/target/helpers/data/windowsZones.xml @@ -5,8 +5,9 @@ Copyright © 1991-2013 Unicode, Inc. CLDR data files are interpreted according to the LDML specification (http://unicode.org/reports/tr35/) For terms of use, see http://www.unicode.org/copyright.html -NOTE: This file should be updated every ~6 months. -Source: https://github.com/unicode-org/cldr/blob/main/common/supplemental/windowsZones.xml +NOTE: This file should be updated every ~6 months. +Updated at: 2024-10-28 +Source: https://github.com/unicode-org/cldr/blob/main/common/supplemental/windowsZones.xml --> @@ -33,7 +34,6 @@ Source: https://github.com/unicode-org/cldr/blob/main/common/supplemental/window - @@ -52,7 +52,7 @@ Source: https://github.com/unicode-org/cldr/blob/main/common/supplemental/window - + @@ -63,7 +63,6 @@ Source: https://github.com/unicode-org/cldr/blob/main/common/supplemental/window - @@ -73,15 +72,14 @@ Source: https://github.com/unicode-org/cldr/blob/main/common/supplemental/window - - + + - - + + - @@ -100,10 +98,9 @@ Source: https://github.com/unicode-org/cldr/blob/main/common/supplemental/window - - + + - @@ -111,7 +108,7 @@ Source: https://github.com/unicode-org/cldr/blob/main/common/supplemental/window - + @@ -136,9 +133,8 @@ Source: https://github.com/unicode-org/cldr/blob/main/common/supplemental/window - + - @@ -424,7 +420,7 @@ Source: https://github.com/unicode-org/cldr/blob/main/common/supplemental/window - + @@ -541,7 +537,8 @@ Source: https://github.com/unicode-org/cldr/blob/main/common/supplemental/window - + + @@ -573,13 +570,12 @@ Source: https://github.com/unicode-org/cldr/blob/main/common/supplemental/window - - + + - @@ -656,7 +652,7 @@ Source: https://github.com/unicode-org/cldr/blob/main/common/supplemental/window - + @@ -713,7 +709,7 @@ Source: https://github.com/unicode-org/cldr/blob/main/common/supplemental/window - + diff --git a/dissect/target/helpers/regutil.py b/dissect/target/helpers/regutil.py index 24dbf1beb..f55fcb8cc 100644 --- a/dissect/target/helpers/regutil.py +++ b/dissect/target/helpers/regutil.py @@ -1,4 +1,5 @@ -""" Registry related abstractions """ +"""Registry related abstractions""" + from __future__ import annotations import fnmatch @@ -645,7 +646,7 @@ def key(self, key: str) -> RegistryKey: try: return RegfKey(self, self.hive.open(key)) except regf.RegistryKeyNotFoundError as e: - raise RegistryKeyNotFoundError(key, cause=e) + raise RegistryKeyNotFoundError(key) from e class RegfKey(RegistryKey): @@ -675,7 +676,7 @@ def subkey(self, subkey: str) -> RegistryKey: try: return RegfKey(self.hive, self.key.subkey(subkey)) except regf.RegistryKeyNotFoundError as e: - raise RegistryKeyNotFoundError(subkey, cause=e) + raise RegistryKeyNotFoundError(subkey) from e def subkeys(self) -> list[RegistryKey]: return [RegfKey(self.hive, k) for k in self.key.subkeys()] @@ -684,7 +685,7 @@ def value(self, value: str) -> RegistryValue: try: return RegfValue(self.hive, self.key.value(value)) except regf.RegistryValueNotFoundError as e: - raise RegistryValueNotFoundError(value, cause=e) + raise RegistryValueNotFoundError(value) from e def values(self) -> list[RegistryValue]: return [RegfValue(self.hive, v) for v in self.key.values()] diff --git a/dissect/target/loader.py b/dissect/target/loader.py index 62f36f7a6..08d5e29b0 100644 --- a/dissect/target/loader.py +++ b/dissect/target/loader.py @@ -205,4 +205,5 @@ def open(item: Union[str, Path], *args, **kwargs) -> Loader: register("smb", "SmbLoader") register("cb", "CbLoader") register("cyber", "CyberLoader") +register("proxmox", "ProxmoxLoader") register("multiraw", "MultiRawLoader") # Should be last diff --git a/dissect/target/loaders/proxmox.py b/dissect/target/loaders/proxmox.py new file mode 100644 index 000000000..9d8dd2f80 --- /dev/null +++ b/dissect/target/loaders/proxmox.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +import re +from pathlib import Path + +from dissect.target import container +from dissect.target.loader import Loader +from dissect.target.target import Target + +RE_VOLUME_ID = re.compile(r"(?:file=)?([^:]+):([^,]+)") + + +class ProxmoxLoader(Loader): + """Loader for Proxmox VM configuration files. + + Proxmox uses volume identifiers in the format of ``storage_id:volume_id``. The ``storage_id`` maps to a + storage configuration in ``/etc/pve/storage.cfg``. The ``volume_id`` is the name of the volume within + that configuration. + + This loader currently does not support parsing the storage configuration, so it will attempt to open the + volume directly from the same directory as the configuration file, or from ``/dev/pve/`` (default LVM config). + If the volume is not found, it will log a warning. + """ + + def __init__(self, path: Path, **kwargs): + path = path.resolve() + super().__init__(path) + self.base_dir = path.parent + + @staticmethod + def detect(path: Path) -> bool: + if path.suffix.lower() != ".conf": + return False + + with path.open("rb") as fh: + lines = fh.read(512).split(b"\n") + needles = [b"cpu:", b"memory:", b"name:"] + return all(any(needle in line for line in lines) for needle in needles) + + def map(self, target: Target) -> None: + with self.path.open("rt") as fh: + for line in fh: + if not (line := line.strip()): + continue + + key, value = line.split(":", 1) + value = value.strip() + + if key.startswith(("scsi", "sata", "ide", "virtio")) and key[-1].isdigit(): + # https://pve.proxmox.com/wiki/Storage + if match := RE_VOLUME_ID.match(value): + storage_id, volume_id = match.groups() + + # TODO: parse the storage information from /etc/pve/storage.cfg + # For now, let's try a few assumptions + disk_path = None + if (path := self.base_dir.joinpath(volume_id)).exists(): + disk_path = path + elif (path := self.base_dir.joinpath("/dev/pve/").joinpath(volume_id)).exists(): + disk_path = path + + if disk_path: + try: + target.disks.add(container.open(disk_path)) + except Exception: + target.log.exception("Failed to open disk: %s", disk_path) + else: + target.log.warning("Unable to find disk: %s:%s", storage_id, volume_id) diff --git a/dissect/target/loaders/vma.py b/dissect/target/loaders/vma.py index cc982825a..177d56507 100644 --- a/dissect/target/loaders/vma.py +++ b/dissect/target/loaders/vma.py @@ -1,4 +1,4 @@ -from dissect.hypervisor import vma +from dissect.archive import vma from dissect.target.containers.raw import RawContainer from dissect.target.loader import Loader diff --git a/dissect/target/loaders/xva.py b/dissect/target/loaders/xva.py index 0a1834146..4b1358c75 100644 --- a/dissect/target/loaders/xva.py +++ b/dissect/target/loaders/xva.py @@ -1,4 +1,4 @@ -from dissect.hypervisor import xva +from dissect.archive import xva from dissect.target.containers.raw import RawContainer from dissect.target.loader import Loader diff --git a/dissect/target/plugin.py b/dissect/target/plugin.py index e553d3a28..7b4fcaab5 100644 --- a/dissect/target/plugin.py +++ b/dissect/target/plugin.py @@ -57,17 +57,18 @@ class OperatingSystem(StrEnum): - LINUX = "linux" - WINDOWS = "windows" - ESXI = "esxi" + ANDROID = "android" BSD = "bsd" + CITRIX = "citrix-netscaler" + ESXI = "esxi" + FORTIOS = "fortios" + IOS = "ios" + LINUX = "linux" OSX = "osx" + PROXMOX = "proxmox" UNIX = "unix" - ANDROID = "android" VYOS = "vyos" - IOS = "ios" - FORTIOS = "fortios" - CITRIX = "citrix-netscaler" + WINDOWS = "windows" def export(*args, **kwargs) -> Callable: @@ -806,7 +807,7 @@ def load(plugin_desc: PluginDescriptor) -> Type[Plugin]: module = importlib.import_module(module) return getattr(module, plugin_desc["class"]) except Exception as e: - raise PluginError(f"An exception occurred while trying to load a plugin: {module}", cause=e) + raise PluginError(f"An exception occurred while trying to load a plugin: {module}") from e def failed() -> list[dict[str, Any]]: diff --git a/dissect/target/plugins/apps/database/__init__.py b/dissect/target/plugins/apps/database/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dissect/target/plugins/child/proxmox.py b/dissect/target/plugins/child/proxmox.py new file mode 100644 index 000000000..914fc789c --- /dev/null +++ b/dissect/target/plugins/child/proxmox.py @@ -0,0 +1,23 @@ +from typing import Iterator + +from dissect.target.exceptions import UnsupportedPluginError +from dissect.target.helpers.record import ChildTargetRecord +from dissect.target.plugin import ChildTargetPlugin + + +class ProxmoxChildTargetPlugin(ChildTargetPlugin): + """Child target plugin that yields from the VM listing.""" + + __type__ = "proxmox" + + def check_compatible(self) -> None: + if self.target.os != "proxmox": + raise UnsupportedPluginError("Not a Proxmox operating system") + + def list_children(self) -> Iterator[ChildTargetRecord]: + for vm in self.target.vmlist(): + yield ChildTargetRecord( + type=self.__type__, + path=vm.path, + _target=self.target, + ) diff --git a/dissect/target/plugins/general/loaders.py b/dissect/target/plugins/general/loaders.py index ee3e5187a..aad67f9dc 100644 --- a/dissect/target/plugins/general/loaders.py +++ b/dissect/target/plugins/general/loaders.py @@ -1,6 +1,8 @@ +import json + from dissect.target.helpers.docs import INDENT_STEP, get_docstring from dissect.target.loader import LOADERS_BY_SCHEME -from dissect.target.plugin import Plugin, export +from dissect.target.plugin import Plugin, arg, export class LoaderListPlugin(Plugin): @@ -10,7 +12,12 @@ def check_compatible(self) -> None: pass @export(output="none") - def loaders(self) -> None: + # NOTE: We would prefer to re-use arguments across plugins from argparse in query.py, but that is not possible yet. + # For now we use --as-json, but in the future this should be changed to inherit --json from target-query. + # https://github.com/fox-it/dissect.target/pull/841 + # https://github.com/fox-it/dissect.target/issues/889 + @arg("--as-json", dest="as_json", action="store_true") + def loaders(self, as_json: bool = False) -> None: """List the available loaders.""" loaders_info = {} @@ -21,6 +28,12 @@ def loaders(self) -> None: except ImportError: continue - print("Available loaders:") - for loader_name, loader_description in sorted(loaders_info.items()): - print(f"{INDENT_STEP}{loader_name} - {loader_description}") + loaders = sorted(loaders_info.items()) + + if as_json: + print(json.dumps([{"name": name, "description": desc} for name, desc in loaders]), end="") + + else: + print("Available loaders:") + for loader_name, loader_description in loaders: + print(f"{INDENT_STEP}{loader_name} - {loader_description}") diff --git a/dissect/target/plugins/general/network.py b/dissect/target/plugins/general/network.py index a15930a18..236c10c71 100644 --- a/dissect/target/plugins/general/network.py +++ b/dissect/target/plugins/general/network.py @@ -53,22 +53,22 @@ def interfaces(self) -> Iterator[InterfaceRecord]: @export def ips(self) -> list[IPAddress]: """Return IP addresses as list of :class:`IPAddress`.""" - return list(self._get_record_type("ip")) + return list(set(self._get_record_type("ip"))) @export def gateways(self) -> list[IPAddress]: """Return gateways as list of :class:`IPAddress`.""" - return list(self._get_record_type("gateway")) + return list(set(self._get_record_type("gateway"))) @export def macs(self) -> list[str]: """Return MAC addresses as list of :class:`str`.""" - return list(self._get_record_type("mac")) + return list(set(self._get_record_type("mac"))) @export - def dns(self) -> list[str]: + def dns(self) -> list[str | IPAddress]: """Return DNS addresses as list of :class:`str`.""" - return list(self._get_record_type("dns")) + return list(set(self._get_record_type("dns"))) @internal def with_ip(self, ip_addr: str) -> Iterator[InterfaceRecord]: diff --git a/dissect/target/plugins/general/plugins.py b/dissect/target/plugins/general/plugins.py index 80949600f..de7ca2e32 100644 --- a/dissect/target/plugins/general/plugins.py +++ b/dissect/target/plugins/general/plugins.py @@ -1,5 +1,8 @@ +from __future__ import annotations + +import json import textwrap -from typing import Dict, List, Type, Union +from typing import Iterator, Type from dissect.target import plugin from dissect.target.helpers.docs import INDENT_STEP, get_plugin_overview @@ -23,7 +26,8 @@ def categorize_plugins(plugins_selection: list[dict] = None) -> dict: return output_dict -def get_exported_plugins(): +def get_exported_plugins() -> list: + """Returns list of exported plugins.""" return [p for p in plugin.plugins() if len(p["exports"])] @@ -50,10 +54,10 @@ def update_dict_recursive(source_dict: dict, updated_dict: dict) -> dict: def output_plugin_description_recursive( - structure_dict: Union[Dict, Plugin], + structure_dict: dict | Plugin, print_docs: bool, - indentation_step=0, -) -> List[str]: + indentation_step: int = 0, +) -> list[str]: """Create plugin overview with identations.""" if isinstance(structure_dict, type) and issubclass(structure_dict, Plugin): @@ -78,10 +82,10 @@ def get_plugin_description( def get_description_dict( - structure_dict: Dict, + structure_dict: dict, print_docs: bool, indentation_step: int, -) -> List[str]: +) -> list[str]: """Returns a list of indented descriptions.""" output_descriptions = [] @@ -105,10 +109,17 @@ def check_compatible(self) -> None: @export(output="none", cache=False) @arg("--docs", dest="print_docs", action="store_true") - def plugins(self, plugins: list[dict] = None, print_docs: bool = False) -> None: - """Print all registered plugins to stdout.""" + # NOTE: We would prefer to re-use arguments across plugins from argparse in query.py, but that is not possible yet. + # For now we use --as-json, but in the future this should be changed to inherit --json from target-query. + # https://github.com/fox-it/dissect.target/pull/841 + # https://github.com/fox-it/dissect.target/issues/889 + @arg("--as-json", dest="as_json", action="store_true") + def plugins(self, plugins: list[Plugin] = None, print_docs: bool = False, as_json: bool = False) -> None: + """Print all available plugins.""" + + dict_plugins = list({p.path: p.plugin_desc for p in plugins}.values()) + categorized_plugins = dict(sorted(categorize_plugins(dict_plugins).items())) - categorized_plugins = dict(sorted(categorize_plugins(plugins).items())) plugin_descriptions = output_plugin_description_recursive(categorized_plugins, print_docs) plugins_list = textwrap.indent( @@ -142,4 +153,32 @@ def plugins(self, plugins: list[dict] = None, print_docs: bool = False) -> None: "Failed to load:", failed_list, ] - print("\n".join(output_lines)) + + if as_json: + out = {"loaded": list(generate_plugins_json(plugins))} + + if failed_plugins := plugin.failed(): + out["failed"] = [ + {"module": p["module"], "stacktrace": "".join(p["stacktrace"])} for p in failed_plugins + ] + + print(json.dumps(out), end="") + + else: + print("\n".join(output_lines)) + + +def generate_plugins_json(plugins: list[Plugin]) -> Iterator[dict]: + """Generates JSON output of a list of :class:`Plugin`s.""" + + for p in plugins: + func = getattr(p.class_object, p.method_name) + description = getattr(func, "__doc__", None) + summary = description.split("\n\n", 1)[0].strip() if description else None + + yield { + "name": p.name, + "output": p.output_type, + "description": summary, + "path": p.path, + } diff --git a/dissect/target/plugins/os/unix/_os.py b/dissect/target/plugins/os/unix/_os.py index 2bbac84e9..5b2d46cf0 100644 --- a/dissect/target/plugins/os/unix/_os.py +++ b/dissect/target/plugins/os/unix/_os.py @@ -4,7 +4,6 @@ import re import uuid from pathlib import Path -from struct import unpack from typing import Iterator from flow.record.fieldtypes import posix_path @@ -19,10 +18,30 @@ log = logging.getLogger(__name__) +# https://en.wikipedia.org/wiki/Executable_and_Linkable_Format#ISA +ARCH_MAP = { + 0x00: "unknown", + 0x02: "sparc", + 0x03: "x86", + 0x08: "mips", + 0x14: "powerpc32", + 0x15: "powerpc64", + 0x16: "s390", # and s390x + 0x28: "aarch32", # armv7 + 0x2A: "superh", + 0x32: "ia-64", + 0x3E: "x86_64", + 0xB7: "aarch64", # armv8 + 0xF3: "riscv64", + 0xF7: "bpf", +} + + class UnixPlugin(OSPlugin): def __init__(self, target: Target): super().__init__(target) self._add_mounts() + self._add_devices() self._hostname_dict = self._parse_hostname_string() self._hosts_dict = self._parse_hosts_string() self._os_release = self._parse_os_release() @@ -247,6 +266,17 @@ def _add_mounts(self) -> None: self.target.log.debug("Mounting %s (%s) at %s", fs, fs.volume, mount_point) self.target.fs.mount(mount_point, fs) + def _add_devices(self) -> None: + """Add some virtual block devices to the target. + + Currently only adds LVM devices. + """ + vfs = self.target.fs.append_layer() + + for volume in self.target.volumes: + if volume.vs and volume.vs.__type__ == "lvm": + vfs.map_file_fh(f"/dev/{volume.raw.vg.name}/{volume.raw.name}", volume) + def _parse_os_release(self, glob: str | None = None) -> dict[str, str]: """Parse files containing Unix version information. @@ -289,37 +319,30 @@ def _parse_os_release(self, glob: str | None = None) -> dict[str, str]: continue return os_release - def _get_architecture(self, os: str = "unix", path: str = "/bin/ls") -> str | None: - arch_strings = { - 0x00: "Unknown", - 0x02: "SPARC", - 0x03: "x86", - 0x08: "MIPS", - 0x14: "PowerPC", - 0x16: "S390", - 0x28: "ARM", - 0x2A: "SuperH", - 0x32: "IA-64", - 0x3E: "x86_64", - 0xB7: "AArch64", - 0xF3: "RISC-V", - } - - for fs in self.target.filesystems: - if fs.exists(path): - fh = fs.open(path) - fh.seek(4) - # ELF - e_ident[EI_CLASS] - bits = unpack("B", fh.read(1))[0] - fh.seek(18) - # ELF - e_machine - arch = unpack("H", fh.read(2))[0] - arch = arch_strings.get(arch) - - if bits == 1: # 32 bit system - return f"{arch}_32-{os}" - else: - return f"{arch}-{os}" + def _get_architecture(self, os: str = "unix", path: Path | str = "/bin/ls") -> str | None: + """Determine architecture by reading an ELF header of a binary on the target. + + Resources: + - https://en.wikipedia.org/wiki/Executable_and_Linkable_Format#ISA + """ + + if not isinstance(path, TargetPath): + for fs in [self.target.fs, *self.target.filesystems]: + if (path := fs.path(path)).exists(): + break + + if not path.exists(): + return + + fh = path.open("rb") + fh.seek(4) # ELF - e_ident[EI_CLASS] + bits = fh.read(1)[0] + + fh.seek(18) # ELF - e_machine + e_machine = int.from_bytes(fh.read(2), "little") + arch = ARCH_MAP.get(e_machine, "unknown") + + return f"{arch}_32-{os}" if bits == 1 and not arch[-2:] == "32" else f"{arch}-{os}" def parse_fstab( diff --git a/dissect/target/plugins/os/unix/bsd/osx/_os.py b/dissect/target/plugins/os/unix/bsd/osx/_os.py index 4b08afad6..6c99e6e62 100644 --- a/dissect/target/plugins/os/unix/bsd/osx/_os.py +++ b/dissect/target/plugins/os/unix/bsd/osx/_os.py @@ -41,10 +41,7 @@ def hostname(self) -> Optional[str]: @export(property=True) def ips(self) -> Optional[list[str]]: - ips = set() - for ip in self.target.network.ips(): - ips.add(str(ip)) - return list(ips) + return list(set(map(str, self.target.network.ips()))) @export(property=True) def version(self) -> Optional[str]: diff --git a/dissect/target/plugins/os/unix/linux/_os.py b/dissect/target/plugins/os/unix/linux/_os.py index cd8d5d594..fb2e6367d 100644 --- a/dissect/target/plugins/os/unix/linux/_os.py +++ b/dissect/target/plugins/os/unix/linux/_os.py @@ -34,17 +34,15 @@ def detect(cls, target: Target) -> Filesystem | None: @export(property=True) def ips(self) -> list[str]: """Returns a list of static IP addresses and DHCP lease IP addresses found on the host system.""" - ips = [] + ips = set() for ip_set in self.network_manager.get_config_value("ips"): - for ip in ip_set: - ips.append(ip) + ips.update(ip_set) for ip in parse_unix_dhcp_log_messages(self.target, iter_all=False): - if ip not in ips: - ips.append(ip) + ips.add(ip) - return ips + return list(ips) @export(property=True) def dns(self) -> list[str]: diff --git a/dissect/target/plugins/os/unix/linux/debian/proxmox/__init__.py b/dissect/target/plugins/os/unix/linux/debian/proxmox/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dissect/target/plugins/os/unix/linux/debian/proxmox/_os.py b/dissect/target/plugins/os/unix/linux/debian/proxmox/_os.py new file mode 100644 index 000000000..a96f1f7b5 --- /dev/null +++ b/dissect/target/plugins/os/unix/linux/debian/proxmox/_os.py @@ -0,0 +1,141 @@ +from __future__ import annotations + +import stat +from io import BytesIO +from typing import BinaryIO + +from dissect.sql import sqlite3 +from dissect.util.stream import BufferedStream + +from dissect.target.filesystem import ( + Filesystem, + VirtualDirectory, + VirtualFile, + VirtualFilesystem, +) +from dissect.target.helpers import fsutil +from dissect.target.plugins.os.unix._os import OperatingSystem, export +from dissect.target.plugins.os.unix.linux.debian._os import DebianPlugin +from dissect.target.target import Target + + +class ProxmoxPlugin(DebianPlugin): + @classmethod + def detect(cls, target: Target) -> Filesystem | None: + for fs in target.filesystems: + if fs.exists("/etc/pve") or fs.exists("/var/lib/pve"): + return fs + + return None + + @classmethod + def create(cls, target: Target, sysvol: Filesystem) -> ProxmoxPlugin: + obj = super().create(target, sysvol) + + if (config_db := target.fs.path("/var/lib/pve-cluster/config.db")).exists(): + with config_db.open("rb") as fh: + vfs = _create_pmxcfs(fh, obj.hostname) + + target.fs.mount("/etc/pve", vfs) + + return obj + + @export(property=True) + def version(self) -> str: + """Returns Proxmox VE version with underlying OS release.""" + + for pkg in self.target.dpkg.status(): + if pkg.name == "proxmox-ve": + distro_name = self._os_release.get("PRETTY_NAME", "") + return f"{pkg.name} {pkg.version} ({distro_name})" + + @export(property=True) + def os(self) -> str: + return OperatingSystem.PROXMOX.value + + +DT_DIR = 4 +DT_REG = 8 + + +def _create_pmxcfs(fh: BinaryIO, hostname: str | None = None) -> VirtualFilesystem: + # https://pve.proxmox.com/wiki/Proxmox_Cluster_File_System_(pmxcfs) + db = sqlite3.SQLite3(fh) + + entries = {row.inode: row for row in db.table("tree")} + + vfs = VirtualFilesystem() + for entry in entries.values(): + if entry.type == DT_DIR: + cls = ProxmoxConfigDirectoryEntry + elif entry.type == DT_REG: + cls = ProxmoxConfigFileEntry + else: + raise ValueError(f"Unknown pmxcfs file type: {entry.type}") + + parts = [] + current = entry + while current.parent != 0: + parts.append(current.name) + current = entries[current.parent] + parts.append(current.name) + + path = "/".join(parts[::-1]) + vfs.map_file_entry(path, cls(vfs, path, entry)) + + if hostname: + node_root = vfs.path(f"nodes/{hostname}") + vfs.symlink(str(node_root), "local") + vfs.symlink(str(node_root / "lxc"), "lxc") + vfs.symlink(str(node_root / "openvz"), "openvz") + vfs.symlink(str(node_root / "qemu-server"), "qemu-server") + + # TODO: .version, .members, .vmlist, maybe .clusterlog and .rrd? + + return vfs + + +class ProxmoxConfigFileEntry(VirtualFile): + def open(self) -> BinaryIO: + return BufferedStream(BytesIO(self.entry.data or b"")) + + def lstat(self) -> fsutil.stat_result: + # ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime'] + return fsutil.stat_result( + [ + stat.S_IFREG | 0o640, + self.entry.inode, + id(self.fs), + 1, + 0, + 0, + len(self.entry.data) if self.entry.data else 0, + 0, + self.entry.mtime, + 0, + ] + ) + + +class ProxmoxConfigDirectoryEntry(VirtualDirectory): + def __init__(self, fs: VirtualFilesystem, path: str, entry: sqlite3.Row): + super().__init__(fs, path) + self.entry = entry + + def lstat(self) -> fsutil.stat_result: + """Return the stat information of the given path, without resolving links.""" + # ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime'] + return fsutil.stat_result( + [ + stat.S_IFDIR | 0o755, + self.entry.inode, + id(self.fs), + 1, + 0, + 0, + 0, + 0, + self.entry.mtime, + 0, + ] + ) diff --git a/dissect/target/plugins/os/unix/linux/debian/proxmox/vm.py b/dissect/target/plugins/os/unix/linux/debian/proxmox/vm.py new file mode 100644 index 000000000..29c72b4c9 --- /dev/null +++ b/dissect/target/plugins/os/unix/linux/debian/proxmox/vm.py @@ -0,0 +1,29 @@ +from typing import Iterator + +from dissect.target.exceptions import UnsupportedPluginError +from dissect.target.helpers.record import TargetRecordDescriptor +from dissect.target.plugin import Plugin, export + +VirtualMachineRecord = TargetRecordDescriptor( + "proxmox/vm", + [ + ("string", "path"), + ], +) + + +class VirtualMachinePlugin(Plugin): + """Plugin to list Proxmox virtual machines.""" + + def check_compatible(self) -> None: + if self.target.os != "proxmox": + raise UnsupportedPluginError("Not a Proxmox operating system") + + @export(record=VirtualMachineRecord) + def vmlist(self) -> Iterator[VirtualMachineRecord]: + """List Proxmox virtual machines on this node.""" + for config in self.target.fs.path("/etc/pve/qemu-server").iterdir(): + yield VirtualMachineRecord( + path=config, + _target=self.target, + ) diff --git a/dissect/target/plugins/os/unix/linux/fortios/_os.py b/dissect/target/plugins/os/unix/linux/fortios/_os.py index ccbee1f69..0eb4ee9b1 100644 --- a/dissect/target/plugins/os/unix/linux/fortios/_os.py +++ b/dissect/target/plugins/os/unix/linux/fortios/_os.py @@ -6,7 +6,7 @@ from datetime import datetime from io import BytesIO from tarfile import ReadError -from typing import BinaryIO, Iterator, Optional, TextIO, Union +from typing import BinaryIO, Iterator, TextIO from dissect.util import cpio from dissect.util.compression import xz @@ -73,10 +73,11 @@ def _load_config(self) -> dict: return config @classmethod - def detect(cls, target: Target) -> Optional[Filesystem]: + def detect(cls, target: Target) -> Filesystem | None: for fs in target.filesystems: - # Tested on FortiGate and FortiAnalyzer, other Fortinet devices may look different. - if fs.exists("/rootfs.gz") and (fs.exists("/.fgtsum") or fs.exists("/.fmg_sign") or fs.exists("/flatkc")): + # Tested on FortiGate, FortiAnalyzer and FortiManager. + # Other Fortinet devices may look different. + if fs.exists("/rootfs.gz") and (any(map(fs.exists, (".fgtsum", ".fmg_sign", "flatkc", "system.conf")))): return fs @classmethod @@ -212,7 +213,7 @@ def version(self) -> str: return "FortiOS Unknown" @export(record=FortiOSUserRecord) - def users(self) -> Iterator[Union[FortiOSUserRecord, UnixUserRecord]]: + def users(self) -> Iterator[FortiOSUserRecord | UnixUserRecord]: """Return local users of the FortiOS system.""" # Possible unix-like users @@ -224,7 +225,7 @@ def users(self) -> Iterator[Union[FortiOSUserRecord, UnixUserRecord]]: yield FortiOSUserRecord( name=username, password=":".join(entry.get("password", [])), - groups=[entry["accprofile"][0]], + groups=list(entry.get("accprofile", [])), home="/root", _target=self.target, ) @@ -233,69 +234,79 @@ def users(self) -> Iterator[Union[FortiOSUserRecord, UnixUserRecord]]: self.target.log.debug("", exc_info=e) # FortiManager administrative users - try: - for username, entry in self._config["global-config"]["system"]["admin"]["user"].items(): - yield FortiOSUserRecord( - name=username, - password=":".join(entry.get("password", [])), - groups=[entry["profileid"][0]], - home="/root", - _target=self.target, - ) - except KeyError as e: - self.target.log.warning("Exception while parsing FortiManager admin users") - self.target.log.debug("", exc_info=e) - - # Local users - try: - local_groups = local_groups_to_users(self._config["root-config"]["user"]["group"]) - for username, entry in self._config["root-config"]["user"].get("local", {}).items(): - try: - password = decrypt_password(entry["passwd"][-1]) - except (ValueError, RuntimeError): - password = ":".join(entry.get("passwd", [])) - - yield FortiOSUserRecord( - name=username, - password=password, - groups=local_groups.get(username, []), - home=None, - _target=self.target, - ) - except KeyError as e: - self.target.log.warning("Exception while parsing FortiOS local users") - self.target.log.debug("", exc_info=e) - - # Temporary guest users - try: - for _, entry in self._config["root-config"]["user"]["group"].get("guestgroup", {}).get("guest", {}).items(): - try: - password = decrypt_password(entry.get("password")[-1]) - except (ValueError, RuntimeError): - password = ":".join(entry.get("password")) + if self._config.get("global-config", {}).get("system", {}).get("admin", {}).get("user"): + try: + for username, entry in self._config["global-config"]["system"]["admin"]["user"].items(): + yield FortiOSUserRecord( + name=username, + password=":".join(entry.get("password", [])), + groups=list(entry.get("profileid", [])), + home="/root", + _target=self.target, + ) + except KeyError as e: + self.target.log.warning("Exception while parsing FortiManager admin users") + self.target.log.debug("", exc_info=e) + + if self._config.get("root-config", {}).get("user", {}).get("local"): + # Local users + try: + local_groups = local_groups_to_users(self._config["root-config"]["user"]["group"]) + except KeyError as e: + self.target.log.warning("Unable to get local user groups in root config") + self.target.log.debug("", exc_info=e) + local_groups = {} - yield FortiOSUserRecord( - name=entry["user-id"][0], - password=password, - groups=["guestgroup"], - home=None, - _target=self.target, - ) - except KeyError as e: - self.target.log.warning("Exception while parsing FortiOS temporary guest users") - self.target.log.debug("", exc_info=e) + try: + for username, entry in self._config["root-config"]["user"].get("local", {}).items(): + try: + password = decrypt_password(entry["passwd"][-1]) + except (ValueError, RuntimeError): + password = ":".join(entry.get("passwd", [])) + + yield FortiOSUserRecord( + name=username, + password=password, + groups=local_groups.get(username, []), + home=None, + _target=self.target, + ) + except KeyError as e: + self.target.log.warning("Exception while parsing FortiOS local users") + self.target.log.debug("", exc_info=e) + + if self._config.get("root-config", {}).get("user", {}).get("group", {}).get("guestgroup"): + # Temporary guest users + try: + for _, entry in ( + self._config["root-config"]["user"]["group"].get("guestgroup", {}).get("guest", {}).items() + ): + try: + password = decrypt_password(entry.get("password")[-1]) + except (ValueError, RuntimeError): + password = ":".join(entry.get("password")) + + yield FortiOSUserRecord( + name=entry["user-id"][0], + password=password, + groups=["guestgroup"], + home=None, + _target=self.target, + ) + except KeyError as e: + self.target.log.warning("Exception while parsing FortiOS temporary guest users") + self.target.log.debug("", exc_info=e) @export(property=True) def os(self) -> str: return OperatingSystem.FORTIOS.value @export(property=True) - def architecture(self) -> Optional[str]: + def architecture(self) -> str | None: """Return architecture FortiOS runs on.""" - paths = ["/lib/libav.so", "/bin/ctr"] - for path in paths: - if self.target.fs.path(path).exists(): - return self._get_architecture(path=path) + for path in ["/lib/libav.so", "/bin/ctr", "/bin/grep"]: + if (bin := self.target.fs.path(path)).exists(): + return self._get_architecture(path=bin) class ConfigNode(dict): @@ -528,7 +539,7 @@ def decrypt_rootfs(fh: BinaryIO, key: bytes, iv: bytes) -> BinaryIO: return BytesIO(result) -def _kdf_7_4_x(key_data: Union[str, bytes]) -> tuple[bytes, bytes]: +def _kdf_7_4_x(key_data: str | bytes) -> tuple[bytes, bytes]: """Derive 32 byte key and 16 byte IV from 32 byte seed. As the IV needs to be 16 bytes, we return the first 16 bytes of the sha256 hash. @@ -542,7 +553,7 @@ def _kdf_7_4_x(key_data: Union[str, bytes]) -> tuple[bytes, bytes]: return key, iv -def get_kernel_hash(sysvol: Filesystem) -> Optional[str]: +def get_kernel_hash(sysvol: Filesystem) -> str | None: """Return the SHA256 hash of the (compressed) kernel.""" kernel_files = ["flatkc", "vmlinuz", "vmlinux"] for k in kernel_files: diff --git a/dissect/target/plugins/os/unix/log/auth.py b/dissect/target/plugins/os/unix/log/auth.py index 6c2a0660c..4be43260e 100644 --- a/dissect/target/plugins/os/unix/log/auth.py +++ b/dissect/target/plugins/os/unix/log/auth.py @@ -1,62 +1,373 @@ +from __future__ import annotations + +import itertools +import logging import re +from abc import ABC, abstractmethod +from datetime import datetime +from functools import lru_cache from itertools import chain -from typing import Iterator +from pathlib import Path +from typing import Any, Iterator +from dissect.target import Target from dissect.target.exceptions import UnsupportedPluginError -from dissect.target.helpers.record import TargetRecordDescriptor +from dissect.target.helpers.fsutil import open_decompress +from dissect.target.helpers.record import DynamicDescriptor, TargetRecordDescriptor from dissect.target.helpers.utils import year_rollover_helper -from dissect.target.plugin import Plugin, export - -AuthLogRecord = TargetRecordDescriptor( - "linux/log/auth", - [ - ("datetime", "ts"), - ("string", "message"), - ("path", "source"), - ], +from dissect.target.plugin import Plugin, alias, export + +log = logging.getLogger(__name__) + +RE_TS = re.compile(r"^[A-Za-z]{3}\s*\d{1,2}\s\d{1,2}:\d{2}:\d{2}") +RE_TS_ISO = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+\d{2}:\d{2}") +RE_LINE = re.compile( + r""" + \d{2}:\d{2}\s # First match on the similar ending of the different timestamps + (?P\S+)\s # The hostname + (?P\S+?)(\[(?P\d+)\])?: # The service with optionally the PID between brackets + \s*(?P.+?)\s*$ # The log message stripped from spaces left and right + """, + re.VERBOSE, +) + +# Generic regular expressions +RE_IPV4_ADDRESS = re.compile( + r""" + ((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3} # First three octets + (25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?) # Last octet + """, + re.VERBOSE, ) +RE_USER = re.compile(r"for ([^\s]+)") + + +class BaseService(ABC): + @classmethod + @abstractmethod + def parse(cls, message: str) -> dict[str, any]: + pass + + +class SudoService(BaseService): + """Parsing of sudo service messages in the auth log.""" + + RE_SUDO_COMMAND = re.compile( + r""" + TTY=(?P\w+\/\w+)\s;\s # The TTY -> TTY=pts/0 ; + PWD=(?P[\/\w]+)\s;\s # The current working directory -> PWD="/home/user" ; + USER=(?P\w+)\s;\s # The effective user -> USER=root ; + COMMAND=(?P.+)$ # The command -> COMMAND=/usr/bin/whoami + """, + re.VERBOSE, + ) + + @classmethod + def parse(cls, message: str) -> dict[str, str]: + """Parse auth log message from sudo.""" + if not (match := cls.RE_SUDO_COMMAND.search(message)): + return {} + + additional_fields = {} + for key, value in match.groupdict().items(): + additional_fields[key] = value + + return additional_fields + + +class SshdService(BaseService): + """Class for parsing sshd messages in the auth log.""" + + RE_SSHD_PORTREGEX = re.compile(r"port\s(\d+)") + RE_USER = re.compile(r"for\s([^\s]+)") + + @classmethod + def parse(cls, message: str) -> dict[str, str | int]: + """Parse message from sshd""" + additional_fields = {} + if ip_address := RE_IPV4_ADDRESS.search(message): + field_name = "host_ip" if "listening" in message else "remote_ip" + additional_fields[field_name] = ip_address.group(0) + if port := cls.RE_SSHD_PORTREGEX.search(message): + additional_fields["port"] = int(port.group(1)) + if user := cls.RE_USER.search(message): + additional_fields["user"] = user.group(1) + # Accepted publickey for test_user from 8.8.8.8 IP port 12345 ssh2: RSA SHA256:123456789asdfghjklertzuio + if "Accepted publickey" in message: + ssh_protocol, encryption_algo, key_info = message.split()[-3:] + hash_algo, key_hash = key_info.split(":") + additional_fields["ssh_protocol"] = ssh_protocol.strip(":") + additional_fields["encryption_algorithm"] = encryption_algo + additional_fields["hash_algorithm"] = hash_algo + additional_fields["key_hash"] = key_hash + if (failed := "Failed" in message) or "Accepted" in message: + action_type = "failed" if failed else "accepted" + additional_fields["action"] = f"{action_type} authentication" + additional_fields["authentication_type"] = "password" if "password" in message else "publickey" + + return additional_fields + + +class SystemdLogindService(BaseService): + """Class for parsing systemd-logind messages in the auth log.""" + + RE_SYSTEMD_LOGIND_WATCHING = re.compile( + r""" + (?PWatching\ssystem\sbuttons)\s # Action is "Watching system buttons" + on\s(?P[^\s]+)\s # The device the button is related to -> /dev/input/event0 + \((?P.*?)\) # The device (button) name -> (Power button) + """, + re.VERBOSE, + ) + + @classmethod + def parse(cls, message: str): + """Parse auth log message from systemd-logind.""" + additional_fields = {} + # Example: Nov 14 07:14:09 ubuntu-1 systemd-logind[4]: Removed session 4. + if "Removed" in message: + additional_fields["action"] = "removed session" + additional_fields["session"] = message.split()[-1].strip(".") + elif "Watching" in message and (match := cls.RE_SYSTEMD_LOGIND_WATCHING.search(message)): + for key, value in match.groupdict().items(): + additional_fields[key] = value + # Example: New session 4 of user sampleuser. + elif "New session" in message: + parts = message.removeprefix("New session ").split() + additional_fields["action"] = "new session" + additional_fields["session"] = parts[0] + additional_fields["user"] = parts[-1].strip(".") + # Example: Session 4 logged out. Waiting for processes to exit. + elif "logged out" in message: + session = message.removeprefix("Session ").split(maxsplit=1)[0] + additional_fields["action"] = "logged out session" + additional_fields["session"] = session + # Example: New seat seat0. + elif "New seat" in message: + seat = message.split()[-1].strip(".") + additional_fields["action"] = "new seat" + additional_fields["seat"] = seat + + return additional_fields + + +class SuService(BaseService): + """Class for parsing su messages in the auth log.""" + + RE_SU_BY = re.compile(r"by\s([^\s]+)") + RE_SU_ON = re.compile(r"on\s([^\s]+)") + RE_SU_COMMAND = re.compile(r"'(.*?)'") + + @classmethod + def parse(cls, message: str) -> dict[str, str]: + additional_fields = {} + if user := RE_USER.search(message): + additional_fields["user"] = user.group(1) + if by := cls.RE_SU_BY.search(message): + additional_fields["by"] = by.group(1) + if on := cls.RE_SU_ON.search(message): + additional_fields["device"] = on.group(1) + if command := cls.RE_SU_COMMAND.search(message): + additional_fields["command"] = command.group(1) + if (failed := "failed" in message) or "Successful" in message: + additional_fields["su_result"] = "failed" if failed else "success" + + return additional_fields + + +class PkexecService(BaseService): + """Class for parsing pkexec messages in the auth log.""" + + RE_PKEXEC_COMMAND = re.compile( + r""" + (?P\S+?):\sExecuting\scommand\s # Starts with actual user -> user: + \[USER=(?P[^\]]+)\]\s # The impersonated user -> [USER=root] + \[TTY=(?P[^\]]+)\]\s # The tty -> [TTY=unknown] + \[CWD=(?P[^\]]+)\]\s # Current working directory -> [CWD=/home/user] + \[COMMAND=(?P[^\]]+)\] # Command -> [COMMAND=/usr/lib/example] + """, + re.VERBOSE, + ) + + @classmethod + def parse(cls, message: str) -> dict[str, str]: + """Parse auth log message from pkexec""" + additional_fields = {} + if exec_cmd := cls.RE_PKEXEC_COMMAND.search(message): + additional_fields["action"] = "executing command" + for key, value in exec_cmd.groupdict().items(): + if value and value.isdigit(): + value = int(value) + additional_fields[key] = value -_TS_REGEX = r"^[A-Za-z]{3}\s*[0-9]{1,2}\s[0-9]{1,2}:[0-9]{2}:[0-9]{2}" -RE_TS = re.compile(_TS_REGEX) -RE_TS_AND_HOSTNAME = re.compile(_TS_REGEX + r"\s\S+\s") + return additional_fields + + +class PamUnixService(BaseService): + RE_PAM_UNIX = re.compile( + r""" + pam_unix\([^\s]+:session\):\s(?Psession\s\w+)\s # Session action, usually opened or closed + for\suser\s(?P[^\s\(]+)(?:\(uid=(?P\d+)\))? # User may contain uid like: root(uid=0) + (?:\sby\s\(uid=(?P\d+)\))?$ # Opened action also contains by + """, + re.VERBOSE, + ) + + @classmethod + def parse(cls, message): + """Parse auth log message from pluggable authentication modules (PAM).""" + if not (match := cls.RE_PAM_UNIX.search(message)): + return {} + + additional_fields = {} + for key, value in match.groupdict().items(): + if value and value.isdigit(): + value = int(value) + additional_fields[key] = value + + return additional_fields + + +class AuthLogRecordBuilder: + """Class for dynamically creating auth log records.""" + + RECORD_NAME = "linux/log/auth" + SERVICES: dict[str, BaseService] = { + "su": SuService, + "sudo": SudoService, + "sshd": SshdService, + "systemd-logind": SystemdLogindService, + "pkexec": PkexecService, + } + + def __init__(self, target: Target): + self._create_event_descriptor = lru_cache(4096)(self._create_event_descriptor) + self.target = target + + def _parse_additional_fields(self, service: str | None, message: str) -> dict[str, Any]: + """Parse additional fields in the message based on the service.""" + if "pam_unix(" in message: + return PamUnixService.parse(message) + + if service not in self.SERVICES: + self.target.log.debug("Service %s is not recognised, no additional fields could be parsed", service) + return {} + + try: + return self.SERVICES[service].parse(message) + except Exception as e: + self.target.log.warning("Parsing additional fields in message '%s' for service %s failed", message, service) + self.target.log.debug("", exc_info=e) + raise e + + def build_record(self, ts: datetime, source: Path, line: str) -> TargetRecordDescriptor: + """Builds an ``AuthLog`` event record.""" + + record_fields = [ + ("datetime", "ts"), + ("path", "source"), + ("string", "service"), + ("varint", "pid"), + ("string", "message"), + ] + + record_values = { + "ts": ts, + "message": line, + "service": None, + "pid": None, + "source": source, + "_target": self.target, + } + + match = RE_LINE.search(line) + if match: + record_values.update(match.groupdict()) + + for key, value in self._parse_additional_fields(record_values["service"], line).items(): + record_type = "string" + if isinstance(value, int): + record_type = "varint" + + record_fields.append((record_type, key)) + record_values[key] = value + + # tuple conversion here is needed for lru_cache + desc = self._create_event_descriptor(tuple(record_fields)) + return desc(**record_values) + + def _create_event_descriptor(self, record_fields) -> TargetRecordDescriptor: + return TargetRecordDescriptor(self.RECORD_NAME, record_fields) class AuthPlugin(Plugin): - """Unix auth log plugin.""" + """Unix authentication log plugin.""" + + def __init__(self, target: Target): + super().__init__(target) + self._auth_log_builder = AuthLogRecordBuilder(target) def check_compatible(self) -> None: var_log = self.target.fs.path("/var/log") if not any(var_log.glob("auth.log*")) and not any(var_log.glob("secure*")): raise UnsupportedPluginError("No auth log files found") - @export(record=AuthLogRecord) - def securelog(self) -> Iterator[AuthLogRecord]: - """Return contents of /var/log/auth.log* and /var/log/secure*.""" - return self.authlog() + @alias("securelog") + @export(record=DynamicDescriptor(["datetime", "path", "string"])) + def authlog(self) -> Iterator[Any]: + """Yield contents of ``/var/log/auth.log*`` and ``/var/log/secure*`` files. + + Order of returned events is not guaranteed to be chronological because of year + rollover detection efforts for log files without a year in the timestamp. + + The following timestamp formats are recognised automatically. This plugin + assumes that no custom ``date_format`` template is set in ``syslog-ng`` or ``systemd`` + configuration (defaults to ``M d H:M:S``). + + ISO formatted authlog entries are parsed as can be found in Ubuntu 24.04 and later. - @export(record=AuthLogRecord) - def authlog(self) -> Iterator[AuthLogRecord]: - """Return contents of /var/log/auth.log* and /var/log/secure*.""" + .. code-block:: text - # Assuming no custom date_format template is set in syslog-ng or systemd (M d H:M:S) - # CentOS format: Jan 12 13:37:00 hostname daemon: message - # Debian format: Jan 12 13:37:00 hostname daemon[pid]: pam_unix(daemon:session): message + CentOS format: Jan 12 13:37:00 hostname daemon: message + Debian format: Jan 12 13:37:00 hostname daemon[pid]: pam_unix(daemon:session): message + Ubuntu 24.04: 2024-01-12T13:37:00.000000+02:00 hostname daemon[pid]: pam_unix(daemon:session): message + + Resources: + - https://help.ubuntu.com/community/LinuxLogFiles + """ tzinfo = self.target.datetime.tzinfo var_log = self.target.fs.path("/var/log") for auth_file in chain(var_log.glob("auth.log*"), var_log.glob("secure*")): - for ts, line in year_rollover_helper(auth_file, RE_TS, "%b %d %H:%M:%S", tzinfo): - ts_and_hostname = re.search(RE_TS_AND_HOSTNAME, line) - if not ts_and_hostname: - self.target.log.warning("No timstamp and hostname found on one of the lines in %s.", auth_file) - self.target.log.debug("Skipping this line: %s", line) - continue - - message = line.replace(ts_and_hostname.group(0), "").strip() - yield AuthLogRecord( - ts=ts, - message=message, - source=auth_file, - _target=self.target, - ) + if is_iso_fmt(auth_file): + iterable = iso_readlines(auth_file) + else: + iterable = year_rollover_helper(auth_file, RE_TS, "%b %d %H:%M:%S", tzinfo) + + for ts, line in iterable: + yield self._auth_log_builder.build_record(ts, auth_file, line) + + +def iso_readlines(file: Path) -> Iterator[tuple[datetime, str]]: + """Iterator reading the provided auth log file in ISO format. Mimics ``year_rollover_helper`` behaviour.""" + with open_decompress(file, "rt") as fh: + for line in fh: + if not (match := RE_TS_ISO.match(line)): + log.warning("No timestamp found in one of the lines in %s!", file) + log.debug("Skipping line: %s", line) + continue + + try: + ts = datetime.strptime(match[0], "%Y-%m-%dT%H:%M:%S.%f%z") + except ValueError as e: + log.warning("Unable to parse ISO timestamp in line: %s", line) + log.debug("", exc_info=e) + continue + + yield ts, line + + +def is_iso_fmt(file: Path) -> bool: + """Determine if the provided auth log file uses new ISO format logging or not.""" + return any(itertools.islice(iso_readlines(file), 0, 2)) diff --git a/dissect/target/plugins/os/unix/log/journal.py b/dissect/target/plugins/os/unix/log/journal.py index 278b35026..1c6373938 100644 --- a/dissect/target/plugins/os/unix/log/journal.py +++ b/dissect/target/plugins/os/unix/log/journal.py @@ -277,6 +277,11 @@ def get_optional(value: str, to_type: Callable) -> Any | None: return None +# Sometimes stringy None is inserted by external tools like Ansible +def int_or_none(value: str) -> int | None: + return int(value) if value and value != "None" else None + + class JournalFile: """Parse Systemd Journal file format. @@ -427,30 +432,30 @@ def journal(self) -> Iterator[JournalRecord]: ts=entry.get("ts"), message=entry.get("message"), message_id=entry.get("message_id"), - priority=get_optional(entry.get("priority"), int), + priority=int_or_none(entry.get("priority")), code_file=get_optional(entry.get("code_file"), path_function), - code_line=get_optional(entry.get("code_line"), int), + code_line=int_or_none(entry.get("code_line")), code_func=entry.get("code_func"), - errno=get_optional(entry.get("errno"), int), + errno=int_or_none(entry.get("errno")), invocation_id=entry.get("invocation_id"), user_invocation_id=entry.get("user_invocation_id"), syslog_facility=entry.get("syslog_facility"), syslog_identifier=entry.get("syslog_identifier"), - syslog_pid=get_optional(entry.get("syslog_pid"), int), + syslog_pid=int_or_none(entry.get("syslog_pid")), syslog_raw=entry.get("syslog_raw"), documentation=entry.get("documentation"), - tid=get_optional(entry.get("tid"), int), + tid=int_or_none(entry.get("tid")), unit=entry.get("unit"), user_unit=entry.get("user_unit"), - pid=get_optional(entry.get("pid"), int), - uid=get_optional(entry.get("uid"), int), - gid=get_optional(entry.get("gid"), int), + pid=int_or_none(entry.get("pid")), + uid=int_or_none(entry.get("uid")), + gid=int_or_none(entry.get("gid")), comm=entry.get("comm"), exe=get_optional(entry.get("exe"), path_function), cmdline=entry.get("cmdline"), cap_effective=entry.get("cap_effective"), - audit_session=get_optional(entry.get("audit_session"), int), - audit_loginuid=get_optional(entry.get("audit_loginuid"), int), + audit_session=int_or_none(entry.get("audit_session")), + audit_loginuid=int_or_none(entry.get("audit_loginuid")), systemd_cgroup=get_optional(entry.get("systemd_cgroup"), path_function), systemd_slice=entry.get("systemd_slice"), systemd_unit=entry.get("systemd_unit"), diff --git a/dissect/target/plugins/os/unix/log/utmp.py b/dissect/target/plugins/os/unix/log/utmp.py index 684aa5e51..4298c514d 100644 --- a/dissect/target/plugins/os/unix/log/utmp.py +++ b/dissect/target/plugins/os/unix/log/utmp.py @@ -1,17 +1,17 @@ -import gzip +from __future__ import annotations + import ipaddress import struct from collections import namedtuple from typing import Iterator from dissect.cstruct import cstruct -from dissect.util.stream import BufferedStream from dissect.util.ts import from_unix from dissect.target.exceptions import UnsupportedPluginError -from dissect.target.helpers.fsutil import TargetPath +from dissect.target.helpers.fsutil import TargetPath, open_decompress from dissect.target.helpers.record import TargetRecordDescriptor -from dissect.target.plugin import OperatingSystem, Plugin, export +from dissect.target.plugin import Plugin, alias, export from dissect.target.target import Target UTMP_FIELDS = [ @@ -27,16 +27,12 @@ BtmpRecord = TargetRecordDescriptor( "linux/log/btmp", - [ - *UTMP_FIELDS, - ], + UTMP_FIELDS, ) WtmpRecord = TargetRecordDescriptor( "linux/log/wtmp", - [ - *UTMP_FIELDS, - ], + UTMP_FIELDS, ) utmp_def = """ @@ -104,24 +100,13 @@ class UtmpFile: """utmp maintains a full accounting of the current status of the system""" - def __init__(self, target: Target, path: TargetPath): - self.fh = target.fs.path(path).open() - - if "gz" in path: - self.compressed = True - else: - self.compressed = False + def __init__(self, path: TargetPath): + self.fh = open_decompress(path, "rb") def __iter__(self): - if self.compressed: - gzip_entry = BufferedStream(gzip.open(self.fh, mode="rb")) - byte_stream = gzip_entry - else: - byte_stream = self.fh - while True: try: - entry = c_utmp.entry(byte_stream) + entry = c_utmp.entry(self.fh) r_type = "" if entry.ut_type in c_utmp.Type: @@ -151,7 +136,7 @@ def __iter__(self): # ut_addr_v6 is parsed as IPv4 address. This could not lead to incorrect results. ut_addr = ipaddress.ip_address(struct.pack(" None: - if not self.target.os == OperatingSystem.LINUX and not any( - [ - list(self.target.fs.glob(self.BTMP_GLOB)), - list(self.target.fs.glob(self.WTMP_GLOB)), - ] - ): - raise UnsupportedPluginError("No WTMP or BTMP log files found") + if not any(self.btmp_paths + self.wtmp_paths + self.utmp_paths): + raise UnsupportedPluginError("No wtmp and/or btmp log files found") + + def _build_record(self, record: TargetRecordDescriptor, entry: UTMP_ENTRY) -> Iterator[BtmpRecord | WtmpRecord]: + return record( + ts=entry.ts, + ut_type=entry.ut_type, + ut_pid=entry.ut_pid, + ut_user=entry.ut_user, + ut_line=entry.ut_line, + ut_id=entry.ut_id, + ut_host=entry.ut_host, + ut_addr=entry.ut_addr, + _target=self.target, + ) @export(record=BtmpRecord) def btmp(self) -> Iterator[BtmpRecord]: @@ -192,26 +187,18 @@ def btmp(self) -> Iterator[BtmpRecord]: - https://en.wikipedia.org/wiki/Utmp - https://www.thegeekdiary.com/what-is-the-purpose-of-utmp-wtmp-and-btmp-files-in-linux/ """ - btmp_paths = self.target.fs.glob(self.BTMP_GLOB) - for btmp_path in btmp_paths: - btmp = UtmpFile(self.target, btmp_path) - - for entry in btmp: - yield BtmpRecord( - ts=entry.ts, - ut_type=entry.ut_type, - ut_pid=entry.ut_pid, - ut_user=entry.ut_user, - ut_line=entry.ut_line, - ut_id=entry.ut_id, - ut_host=entry.ut_host, - ut_addr=entry.ut_addr, - _target=self.target, - ) + for path in self.btmp_paths: + if not path.is_file(): + self.target.log.warning("Unable to parse btmp file: %s is not a file", path) + continue + for entry in UtmpFile(path): + yield self._build_record(BtmpRecord, entry) + + @alias("utmp") @export(record=WtmpRecord) def wtmp(self) -> Iterator[WtmpRecord]: - """Return the content of the wtmp log files. + """Yield contents of wtmp log files. The wtmp file contains the historical data of the utmp file. The utmp file contains information about users logins at which terminals, logouts, system events and current status of the system, system boot time @@ -220,19 +207,11 @@ def wtmp(self) -> Iterator[WtmpRecord]: References: - https://www.thegeekdiary.com/what-is-the-purpose-of-utmp-wtmp-and-btmp-files-in-linux/ """ - wtmp_paths = self.target.fs.glob(self.WTMP_GLOB) - for wtmp_path in wtmp_paths: - wtmp = UtmpFile(self.target, wtmp_path) - - for entry in wtmp: - yield WtmpRecord( - ts=entry.ts, - ut_type=entry.ut_type, - ut_pid=entry.ut_pid, - ut_user=entry.ut_user, - ut_line=entry.ut_line, - ut_id=entry.ut_id, - ut_host=entry.ut_host, - ut_addr=entry.ut_addr, - _target=self.target, - ) + + for path in self.wtmp_paths + self.utmp_paths: + if not path.is_file(): + self.target.log.warning("Unable to parse wtmp file: %s is not a file", path) + continue + + for entry in UtmpFile(path): + yield self._build_record(WtmpRecord, entry) diff --git a/dissect/target/plugins/os/windows/_os.py b/dissect/target/plugins/os/windows/_os.py index 97197d4b2..1e99f6e8b 100644 --- a/dissect/target/plugins/os/windows/_os.py +++ b/dissect/target/plugins/os/windows/_os.py @@ -12,6 +12,14 @@ from dissect.target.plugin import OperatingSystem, OSPlugin, export from dissect.target.target import Target +ARCH_MAP = { + "x86": 32, + "IA64": 64, + "ARM64": 64, + "EM64T": 64, + "AMD64": 64, +} + class WindowsPlugin(OSPlugin): CURRENT_VERSION_KEY = "HKLM\\Software\\Microsoft\\Windows NT\\CurrentVersion" @@ -101,7 +109,7 @@ def hostname(self) -> str | None: @export(property=True) def ips(self) -> list[str]: - return self.target.network.ips() + return list(set(map(str, self.target.network.ips()))) def _get_version_reg_value(self, value_name: str) -> Any: try: @@ -265,19 +273,11 @@ def architecture(self) -> str | None: Dict: arch: architecture, bitness: bits """ - arch_strings = { - "x86": 32, - "IA64": 64, - "ARM64": 64, - "EM64T": 64, - "AMD64": 64, - } - key = "HKLM\\SYSTEM\\CurrentControlSet\\Control\\Session Manager\\Environment" try: arch = self.target.registry.key(key).value("PROCESSOR_ARCHITECTURE").value - bits = arch_strings.get(arch) + bits = ARCH_MAP.get(arch) # return {"arch": arch, "bitness": bits} if bits == 64: diff --git a/dissect/target/plugins/os/windows/generic.py b/dissect/target/plugins/os/windows/generic.py index 3083a0176..f72695fe2 100644 --- a/dissect/target/plugins/os/windows/generic.py +++ b/dissect/target/plugins/os/windows/generic.py @@ -1,8 +1,10 @@ from __future__ import annotations +import struct from datetime import datetime from typing import Iterator +from dissect.util.sid import read_sid from dissect.util.ts import from_unix from dissect.target.exceptions import RegistryError, UnsupportedPluginError @@ -10,7 +12,10 @@ RegistryRecordDescriptorExtension, UserRecordDescriptorExtension, ) -from dissect.target.helpers.record import create_extended_descriptor +from dissect.target.helpers.record import ( + TargetRecordDescriptor, + create_extended_descriptor, +) from dissect.target.plugin import Plugin, export UserRegistryRecordDescriptor = create_extended_descriptor( @@ -113,6 +118,15 @@ ], ) +ComputerSidRecord = TargetRecordDescriptor( + "windows/sid/computer", + [ + ("datetime", "ts"), + ("string", "sidtype"), + ("string", "sid"), + ], +) + class GenericPlugin(Plugin): """Generic Windows plugin. @@ -573,3 +587,36 @@ def codepage(self) -> str | None: return self.target.registry.key(key).value("ACP").value except RegistryError: pass + + @export(record=ComputerSidRecord) + def sid(self) -> Iterator[ComputerSidRecord]: + """Return the machine- and optional domain SID of the system.""" + + try: + key = self.target.registry.key("HKLM\\SAM\\SAM\\Domains\\Account") + + # The machine SID is stored in the last 12 bytes of the V value as little-endian + # The machine SID differs from a 'normal' binary SID as only holds 3 values and lacks a prefix / Revision + # NOTE: Consider moving this to dissect.util.sid if we encounter this more often + sid = struct.unpack_from(" None: + if not self.instances: + raise UnsupportedPluginError("System does not seem to be running SQL Server") + + @export(record=MssqlErrorlogRecord) + def errorlog(self) -> Iterator[MssqlErrorlogRecord]: + """Return all Microsoft SQL Server ERRORLOG messages. + + These log files contain information such as: + - Logon failures + - Enabling/disabling of features, such as xp_cmdshell + + Yields MssqlErrorlogRecord instances with fields: + + .. code-block:: text + + ts (datetime): Timestamp of the log line. + instance (str): SQL Server instance name. + process (str): Process name. + message (str): Log message. + path (Path): Path to the log file. + + References: + - https://learn.microsoft.com/en-us/sql/relational-databases/logs/view-offline-log-files + """ + + for instance, log_path in self.instances: + for errorlog in log_path.glob(self.FILE_GLOB): + # The errorlog includes a BOM, so endianess gets determined automatically + fh = errorlog.open(mode="rt", encoding="utf-16", errors="surrogateescape") + buf = "" + + for line in fh: + if ts := RE_TIMESTAMP_PATTERN.match(line): + yield MssqlErrorlogRecord( + ts=datetime.strptime(ts.group(), "%Y-%m-%d %H:%M:%S.%f").replace(tzinfo=timezone.utc), + instance=instance, + # The process name is a fixed-width field and is always 12 characters long. + process=buf[23:35].strip(), + message=buf[35:].strip(), + path=errorlog, + _target=self.target, + ) + buf = "" + + buf += line + + def _find_instances(self) -> list[str, TargetPath]: + instances = [] + + for subkey in self.target.registry.key(self.MSSQL_KEY).subkeys(): + if subkey.name.startswith("MSSQL") and "." in subkey.name: + instances.append( + ( + subkey.name, + self.target.fs.path(subkey.subkey("SQLServerAgent").value("ErrorLogFile").value).parent, + ) + ) + return instances diff --git a/dissect/target/plugins/os/windows/network.py b/dissect/target/plugins/os/windows/network.py index 1a07037d1..d29d00a39 100644 --- a/dissect/target/plugins/os/windows/network.py +++ b/dissect/target/plugins/os/windows/network.py @@ -1,6 +1,7 @@ from __future__ import annotations from enum import IntEnum +from functools import lru_cache from typing import Iterator from dissect.util.ts import wintimestamp @@ -12,6 +13,7 @@ from dissect.target.helpers.record import WindowsInterfaceRecord from dissect.target.helpers.regutil import RegistryKey from dissect.target.plugins.general.network import NetworkPlugin +from dissect.target.target import Target class IfTypes(IntEnum): @@ -222,15 +224,32 @@ def _try_value(subkey: RegistryKey, value: str) -> str | list | None: return None +def _get_config_value(key: RegistryKey, name: str) -> set: + value = _try_value(key, name) + if not value or value in ("", "0.0.0.0", None, [], ["0.0.0.0"]): + return set() + + if isinstance(value, list): + return set(value) + + return {value} + + class WindowsNetworkPlugin(NetworkPlugin): """Windows network interface plugin.""" + def __init__(self, target: Target): + super().__init__(target) + self._extract_network_device_config = lru_cache(128)(self._extract_network_device_config) + def _interfaces(self) -> Iterator[WindowsInterfaceRecord]: + """Yields found Windows interfaces used by :meth:`NetworkPlugin.interfaces() `.""" # noqa: E501 + # Get all the network interfaces - for keys in self.target.registry.keys( + for key in self.target.registry.keys( "HKLM\\SYSTEM\\CurrentControlSet\\Control\\Class\\{4d36e972-e325-11ce-bfc1-08002be10318}" ): - for subkey in keys.subkeys(): + for subkey in key.subkeys(): device_info = {} if (net_cfg_instance_id := _try_value(subkey, "NetCfgInstanceId")) is None: @@ -238,25 +257,28 @@ def _interfaces(self) -> Iterator[WindowsInterfaceRecord]: continue # Extract the network device configuration for given interface id - config = self._extract_network_device_config(net_cfg_instance_id) - if config is None or all(not conf for conf in config): - # if no configuration is found or all configurations are empty, skip this network interface + if not (config := self._extract_network_device_config(net_cfg_instance_id)): continue - # Extract the network device name for given interface id - name_key = self.target.registry.key( - f"HKLM\\SYSTEM\\CurrentControlSet\\Control\\Network\\" - f"{{4D36E972-E325-11CE-BFC1-08002BE10318}}\\{net_cfg_instance_id}\\Connection" - ) - if value_name := _try_value(name_key, "Name"): - device_info["name"] = value_name - - # Extract the metric value from the REGISTRY_KEY_INTERFACE key - interface_key = self.target.registry.key( - f"HKLM\\SYSTEM\\CurrentControlSet\\Services\\Tcpip\\Parameters\\Interfaces\\{net_cfg_instance_id}" - ) - if value_metric := _try_value(interface_key, "InterfaceMetric"): - device_info["metric"] = value_metric + # Extract a network device name for given interface id + try: + name_key = self.target.registry.key( + f"HKLM\\SYSTEM\\CurrentControlSet\\Control\\Network\\{{4D36E972-E325-11CE-BFC1-08002BE10318}}\\{net_cfg_instance_id}\\Connection" # noqa: E501 + ) + if value_name := _try_value(name_key, "Name"): + device_info["name"] = value_name + except RegistryKeyNotFoundError: + pass + + # Extract the metric value from the interface registry key + try: + interface_key = self.target.registry.key( + f"HKLM\\SYSTEM\\CurrentControlSet\\Services\\Tcpip\\Parameters\\Interfaces\\{net_cfg_instance_id}" # noqa: E501 + ) + if value_metric := _try_value(interface_key, "InterfaceMetric"): + device_info["metric"] = value_metric + except RegistryKeyNotFoundError: + pass # Extract the rest of the device information device_info["mac"] = _try_value(subkey, "NetworkAddress") @@ -270,96 +292,89 @@ def _interfaces(self) -> Iterator[WindowsInterfaceRecord]: # Yield a record for each non-empty configuration for conf in config: - if conf: - # Create a copy of device_info to avoid overwriting - record_info = device_info.copy() - record_info.update(conf) - yield WindowsInterfaceRecord( - **record_info, - source=f"HKLM\\SYSTEM\\{subkey.path}", - _target=self.target, - ) - - def _extract_network_device_config( - self, interface_id: str - ) -> list[dict[str, str | list], dict[str, str | list]] | None: - dhcp_config = {} - static_config = {} + # If no configuration is found or all configurations are empty, + # skip this network interface. + if not conf or not any( + [ + conf["dns"], + conf["ip"], + conf["gateway"], + conf["subnetmask"], + conf["search_domain"], + ] + ): + continue + + # Create a copy of device_info to avoid overwriting + record_info = device_info.copy() + record_info.update(conf) + yield WindowsInterfaceRecord( + **record_info, + source=f"HKLM\\SYSTEM\\{subkey.path}", + _target=self.target, + ) + + def _extract_network_device_config(self, interface_id: str) -> list[dict[str, set | bool | None]]: + """Extract network device configuration from the given interface_id for all ControlSets on the system.""" + + dhcp_config = { + "gateway": set(), + "ip": set(), + "dns": set(), + "subnetmask": set(), + "search_domain": set(), + "network": set(), + } + + static_config = { + "ip": set(), + "dns": set(), + "subnetmask": set(), + "search_domain": set(), + "gateway": set(), + "network": set(), + } # Get the registry keys for the given interface id try: - keys = self.target.registry.key( - f"HKLM\\SYSTEM\\CurrentControlSet\\Services\\Tcpip\\Parameters\\Interfaces\\{interface_id}" + keys = list( + self.target.registry.keys( + f"HKLM\\SYSTEM\\CurrentControlSet\\Services\\Tcpip\\Parameters\\Interfaces\\{interface_id}" + ) ) except RegistryKeyNotFoundError: - return None + return [] if not len(keys): - return None - - # Extract DHCP configuration from the registry - dhcp_gateway = _try_value(keys, "DhcpDefaultGateway") - if dhcp_gateway not in ["", "0.0.0.0", None, []]: - dhcp_config["gateway"] = dhcp_gateway - - dhcp_ip = _try_value(keys, "DhcpIPAddress") - if dhcp_ip not in ["", "0.0.0.0", None]: - dhcp_config["ip"] = [dhcp_ip] - - dhcp_dns = _try_value(keys, "DhcpNameServer") - if dhcp_dns not in ["", "0.0.0.0", None]: - dhcp_config["dns"] = dhcp_dns.split(" ") - - dhcp_subnetmask = _try_value(keys, "DhcpSubnetMask") - if dhcp_subnetmask not in ["", "0.0.0.0", None]: - dhcp_config["subnetmask"] = [dhcp_subnetmask] - - dhcp_domain = _try_value(keys, "DhcpDomain") - if dhcp_domain not in ["", None]: - dhcp_config["search_domain"] = [dhcp_domain] + return [] + + for key in keys: + # Extract DHCP configuration from the registry + dhcp_config["gateway"].update(_get_config_value(key, "DhcpDefaultGateway")) + dhcp_config["ip"].update(_get_config_value(key, "DhcpIPAddress")) + dhcp_config["subnetmask"].update(_get_config_value(key, "DhcpSubnetMask")) + dhcp_config["search_domain"].update(_get_config_value(key, "DhcpDomain")) + dhcp_config["dns"].update(_get_config_value(key, "DhcpNameServer")) + + # Extract static configuration from the registry + static_config["gateway"].update(_get_config_value(key, "DefaultGateway")) + static_config["dns"].update(_get_config_value(key, "NameServer")) + static_config["search_domain"].update(_get_config_value(key, "Domain")) + static_config["ip"].update(_get_config_value(key, "IPAddress")) + static_config["subnetmask"].update(_get_config_value(key, "SubnetMask")) if len(dhcp_config) > 0: - dhcp_enable = _try_value(keys, "EnableDHCP") - dhcp_config["enabled"] = dhcp_enable == 1 + dhcp_config["enabled"] = _try_value(key, "EnableDHCP") == 1 dhcp_config["dhcp"] = True - # Extract static configuration from the registry - static_gateway = _try_value(keys, "DefaultGateway") - if static_gateway not in ["", None, []]: - static_config["gateway"] = static_gateway - - static_ip = _try_value(keys, "IPAddress") - if static_ip not in ["", "0.0.0.0", ["0.0.0.0"], None, []]: - static_config["ip"] = static_ip if isinstance(static_ip, list) else [static_ip] - - static_dns = _try_value(keys, "NameServer") - if static_dns not in ["", "0.0.0.0", None]: - static_config["dns"] = static_dns.split(",") - - static_subnetmask = _try_value(keys, "SubnetMask") - if static_subnetmask not in ["", "0.0.0.0", ["0.0.0.0"], None, []]: - static_config["subnetmask"] = ( - static_subnetmask if isinstance(static_subnetmask, list) else [static_subnetmask] - ) - - static_domain = _try_value(keys, "Domain") - if static_domain not in ["", None]: - static_config["search_domain"] = [static_domain] - if len(static_config) > 0: static_config["enabled"] = None static_config["dhcp"] = False - # Combine ip and subnetmask for extraction - combined_configs = [ - (dhcp_config, dhcp_config.get("ip", []), dhcp_config.get("subnetmask", [])), - (static_config, static_config.get("ip", []), static_config.get("subnetmask", [])), - ] - # Iterate over combined ip/subnet lists - for config, ips, subnet_masks in combined_configs: - for network_address in self.calculate_network(ips, subnet_masks): - config.setdefault("network", []).append(network_address) + for config in (dhcp_config, static_config): + if (ips := config.get("ip")) and (masks := config.get("subnetmask")): + config["network"].update(set(self.calculate_network(ips, masks))) # Return both configurations return [dhcp_config, static_config] diff --git a/dissect/target/plugins/os/windows/syscache.py b/dissect/target/plugins/os/windows/syscache.py index 689be227b..9bc658a53 100644 --- a/dissect/target/plugins/os/windows/syscache.py +++ b/dissect/target/plugins/os/windows/syscache.py @@ -77,7 +77,8 @@ def syscache(self) -> Iterator[SyscacheRecord]: full_path = None if mft: try: - full_path = self.target.fs.path("\\".join(["sysvol", mft.mft(file_segment).fullpath()])) + if path := mft(file_segment).full_path(): + full_path = self.target.fs.path("\\".join(["sysvol", path])) except ntfs.Error: pass diff --git a/dissect/target/plugins/os/windows/wer.py b/dissect/target/plugins/os/windows/wer.py index cf03ab2dd..9b722b11d 100644 --- a/dissect/target/plugins/os/windows/wer.py +++ b/dissect/target/plugins/os/windows/wer.py @@ -4,6 +4,7 @@ from defusedxml import ElementTree from dissect.util.ts import wintimestamp +from flow.record.base import is_valid_field_name from dissect.target.exceptions import UnsupportedPluginError from dissect.target.helpers.fsutil import Path @@ -11,7 +12,9 @@ from dissect.target.plugin import Plugin, export from dissect.target.target import Target -camel_case_patterns = [re.compile(r"(\S)([A-Z][a-z]+)"), re.compile(r"([a-z0-9])([A-Z])"), re.compile(r"(\w)[.\s](\w)")] +CAMEL_CASE_PATTERNS = [re.compile(r"(\S)([A-Z][a-z]+)"), re.compile(r"([a-z0-9])([A-Z])"), re.compile(r"(\w)[.\s](\w)")] +RE_VALID_KEY_START_CHARS = re.compile(r"[a-zA-Z]") +RE_VALID_KEY_CHARS = re.compile(r"[a-zA-Z0-9_]") def _create_record_descriptor(record_name: str, record_fields: list[tuple[str, str]]) -> TargetRecordDescriptor: @@ -49,13 +52,25 @@ def check_compatible(self) -> None: def _sanitize_key(self, key: str) -> str: # Convert camel case to snake case - for pattern in camel_case_patterns: + for pattern in CAMEL_CASE_PATTERNS: key = pattern.sub(r"\1_\2", key) - # Keep only basic characters in key - key = re.sub(r"[^a-zA-Z0-9_]", "", key) - - return key.lower() + clean_key = "" + separator = "_" + prev_encoded = False + for idx, char in enumerate(key): + if prev_encoded: + clean_key += separator + clean_key += char + if not is_valid_field_name(clean_key): + clean_key = clean_key[:-1] + prefix = f"{separator}x" if (idx != 0 and not prev_encoded) else "x" + clean_key += prefix + char.encode("utf-8").hex() + prev_encoded = True + else: + prev_encoded = False + + return clean_key.lower() def _collect_wer_data(self, wer_file: Path) -> tuple[list[tuple[str, str]], dict[str, str]]: """Parse data from a .wer file.""" diff --git a/dissect/target/target.py b/dissect/target/target.py index 9d581e26a..c3a2ab516 100644 --- a/dissect/target/target.py +++ b/dissect/target/target.py @@ -236,7 +236,7 @@ def open(cls, path: Union[str, Path]) -> Target: try: loader_instance = loader_cls(path, parsed_path=parsed_path) except Exception as e: - raise TargetError(f"Failed to initiate {loader_cls.__name__} for target {path}: {e}", cause=e) + raise TargetError(f"Failed to initiate {loader_cls.__name__} for target {path}: {e}") from e return cls._load(path, loader_instance) return cls.open_raw(path) @@ -428,7 +428,7 @@ def _load(cls, path: Union[str, Path], ldr: loader.Loader) -> Target: target.apply() return target except Exception as e: - raise TargetError(f"Failed to load target: {path}", cause=e) + raise TargetError(f"Failed to load target: {path}") from e def _init_os(self) -> None: """Internal function that attemps to load an OSPlugin for this target.""" @@ -541,7 +541,7 @@ def add_plugin( except PluginError: raise except Exception as e: - raise PluginError(f"An exception occurred while trying to initialize a plugin: {plugin_cls}", cause=e) + raise PluginError(f"An exception occurred while trying to initialize a plugin: {plugin_cls}") from e else: p = plugin_cls @@ -556,8 +556,8 @@ def add_plugin( raise except Exception as e: raise UnsupportedPluginError( - f"An exception occurred while checking for plugin compatibility: {plugin_cls}", cause=e - ) + f"An exception occurred while checking for plugin compatibility: {plugin_cls}" + ) from e self._register_plugin_functions(p) @@ -614,9 +614,8 @@ def get_function(self, function: str) -> FunctionTuple: # Just take the last known cause for now raise UnsupportedPluginError( f"Unsupported function `{function}` for target with OS plugin {self._os_plugin}", - cause=causes[0] if causes else None, extra=causes[1:] if len(causes) > 1 else None, - ) + ) from causes[0] if causes else None # We still ended up with no compatible plugins if function not in self._functions: diff --git a/dissect/target/tools/info.py b/dissect/target/tools/info.py index e105d2df3..5b1f466a2 100644 --- a/dissect/target/tools/info.py +++ b/dissect/target/tools/info.py @@ -137,7 +137,7 @@ def print_target_info(target: Target) -> None: continue if isinstance(value, list): - value = ", ".join(value) + value = ", ".join(map(str, value)) if isinstance(value, datetime): value = value.isoformat(timespec="microseconds") diff --git a/dissect/target/tools/query.py b/dissect/target/tools/query.py index b97223874..5ab83560e 100644 --- a/dissect/target/tools/query.py +++ b/dissect/target/tools/query.py @@ -169,31 +169,40 @@ def main(): # Show the list of available plugins for the given optional target and optional # search pattern, only display plugins that can be applied to ANY targets if args.list: - collected_plugins = {} + collected_plugins = [] if targets: for plugin_target in Target.open_all(targets, args.children): funcs, _ = find_plugin_functions(plugin_target, args.list, compatibility=True, show_hidden=True) for func in funcs: - collected_plugins[func.path] = func.plugin_desc + collected_plugins.append(func) else: funcs, _ = find_plugin_functions(Target(), args.list, compatibility=False, show_hidden=True) for func in funcs: - collected_plugins[func.path] = func.plugin_desc + collected_plugins.append(func) - # Display in a user friendly manner target = Target() fparser = generate_argparse_for_bound_method(target.plugins, usage_tmpl=USAGE_FORMAT_TMPL) fargs, rest = fparser.parse_known_args(rest) + # Display in a user friendly manner if collected_plugins: - target.plugins(list(collected_plugins.values())) + if args.json: + print('{"plugins": ', end="") + target.plugins(collected_plugins, as_json=args.json) # No real targets specified, show the available loaders if not targets: fparser = generate_argparse_for_bound_method(target.loaders, usage_tmpl=USAGE_FORMAT_TMPL) fargs, rest = fparser.parse_known_args(rest) - target.loaders(**vars(fargs)) + del fargs.as_json + if args.json: + print(', "loaders": ', end="") + target.loaders(**vars(fargs), as_json=args.json) + + if args.json: + print("}") + parser.exit() if not targets: diff --git a/dissect/target/tools/utils.py b/dissect/target/tools/utils.py index eddb8d5c0..8dcac009a 100644 --- a/dissect/target/tools/utils.py +++ b/dissect/target/tools/utils.py @@ -230,15 +230,15 @@ def get_target_attribute(target: Target, func: PluginFunction) -> Union[Plugin, target.add_plugin(plugin_class) except UnsupportedPluginError as e: raise UnsupportedPluginError( - f"Unsupported function `{func.method_name}` for target with plugin {func.class_object}", cause=e - ) + f"Unsupported function `{func.method_name}` for target with plugin {func.class_object}" + ) from e _, target_attr = plugin_factory(target, plugin_class, func.method_name, func.plugin_desc["namespace"]) return target_attr def plugin_function_with_argparser( - target_attr: Union[Plugin, Callable] + target_attr: Union[Plugin, Callable], ) -> tuple[Optional[Iterator], Optional[argparse.ArgumentParser]]: """Resolves which plugin function to execute, and creates the argument parser for said plugin.""" plugin_method = None diff --git a/dissect/target/volume.py b/dissect/target/volume.py index 93b61f0ac..76dffcaf6 100644 --- a/dissect/target/volume.py +++ b/dissect/target/volume.py @@ -334,7 +334,7 @@ def open(fh: BinaryIO, *args, **kwargs) -> DissectVolumeSystem: try: return disk.DissectVolumeSystem(fh) except Exception as e: - raise VolumeSystemError(f"Failed to load volume system for {fh}", cause=e) + raise VolumeSystemError(f"Failed to load volume system for {fh}") from e finally: fh.seek(offset) @@ -353,7 +353,7 @@ def is_lvm_volume(volume: BinaryIO) -> bool: log.info("Failed to import %s", logical_vs) log.debug("", exc_info=e) except Exception as e: - raise VolumeSystemError(f"Failed to detect logical volume for {volume}", cause=e) + raise VolumeSystemError(f"Failed to detect logical volume for {volume}") from e return False @@ -372,7 +372,7 @@ def is_encrypted(volume: BinaryIO) -> bool: log.info("Failed to import %s", manager) log.debug("", exc_info=e) except Exception as e: - raise VolumeSystemError(f"Failed to detect encrypted volume for {volume}", cause=e) + raise VolumeSystemError(f"Failed to detect encrypted volume for {volume}") from e return False @@ -422,4 +422,4 @@ def open_lvm(volumes: list[BinaryIO], *args, **kwargs) -> Iterator[VolumeSystem] log.info("Failed to import %s", logical_vs) log.debug("", exc_info=e) except Exception as e: - raise VolumeSystemError(f"Failed to load logical volume system for {volumes}", cause=e) + raise VolumeSystemError(f"Failed to load logical volume system for {volumes}") from e diff --git a/pyproject.toml b/pyproject.toml index 6881e0486..787ce9a75 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ "dissect.regf>=3.3,<4", "dissect.util>=3,<4", "dissect.volume>=2,<4", - "flow.record~=3.16.0", + "flow.record~=3.18.0", "structlog", ] dynamic = ["version"] @@ -47,6 +47,7 @@ repository = "https://github.com/fox-it/dissect.target" [project.optional-dependencies] full = [ "asn1crypto", + "dissect.archive>=1,<2", "dissect.btrfs>=1,<2", "dissect.cim>=3,<4", "dissect.clfs>=1,<2", @@ -55,6 +56,7 @@ full = [ "dissect.extfs>=3,<4", "dissect.fat>=3,<4", "dissect.ffs>=3,<4", + "dissect.fve>=4,<5; platform_system != 'Windows' or platform_python_implementation != 'PyPy'", "dissect.jffs>=1,<2", "dissect.ole>=3,<4", "dissect.shellitem>=3,<4", @@ -79,6 +81,7 @@ full = [ ] dev = [ "dissect.target[full,mqtt,yara]", + "dissect.archive[dev]>=1.0.dev,<2.0.dev", "dissect.btrfs[dev]>=1.0.dev,<2.0.dev", "dissect.cim[dev]>=3.0.dev,<4.0.dev", "dissect.clfs[dev]>=1.0.dev,<2.0.dev", @@ -90,6 +93,7 @@ dev = [ "dissect.extfs[dev]>=3.0.dev,<4.0.dev", "dissect.fat[dev]>=3.0.dev,<4.0.dev", "dissect.ffs[dev]>=3.0.dev,<4.0.dev", + "dissect.fve[dev]>=4.0.dev,<5.0.dev; platform_system != 'Windows' or platform_python_implementation != 'PyPy'", "dissect.hypervisor[dev]>=3.0.dev,<4.0.dev", "dissect.jffs[dev]>=1.0.dev,<2.0.dev", "dissect.ntfs[dev]>=3.4.dev,<4.0.dev", @@ -101,7 +105,7 @@ dev = [ "dissect.util>=3.0.dev,<4.0.dev", "dissect.vmfs[dev]>=3.0.dev,<4.0.dev", "dissect.volume[dev]>=3.0.dev,<4.0.dev", - "dissect.xfs[dev]>=3.0.dev,<4.0.dev", + "dissect.xfs[dev]>=3.0.dev,<4.0.dev", ] yara = [ # Grab the dependencies for dissect.target diff --git a/tests/_data/plugins/os/unix/linux/debian/proxmox/_os/config.db b/tests/_data/plugins/os/unix/linux/debian/proxmox/_os/config.db new file mode 100644 index 000000000..793bb301a --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/debian/proxmox/_os/config.db @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e96dcf15741eeab70e3df48ca90b3e8ea2971c5d9916d38f2eba4c7a34536d7f +size 40960 diff --git a/tests/_data/plugins/os/unix/log/auth/iso.log b/tests/_data/plugins/os/unix/log/auth/iso.log new file mode 100644 index 000000000..2881562af --- /dev/null +++ b/tests/_data/plugins/os/unix/log/auth/iso.log @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:40bf6c229952ac458617f51273ed63a6b628877da90582e2e5e81a4d2b323309 +size 1281 diff --git a/tests/_data/plugins/os/windows/log/mssql/errorlog b/tests/_data/plugins/os/windows/log/mssql/errorlog new file mode 100644 index 000000000..8d6db3d75 --- /dev/null +++ b/tests/_data/plugins/os/windows/log/mssql/errorlog @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:8a05c7ed7e81f21d323eb28a8cfeab53060a4e0a0a475127282de1f70bd4d0ba +size 27622 diff --git a/tests/_data/plugins/os/windows/syscache/Syscache-mft.hve b/tests/_data/plugins/os/windows/syscache/Syscache-mft.hve new file mode 100644 index 000000000..223626d19 --- /dev/null +++ b/tests/_data/plugins/os/windows/syscache/Syscache-mft.hve @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e80985a82a3edb23f802148e5eee1a6c462acf5e5a663c80393c9b7ef4a7b11d +size 786432 diff --git a/tests/_data/plugins/os/windows/wer/wer_test.wer b/tests/_data/plugins/os/windows/wer/wer_test.wer index adad7a5d8..40d89b83a 100644 --- a/tests/_data/plugins/os/windows/wer/wer_test.wer +++ b/tests/_data/plugins/os/windows/wer/wer_test.wer @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:4f3847f6971f000fe39e7b25f4dd4d901ad6e7af3cc9abd7839808937d6d3c3b -size 1612 +oid sha256:13ec7c6cefef55ce9ea8924c3d4d16e06974462a8a9bc9d71b2d187c3919f106 +size 1906 diff --git a/tests/conftest.py b/tests/conftest.py index 53b8a295f..8cd7049fd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -227,14 +227,23 @@ def hive_hklm() -> Iterator[VirtualHive]: hive = VirtualHive() # set current control set to ControlSet001 and mock it - controlset_key = "SYSTEM\\ControlSet001" + change_controlset(hive, 1) + + yield hive + + +def change_controlset(hive: VirtualHive, num: int) -> None: + """Update the current control set of the given HKLM hive.""" + + if not isinstance(num, int) or num > 999 or num < 1: + raise ValueError("ControlSet integer must be between 1 and 999") + + controlset_key = f"SYSTEM\\ControlSet{num:>03}" hive.map_key(controlset_key, VirtualKey(hive, controlset_key)) select_key = "SYSTEM\\Select" hive.map_key(select_key, VirtualKey(hive, select_key)) - hive.map_value(select_key, "Current", VirtualValue(hive, "Current", 1)) - - yield hive + hive.map_value(select_key, "Current", VirtualValue(hive, "Current", num)) @pytest.fixture diff --git a/tests/filesystems/test_ffs.py b/tests/filesystems/test_ffs.py new file mode 100644 index 000000000..f22051052 --- /dev/null +++ b/tests/filesystems/test_ffs.py @@ -0,0 +1,67 @@ +from datetime import datetime +from typing import Iterator +from unittest.mock import Mock, patch + +import pytest + +from dissect.target.filesystems.ffs import FfsFilesystem, FfsFilesystemEntry + +NANOSECONDS_IN_SECOND = 1_000_000_000 + + +@pytest.fixture +def ffs_fs() -> Iterator[FfsFilesystem]: + with patch("dissect.ffs.ffs.FFS"): + ffs_fs = FfsFilesystem(Mock()) + ffs_fs.ffs.block_size = 32 * 1024 + yield ffs_fs + + +@pytest.fixture +def ffs_fs_entry(ffs_fs: FfsFilesystem) -> Iterator[FfsFilesystemEntry]: + atime = datetime(2024, 10, 1, 12, 0, 0) + mtime = datetime(2024, 10, 2, 12, 0, 0) + ctime = datetime(2024, 10, 3, 12, 0, 0) + btime = datetime(2024, 10, 4, 12, 0, 0) + + raw_inode = Mock(di_uid=1000, di_nlink=1, di_guid=999, di_size=165002) + inode = Mock( + mode=0o100664, + inum=4, + inode=raw_inode, + nblocks=323, + atime=atime, + atime_ns=atime.timestamp() * NANOSECONDS_IN_SECOND, + mtime=mtime, + mtime_ns=mtime.timestamp() * NANOSECONDS_IN_SECOND, + ctime=ctime, + ctime_ns=ctime.timestamp() * NANOSECONDS_IN_SECOND, + btime=btime, + btime_ns=btime.timestamp() * NANOSECONDS_IN_SECOND, + is_file=lambda: True, + is_dir=lambda: False, + is_symlink=lambda: False, + ) + + entry = FfsFilesystemEntry(ffs_fs, "/some_file", inode) + yield entry + + +def test_jffs2_stat(ffs_fs_entry: FfsFilesystemEntry) -> None: + stat = ffs_fs_entry.stat() + + entry = ffs_fs_entry.entry + assert stat.st_mode == entry.mode + assert stat.st_ino == entry.inum + assert stat.st_dev == id(ffs_fs_entry.fs) + assert stat.st_nlink == entry.inode.di_nlink + assert stat.st_uid == entry.inode.di_uid + assert stat.st_gid == entry.inode.di_gid + assert stat.st_size == entry.inode.di_size + assert stat.st_atime == entry.atime.timestamp() + assert stat.st_mtime == entry.mtime.timestamp() + assert stat.st_ctime == entry.ctime.timestamp() + assert stat.st_birthtime == entry.btime.timestamp() + assert stat.st_birthtime_ns == entry.btime_ns + assert stat.st_blksize == 32 * 1024 + assert stat.st_blocks == 323 diff --git a/tests/plugins/general/test_network.py b/tests/plugins/general/test_network.py index 6e3a1e9b3..a8131235f 100644 --- a/tests/plugins/general/test_network.py +++ b/tests/plugins/general/test_network.py @@ -34,7 +34,7 @@ def test_base_network_plugin(target_bare: Target, network_record: InterfaceRecor assert network.ips() == ["10.42.42.10"] assert network.gateways() == ["10.42.42.1"] assert network.macs() == ["DE:AD:BE:EF:00:00"] - assert network.dns() == ["8.8.8.8", "1.1.1.1"] + assert sorted(list(map(str, network.dns()))) == ["1.1.1.1", "8.8.8.8"] assert len(list(network.in_cidr("10.42.42.0/24"))) == 1 assert len(list(network.in_cidr("10.43.42.0/24"))) == 0 diff --git a/tests/plugins/general/test_plugins.py b/tests/plugins/general/test_plugins.py index 4bd27bcfd..c2745cec1 100644 --- a/tests/plugins/general/test_plugins.py +++ b/tests/plugins/general/test_plugins.py @@ -29,7 +29,7 @@ def test_update_dict(): def test_plugin_description(): description = [x for x in output_plugin_description_recursive(PluginListPlugin, False)] - assert description == ["plugins - Print all registered plugins to stdout. (output: no output)"] + assert description == ["plugins - Print all available plugins. (output: no output)"] def test_plugin_description_compacting(): @@ -39,7 +39,7 @@ def test_plugin_description_compacting(): assert description == [ "hello:", " world:", - " plugins - Print all registered plugins to stdout. (output: no output)", + " plugins - Print all available plugins. (output: no output)", ] @@ -54,9 +54,9 @@ def test_plugin_description_in_dict_multiple(): "hello:", " world:", " data:", - " plugins - Print all registered plugins to stdout. (output: no output)", + " plugins - Print all available plugins. (output: no output)", " data2:", - " plugins - Print all registered plugins to stdout. (output: no output)", + " plugins - Print all available plugins. (output: no output)", ] diff --git a/tests/plugins/os/unix/linux/debian/proxmox/__init__.py b/tests/plugins/os/unix/linux/debian/proxmox/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/plugins/os/unix/linux/debian/proxmox/test__os.py b/tests/plugins/os/unix/linux/debian/proxmox/test__os.py new file mode 100644 index 000000000..07abfe9ec --- /dev/null +++ b/tests/plugins/os/unix/linux/debian/proxmox/test__os.py @@ -0,0 +1,79 @@ +from io import BytesIO + +from dissect.target.filesystem import VirtualFilesystem +from dissect.target.plugins.os.unix.linux.debian.proxmox._os import ProxmoxPlugin +from dissect.target.target import Target +from tests._utils import absolute_path + + +def test_proxmox_os(target_bare: Target) -> None: + fs = VirtualFilesystem() + + fs.map_file_fh("/etc/hostname", BytesIO(b"pve")) + fs.map_file( + "/var/lib/pve-cluster/config.db", absolute_path("_data/plugins/os/unix/linux/debian/proxmox/_os/config.db") + ) + fs.makedirs("/etc/pve") + fs.makedirs("/var/lib/pve") + + target_bare.filesystems.add(fs) + + assert ProxmoxPlugin.detect(target_bare) + target_bare._os_plugin = ProxmoxPlugin + target_bare.apply() + + assert target_bare.os == "proxmox" + assert sorted(list(map(str, target_bare.fs.path("/etc/pve").rglob("*")))) == [ + "/etc/pve/__version__", + "/etc/pve/authkey.pub", + "/etc/pve/authkey.pub.old", + "/etc/pve/corosync.conf", + "/etc/pve/datacenter.cfg", + "/etc/pve/firewall", + "/etc/pve/ha", + "/etc/pve/local", + "/etc/pve/lxc", + "/etc/pve/mapping", + "/etc/pve/nodes", + "/etc/pve/nodes/pve", + "/etc/pve/nodes/pve-btrfs", + "/etc/pve/nodes/pve-btrfs/lrm_status", + "/etc/pve/nodes/pve-btrfs/lrm_status.tmp.971", + "/etc/pve/nodes/pve-btrfs/lxc", + "/etc/pve/nodes/pve-btrfs/openvz", + "/etc/pve/nodes/pve-btrfs/priv", + "/etc/pve/nodes/pve-btrfs/pve-ssl.key", + "/etc/pve/nodes/pve-btrfs/pve-ssl.pem", + "/etc/pve/nodes/pve-btrfs/qemu-server", + "/etc/pve/nodes/pve-btrfs/ssh_known_hosts", + "/etc/pve/nodes/pve/lrm_status", + "/etc/pve/nodes/pve/lxc", + "/etc/pve/nodes/pve/openvz", + "/etc/pve/nodes/pve/priv", + "/etc/pve/nodes/pve/pve-ssl.key", + "/etc/pve/nodes/pve/pve-ssl.pem", + "/etc/pve/nodes/pve/qemu-server", + "/etc/pve/nodes/pve/qemu-server/100.conf", + "/etc/pve/nodes/pve/ssh_known_hosts", + "/etc/pve/openvz", + "/etc/pve/priv", + "/etc/pve/priv/acme", + "/etc/pve/priv/authkey.key", + "/etc/pve/priv/authorized_keys", + "/etc/pve/priv/known_hosts", + "/etc/pve/priv/lock", + "/etc/pve/priv/pve-root-ca.key", + "/etc/pve/priv/pve-root-ca.srl", + "/etc/pve/pve-root-ca.pem", + "/etc/pve/pve-www.key", + "/etc/pve/qemu-server", + "/etc/pve/sdn", + "/etc/pve/storage.cfg", + "/etc/pve/user.cfg", + "/etc/pve/virtual-guest", + "/etc/pve/vzdump.cron", + ] + + vmlist = list(target_bare.vmlist()) + assert len(vmlist) == 1 + assert vmlist[0].path == "/etc/pve/qemu-server/100.conf" diff --git a/tests/plugins/os/unix/linux/fortios/__init__.py b/tests/plugins/os/unix/linux/fortios/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/plugins/os/unix/linux/fortios/test__os.py b/tests/plugins/os/unix/linux/fortios/test__os.py new file mode 100644 index 000000000..2989abf65 --- /dev/null +++ b/tests/plugins/os/unix/linux/fortios/test__os.py @@ -0,0 +1,98 @@ +import gzip +from io import BytesIO + +from dissect.target.filesystem import VirtualFilesystem +from dissect.target.plugins.os.unix.linux.fortios._os import FortiOSPlugin +from dissect.target.plugins.os.unix.linux.fortios.generic import GenericPlugin +from dissect.target.plugins.os.unix.linux.fortios.locale import LocalePlugin +from dissect.target.target import Target + + +def test_fortigate_os(target_unix: Target, fs_unix: VirtualFilesystem) -> None: + """test if we detect FortiGate OS correctly.""" + + global_config = """\ + #config-version=FGVM64-7.4.2-FW-build2571-231219:opmode=0:vdom=0 + config system global + set alias "FortiGate-VM64" + set hostname "FortiGate-VM64" + set timezone "US/Pacific" + end + config system admin + edit "admin" + set accprofile "super_admin" + set vdom "root" + set password ENC SH22zS4+QvU399DXuDApIVHu5fGh3wQCwO1aGeqlbA08G9tB/DvJsqLdG9HA18= + next + end + config system dns + set primary 96.45.45.45 + set secondary 96.45.46.46 + end + """ + + iface_config = """\ + edit "port1" + set vdom "root" + set ip 1.2.3.4 255.255.255.0 + set allowaccess https + set type physical + set snmp-index 1 + next + """ + + root_config = """\ + config user group + edit "Guest-group" + set member "guest" + next + end + config user local + edit "guest" + set type password + set passwd ENC pATZu+74jg21Ktwn9zMDS/bGcYumPFDZMnBKh+86851cd0Ig2CS1zbqQa7mpUGpCNfnDKlDkjobzwIlUbXkHgRYxBWWf99DtTvm7g7UsEBGnf8Xa06ZNd62b5Zb4MVfKQJ/uh5Ky0dI4RujLfv8PqrU7VVKKTAPUdzEtC5ehWZzUxRAFohNM6WhklTUpLV58M+zoRA== + next + end + """ # noqa: E501 + + fs_unix.map_file_fh("/.flatkc", BytesIO(b"")) + fs_unix.map_file_fh("/rootfs.gz", BytesIO(b"")) + fs_unix.map_file_fh("/data/config/sys_global.conf.gz", BytesIO(gzip.compress(global_config.encode()))) + fs_unix.map_file_fh("/data/config/global_system_interface.gz", BytesIO(gzip.compress(iface_config.encode()))) + fs_unix.map_file_fh("/data/config/sys_vd_root+root.conf.gz", BytesIO(gzip.compress(root_config.encode()))) + fs_unix.map_file_fh( + "/bin/grep", BytesIO(bytes.fromhex("7f454c4602010100000000000000000002003e0001000000004b4000000000")) + ) + + target_unix.add_plugin(FortiOSPlugin) + target_unix.add_plugin(LocalePlugin) + target_unix.add_plugin(GenericPlugin) + + # tests FortiOSPlugin.detect() indirectly + assert target_unix.os == "fortios" + + target_unix._os_plugin = FortiOSPlugin + target_unix.apply() + + assert target_unix.os == "fortios" + assert target_unix.hostname == "FortiGate-VM64" + assert target_unix.version == "FortiGate VM 7.4.2 (build 2571, 2023-12-19)" + assert target_unix.ips == ["1.2.3.4"] + assert target_unix.dns == ["96.45.45.45", "96.45.46.46"] + assert target_unix.architecture == "x86_64-unix" + assert target_unix.language == "en_US" + assert target_unix.timezone == "US/Pacific" + + users = list(target_unix.users()) + assert len(users) == 2 + assert users[0].hostname == "FortiGate-VM64" + assert users[0].name == "admin" + assert users[0].groups == ["super_admin"] + assert users[0].password == "ENC:SH22zS4+QvU399DXuDApIVHu5fGh3wQCwO1aGeqlbA08G9tB/DvJsqLdG9HA18=" + assert users[0].home == "/root" + + assert users[1].hostname == "FortiGate-VM64" + assert users[1].name == "guest" + assert users[1].groups == ["Guest-group"] + assert users[1].password == "guest" + assert users[1].home is None diff --git a/tests/plugins/os/unix/log/test_auth.py b/tests/plugins/os/unix/log/test_auth.py index f17fce280..4bfb63646 100644 --- a/tests/plugins/os/unix/log/test_auth.py +++ b/tests/plugins/os/unix/log/test_auth.py @@ -1,16 +1,21 @@ +from __future__ import annotations + from datetime import datetime, timezone from io import BytesIO +from pathlib import Path from unittest.mock import patch from zoneinfo import ZoneInfo +import pytest from flow.record.fieldtypes import datetime as dt from dissect.target.filesystem import VirtualFilesystem -from dissect.target.plugins.os.unix.log.auth import AuthLogRecord, AuthPlugin +from dissect.target.plugins.os.unix.log.auth import AuthPlugin +from dissect.target.target import Target from tests._utils import absolute_path -def test_auth_plugin(target_unix, fs_unix: VirtualFilesystem): +def test_auth_plugin(target_unix: Target, fs_unix: VirtualFilesystem) -> None: fs_unix.map_file_fh("/etc/timezone", BytesIO("Europe/Amsterdam".encode())) data_path = "_data/plugins/os/unix/log/auth/auth.log" @@ -26,12 +31,11 @@ def test_auth_plugin(target_unix, fs_unix: VirtualFilesystem): results = list(target_unix.authlog()) assert len(results) == 10 - assert isinstance(results[0], type(AuthLogRecord())) assert results[-1].ts == dt(2022, 11, 14, 6, 39, 1, tzinfo=ZoneInfo("Europe/Amsterdam")) - assert results[-1].message == "CRON[1]: pam_unix(cron:session): session opened for user root by (uid=0)" + assert results[-1].message == "pam_unix(cron:session): session opened for user root by (uid=0)" -def test_auth_plugin_with_gz(target_unix, fs_unix: VirtualFilesystem): +def test_auth_plugin_with_gz(target_unix: Target, fs_unix: VirtualFilesystem) -> None: fs_unix.map_file_fh("/etc/timezone", BytesIO("Pacific/Honolulu".encode())) empty_file = absolute_path("_data/plugins/os/unix/log/empty.log") @@ -50,12 +54,11 @@ def test_auth_plugin_with_gz(target_unix, fs_unix: VirtualFilesystem): results = list(target_unix.authlog()) assert len(results) == 10 - assert isinstance(results[0], type(AuthLogRecord())) assert results[-1].ts == dt(2022, 11, 14, 6, 39, 1, tzinfo=ZoneInfo("Pacific/Honolulu")) - assert results[-1].message == "CRON[1]: pam_unix(cron:session): session opened for user root by (uid=0)" + assert results[-1].message == "pam_unix(cron:session): session opened for user root by (uid=0)" -def test_auth_plugin_with_bz(target_unix, fs_unix: VirtualFilesystem): +def test_auth_plugin_with_bz(target_unix: Target, fs_unix: VirtualFilesystem) -> None: fs_unix.map_file_fh("/etc/timezone", BytesIO("America/Nuuk".encode())) empty_file = absolute_path("_data/plugins/os/unix/log/empty.log") @@ -74,12 +77,11 @@ def test_auth_plugin_with_bz(target_unix, fs_unix: VirtualFilesystem): results = list(target_unix.authlog()) assert len(results) == 10 - assert isinstance(results[0], type(AuthLogRecord())) assert results[-1].ts == dt(2022, 11, 14, 6, 39, 1, tzinfo=ZoneInfo("America/Nuuk")) - assert results[-1].message == "CRON[1]: pam_unix(cron:session): session opened for user root by (uid=0)" + assert results[-1].message == "pam_unix(cron:session): session opened for user root by (uid=0)" -def test_auth_plugin_year_rollover(target_unix, fs_unix: VirtualFilesystem): +def test_auth_plugin_year_rollover(target_unix: Target, fs_unix: VirtualFilesystem) -> None: fs_unix.map_file_fh("/etc/timezone", BytesIO("Etc/UTC".encode())) data_path = "_data/plugins/os/unix/log/auth/secure" @@ -96,7 +98,216 @@ def test_auth_plugin_year_rollover(target_unix, fs_unix: VirtualFilesystem): assert len(results) == 2 results.reverse() - assert isinstance(results[0], type(AuthLogRecord())) - assert isinstance(results[1], type(AuthLogRecord())) assert results[0].ts == dt(2021, 12, 31, 3, 14, 0, tzinfo=ZoneInfo("Etc/UTC")) assert results[1].ts == dt(2022, 1, 1, 13, 37, 0, tzinfo=ZoneInfo("Etc/UTC")) + + +@pytest.mark.parametrize( + "message, results", + [ + pytest.param( + "Mar 29 10:43:01 ubuntu-1 sshd[1193]: Accepted password for test_user from 8.8.8.8 port 52942 ssh2", + { + "service": "sshd", + "pid": 1193, + "action": "accepted authentication", + "authentication_type": "password", + "user": "test_user", + "remote_ip": "8.8.8.8", + "port": 52942, + }, + id="sshd: accepted password", + ), + pytest.param( + "Jun 4 22:14:15 ubuntu-1 sshd[41458]: Failed password for root from 8.8.8.8 port 22 ssh2", + { + "service": "sshd", + "pid": 41458, + "action": "failed authentication", + "authentication_type": "password", + "user": "root", + "remote_ip": "8.8.8.8", + "port": 22, + }, + id="sshd: failed password", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: Accepted publickey for test_user " + "from 8.8.8.8 port 12345 ssh2: RSA SHA256:123456789asdfghjklertzuio", + { + "service": "sshd", + "pid": 1361, + "action": "accepted authentication", + "authentication_type": "publickey", + "user": "test_user", + "remote_ip": "8.8.8.8", + "port": 12345, + "ssh_protocol": "ssh2", + "encryption_algorithm": "RSA", + "hash_algorithm": "SHA256", + "key_hash": "123456789asdfghjklertzuio", + }, + id="sshd: accepted publickey", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: Failed publickey for test_user from 8.8.8.8 port 12345 ssh2.", + { + "service": "sshd", + "pid": 1361, + "action": "failed authentication", + "authentication_type": "publickey", + "user": "test_user", + "remote_ip": "8.8.8.8", + "port": 12345, + }, + id="sshd: failed publickey", + ), + pytest.param( + "Mar 27 13:06:56 ubuntu-1 sshd[1291]: Server listening on 127.0.0.1 port 22.", + { + "service": "sshd", + "pid": 1291, + "host_ip": "127.0.0.1", + "port": 22, + }, + id="sshd: listening", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: pam_unix(sshd:session): session opened for user test_user by (uid=0)", + { + "service": "sshd", + "pid": 1361, + "action": "session opened", + "user": "test_user", + "user_uid": None, + "by_uid": 0, + }, + id="sshd: pam_unix", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: pam_unix(sshd:session): session opened " + "for user root(uid=0) by (uid=0)", + { + "service": "sshd", + "pid": 1361, + "action": "session opened", + "user": "root", + "user_uid": 0, + "by_uid": 0, + }, + id="sshd: pam_unix", + ), + pytest.param( + "Mar 27 13:06:56 ubuntu-1 systemd-logind[1118]: Watching system buttons " + "on /dev/input/event0 (Power Button)", + { + "service": "systemd-logind", + "pid": 1118, + "action": "Watching system buttons", + "device": "/dev/input/event0", + "device_name": "Power Button", + }, + id="systemd-logind: watching system buttons", + ), + pytest.param( + "Mar 27 13:06:56 ubuntu-1 systemd-logind[1118]: New seat seat0.", + { + "service": "systemd-logind", + "pid": 1118, + "action": "new seat", + "seat": "seat0", + }, + id="systemd-logind: new seat", + ), + pytest.param( + "Mar 27 13:10:08 ubuntu-1 sudo: ubuntu : TTY=pts/0 ; PWD=/home/test_user ; " + "USER=root ; COMMAND=/usr/bin/apt-key add -", + { + "service": "sudo", + "pid": None, + "tty": "pts/0", + "pwd": "/home/test_user", + "effective_user": "root", + "command": "/usr/bin/apt-key add -", + }, + id="sudo: command", + ), + pytest.param( + "Apr 3 12:32:23 ubuntu-1 su[1521]: Successful su for user by root", + {"service": "su", "pid": 1521, "su_result": "success", "user": "user", "by": "root"}, + id="su: success", + ), + pytest.param( + "Apr 3 12:32:23 ubuntu-1 su[1531]: 'su root' failed for user by root", + { + "service": "su", + "pid": 1531, + "su_result": "failed", + "command": "su root", + "user": "user", + "by": "root", + }, + id="su: failed", + ), + pytest.param( + "Apr 3 12:32:23 ubuntu-1 pkexec[1531]: user: Executing command [USER=root] " + "[TTY=unknown] [CWD=/home/user] [COMMAND=/usr/lib/update-notifier/package-system-locked]", + { + "service": "pkexec", + "pid": 1531, + "action": "executing command", + "user": "user", + "effective_user": "root", + "tty": "unknown", + "cwd": "/home/user", + "command": "/usr/lib/update-notifier/package-system-locked", + }, + id="pkexec: executing command", + ), + pytest.param( + "Mar 27 13:17:01 ubuntu-1 CRON[2623]: pam_unix(cron:session): session closed for user root", + { + "service": "CRON", + "pid": 2623, + "action": "session closed", + "user": "root", + }, + id="cron: pam_unix", + ), + ], +) +def test_auth_plugin_additional_fields( + target_unix, fs_unix: VirtualFilesystem, tmp_path: Path, message: str, results: dict[str, str | int] +) -> None: + data_path = tmp_path / "auth.log" + data_path.write_text(message) + fs_unix.map_file("var/log/auth.log", data_path) + + target_unix.add_plugin(AuthPlugin) + record = list(target_unix.authlog())[0] + + for key, value in results.items(): + assert getattr(record, key) == value + + +def test_auth_plugin_iso_date_format(target_unix: Target, fs_unix: VirtualFilesystem) -> None: + """test if we correctly handle Ubuntu 24.04 ISO formatted dates.""" + + fs_unix.map_file("/var/log/auth.log", absolute_path("_data/plugins/os/unix/log/auth/iso.log")) + target_unix.add_plugin(AuthPlugin) + + results = sorted(list(target_unix.authlog()), key=lambda r: r.ts) + assert len(results) == 10 + + assert results[0].ts == datetime(2024, 12, 31, 11, 37, 1, 123456, tzinfo=timezone.utc) + assert results[0].service == "sudo" + assert results[0].pid is None + assert results[0].tty == "pts/0" + assert results[0].pwd == "/home/user" + assert results[0].effective_user == "root" + assert results[0].command == "/usr/bin/chmod go+r /etc/apt/keyrings/githubcli-archive-keyring.gpg" + assert results[0].source == "/var/log/auth.log" + assert ( + results[0].message + == "user : TTY=pts/0 ; PWD=/home/user ; USER=root ; COMMAND=/usr/bin/chmod go+r /etc/apt/keyrings/githubcli-archive-keyring.gpg" # noqa: E501 + ) diff --git a/tests/plugins/os/unix/log/test_utmp.py b/tests/plugins/os/unix/log/test_utmp.py index d1544f745..40664bd81 100644 --- a/tests/plugins/os/unix/log/test_utmp.py +++ b/tests/plugins/os/unix/log/test_utmp.py @@ -13,6 +13,7 @@ def test_utmp_ipv6(target_linux: Target, fs_linux: VirtualFilesystem) -> None: target_linux.add_plugin(UtmpPlugin) results = list(target_linux.btmp()) + assert len(results) == 6 # IPv4 address results[0].ut_host == "127.0.0.1" @@ -35,6 +36,7 @@ def test_wtmp_plugin(target_linux: Target, fs_linux: VirtualFilesystem) -> None: results = list(target_linux.wtmp()) assert len(results) == 70 + result = results[-1] assert result.ts == datetime(2021, 11, 12, 10, 12, 54, tzinfo=timezone.utc) assert result.ut_type == "USER_PROCESS" @@ -54,6 +56,7 @@ def test_btmp_plugin(target_linux: Target, fs_linux: VirtualFilesystem) -> None: results = list(target_linux.btmp()) assert len(results) == 10 + result = results[-1] assert result.ts == datetime(2021, 11, 30, 23, 2, 9, tzinfo=timezone.utc) assert result.ut_type == "LOGIN_PROCESS" @@ -63,3 +66,11 @@ def test_btmp_plugin(target_linux: Target, fs_linux: VirtualFilesystem) -> None: assert result.ut_id == "" assert result.ut_host == "8.210.13.5" assert result.ut_addr == "8.210.13.5" + + +def test_utmp_plugin(target_linux: Target, fs_linux: VirtualFilesystem) -> None: + """test if we correctly parse a /var/run/utmp file.""" + fs_linux.map_file("var/run/utmp", absolute_path("_data/plugins/os/unix/log/wtmp/wtmp")) + target_linux.add_plugin(UtmpPlugin) + results = list(target_linux.utmp()) + assert len(results) == 70 diff --git a/tests/plugins/os/unix/test__os.py b/tests/plugins/os/unix/test__os.py index 8e8eecaab..4093c5d9b 100644 --- a/tests/plugins/os/unix/test__os.py +++ b/tests/plugins/os/unix/test__os.py @@ -146,3 +146,21 @@ def test_users(target_unix_users: Target) -> None: assert users[1].gid == 1000 assert users[1].home == posix_path("/home/user") assert users[1].shell == "/bin/bash" + + +@pytest.mark.parametrize( + "expected_arch, elf_buf", + [ + # https://launchpad.net/ubuntu/+source/coreutils/9.4-3.1ubuntu1 + ("x86_64-unix", "7f454c4602010100000000000000000003003e0001000000a06d000000000000"), # amd64 + ("aarch64-unix", "7f454c460201010000000000000000000300b70001000000405e000000000000"), # arm64 + ("aarch32-unix", "7f454c4601010100000000000000000003002800010000001d40000034000000"), # armhf + ("x86_32-unix", "7f454c460101010000000000000000000300030001000000e042000034000000"), # i386 + ("powerpc64-unix", "7f454c4602010100000000000000000003001500010000007470000000000000"), # ppc64el + ("riscv64-unix", "7f454c460201010000000000000000000300f30001000000685a000000000000"), # riscv64 + ], +) +def test_architecture(target_unix: Target, fs_unix: VirtualFilesystem, expected_arch: str, elf_buf: str) -> None: + """test if we correctly parse unix architecture.""" + fs_unix.map_file_fh("/bin/ls", BytesIO(bytes.fromhex(elf_buf))) + assert target_unix.architecture == expected_arch diff --git a/tests/plugins/os/unix/test_ips.py b/tests/plugins/os/unix/test_ips.py index d8591e453..c86d8505e 100644 --- a/tests/plugins/os/unix/test_ips.py +++ b/tests/plugins/os/unix/test_ips.py @@ -222,3 +222,27 @@ def test_clean_ips(input: str, expected_output: set) -> None: """Test the cleaning of dirty ip addresses.""" assert NetworkManager.clean_ips({input}) == expected_output + + +def test_regression_ips_unique_strings(target_unix: Target, fs_unix: VirtualFilesystem) -> None: + """Regression test for https://github.com/fox-it/dissect.target/issues/877""" + + config = """ + network: + ethernets: + eth0: + addresses: ['1.2.3.4'] + """ + fs_unix.map_file_fh("/etc/netplan/01-netcfg.yaml", BytesIO(textwrap.dedent(config).encode())) + fs_unix.map_file_fh("/etc/netplan/02-netcfg.yaml", BytesIO(textwrap.dedent(config).encode())) + + syslog = "Apr 4 13:37:04 localhost dhclient[4]: bound to 1.2.3.4 -- renewal in 1337 seconds." + fs_unix.map_file_fh("/var/log/syslog", BytesIO(textwrap.dedent(syslog).encode())) + + target_unix.add_plugin(LinuxPlugin) + + assert isinstance(target_unix.ips, list) + assert all([isinstance(ip, str) for ip in target_unix.ips]) + + assert len(target_unix.ips) == 1 + assert target_unix.ips == ["1.2.3.4"] diff --git a/tests/plugins/os/windows/log/test_mssql.py b/tests/plugins/os/windows/log/test_mssql.py new file mode 100644 index 000000000..d1b283501 --- /dev/null +++ b/tests/plugins/os/windows/log/test_mssql.py @@ -0,0 +1,35 @@ +from dissect.target.filesystem import Filesystem +from dissect.target.helpers.regutil import VirtualHive, VirtualKey, VirtualValue +from dissect.target.plugins.os.windows.log.mssql import MssqlPlugin +from tests._utils import absolute_path + + +def test_mssql_errorlog(target_win_users, hive_hklm: VirtualHive, fs_win: Filesystem) -> None: + errorlog_file = absolute_path("_data/plugins/os/windows/log/mssql/errorlog") + target_errorlog_name = "/sysvol/Temp/MSSQL/Log/ERRORLOG" + + _, _, map_path = target_errorlog_name.partition("sysvol/") + fs_win.map_file(map_path, errorlog_file) + + errorlog_name = "SOFTWARE\\Microsoft\\Microsoft SQL Server\\MSSQL69.MyInstance\\SQLServerAgent" + errorlog_key = VirtualKey(hive_hklm, errorlog_name) + hive_hklm.map_key(errorlog_name, errorlog_key) + errorlog_key.add_value( + "ErrorLogFile", VirtualValue(hive_hklm, "ErrorLogFile", "C:\\Temp\\MSSQL\\Log\\SQLAGENT.OUT") + ) + + datapath_name = "SOFTWARE\\Microsoft\\Microsoft SQL Server\\MSSQL69.MyInstance\\MSSQLServer" + datapath_key = VirtualKey(hive_hklm, datapath_name) + hive_hklm.map_key(datapath_name, datapath_key) + datapath_key.add_value("DefaultData", VirtualValue(hive_hklm, "DefaultData", "C:\\Temp\\MSSQL\\Data")) + + target_win_users.add_plugin(MssqlPlugin) + records = list(target_win_users.mssql()) + assert len(records) == 101 + + record = records[51] + assert str(record.ts) == "2024-04-08 12:16:41.190000+00:00" + assert record.instance == "MSSQL69.MyInstance" + assert record.process == "Server" + assert record.message.startswith("The SQL Server Network Interface library could not register") + assert record.path == "C:\\Temp\\MSSQL\\Log\\ERRORLOG" diff --git a/tests/plugins/os/windows/test_generic.py b/tests/plugins/os/windows/test_generic.py index 41488c354..ecb879154 100644 --- a/tests/plugins/os/windows/test_generic.py +++ b/tests/plugins/os/windows/test_generic.py @@ -2,9 +2,11 @@ from dissect.target.helpers.regutil import VirtualKey, VirtualValue from dissect.target.plugins.os.windows.generic import GenericPlugin +from dissect.target.plugins.os.windows.registry import VirtualHive +from dissect.target.target import Target -def test_windows_generic_install_date(target_win_users, fs_win, hive_hklm): +def test_windows_generic_install_date(target_win_users: Target, hive_hklm: VirtualHive): currentversion_key_name = "Software\\Microsoft\\Windows NT\\CurrentVersion" currentversion_key = VirtualKey(hive_hklm, currentversion_key_name) currentversion_key.add_value("InstallDate", VirtualValue(hive_hklm, "InstallDate", 0)) @@ -12,3 +14,33 @@ def test_windows_generic_install_date(target_win_users, fs_win, hive_hklm): target_win_users.add_plugin(GenericPlugin) assert target_win_users.install_date == from_unix(0) + + +def test_windows_generic_sid(target_win: Target, hive_hklm: VirtualHive): + value_machine = b"\x00\x00\x00\x00H\x01\x00\x00\x03\x00\x01\x00H\x01\x00\x00\x18\x00\x00\x00\x00\x00\x00\x00`\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00`\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x14\x80(\x01\x00\x008\x01\x00\x00\x14\x00\x00\x00D\x00\x00\x00\x02\x000\x00\x02\x00\x00\x00\x02\xc0\x14\x00z\x04\x05\x01\x01\x01\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x02\xc0\x14\x00\xff\x07\x0f\x00\x01\x01\x00\x00\x00\x00\x00\x05\x07\x00\x00\x00\x02\x00\xe4\x00\x08\x00\x00\x00\x00\x00\x14\x00\x85\x03\x02\x00\x01\x01\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x18\x00\x85\x03\x02\x00\x01\x02\x00\x00\x00\x00\x00\x05 \x00\x00\x00!\x02\x00\x00\x00\x00\x18\x00\xdf\x07\x0f\x00\x01\x02\x00\x00\x00\x00\x00\x05 \x00\x00\x00 \x02\x00\x00\x00\x00\x18\x00\x85\x03\x02\x00\x01\x02\x00\x00\x00\x00\x00\x05 \x00\x00\x00#\x02\x00\x00\x00\x00\x18\x00\xd5\x03\x02\x00\x01\x02\x00\x00\x00\x00\x00\x05 \x00\x00\x00$\x02\x00\x00\x00\x008\x00\x85\x03\x02\x00\x01\n\x00\x00\x00\x00\x00\x0f\x03\x00\x00\x00\x00\x04\x00\x00\xde\xa2(g!>\xd2\xaf\x19\xad]y\xb0\xc1\x07)'V\xfc \xd8\xadf\xf6\x10\xf2h\xfa\xdf*\xf8\x0f\x01\x00\x18\x00P\x00\x00\x00\x01\x02\x00\x00\x00\x00\x00\x05 \x00\x00\x00#\x02\x00\x00\x01\x00\x18\x00P\x00\x00\x00\x01\x02\x00\x00\x00\x00\x00\x05 \x00\x00\x00#\x02\x00\x00\x01\x02\x00\x00\x00\x00\x00\x05 \x00\x00\x00 \x02\x00\x00\x01\x02\x00\x00\x00\x00\x00\x05 \x00\x00\x00 \x02\x00\x00\x01\x04\x00\x00\x00\x00\x00\x05\x15\x00\x00\x00\xc3,HH\xddV\r\xd1\xab\xbe\x8a\xe0" # noqa: E501 + + value_domain = ( + b"\x01\x05\x00\x00\x00\x00\x00\x05\x15\x00\x00\x00\xc8\x13\x95z\x9c\xf0\xcd\x83I\xd9\x9e\xb9E\x06\x00\x00" + ) + machine_sid_key_name = "SAM\\SAM\\Domains\\Account" + domain_sid_key_name = "SECURITY\\Policy\\PolMachineAccountS" + + machine_sid_key = VirtualKey(hive_hklm, machine_sid_key_name) + domain_sid_key = VirtualKey(hive_hklm, domain_sid_key_name) + + machine_sid_key.add_value("V", VirtualValue(hive_hklm, "V", value_machine)) + domain_sid_key.add_value("(Default)", VirtualValue(hive_hklm, "(Default)", value_domain)) + + hive_hklm.map_key(machine_sid_key_name, machine_sid_key) + hive_hklm.map_key(domain_sid_key_name, domain_sid_key) + target_win.add_plugin(GenericPlugin) + + records = list(target_win.sid()) + + assert len(records) == 2 + + assert records[0].sid == "S-1-5-21-1212689603-3507312349-3767189163" + assert records[0].sidtype == "Machine" + + assert records[1].sid == "S-1-5-21-2056590280-2211311772-3114195273-1605" + assert records[1].sidtype == "Domain" diff --git a/tests/plugins/os/windows/test_network.py b/tests/plugins/os/windows/test_network.py index ed865f1eb..2398e66d7 100644 --- a/tests/plugins/os/windows/test_network.py +++ b/tests/plugins/os/windows/test_network.py @@ -2,11 +2,15 @@ from dataclasses import dataclass from datetime import datetime -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch import pytest +from dissect.target.helpers.regutil import VirtualHive, VirtualKey +from dissect.target.plugins.os.windows._os import WindowsPlugin +from dissect.target.plugins.os.windows.network import WindowsNetworkPlugin from dissect.target.target import Target +from tests.conftest import change_controlset @dataclass @@ -102,7 +106,7 @@ class MockRegVal: "dns": [], "mac": [], "network": [], - "search_domain": None, + "search_domain": [], "first_connected": None, "vlan": None, "name": None, @@ -136,7 +140,7 @@ class MockRegVal: "first_connected": None, "dns": [], "network": [], - "search_domain": None, + "search_domain": [], "vlan": None, "metric": None, "name": None, @@ -170,7 +174,9 @@ def test_windows_network( with ( patch("dissect.target.plugins.os.windows.generic.GenericPlugin", return_value=""), - patch("dissect.target.plugins.os.windows._os.WindowsPlugin.hostname", return_value="hostname"), + patch( + "dissect.target.plugins.os.windows._os.WindowsPlugin.hostname", property(MagicMock(return_value="hostname")) + ), patch.object(target_win, "registry", mock_registry), ): network = target_win.network @@ -223,7 +229,9 @@ def test_windows_network_none( mock_registry.values.return_value = list(mock_value_dict.values()) with ( - patch("dissect.target.plugins.os.windows._os.WindowsPlugin.hostname", return_value="hostname"), + patch( + "dissect.target.plugins.os.windows._os.WindowsPlugin.hostname", property(MagicMock(return_value="hostname")) + ), patch.object(target_win, "registry", mock_registry), ): network = target_win.network @@ -319,26 +327,29 @@ def test_network_dhcp_and_static( with ( patch("dissect.target.plugins.os.windows.generic.GenericPlugin", return_value=""), - patch("dissect.target.plugins.os.windows._os.WindowsPlugin.hostname", return_value="hostname"), + patch( + "dissect.target.plugins.os.windows._os.WindowsPlugin.hostname", property(MagicMock(return_value="hostname")) + ), patch.object(target_win, "registry", mock_registry), ): - ips = [] - dns = [] - gateways = [] - macs = [] - network = target_win.network interfaces = list(network.interfaces()) + ips = set() + dns = set() + gateways = set() + macs = set() + for interface, expected in zip(interfaces, expected_values): + ips.update(interface.ip) + dns.update(interface.dns) + gateways.update(interface.gateway) + macs.add(interface.mac) + assert interface.ip == expected["ip"] - ips.extend(interface.ip) assert interface.dns == expected["dns"] - dns.extend(interface.dns) assert interface.gateway == expected["gateway"] - gateways.extend(interface.gateway) assert interface.mac == expected["mac"] - macs.append(interface.mac) assert interface.network == expected["network"] assert interface.first_connected == expected["first_connected"] assert interface.type == expected["type"] @@ -350,7 +361,56 @@ def test_network_dhcp_and_static( assert interface.dhcp == expected["dhcp"] assert interface.enabled == expected["enabled"] - assert network.ips() == ips - assert network.dns() == dns - assert network.gateways() == gateways - assert network.macs() == macs + assert network.ips() == list(ips) + assert network.dns() == list(dns) + assert network.gateways() == list(gateways) + assert network.macs() == list(macs) + + +@patch( + "dissect.target.plugins.os.windows.registry.RegistryPlugin.controlsets", + property(MagicMock(return_value=["ControlSet001", "ControlSet002", "ControlSet003"])), +) +def test_regression_duplicate_ips(target_win: Target, hive_hklm: VirtualHive) -> None: + """Regression test for https://github.com/fox-it/dissect.target/issues/877""" + + change_controlset(hive_hklm, 3) + + # register the interfaces + kvs = [ + ( + "SYSTEM\\ControlSet001\\Control\\Class\\{4d36e972-e325-11ce-bfc1-08002be10318}\\0001", + "{some-net-cfg-instance-uuid}", + ), + ( + "SYSTEM\\ControlSet002\\Control\\Class\\{4d36e972-e325-11ce-bfc1-08002be10318}\\0002", + "{some-net-cfg-instance-uuid}", + ), + ( + "SYSTEM\\ControlSet003\\Control\\Class\\{4d36e972-e325-11ce-bfc1-08002be10318}\\0003", + "{some-net-cfg-instance-uuid}", + ), + ] + for name, value in kvs: + key = VirtualKey(hive_hklm, name) + key.add_value("NetCfgInstanceId", value) + hive_hklm.map_key(name, key) + + # register interface dhcp ip addresses for three different control sets + kvs = [ + ("SYSTEM\\ControlSet001\\Services\\Tcpip\\Parameters\\Interfaces\\{some-net-cfg-instance-uuid}", "1.2.3.4"), + ("SYSTEM\\ControlSet002\\Services\\Tcpip\\Parameters\\Interfaces\\{some-net-cfg-instance-uuid}", "1.2.3.4"), + ("SYSTEM\\ControlSet003\\Services\\Tcpip\\Parameters\\Interfaces\\{some-net-cfg-instance-uuid}", "5.6.7.8"), + ] + for name, value in kvs: + key = VirtualKey(hive_hklm, name) + key.add_value("DhcpIPAddress", value) + hive_hklm.map_key(name, key) + + target_win.add_plugin(WindowsPlugin) + target_win.add_plugin(WindowsNetworkPlugin) + + assert isinstance(target_win.ips, list) + assert all([isinstance(ip, str) for ip in target_win.ips]) + assert len(target_win.ips) == 2 + assert sorted(target_win.ips) == ["1.2.3.4", "5.6.7.8"] diff --git a/tests/plugins/os/windows/test_syscache.py b/tests/plugins/os/windows/test_syscache.py index 5efd60a8f..69e7231d6 100644 --- a/tests/plugins/os/windows/test_syscache.py +++ b/tests/plugins/os/windows/test_syscache.py @@ -1,3 +1,4 @@ +from dissect.target.filesystems.ntfs import NtfsFilesystem from dissect.target.plugins.os.windows.syscache import SyscachePlugin from tests._utils import absolute_path @@ -10,3 +11,24 @@ def test_syscache_plugin(target_win, fs_win): results = list(target_win.syscache()) assert len(results) == 401 + + +def test_syscache_plugin_real_mft(target_win, fs_win): + filesystem = NtfsFilesystem(mft=open(absolute_path("_data/plugins/filesystem/ntfs/mft/mft.raw"), "rb")) + + # We need to change the type of the mocked filesystem, since syscache.py checks for explicit value + target_win.fs.mounts["sysvol"].__type__ = "ntfs" + target_win.fs.mounts["sysvol"].ntfs = filesystem.ntfs + + syscache_file = absolute_path("_data/plugins/os/windows/syscache/Syscache-mft.hve") + fs_win.map_file("System Volume Information/Syscache.hve", syscache_file) + + target_win.add_plugin(SyscachePlugin) + + results = list(target_win.syscache()) + assert len(results) == 401 + + filepaths = [entry.path for entry in results] + assert filepaths.count(None) == 399 + assert "sysvol\\NamelessDirectory\\totally_normal.txt" in filepaths + assert "sysvol\\text_document.txt" in filepaths diff --git a/tests/plugins/os/windows/test_wer.py b/tests/plugins/os/windows/test_wer.py index 6d4765c06..30f9f51ea 100644 --- a/tests/plugins/os/windows/test_wer.py +++ b/tests/plugins/os/windows/test_wer.py @@ -14,9 +14,11 @@ def test_wer_plugin(target_win, fs_win): "sig", "dynamic_sig", "dynamic_signatures_parameter1", - "ui1", - "spcial_charactr", - "невидимый", + "ui_x5b_1_x5d", + "sp_xc3a9_cial_charact_xc3a9_r", + "xd0bd_xd0b5_xd0b2_xd0b8_xd0b4_xd0b8_xd0bc_xd18b_xd0b9", + "x5f_start_with_an_", + "x33__start_with_a_3", ] records = list(target_win.wer()) @@ -29,12 +31,6 @@ def test_wer_plugin(target_win, fs_win): record = wer_record_map["wer_test.wer"] for test in tests: record_field = getattr(record, test, None) - - # Check if expected line has been skipped - if record_field is None: - assert test == "невидимый" - continue - assert record_field == f"test_{test}" assert record.ts == datetime.datetime(2022, 10, 4, 11, 0, 0, 0, tzinfo=datetime.timezone.utc) diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py new file mode 100644 index 000000000..29e049430 --- /dev/null +++ b/tests/test_exceptions.py @@ -0,0 +1,19 @@ +import pytest + +from dissect.target import exceptions + + +@pytest.mark.parametrize( + "exc, std", + [ + (exceptions.FileNotFoundError, FileNotFoundError), + (exceptions.IsADirectoryError, IsADirectoryError), + (exceptions.NotADirectoryError, NotADirectoryError), + ], +) +def test_filesystem_error_subclass(exc: exceptions.Error, std: Exception) -> None: + assert issubclass(exc, (std, exceptions.FilesystemError)) + assert isinstance(exc(), (std, exceptions.FilesystemError)) + + with pytest.raises(std): + raise exc() diff --git a/tests/tools/test_query.py b/tests/tools/test_query.py index c37610a54..d8f51b68a 100644 --- a/tests/tools/test_query.py +++ b/tests/tools/test_query.py @@ -1,3 +1,4 @@ +import json import os import re from typing import Any, Optional @@ -244,3 +245,56 @@ def test_target_query_dry_run(capsys: pytest.CaptureFixture, monkeypatch: pytest " execute: network.interfaces (general.network.interfaces)\n" " execute: osinfo (general.osinfo.osinfo)\n" ) + + +def test_target_query_list_json(capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: + """test if target-query --list --json output is formatted as we expect it to be.""" + + with monkeypatch.context() as m: + m.setattr("sys.argv", ["target-query", "-l", "-j"]) + with pytest.raises((SystemExit, IndexError, ImportError)): + target_query() + out, _ = capsys.readouterr() + + try: + output = json.loads(out) + except json.JSONDecodeError: + pass + + # test the generic structure of the returned dictionary. + assert isinstance(output, dict), "Could not load JSON output of 'target-query --list --json'" + assert output["plugins"], "Expected a dictionary of plugins" + assert output["loaders"], "Expected a dictionary of loaders" + assert len(output["plugins"]["loaded"]) > 200, "Expected more loaded plugins" + assert not output["plugins"].get("failed"), "Some plugin(s) failed to initialize" + + def get_plugin(plugins: list[dict], needle: str) -> dict: + match = [p for p in output["plugins"]["loaded"] if p["name"] == needle] + return match[0] if match else False + + # general plugin + users_plugin = get_plugin(output, "users") + assert users_plugin == { + "name": "users", + "description": "Return the users available in the target.", + "output": "record", + "path": "general.default.users", + } + + # namespaced plugin + plocate_plugin = get_plugin(output, "plocate.locate") + assert plocate_plugin == { + "name": "plocate.locate", + "description": "Yield file and directory names from the plocate.db.", + "output": "record", + "path": "os.unix.locate.plocate.locate", + } + + # regular plugin + sam_plugin = get_plugin(output, "sam") + assert sam_plugin == { + "name": "sam", + "description": "Dump SAM entries", + "output": "record", + "path": "os.windows.credential.sam.sam", + }