diff --git a/dissect/target/tools/diff.py b/dissect/target/tools/diff.py new file mode 100644 index 000000000..8d5ca4d9b --- /dev/null +++ b/dissect/target/tools/diff.py @@ -0,0 +1,907 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from __future__ import annotations + +import argparse +import dataclasses +import logging +import re +import shutil +import sys +from difflib import diff_bytes, unified_diff +from fnmatch import fnmatch, translate +from io import BytesIO +from typing import Iterable, Iterator, TextIO + +from flow.record import ( + IGNORE_FIELDS_FOR_COMPARISON, + Record, + RecordOutput, + set_ignored_fields_for_comparison, +) + +from dissect.target import Target +from dissect.target.exceptions import FileNotFoundError +from dissect.target.filesystem import FilesystemEntry +from dissect.target.helpers import fsutil +from dissect.target.helpers.record import TargetRecordDescriptor +from dissect.target.plugin import arg +from dissect.target.tools.query import record_output +from dissect.target.tools.shell import ( + ExtendedCmd, + TargetCli, + arg_str_to_arg_list, + build_pipe_stdout, + fmt_ls_colors, + print_extensive_file_stat, + python_shell, + run_cli, +) +from dissect.target.tools.utils import ( + catch_sigpipe, + configure_generic_arguments, + generate_argparse_for_bound_method, + process_generic_arguments, +) + +log = logging.getLogger(__name__) +logging.lastResort = None +logging.raiseExceptions = False + +BLOCK_SIZE = 2048 +FILE_LIMIT = BLOCK_SIZE * 16 + +FILE_DIFF_RECORD_FIELDS = [ + ("string", "src_target"), + ("string", "dst_target"), + ("string", "path"), +] +RECORD_DIFF_RECORD_FIELDS = [ + ("string", "src_target"), + ("string", "dst_target"), + ("record", "record"), +] + +FileDeletedRecord = TargetRecordDescriptor("differential/file/deleted", FILE_DIFF_RECORD_FIELDS) +FileCreatedRecord = TargetRecordDescriptor("differential/file/created", FILE_DIFF_RECORD_FIELDS) +FileModifiedRecord = TargetRecordDescriptor( + "differential/file/modified", + FILE_DIFF_RECORD_FIELDS + + [ + ("bytes[]", "diff"), + ], +) + +RecordCreatedRecord = TargetRecordDescriptor("differential/record/created", RECORD_DIFF_RECORD_FIELDS) +RecordDeletedRecord = TargetRecordDescriptor("differential/record/deleted", RECORD_DIFF_RECORD_FIELDS) +RecordUnchangedRecord = TargetRecordDescriptor("differential/record/unchanged", RECORD_DIFF_RECORD_FIELDS) + + +@dataclasses.dataclass +class DifferentialEntry: + """Signifies a change for a FilesystemEntry between two versions of a target.""" + + path: str + name: str + src_target_entry: FilesystemEntry + dst_target_entry: FilesystemEntry + diff: list[bytes] + + +@dataclasses.dataclass +class DirectoryDifferential: + """For a given directory, contains the unchanged, created, modified and deleted entries, as well as a list of + subdirectories.""" + + directory: str + unchanged: list[FilesystemEntry] = dataclasses.field(default_factory=list) + created: list[FilesystemEntry] = dataclasses.field(default_factory=list) + modified: list[DifferentialEntry] = dataclasses.field(default_factory=list) + deleted: list[FilesystemEntry] = dataclasses.field(default_factory=list) + + +def likely_unchanged(src: fsutil.stat_result, dst: fsutil.stat_result) -> bool: + """Determine whether or not, based on the file stats, we can assume a file hasn't been changed.""" + if src.st_size != dst.st_size or src.st_mtime != dst.st_mtime or src.st_ctime != dst.st_ctime: + return False + return True + + +def get_plugin_output_records(plugin_name: str, plugin_arg_parts: list[str], target: Target) -> Iterable[Record]: + """Command exection helper for target plugins. Highly similar to target-shell's _exec_target, however this function + only accepts plugins that outputs records, and returns an iterable of records rather than a function that outputs + to stdout.""" + attr = target + for part in plugin_name.split("."): + attr = getattr(attr, part) + + if getattr(attr, "__output__", "default") != "record": + raise ValueError("Comparing plugin output is only supported for plugins outputting records.") + + if callable(attr): + argparser = generate_argparse_for_bound_method(attr) + try: + args = argparser.parse_args(plugin_arg_parts) + except SystemExit: + return False + + return attr(**vars(args)) + else: + return attr + + +class TargetComparison: + """This class wraps functionality that for two given targets can identify similarities and differences between them. + Currently supports differentiating between the target filesystems, and between plugin outputs.""" + + def __init__( + self, + src_target: Target, + dst_target: Target, + deep: bool = False, + file_limit: int = FILE_LIMIT, + ): + self.src_target = src_target + self.dst_target = dst_target + self.deep = deep + self.file_limit = file_limit + + def scandir(self, path: str) -> DirectoryDifferential: + """Scan a given directory for files that have been unchanged, modified, created or deleted from one target to + the next. Add these results (as well as subdirectories) to a DirectoryDifferential object.""" + unchanged = [] + modified = [] + exists_as_directory_src = self.src_target.fs.exists(path) and self.src_target.fs.get(path).is_dir() + exists_as_directory_dst = self.dst_target.fs.exists(path) and self.dst_target.fs.get(path).is_dir() + + if not (exists_as_directory_src and exists_as_directory_dst): + if exists_as_directory_src: + # Path only exists on src target, hence all entries can be considered 'deleted' + entries = list(self.src_target.fs.scandir(path)) + return DirectoryDifferential(path, deleted=entries) + elif exists_as_directory_dst: + # Path only exists on dst target, hence all entries can be considered 'created' + entries = list(self.dst_target.fs.scandir(path)) + return DirectoryDifferential(path, created=entries) + raise ValueError(f"{path} is not a directory on either the source or destination target!") + + src_target_entries = list(self.src_target.fs.scandir(path)) + src_target_children_paths = set([entry.path for entry in src_target_entries]) + + dst_target_entries = list(self.dst_target.fs.scandir(path)) + dst_target_children_paths = set([entry.path for entry in dst_target_entries]) + + paths_only_on_src_target = src_target_children_paths - dst_target_children_paths + paths_only_on_dst_target = dst_target_children_paths - src_target_children_paths + + deleted = [entry for entry in src_target_entries if entry.path in paths_only_on_src_target] + created = [entry for entry in dst_target_entries if entry.path in paths_only_on_dst_target] + + paths_on_both = src_target_children_paths.intersection(dst_target_children_paths) + entry_pairs = [] + + for dst_entry in dst_target_entries: + if dst_entry.path not in paths_on_both: + continue + src_entry = next((entry for entry in src_target_entries if entry.path == dst_entry.path), None) + entry_pairs.append((src_entry, dst_entry)) + + for entry_pair in entry_pairs: + src_entry, dst_entry = entry_pair + entry_path = src_entry.path + + # It's possible that there is an entry, but upon trying to retrieve its stats / content, we get a + # FileNotFoundError. We account for this by wrapping both stat retrievals in a try except + src_target_notfound = False + dst_target_notfound = False + src_target_isdir = None + dst_target_isdir = None + + try: + src_target_stat = src_entry.stat() + src_target_isdir = src_entry.is_dir() + except FileNotFoundError: + src_target_notfound = True + + try: + dst_target_stat = dst_entry.stat() + dst_target_isdir = dst_entry.is_dir() + except FileNotFoundError: + dst_target_notfound = True + + if src_target_notfound or dst_target_notfound: + if src_target_notfound and not dst_target_notfound: + created.append(dst_entry) + elif dst_target_notfound and not src_target_notfound: + deleted.append(src_entry) + else: + # Not found on both + unchanged.append(src_entry) + # We can't continue as we cannot access the stats (or buffer) + continue + + if src_target_isdir or dst_target_isdir: + if src_target_isdir == dst_target_isdir: + unchanged.append(src_entry) + else: + # Went from a file to a dir, or from a dir to a file. Either way, we consider the source entry + # 'deleted' and the dst entry 'Created' + deleted.append(src_entry) + created.append(dst_entry) + continue + + if self.deep is False and likely_unchanged(src_target_stat, dst_target_stat): + unchanged.append(src_entry) + continue + + # If we get here, we have two files that we need to compare contents of + src_fh = src_entry.open() + dst_fh = dst_entry.open() + + while True: + chunk_a = src_fh.read(BLOCK_SIZE) + chunk_b = dst_fh.read(BLOCK_SIZE) + if chunk_a != chunk_b: + # We immediately break after discovering a difference in file contents + # This means that we won't return a full diff of the file, merely the first block where a difference + # is observed + content_difference = list(diff_bytes(unified_diff, [chunk_a], [chunk_b])) + differential_entry = DifferentialEntry( + entry_path, + src_entry.name, + src_entry, + dst_entry, + content_difference, + ) + modified.append(differential_entry) + break + + if src_fh.tell() > self.file_limit: + unchanged.append(src_entry) + break + + if len(chunk_a) == 0: + # End of file + unchanged.append(src_entry) + break + + return DirectoryDifferential(path, unchanged, created, modified, deleted) + + def walkdir( + self, + path: str, + exclude: list[str] | str | None = None, + already_iterated: list[str] = None, + ) -> Iterator[DirectoryDifferential]: + """Recursively iterate directories and yield DirectoryDifferentials.""" + if already_iterated is None: + already_iterated = [] + + if path in already_iterated: + return + + if exclude is not None and not isinstance(exclude, list): + exclude = [exclude] + + already_iterated.append(path) + + diff = self.scandir(path) + yield diff + + subentries = diff.created + diff.unchanged + diff.deleted + subdirectories = [entry for entry in subentries if entry.is_dir()] + # Check if the scandir lead to the discovery of new directories that we have to scan for differentials + # Directories are always in 'unchanged' + for subdirectory in subdirectories: + if subdirectory in already_iterated: + continue + + # Right-pad with a '/' + subdirectory_path = subdirectory.path if subdirectory.path.endswith("/") else subdirectory.path + "/" + if exclude: + match = next((pattern for pattern in exclude if fnmatch(subdirectory_path, pattern)), None) + if match: + continue + yield from self.walkdir(subdirectory.path, exclude, already_iterated) + + def differentiate_plugin_outputs(self, plugin_name: str, plugin_arg_parts: list[str]) -> Iterator[Record]: + """Run a plugin on the source and destination targets and yield RecordUnchanged, RecordCreated and RecordDeleted + records. There is no equivalent for the FileModifiedRecord. For files and directories, we can use the path to + reliably track changes from one target to the next. There is no equivalent for plugin outputs, so we just assume + that all records are either deleted (only on src), created (only on dst) or unchanged (on both).""" + old_ignored_values = IGNORE_FIELDS_FOR_COMPARISON + set_ignored_fields_for_comparison(["_generated", "_source", "hostname", "domain"]) + + src_records = set(get_plugin_output_records(plugin_name, plugin_arg_parts, self.src_target)) + src_records_seen = set() + + for dst_record in get_plugin_output_records(plugin_name, plugin_arg_parts, self.dst_target): + if dst_record in src_records: + src_records_seen.add(dst_record) + yield RecordUnchangedRecord( + src_target=self.src_target.path, dst_target=self.dst_target.path, record=dst_record + ) + else: + yield RecordCreatedRecord( + src_target=self.src_target.path, dst_target=self.dst_target.path, record=dst_record + ) + for record in src_records - src_records_seen: + yield RecordDeletedRecord(src_target=self.src_target.path, dst_target=self.dst_target.path, record=record) + + set_ignored_fields_for_comparison(old_ignored_values) + + +class DifferentialCli(ExtendedCmd): + """CLI for browsing the differential between two or more targets.""" + + doc_header_prefix = "Target Diff\n" "==========\n" + doc_header_suffix = "\n\nDocumented commands (type help ):" + doc_header_multiple_targets = "Use 'list', 'prev' and 'next' to list and select targets to differentiate between." + + def __init__(self, *targets: tuple[Target], deep: bool = False, limit: int = FILE_LIMIT): + self.targets = targets + self.deep = deep + self.limit = limit + + self.src_index = 0 + self.dst_index = 0 + self.comparison: TargetComparison = None + + self.cwd = "/" + self.alt_separator = "/" + + doc_header_middle = self.doc_header_multiple_targets if len(targets) > 2 else "" + self.doc_header = self.doc_header_prefix + doc_header_middle + self.doc_header_suffix + + self._select_source_and_dest(0, 1) + if len(self.targets) > 2: + # Some help may be nice if you are diffing more than 2 targets at once + self.do_help(arg=None) + + start_in_cyber = any(target.props.get("cyber") for target in self.targets) + super().__init__(start_in_cyber) + + @property + def src_target(self) -> Target: + return self.targets[self.src_index] + + @property + def dst_target(self) -> Target: + return self.targets[self.dst_index] + + @property + def prompt(self) -> str: + if self.comparison.src_target.name != self.comparison.dst_target.name: + prompt_base = f"({self.comparison.src_target.name}/{self.comparison.dst_target.name})" + else: + prompt_base = self.comparison.src_target.name + + suffix = f"{prompt_base}/diff {self.cwd}>" + + if len(self.targets) <= 2: + return suffix + + chain_prefix = "[ " + for i in range(len(self.targets)): + char = "O " if i == self.src_index or i == self.dst_index else ". " + chain_prefix += char + chain_prefix += "] " + + return f"{chain_prefix}{suffix}" + + def _select_source_and_dest(self, src_index: int, dst_index: int) -> None: + """Set local variables according to newly selected source and destination index, and re-instatiate + TargetComparison.""" + self.src_index = src_index + self.dst_index = dst_index + if not self.src_target.fs.exists(self.cwd) and not self.dst_target.fs.exists(self.cwd): + logging.warning("The current directory exists on neither of the selected targets.") + if self.src_target.fs.alt_separator != self.dst_target.fs.alt_separator: + raise NotImplementedError("No support for handling targets with different path separators") + + self.alt_separator = self.src_target.fs.alt_separator + self.comparison = TargetComparison(self.src_target, self.dst_target, self.deep, self.limit) + + def _annotate_differential( + self, + diff: DirectoryDifferential, + unchanged: bool = True, + created: bool = True, + modified: bool = True, + deleted: bool = True, + absolute: bool = False, + ) -> list[tuple[fsutil.TargetPath | DifferentialEntry], str]: + """Given a DirectoryDifferential instance, construct a list of tuples where the first element is a Filesystem / + DifferentialEntry entries and the second a color-formatted string.""" + r = [] + + attr = "path" if absolute else "name" + if unchanged: + for entry in diff.unchanged: + color = "di" if entry.is_dir() else "fi" + r.append((entry, fmt_ls_colors(color, getattr(entry, attr)))) + + if created: + for entry in diff.created: + color = "tw" if entry.is_dir() else "ex" + r.append((entry, fmt_ls_colors(color, f"{getattr(entry, attr)} (created)"))) + + if modified: + for entry in diff.modified: + # Modified entries are always files + r.append((entry, fmt_ls_colors("ln", f"{getattr(entry, attr)} (modified)"))) + if deleted: + for entry in diff.deleted: + color = "su" if entry.is_dir() else "or" + r.append((entry, fmt_ls_colors(color, f"{getattr(entry, attr)} (deleted)"))) + + r.sort(key=lambda e: e[0].name) + return r + + def _targets_with_path(self, path: str, warn_when_incomplete: bool = False) -> list[Target]: + """Return targets where a given path exists, checking the src and dst target of this class. Optionally log a + warning if the path only exists on one of the two targets.""" + targets_with_path = [] + if self.comparison.src_target.fs.exists(path): + targets_with_path.append(self.comparison.src_target) + if self.comparison.dst_target.fs.exists(path): + targets_with_path.append(self.comparison.dst_target) + if warn_when_incomplete and len(targets_with_path) == 1: + log.warning("'%s' is only present on '%s'.", path, targets_with_path[0]) + return targets_with_path + + def _write_entry_contents_to_stdout(self, entry: FilesystemEntry, stdout: TextIO): + """Copy the contents of a Filesystementry to stdout.""" + stdout = stdout.buffer + fh = entry.open() + shutil.copyfileobj(fh, stdout) + stdout.flush() + print("") + + def completedefault(self, text: str, line: str, begidx: int, endidx: int): + """Autocomplete based on files / directories found in the current path.""" + path = line[:begidx].rsplit(" ")[-1] + textlower = text.lower() + + path = fsutil.abspath(path, cwd=str(self.cwd), alt_separator=self.alt_separator) + + diff = self.comparison.scandir(path) + names = [item.name for group in [diff.created, diff.modified, diff.unchanged, diff.deleted] for item in group] + + r = [name for name in names if name.lower().startswith(textlower)] + return r + + def do_list(self, line): + """Prints a list of targets to differentiate between. Useful when differentiating between three or more + targets. Looks quite bad on small terminal screens.""" + columns = ["#", "Name", "Path", "From", "To"] + + rows = [] + + for i, target in enumerate(self.targets): + rows.append( + [ + f"{i:2d}", + target.name, + str(target.path), + "**" if i == self.src_index else "", + "**" if i == self.dst_index else "", + ] + ) + + longest_name = max(len(row[1]) + 4 for row in rows) + longest_path = max(len(row[2]) + 4 for row in rows) + name_len = max(10, longest_name) + path_len = max(15, longest_path) + + fmt = "{:^5} | {:<" + str(name_len) + "} | {:<" + str(path_len) + "} | {:^6} | {:^6} |" + print(fmt.format(*columns)) + print("") + for row in rows: + print(fmt.format(*row)) + print("") + + @arg("-a", "--absolute", action="store_true", help="Only move the destination target one position back.") + def cmd_previous(self, args: argparse.Namespace, line: str) -> bool: + """When three or more targets are available, move the 'comparison window' one position back.""" + src_index = self.src_index - 1 if not args.absolute else 0 + if src_index < 0: + src_index = len(self.targets) - 1 + dst_index = self.dst_index - 1 + if dst_index < 0: + dst_index = len(self.targets) - 1 + if dst_index <= src_index: + src_index, dst_index = dst_index, src_index + self._select_source_and_dest(src_index, dst_index) + + @arg("-a", "--absolute", action="store_true", help="Only move the destination target one position back.") + def cmd_prev(self, args: argparse.Namespace, line: str) -> bool: + """Alias for previous.""" + self.cmd_previous(args, line) + + @arg("-a", "--absolute", action="store_true", help="Only move the destination target one position forward.") + def cmd_next(self, args: argparse.Namespace, line: str) -> bool: + """When three or more targets are available, move the 'comparison window' one position forward.""" + dst_index = (self.dst_index + 1) % len(self.targets) + src_index = self.src_index + 1 % len(self.targets) if not args.absolute else 0 + + if dst_index <= src_index: + src_index, dst_index = dst_index, src_index + self._select_source_and_dest(src_index, dst_index) + + def do_cd(self, path: str) -> None: + """Change directory to the given path.""" + path = fsutil.abspath(path, cwd=str(self.cwd), alt_separator=self.alt_separator) + targets_with_path = self._targets_with_path(path, warn_when_incomplete=True) + if len(targets_with_path) == 0: + return + self.cwd = path + + @arg("path", nargs="?") + @arg("-l", action="store_true") + @arg("-a", "--all", action="store_true") # ignored but included for proper argument parsing + @arg("-h", "--human-readable", action="store_true") + def cmd_ls(self, args: argparse.Namespace, stdout: TextIO): + """List contents of a directory for two targets.""" + path = args.path if args.path is not None else self.cwd + diff = self.comparison.scandir(path) + results = self._annotate_differential(diff) + if not args.l: + print("\n".join([name for _, name in results]), file=stdout) + else: + for entry, name in results: + if not isinstance(entry, DifferentialEntry): + print_extensive_file_stat(stdout, name, entry) + else: + # We have to choose for which version of this file we are going to print detailed info. The + # destination target seems to make the most sense: it is likely newer + print_extensive_file_stat(stdout, name, entry.dst_target_entry) + + @arg("path", nargs="?") + def cmd_cat(self, args: argparse.Namespace, stdout: TextIO): + """Output the contents of a file.""" + base_dir, _, name = args.path.rpartition("/") + if not base_dir: + base_dir = self.cwd + + directory_differential = self.comparison.scandir(base_dir) + entry = None + for entry in directory_differential.unchanged: + if entry.name == name: + return self._write_entry_contents_to_stdout(entry, stdout) + for entry in directory_differential.created: + if entry.name == name: + log.warning("'%s' is only present on '%s'.", entry.name, self.comparison.dst_target.path) + return self._write_entry_contents_to_stdout(entry, stdout) + for entry in directory_differential.deleted: + if entry.name == name: + log.warning("'%s' is only present on '%s'.", entry.name, self.comparison.src_target.path) + return self._write_entry_contents_to_stdout(entry, stdout) + for entry in directory_differential.modified: + if entry.name == name: + log.warning( + "Concatinating latest version of '%s'. Use 'diff' to differentiate between target versions.", + entry.name, + ) + return self._write_entry_contents_to_stdout(entry.dst_target_entry, stdout) + print(f"File {name} not found.") + + @arg("path", nargs="?") + def cmd_diff(self, args: argparse.Namespace, stdout: TextIO): + """Output the difference in file contents between two targets.""" + stdout = stdout.buffer + base_dir, _, name = args.path.rpartition("/") + if not base_dir: + base_dir = self.cwd + directory_differential = self.comparison.scandir(base_dir) + for entry in directory_differential.modified: + if entry.name == name: + primary_fh_lines = entry.src_target_entry.open().readlines() + secondary_fh_lines = entry.dst_target_entry.open().readlines() + for chunk in diff_bytes(unified_diff, primary_fh_lines, secondary_fh_lines): + if chunk.startswith(b"@@"): + chunk = fmt_ls_colors("ln", chunk.decode()).encode() + elif chunk.startswith(b"+"): + chunk = fmt_ls_colors("ex", chunk.decode()).encode() + elif chunk.startswith(b"-"): + chunk = fmt_ls_colors("or", chunk.decode()).encode() + + shutil.copyfileobj(BytesIO(chunk), stdout) + stdout.flush() + print("") + return + + # Check if this file is even present on one of the targets + files = directory_differential.unchanged + directory_differential.created + directory_differential.deleted + match = next((entry for entry in files if entry.name == name), None) + if match is None: + print(f"File {name} not found.") + else: + print(f"No two versions available for {name} to differentiate between.") + + @arg("index", type=str) + @arg("type", choices=["src", "dst"]) + def cmd_set(self, args: argparse.Namespace, stdout: TextIO): + """Change either the source or destination target for differentiation. Index can be given relative (when + prefixed with '+' or '-', e.g. "set dst +1") or absolute (e.g. set src 0).""" + index = args.index.strip() + pos = self.src_index if args.type == "src" else self.dst_index + + if index.startswith(("+", "-")): + multiplier = 1 if index[0] == "+" else -1 + index = index[1:].strip() + if not index.isdigit(): + return + pos += int(index) * multiplier + elif index.isdigit(): + pos = int(index) + else: + raise ValueError(f"Could not set {args.type} to {index}.") + if args.type == "src": + self._select_source_and_dest(pos, self.dst_index) + else: + self._select_source_and_dest(self.src_index, pos) + + @arg("target", choices=["src", "dst"]) + def cmd_enter(self, args: argparse.Namespace, stdout: TextIO): + """Open a subshell for the source or destination target.""" + target = self.src_target if args.target == "src" else self.dst_target + cli = TargetCli(target) + if target.fs.exists(self.cwd): + cli.chdir(self.cwd) + + # Cyber doesn't work well with subshells + cli.cyber = False + run_cli(cli) + + @arg("path", nargs="?") + @arg("-name", default="*") + @arg("-iname") + @arg("-c", "--created", action="store_true") + @arg("-m", "--modified", action="store_true") + @arg("-d", "--deleted", action="store_true") + @arg("-u", "--unchanged", action="store_true") + def cmd_find(self, args: argparse.Namespace, stdout: TextIO) -> bool | None: + """Search for files in a directory hierarchy.""" + path = fsutil.abspath(args.path, cwd=str(self.cwd), alt_separator=self.comparison.src_target.fs.alt_separator) + if not path: + return + targets_with_path = self._targets_with_path(path, warn_when_incomplete=True) + if len(targets_with_path) < 0: + return + + if args.iname: + pattern = re.compile(translate(args.iname), re.IGNORECASE) + else: + pattern = re.compile(translate(args.name)) + + include_all_changes = not (args.created or args.modified or args.deleted or args.unchanged) + + include_unchanged = args.unchanged + include_modified = include_all_changes or args.modified + include_created = include_all_changes or args.created + include_deleted = include_all_changes or args.deleted + + for differential in self.comparison.walkdir(path): + for entry, line in self._annotate_differential( + differential, include_unchanged, include_created, include_modified, include_deleted, absolute=True + ): + if not pattern.match(entry.name): + continue + + print(line, file=stdout) + + def do_plugin(self, line: str): + """Yield RecordCreated, RecordUnchanged and RecordDeleted Records by comparing plugin outputs for two + targets.""" + argparts = arg_str_to_arg_list(line) + pipeparts = [] + if "|" in argparts: + pipeidx = argparts.index("|") + argparts, pipeparts = argparts[:pipeidx], argparts[pipeidx + 1 :] + + if len(argparts) < 1: + raise ValueError("Provide a plugin name, and optionally parameters to pass to the plugin.") + + plugin = argparts.pop(0) + + iterator = self.comparison.differentiate_plugin_outputs(plugin, argparts) + if pipeparts: + try: + with build_pipe_stdout(pipeparts) as pipe_stdin: + rs = RecordOutput(pipe_stdin.buffer) + for record in iterator: + rs.write(record) + except OSError as e: + # in case of a failure in a subprocess + print(e) + else: + for record in iterator: + print(record, file=sys.stdout) + + def do_python(self, line: str) -> bool | None: + """drop into a Python shell.""" + python_shell(list(self.targets)) + + +def make_target_pairs(targets: tuple[Target], absolute: bool = False) -> list[tuple[Target, Target]]: + """Make 'pairs' of targets that we are going to compare against one another. A list of targets can be treated in two + ways: compare every target with the one that came before it, or compare all targets against a 'base' target (which + has to be supplied as initial target in the list).""" + target_pairs = [] + + previous_target = targets[0] + for target in targets[1:]: + target_pairs.append((previous_target, target)) + if not absolute: + # The next target should be compared against the one we just opened + previous_target = target + return target_pairs + + +def differentiate_target_filesystems( + *targets: tuple[Target], + deep: bool = False, + limit: int = FILE_LIMIT, + absolute: bool = False, + include: list[str] = None, + exclude: list[str] = None, +) -> Iterator[Record]: + """Given a list of targets, compare targets against one another and yield File[Created|Modified|Deleted]Records + indicating the differences between them.""" + if len(targets) < 2: + raise ValueError("Provide two or more targets to differentiate between.") + + for target_pair in make_target_pairs(targets, absolute): + # Unpack the tuple and initialize the comparison class + src_target, dst_target = target_pair + comparison = TargetComparison(src_target, dst_target, deep, limit) + + paths = ["/"] if include is None else include + + for path in paths: + for directory_diff in comparison.walkdir(path, exclude=exclude): + for creation_entry in directory_diff.created: + yield FileCreatedRecord( + path=creation_entry.path, + src_target=src_target.path, + dst_target=dst_target.path, + ) + + for deletion_entry in directory_diff.deleted: + yield FileDeletedRecord( + path=deletion_entry.path, + src_target=src_target.path, + dst_target=dst_target.path, + ) + + for entry_difference in directory_diff.modified: + yield FileModifiedRecord( + path=entry_difference.path, + diff=entry_difference.diff, + src_target=src_target.path, + dst_target=dst_target.path, + ) + + +def differentiate_target_plugin_outputs( + *targets: tuple[Target], absolute: bool = False, plugin: str, plugin_args: str = "" +) -> Iterator[Record]: + """Given a list of targets, yielding records indicating which records from this plugin are new, unmodified or + deleted.""" + for target_pair in make_target_pairs(targets, absolute): + src_target, dst_target = target_pair + comparison = TargetComparison(src_target, dst_target) + yield from comparison.differentiate_plugin_outputs(plugin, plugin_args) + + +@catch_sigpipe +def main() -> None: + help_formatter = argparse.ArgumentDefaultsHelpFormatter + parser = argparse.ArgumentParser( + description="target-diff", + fromfile_prefix_chars="@", + formatter_class=help_formatter, + ) + + parser.add_argument( + "-d", + "--deep", + action="store_true", + help="Compare file contents even if metadata suggests they have been left unchanged", + ) + parser.add_argument( + "-l", + "--limit", + default=FILE_LIMIT, + type=int, + help="How many bytes to compare before assuming a file is left unchanged (0 for no limit)", + ) + subparsers = parser.add_subparsers(help="Mode for differentiating targets", dest="mode") + + shell_mode = subparsers.add_parser("shell", help="Open an interactive shell to compare two or more targets.") + shell_mode.add_argument("targets", metavar="TARGETS", nargs="*", help="Targets to differentiate between") + + fs_mode = subparsers.add_parser("fs", help="Yield records about differences between target filesystems.") + fs_mode.add_argument("targets", metavar="TARGETS", nargs="*", help="Targets to differentiate between") + fs_mode.add_argument("-s", "--strings", action="store_true", help="print records as strings") + fs_mode.add_argument("-e", "--exclude", action="append", help="Path(s) on targets not to check for differences") + fs_mode.add_argument( + "-i", + "--include", + action="append", + help="Path(s) on targets to check for differences (all will be checked if left omitted)", + ) + fs_mode.add_argument( + "-a", + "--absolute", + action="store_true", + help=( + "Treat every target as an absolute. The first given target is treated as the 'base' target to compare " + "subsequent targets against. If omitted, every target is treated as a 'delta' and compared against the " + "target that came before it." + ), + ) + + query_mode = subparsers.add_parser("query", help="Differentiate plugin outputs between two or more targets.") + query_mode.add_argument("targets", metavar="TARGETS", nargs="*", help="Targets to differentiate between") + query_mode.add_argument("-s", "--strings", action="store_true", help="print records as strings") + query_mode.add_argument( + "-p", + "--parameters", + type=str, + required=False, + default="", + help="Parameters for the plugin", + ) + query_mode.add_argument( + "-f", + "--plugin", + type=str, + required=True, + help="Function to execute", + ) + query_mode.add_argument( + "-a", + "--absolute", + action="store_true", + help=( + "Treat every target as an absolute. The first given target is treated as the 'base' target to compare " + "subsequent targets against. If omitted, every target is treated as a 'delta' and compared against the " + "target that came before it." + ), + ) + + configure_generic_arguments(parser) + + args = parser.parse_args() + process_generic_arguments(args) + + target_list = [Target.open(path) for path in args.targets] + if args.mode == "shell": + cli = DifferentialCli(*target_list, deep=args.deep, limit=args.limit) + run_cli(cli) + else: + writer = record_output(args.strings) + if args.mode == "fs": + iterator = differentiate_target_filesystems( + *target_list, + deep=args.deep, + limit=args.limit, + absolute=args.absolute, + include=args.include, + exclude=args.exclude, + ) + elif args.mode == "query": + iterator = differentiate_target_plugin_outputs( + *target_list, + absolute=args.absolute, + plugin=args.plugin, + plugin_args=arg_str_to_arg_list(args.parameters), + ) + for record in iterator: + writer.write(record) + + +if __name__ == "__main__": + main() diff --git a/dissect/target/tools/shell.py b/dissect/target/tools/shell.py index 2471cd490..a048d7e20 100644 --- a/dissect/target/tools/shell.py +++ b/dissect/target/tools/shell.py @@ -90,7 +90,7 @@ def prepare_ls_colors() -> dict[str, str]: LS_COLORS = prepare_ls_colors() -class TargetCmd(cmd.Cmd): +class ExtendedCmd(cmd.Cmd): """Subclassed cmd.Cmd to provide some additional features. Add new simple commands by implementing: @@ -110,9 +110,9 @@ class TargetCmd(cmd.Cmd): CMD_PREFIX = "cmd_" - def __init__(self, target: Target): + def __init__(self, start_in_cyber: bool = False): cmd.Cmd.__init__(self) - self.target = target + self.cyber = start_in_cyber def __getattr__(self, attr: str) -> Any: if attr.startswith("help_"): @@ -140,22 +140,29 @@ def get_names(self) -> list[str]: return names - def default(self, line: str) -> Optional[bool]: + def check_custom_command_execution(self, line: str) -> tuple[bool, Any]: + """Check whether custom handling of the cmd can be performed and if so, do it. Returns a tuple containing a + boolean whether or not a custom command execution was performed, and the result of said execution.""" if line == "EOF": - return True + return True, True - # Override default command execution to first attempt complex - # command execution, and then target plugin command execution + # Override default command execution to first attempt complex command execution command, command_args_str, line = self.parseline(line) try: - return self._exec_command(command, command_args_str) + return True, self._exec_command(command, command_args_str) except AttributeError: - pass - - if self.target.has_function(command): - return self._exec_target(command, command_args_str) + return False, None + def default(self, line: str): + try: + handled, response = self.check_custom_command_execution(line) + if handled: + return response + except Exception: + # For unhandled exceptions, we do not want the 'unknown syntax' error, but a stacktrace. + traceback.print_exc() + return return cmd.Cmd.default(self, line) def emptyline(self) -> None: @@ -174,10 +181,7 @@ def _exec( argparts = [] if command_args_str is not None: - lexer = shlex.shlex(command_args_str, posix=True, punctuation_chars=True) - lexer.wordchars += "$" - lexer.whitespace_split = True - argparts = list(lexer) + argparts = arg_str_to_arg_list(command_args_str) try: if "|" in argparts: @@ -191,7 +195,8 @@ def _exec( print(e) else: ctx = contextlib.nullcontext() - if self.target.props.get("cyber") and not no_cyber: + + if self.cyber and not no_cyber: ctx = cyber.cyber(color=None, run_at_end=True) with ctx: @@ -215,6 +220,42 @@ def _exec_(argparts: list[str], stdout: TextIO) -> bool: no_cyber = cmdfunc.__func__ in (TargetCli.cmd_registry, TargetCli.cmd_enter) return self._exec(_exec_, command_args_str, no_cyber) + def do_clear(self, line: str) -> Optional[bool]: + """clear the terminal screen""" + os.system("cls||clear") + + def do_exit(self, line: str) -> Optional[bool]: + """exit shell""" + return True + + def do_cyber(self, line: str): + """cyber""" + self.cyber = not self.cyber + word, color = {False: ("D I S E N", cyber.Color.RED), True: ("E N", cyber.Color.YELLOW)}[self.cyber] + with cyber.cyber(color=color): + print(f"C Y B E R - M O D E - {word} G A G E D") + + +class TargetCmd(ExtendedCmd): + def __init__(self, target: Target): + self.target = target + start_in_cyber = self.target.props.get("cyber") + + super().__init__(start_in_cyber) + + def check_custom_command_execution(self, line: str) -> tuple[bool, Any]: + handled, response = super().check_custom_command_execution(line) + if handled: + return handled, response + + # The parent class has already attempted complex command execution, we now attempt target plugin command + # execution + command, command_args_str, line = self.parseline(line) + + if self.target.has_function(command): + return True, self._exec_target(command, command_args_str) + return False, None + def _exec_target(self, func: str, command_args_str: str) -> Optional[bool]: """Command exection helper for target plugins.""" attr = self.target @@ -262,23 +303,6 @@ def do_python(self, line: str) -> Optional[bool]: """drop into a Python shell""" python_shell([self.target]) - def do_clear(self, line: str) -> Optional[bool]: - """clear the terminal screen""" - os.system("cls||clear") - - def do_cyber(self, line: str) -> Optional[bool]: - """cyber""" - self.target.props["cyber"] = not bool(self.target.props.get("cyber")) - word, color = {False: ("D I S E N", cyber.Color.RED), True: ("E N", cyber.Color.YELLOW)}[ - self.target.props["cyber"] - ] - with cyber.cyber(color=color): - print(f"C Y B E R - M O D E - {word} G A G E D") - - def do_exit(self, line: str) -> Optional[bool]: - """exit shell""" - return True - class TargetHubCli(cmd.Cmd): """Hub Cli for interacting with multiple targets.""" @@ -550,7 +574,18 @@ def _print_ls(self, args: argparse.Namespace, path: fsutil.TargetPath, depth: in if len(contents) > 1: print(f"total {len(contents)}", file=stdout) for target_path, name in contents: - self.print_extensive_file_stat(args=args, stdout=stdout, target_path=target_path, name=name) + try: + entry = target_path.get() + stat = entry.lstat() + show_time = stat.st_mtime + if args.use_ctime: + show_time = stat.st_ctime + elif args.use_atime: + show_time = stat.st_atime + except FileNotFoundError: + entry = None + show_time = None + print_extensive_file_stat(stdout, name, entry, show_time) if target_path.is_dir(): subdirs.append(target_path) @@ -558,32 +593,6 @@ def _print_ls(self, args: argparse.Namespace, path: fsutil.TargetPath, depth: in for subdir in subdirs: self._print_ls(args, subdir, depth + 1, stdout) - def print_extensive_file_stat( - self, args: argparse.Namespace, stdout: TextIO, target_path: fsutil.TargetPath, name: str - ) -> None: - """Print the file status.""" - try: - entry = target_path.get() - stat = entry.lstat() - symlink = f" -> {entry.readlink()}" if entry.is_symlink() else "" - show_time = stat.st_mtime - if args.use_ctime: - show_time = stat.st_ctime - elif args.use_atime: - show_time = stat.st_atime - utc_time = datetime.datetime.utcfromtimestamp(show_time).isoformat() - - print( - f"{stat_modestr(stat)} {stat.st_uid:4d} {stat.st_gid:4d} {stat.st_size:6d} {utc_time} {name}{symlink}", - file=stdout, - ) - - except FileNotFoundError: - print( - f"?????????? ? ? ? ????-??-??T??:??:??.?????? {name}", - file=stdout, - ) - @arg("path", nargs="?") @arg("-name", default="*") @arg("-iname") @@ -1098,6 +1107,36 @@ def fmt_ls_colors(ft: str, name: str) -> str: return name +def arg_str_to_arg_list(args: str) -> list[str]: + """Convert a commandline string to a list of command line arguments.""" + lexer = shlex.shlex(args, posix=True, punctuation_chars=True) + lexer.wordchars += "$" + lexer.whitespace_split = True + return list(lexer) + + +def print_extensive_file_stat( + stdout: TextIO, name: str, entry: Optional[FilesystemEntry] = None, timestamp: Optional[datetime.datetime] = None +) -> None: + """Print the file status.""" + if entry is not None: + try: + stat = entry.lstat() + if timestamp is None: + timestamp = stat.st_mtime + symlink = f" -> {entry.readlink()}" if entry.is_symlink() else "" + utc_time = datetime.datetime.utcfromtimestamp(timestamp).isoformat() + + print( + f"{stat_modestr(stat)} {stat.st_uid:4d} {stat.st_gid:4d} {stat.st_size:6d} {utc_time} {name}{symlink}", + file=stdout, + ) + return + except FileNotFoundError: + pass + print(f"?????????? ? ? ? ????-??-??T??:??:??.?????? {name}", file=stdout) + + @contextmanager def build_pipe(pipe_parts: list[str], pipe_stdout: int = subprocess.PIPE) -> Iterator[tuple[TextIO, BinaryIO]]: """ diff --git a/pyproject.toml b/pyproject.toml index 0e912cfe8..d11c5013e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ "dissect.regf>=3.3.dev,<4.0.dev", "dissect.util>=3.0.dev,<4.0.dev", "dissect.volume>=3.0.dev,<4.0.dev", - "flow.record~=3.14.0", + "flow.record~=3.15.dev10 ", "structlog", ] dynamic = ["version"] @@ -101,6 +101,7 @@ mqtt = [ target-build-pluginlist = "dissect.target.tools.build_pluginlist:main" target-dump = "dissect.target.tools.dump.run:main" target-dd = "dissect.target.tools.dd:main" +target-diff = "dissect.target.tools.diff:main" target-fs = "dissect.target.tools.fs:main" target-info = "dissect.target.tools.info:main" target-mount = "dissect.target.tools.mount:main" diff --git a/tests/_data/tools/diff/dst.tar b/tests/_data/tools/diff/dst.tar new file mode 100644 index 000000000..88436309a Binary files /dev/null and b/tests/_data/tools/diff/dst.tar differ diff --git a/tests/_data/tools/diff/src.tar b/tests/_data/tools/diff/src.tar new file mode 100644 index 000000000..158056d8b Binary files /dev/null and b/tests/_data/tools/diff/src.tar differ diff --git a/tests/tools/test_diff.py b/tests/tools/test_diff.py new file mode 100644 index 000000000..0ae65522d --- /dev/null +++ b/tests/tools/test_diff.py @@ -0,0 +1,358 @@ +from __future__ import annotations + +import textwrap +from io import BytesIO, StringIO +from pathlib import Path +from typing import Iterator + +import pytest + +import dissect.target.tools.shell as shell +from dissect.target.filesystem import VirtualFilesystem +from dissect.target.helpers.fsutil import stat_result +from dissect.target.plugins.os.unix._os import UnixPlugin +from dissect.target.target import Target +from dissect.target.tools.diff import ( + DifferentialCli, + TargetComparison, + differentiate_target_filesystems, + differentiate_target_plugin_outputs, + likely_unchanged, +) +from dissect.target.tools.diff import main as target_diff +from tests._utils import absolute_path +from tests.conftest import make_os_target + +PASSWD_CONTENTS = """ + root:x:0:0:root:/root:/bin/bash + user:x:1000:1000:user:/home/user:/bin/bash + """ + + +class TargetUnixFactory: + def __init__(self, tmp_path: Path): + self.tmp_path = tmp_path + + def new(self, hostname: str) -> tuple[Target, VirtualFilesystem]: + """Initialize a virtual unix target.""" + fs = VirtualFilesystem() + + fs.makedirs("var") + fs.makedirs("etc") + fs.map_file_fh("/etc/hostname", BytesIO(hostname.encode())) + + return make_os_target(self.tmp_path, UnixPlugin, root_fs=fs), fs + + +@pytest.fixture +def target_unix_factory(tmp_path: Path) -> TargetUnixFactory: + """This fixture returns a class that can instantiate a virtual unix targets from a blueprint. This can then be used + to create a fixture for the source target and the desination target, without them 'bleeding' into each other.""" + return TargetUnixFactory(tmp_path) + + +@pytest.fixture +def src_target(target_unix_factory) -> Iterator[Target]: + target, fs_unix = target_unix_factory.new("src_target") + + passwd_contents = PASSWD_CONTENTS + "\nsrc_user:x:1001:1001:src_user:/home/src_user:/bin/bash" + + fs_unix.map_file_fh("/etc/passwd", BytesIO(textwrap.dedent(passwd_contents).encode())) + + fs_unix.map_file_fh("changes/unchanged", BytesIO(b"Unchanged")) + fs_unix.map_file_fh("changes/changed", BytesIO(b"Hello From Source Target")) + fs_unix.map_file_fh("changes/only_on_src", BytesIO(b"FooBarBaz")) + + fs_unix.map_file_fh("changes/subdirectory_both/on_both", BytesIO(b"On Both")) + fs_unix.map_file_fh("changes/subdirectory_src/only_on_src", BytesIO(b"Hello From Source Target")) + + fs_unix.map_file_fh("changes/file_on_src", BytesIO(b"Hello From Source Target")) + fs_unix.map_file_fh("changes/dir_on_src/file", BytesIO(b"Hello From Source Target")) + yield target + + +@pytest.fixture +def dst_target(target_unix_factory) -> Iterator[Target]: + target, fs_unix = target_unix_factory.new("dst_target") + + passwd_contents = PASSWD_CONTENTS + "\ndst_user:x:1002:1002:dst_user:/home/dst_user:/bin/bash" + + fs_unix.map_file_fh("/etc/passwd", BytesIO(textwrap.dedent(passwd_contents).encode())) + + fs_unix.map_file_fh("changes/unchanged", BytesIO(b"Unchanged")) + fs_unix.map_file_fh("changes/changed", BytesIO(b"Hello From Destination Target")) + fs_unix.map_file_fh("changes/only_on_dst", BytesIO(b"BazBarFoo")) + + fs_unix.map_file_fh("changes/subdirectory_both/on_both", BytesIO(b"On Both")) + fs_unix.map_file_fh("changes/subdirectory_dst/only_on_dst", BytesIO(b"Hello From Destination Target")) + + fs_unix.map_file_fh("changes/dir_on_src", BytesIO(b"Hello From Destination Target")) + fs_unix.map_file_fh("changes/file_on_src/file", BytesIO(b"Hello From Destination Target")) + yield target + + +def test_scandir(src_target: Target, dst_target: Target) -> None: + comparison = TargetComparison(src_target, dst_target, deep=True) + diff = comparison.scandir("changes") + + assert len(diff.deleted) == 4 + assert diff.deleted[0].name == "only_on_src" + assert diff.deleted[0].open().read() == b"FooBarBaz" + assert diff.deleted[1].name == "subdirectory_src" + assert diff.deleted[2].name == "dir_on_src" + assert diff.deleted[3].open().read() == b"Hello From Source Target" + + assert len(diff.created) == 4 + assert diff.created[0].open().read() == b"BazBarFoo" + assert diff.created[0].name == "only_on_dst" + assert diff.created[1].name == "subdirectory_dst" + + assert diff.created[2].name == "dir_on_src" + assert diff.created[2].open().read() == b"Hello From Destination Target" + assert diff.created[3].name == "file_on_src" + assert diff.created[3].is_dir() + + assert len(diff.unchanged) == 2 + assert diff.unchanged[0].open().read() == b"Unchanged" + assert diff.unchanged[0].name == "unchanged" + + assert diff.unchanged[1].name == "subdirectory_both" + + assert len(diff.modified) == 1 + differential_entry = diff.modified[0] + assert differential_entry.src_target_entry.open().read() == b"Hello From Source Target" + assert differential_entry.dst_target_entry.open().read() == b"Hello From Destination Target" + assert differential_entry.diff == [ + b"--- \n", + b"+++ \n", + b"@@ -1 +1 @@\n", + b"-Hello From Source Target", + b"+Hello From Destination Target", + ] + + +def test_walkdir(src_target: Target, dst_target: Target) -> None: + comparison = TargetComparison(src_target, dst_target, deep=True) + differentials = list(comparison.walkdir("changes")) + + assert len(differentials) == 6 + assert sorted(differential.directory for differential in differentials) == [ + "/changes/dir_on_src", + "/changes/file_on_src", + "/changes/subdirectory_both", + "/changes/subdirectory_dst", + "/changes/subdirectory_src", + "changes", + ] + + assert differentials[0].directory == "changes" + + subdirectories_only_on_dst = ["/changes/subdirectory_dst", "/changes/file_on_src"] + for subdirectory in subdirectories_only_on_dst: + differential = next((differential for differential in differentials if differential.directory == subdirectory)) + + # All entries should be 'created' as this directory doesn't exist on the source target + assert len(differential.modified) == 0 + assert len(differential.deleted) == 0 + assert len(differential.unchanged) == 0 + assert len(differential.created) == 1 + assert differential.created[0].open().read() == b"Hello From Destination Target" + + subdirectories_only_on_src = ["/changes/subdirectory_src", "/changes/dir_on_src"] + + for subdirectory in subdirectories_only_on_src: + differential = next((differential for differential in differentials if differential.directory == subdirectory)) + + # All entries should be 'created' as this directory doesn't exist on the destination target + assert len(differential.modified) == 0 + assert len(differential.deleted) == 1 + assert len(differential.unchanged) == 0 + assert len(differential.created) == 0 + assert differential.deleted[0].open().read() == b"Hello From Source Target" + + # All entries should be 'unchanged' as this folder is identical on both + assert len(differentials[3].modified) == 0 + assert len(differentials[3].deleted) == 0 + assert len(differentials[3].unchanged) == 1 + assert len(differentials[3].created) == 0 + assert differentials[3].unchanged[0].open().read() == b"On Both" + + +def test_likely_unchanged() -> None: + # ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime'] + mock_stat = stat_result([0o1777, 1, 2, 3, 1337, 7331, 999, 0, 0, 0]) + mock_stat_accessed = stat_result([0o1777, 1, 2, 3, 1337, 7331, 999, 999, 0, 0]) + mock_stat_changed = stat_result([0o1777, 1, 2, 3, 1337, 7331, 999, 999, 999, 0]) + + assert likely_unchanged(mock_stat, mock_stat_accessed) + assert not likely_unchanged(mock_stat, mock_stat_changed) + + +def test_differentiate_filesystems(src_target: Target, dst_target: Target) -> None: + records = list(differentiate_target_filesystems(src_target, dst_target, deep=True, exclude="/etc/*")) + + created = [record for record in records if "created" in record._desc.name] + modified = [record for record in records if "modified" in record._desc.name] + deleted = [record for record in records if "deleted" in record._desc.name] + + assert len(created) == 6 + assert all(record._desc.name == "differential/file/created" for record in created) + + assert len(modified) == 1 + assert all(record._desc.name == "differential/file/modified" for record in modified) + + assert len(deleted) == 6 + assert all(record._desc.name == "differential/file/deleted" for record in deleted) + + +def test_differentiate_plugins(src_target: Target, dst_target: Target) -> None: + records = list(differentiate_target_plugin_outputs(src_target, dst_target, plugin="users")) + assert len(records) == 4 + + created = [record for record in records if "created" in record._desc.name] + unchanged = [record for record in records if "unchanged" in record._desc.name] + deleted = [record for record in records if "deleted" in record._desc.name] + + assert len(unchanged) == 2 + assert len(created) == 1 + assert len(deleted) == 1 + + assert created[0].record.name == "dst_user" + assert created[0].record.hostname == "dst_target" + assert deleted[0].record.name == "src_user" + assert deleted[0].record.hostname == "src_target" + + +def test_shell_ls(src_target: Target, dst_target: Target, capsys, monkeypatch) -> None: + monkeypatch.setattr(shell, "LS_COLORS", {}) + + cli = DifferentialCli(src_target, dst_target, deep=True) + cli.onecmd("ls changes") + + captured = capsys.readouterr() + + expected = [ + "changed (modified)", + "dir_on_src (created)", + "dir_on_src (deleted)", + "file_on_src (created)", + "file_on_src (deleted)", + "only_on_dst (created)", + "only_on_src (deleted)", + "subdirectory_both", + "subdirectory_dst (created)", + "subdirectory_src (deleted)", + "unchanged", + ] + + assert captured.out == "\n".join(expected) + "\n" + + +def test_shell_find(src_target: Target, dst_target: Target, capsys, monkeypatch) -> None: + monkeypatch.setattr(shell, "LS_COLORS", {}) + + cli = DifferentialCli(src_target, dst_target, deep=True) + cli.onecmd("find /changes -cmd") + + captured = capsys.readouterr() + + expected = [ + "/changes/changed (modified)", + "/changes/dir_on_src (created)", + "/changes/dir_on_src (deleted)", + "/changes/file_on_src (created)", + "/changes/file_on_src (deleted)", + "/changes/only_on_dst (created)", + "/changes/only_on_src (deleted)", + "/changes/subdirectory_dst (created)", + "/changes/subdirectory_src (deleted)", + "/changes/subdirectory_dst/only_on_dst (created)", + "/changes/file_on_src/file (created)", + "/changes/subdirectory_src/only_on_src (deleted)", + "/changes/dir_on_src/file (deleted)", + ] + + assert captured.out == "\n".join(expected) + "\n" + + +def test_shell_cat(src_target: Target, dst_target: Target, capsys) -> None: + cli = DifferentialCli(src_target, dst_target, deep=True) + + cli.onecmd("cat /changes/unchanged") + captured = capsys.readouterr() + assert captured.out == "Unchanged\n" + + cli.onecmd("cat /changes/subdirectory_dst/only_on_dst") + captured = capsys.readouterr() + assert captured.out == "Hello From Destination Target\n" + + cli.onecmd("cat /changes/subdirectory_src/only_on_src") + captured = capsys.readouterr() + assert captured.out == "Hello From Source Target\n" + + # When a file is present on both, we want the last version of the file to be outputted. + cli.onecmd("cat /changes/changed") + captured = capsys.readouterr() + assert captured.out == "Hello From Destination Target\n" + + +def test_shell_plugin(src_target: Target, dst_target: Target, capsys) -> None: + cli = DifferentialCli(src_target, dst_target, deep=True) + + cli.onecmd("plugin users") + captured = capsys.readouterr() + + assert "differential/record/created" in captured.out + assert "differential/record/unchanged" in captured.out + assert "differential/record/deleted" in captured.out + + +def test_target_diff_shell(capsys, monkeypatch) -> None: + with monkeypatch.context() as m: + m.setattr(shell, "LS_COLORS", {}) + src_target_path = absolute_path("_data/tools/diff/src.tar") + dst_target_path = absolute_path("_data/tools/diff/dst.tar") + m.setattr("sys.argv", ["target-diff", "--deep", "shell", src_target_path, dst_target_path]) + m.setattr("sys.stdin", StringIO("ls changes")) + target_diff() + out, err = capsys.readouterr() + out = out.replace("(src_target/dst_target)/diff />", "").strip() + + expected = [ + "changed (modified)", + "only_on_dst (created)", + "only_on_src (deleted)", + "subdirectory_both", + "subdirectory_dst (created)", + "subdirectory_src (deleted)", + "unchanged", + ] + + assert out == "\n".join(expected) + assert "unrecognized arguments" not in err + + +def test_target_diff_fs(capsys, monkeypatch) -> None: + with monkeypatch.context() as m: + src_target_path = absolute_path("_data/tools/diff/src.tar") + dst_target_path = absolute_path("_data/tools/diff/dst.tar") + m.setattr("sys.argv", ["target-diff", "--deep", "fs", "--strings", src_target_path, dst_target_path]) + target_diff() + out, _ = capsys.readouterr() + + assert "differential/file/created" in out + assert "differential/file/modified" in out + assert "differential/file/deleted" in out + + +def test_target_diff_query(capsys, monkeypatch) -> None: + with monkeypatch.context() as m: + src_target_path = absolute_path("_data/tools/diff/src.tar") + dst_target_path = absolute_path("_data/tools/diff/dst.tar") + m.setattr("sys.argv", ["target-diff", "query", "--strings", "-f", "users", src_target_path, dst_target_path]) + target_diff() + out, _ = capsys.readouterr() + + assert "differential/record/created" in out + assert "differential/record/unchanged" in out + assert "differential/record/deleted" in out diff --git a/tests/tools/test_shell.py b/tests/tools/test_shell.py index 098d6f5ad..5e73c5751 100644 --- a/tests/tools/test_shell.py +++ b/tests/tools/test_shell.py @@ -9,7 +9,7 @@ from dissect.target.exceptions import FileNotFoundError from dissect.target.filesystem import FilesystemEntry -from dissect.target.helpers.fsutil import TargetPath, normalize, stat_result +from dissect.target.helpers.fsutil import normalize, stat_result from dissect.target.tools import shell from dissect.target.tools.shell import ( TargetCli, @@ -18,6 +18,7 @@ build_pipe_stdout, ) from dissect.target.tools.shell import main as target_shell +from dissect.target.tools.shell import print_extensive_file_stat from tests._utils import absolute_path GREP_MATCH = "test1 and test2" @@ -179,44 +180,30 @@ def test_target_cli_print_extensive_file_stat(target_win, capsys): mock_entry = MagicMock(spec_set=FilesystemEntry) mock_entry.lstat.return_value = mock_stat mock_entry.is_symlink.return_value = False - mock_path = MagicMock(spec_set=TargetPath) - mock_path.get.return_value = mock_entry - mock_args = MagicMock() - - cli = TargetCli(target_win) - cli.print_extensive_file_stat(mock_args, sys.stdout, mock_path, "foo") + print_extensive_file_stat(sys.stdout, "foo", mock_entry) captured = capsys.readouterr() assert captured.out == "-rwxrwxrwx 1337 7331 999 1970-01-01T00:00:00 foo\n" -def test_target_cli_print_extensive_file_stat_symlink(target_win, capsys): +def test_print_extensive_file_stat_symlink(target_win, capsys): mock_stat = stat_result([0o1777, 1, 2, 3, 1337, 7331, 999, 0, 0, 0]) mock_entry = MagicMock(spec_set=FilesystemEntry) mock_entry.lstat.return_value = mock_stat mock_entry.is_symlink.return_value = True mock_entry.readlink.return_value = "bar" - mock_path = MagicMock(spec_set=TargetPath) - mock_path.get.return_value = mock_entry - mock_args = MagicMock() - - cli = TargetCli(target_win) - cli.print_extensive_file_stat(mock_args, sys.stdout, mock_path, "foo") + print_extensive_file_stat(sys.stdout, "foo", mock_entry) captured = capsys.readouterr() assert captured.out == "-rwxrwxrwx 1337 7331 999 1970-01-01T00:00:00 foo -> bar\n" -def test_target_cli_print_extensive_file_stat_fail(target_win, capsys): - mock_path = MagicMock(spec_set=TargetPath) - mock_path.get.side_effect = FileNotFoundError("ERROR") - - mock_args = MagicMock() - - cli = TargetCli(target_win) - cli.print_extensive_file_stat(mock_args, sys.stdout, mock_path, "foo") +def test_print_extensive_file_stat_fail(target_win, capsys): + mock_entry = MagicMock(spec_set=FilesystemEntry) + mock_entry.lstat.side_effect = FileNotFoundError("ERROR") + print_extensive_file_stat(sys.stdout, "foo", mock_entry) captured = capsys.readouterr() assert captured.out == "?????????? ? ? ? ????-??-??T??:??:??.?????? foo\n"