From 6d6fea1641fe5f617ce1dea40ff7451318bf2099 Mon Sep 17 00:00:00 2001 From: Computer Network Investigation <121175071+JSCU-CNI@users.noreply.github.com> Date: Wed, 16 Oct 2024 16:34:00 +0200 Subject: [PATCH 1/2] Add support for ISO 8601 formatted auth log timestamps (#901) --- dissect/target/plugins/os/unix/log/auth.py | 101 +++++++++++++------ tests/_data/plugins/os/unix/log/auth/iso.log | 3 + tests/plugins/os/unix/log/test_auth.py | 34 ++++++- 3 files changed, 103 insertions(+), 35 deletions(-) create mode 100644 tests/_data/plugins/os/unix/log/auth/iso.log diff --git a/dissect/target/plugins/os/unix/log/auth.py b/dissect/target/plugins/os/unix/log/auth.py index 6c2a0660c..393502e14 100644 --- a/dissect/target/plugins/os/unix/log/auth.py +++ b/dissect/target/plugins/os/unix/log/auth.py @@ -1,11 +1,25 @@ +import itertools +import logging import re from itertools import chain from typing import Iterator from dissect.target.exceptions import UnsupportedPluginError -from dissect.target.helpers.record import TargetRecordDescriptor +from dissect.target.helpers.fsutil import TargetPath, open_decompress +from dissect.target.helpers.record import DynamicDescriptor, TargetRecordDescriptor from dissect.target.helpers.utils import year_rollover_helper -from dissect.target.plugin import Plugin, export +from dissect.target.plugin import Plugin, alias, export + +log = logging.getLogger(__name__) + +_RE_TS = r"^[A-Za-z]{3}\s*[0-9]{1,2}\s[0-9]{1,2}:[0-9]{2}:[0-9]{2}" +_RE_TS_ISO = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+\d{2}:\d{2}" + +RE_TS = re.compile(_RE_TS) +RE_TS_ISO = re.compile(_RE_TS_ISO) +RE_LINE = re.compile( + rf"(?P{_RE_TS}|{_RE_TS_ISO})\s(?P\S+)\s(?P\S+?)(\[(?P\d+)\])?:\s(?P.+)$" +) AuthLogRecord = TargetRecordDescriptor( "linux/log/auth", @@ -16,10 +30,6 @@ ], ) -_TS_REGEX = r"^[A-Za-z]{3}\s*[0-9]{1,2}\s[0-9]{1,2}:[0-9]{2}:[0-9]{2}" -RE_TS = re.compile(_TS_REGEX) -RE_TS_AND_HOSTNAME = re.compile(_TS_REGEX + r"\s\S+\s") - class AuthPlugin(Plugin): """Unix auth log plugin.""" @@ -29,34 +39,65 @@ def check_compatible(self) -> None: if not any(var_log.glob("auth.log*")) and not any(var_log.glob("secure*")): raise UnsupportedPluginError("No auth log files found") - @export(record=AuthLogRecord) - def securelog(self) -> Iterator[AuthLogRecord]: - """Return contents of /var/log/auth.log* and /var/log/secure*.""" - return self.authlog() + @alias("securelog") + @export(record=DynamicDescriptor(["datetime", "path", "string"])) + def authlog(self) -> Iterator[any]: + """Yield contents of ``/var/log/auth.log*`` and ``/var/log/secure*`` files. - @export(record=AuthLogRecord) - def authlog(self) -> Iterator[AuthLogRecord]: - """Return contents of /var/log/auth.log* and /var/log/secure*.""" + Order of returned events is not guaranteed to be chronological because of year + rollover detection efforts for log files without a year in the timestamp. - # Assuming no custom date_format template is set in syslog-ng or systemd (M d H:M:S) - # CentOS format: Jan 12 13:37:00 hostname daemon: message - # Debian format: Jan 12 13:37:00 hostname daemon[pid]: pam_unix(daemon:session): message + The following timestamp formats are recognised automatically. This plugin + assumes that no custom ``date_format`` template is set in ``syslog-ng`` or ``systemd`` + configuration (defaults to ``M d H:M:S``). + + ISO formatted authlog entries are parsed as can be found in Ubuntu 24.04 and later. + + .. code-block:: text + + CentOS format: Jan 12 13:37:00 hostname daemon: message + Debian format: Jan 12 13:37:00 hostname daemon[pid]: pam_unix(daemon:session): message + Ubuntu 24.04: 2024-01-12T13:37:00.000000+02:00 hostname daemon[pid]: pam_unix(daemon:session): message + + Resources: + - https://help.ubuntu.com/community/LinuxLogFiles + """ tzinfo = self.target.datetime.tzinfo var_log = self.target.fs.path("/var/log") for auth_file in chain(var_log.glob("auth.log*"), var_log.glob("secure*")): - for ts, line in year_rollover_helper(auth_file, RE_TS, "%b %d %H:%M:%S", tzinfo): - ts_and_hostname = re.search(RE_TS_AND_HOSTNAME, line) - if not ts_and_hostname: - self.target.log.warning("No timstamp and hostname found on one of the lines in %s.", auth_file) - self.target.log.debug("Skipping this line: %s", line) - continue - - message = line.replace(ts_and_hostname.group(0), "").strip() - yield AuthLogRecord( - ts=ts, - message=message, - source=auth_file, - _target=self.target, - ) + if is_iso_fmt(auth_file): + iterable = iso_readlines(auth_file) + + else: + iterable = year_rollover_helper(auth_file, RE_TS, "%b %d %H:%M:%S", tzinfo) + + for ts, line in iterable: + yield self._auth_log_builder.build_record(ts, auth_file, line) + + +def iso_readlines(file: TargetPath) -> Iterator[tuple[datetime, str]]: + """Iterator reading the provided auth log file in ISO format. Mimics ``year_rollover_helper`` behaviour.""" + + with open_decompress(file, "rt") as fh: + for line in fh: + if not (match := RE_TS_ISO.match(line)): + log.warning("No timestamp found in one of the lines in %s!", file) + log.debug("Skipping line: %s", line) + continue + + try: + ts = datetime.strptime(match[0], "%Y-%m-%dT%H:%M:%S.%f%z") + + except ValueError as e: + log.warning("Unable to parse ISO timestamp in line: %s", line) + log.debug("", exc_info=e) + continue + + yield ts, line + + +def is_iso_fmt(file: TargetPath) -> bool: + """Determine if the provided auth log file uses new ISO format logging or not.""" + return any(itertools.islice(iso_readlines(file), 0, 2)) diff --git a/tests/_data/plugins/os/unix/log/auth/iso.log b/tests/_data/plugins/os/unix/log/auth/iso.log new file mode 100644 index 000000000..2881562af --- /dev/null +++ b/tests/_data/plugins/os/unix/log/auth/iso.log @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:40bf6c229952ac458617f51273ed63a6b628877da90582e2e5e81a4d2b323309 +size 1281 diff --git a/tests/plugins/os/unix/log/test_auth.py b/tests/plugins/os/unix/log/test_auth.py index f17fce280..9de617716 100644 --- a/tests/plugins/os/unix/log/test_auth.py +++ b/tests/plugins/os/unix/log/test_auth.py @@ -6,11 +6,12 @@ from flow.record.fieldtypes import datetime as dt from dissect.target.filesystem import VirtualFilesystem -from dissect.target.plugins.os.unix.log.auth import AuthLogRecord, AuthPlugin +from dissect.target.plugins.os.unix.log.auth import AuthPlugin +from dissect.target.target import Target from tests._utils import absolute_path -def test_auth_plugin(target_unix, fs_unix: VirtualFilesystem): +def test_auth_plugin(target_unix: Target, fs_unix: VirtualFilesystem) -> None: fs_unix.map_file_fh("/etc/timezone", BytesIO("Europe/Amsterdam".encode())) data_path = "_data/plugins/os/unix/log/auth/auth.log" @@ -31,7 +32,7 @@ def test_auth_plugin(target_unix, fs_unix: VirtualFilesystem): assert results[-1].message == "CRON[1]: pam_unix(cron:session): session opened for user root by (uid=0)" -def test_auth_plugin_with_gz(target_unix, fs_unix: VirtualFilesystem): +def test_auth_plugin_with_gz(target_unix: Target, fs_unix: VirtualFilesystem) -> None: fs_unix.map_file_fh("/etc/timezone", BytesIO("Pacific/Honolulu".encode())) empty_file = absolute_path("_data/plugins/os/unix/log/empty.log") @@ -55,7 +56,7 @@ def test_auth_plugin_with_gz(target_unix, fs_unix: VirtualFilesystem): assert results[-1].message == "CRON[1]: pam_unix(cron:session): session opened for user root by (uid=0)" -def test_auth_plugin_with_bz(target_unix, fs_unix: VirtualFilesystem): +def test_auth_plugin_with_bz(target_unix: Target, fs_unix: VirtualFilesystem) -> None: fs_unix.map_file_fh("/etc/timezone", BytesIO("America/Nuuk".encode())) empty_file = absolute_path("_data/plugins/os/unix/log/empty.log") @@ -79,7 +80,7 @@ def test_auth_plugin_with_bz(target_unix, fs_unix: VirtualFilesystem): assert results[-1].message == "CRON[1]: pam_unix(cron:session): session opened for user root by (uid=0)" -def test_auth_plugin_year_rollover(target_unix, fs_unix: VirtualFilesystem): +def test_auth_plugin_year_rollover(target_unix: Target, fs_unix: VirtualFilesystem) -> None: fs_unix.map_file_fh("/etc/timezone", BytesIO("Etc/UTC".encode())) data_path = "_data/plugins/os/unix/log/auth/secure" @@ -100,3 +101,26 @@ def test_auth_plugin_year_rollover(target_unix, fs_unix: VirtualFilesystem): assert isinstance(results[1], type(AuthLogRecord())) assert results[0].ts == dt(2021, 12, 31, 3, 14, 0, tzinfo=ZoneInfo("Etc/UTC")) assert results[1].ts == dt(2022, 1, 1, 13, 37, 0, tzinfo=ZoneInfo("Etc/UTC")) + + +def test_auth_plugin_iso_date_format(target_unix: Target, fs_unix: VirtualFilesystem) -> None: + """test if we correctly handle Ubuntu 24.04 ISO formatted dates.""" + + fs_unix.map_file("/var/log/auth.log", absolute_path("_data/plugins/os/unix/log/auth/iso.log")) + target_unix.add_plugin(AuthPlugin) + + results = sorted(list(target_unix.authlog()), key=lambda r: r.ts) + assert len(results) == 10 + + assert results[0].ts == datetime(2024, 12, 31, 11, 37, 1, 123456, tzinfo=timezone.utc) + assert results[0].service == "sudo" + assert results[0].pid is None + assert results[0].tty == "pts/0" + assert results[0].pwd == "/home/user" + assert results[0].effective_user == "root" + assert results[0].command == "/usr/bin/chmod go+r /etc/apt/keyrings/githubcli-archive-keyring.gpg" + assert results[0].source == "/var/log/auth.log" + assert ( + results[0].message + == "user : TTY=pts/0 ; PWD=/home/user ; USER=root ; COMMAND=/usr/bin/chmod go+r /etc/apt/keyrings/githubcli-archive-keyring.gpg" # noqa: E501 + ) From 8a2d49944fc35dce98a9440c65362ce525049276 Mon Sep 17 00:00:00 2001 From: Poeloe <22234727+Poeloe@users.noreply.github.com> Date: Thu, 19 Sep 2024 09:23:55 -0700 Subject: [PATCH 2/2] Refactor AuthLogPlugin Fixes #286 --- dissect/target/plugins/os/unix/log/auth.py | 314 +++++++++++++++++++-- tests/plugins/os/unix/log/test_auth.py | 203 ++++++++++++- 2 files changed, 487 insertions(+), 30 deletions(-) diff --git a/dissect/target/plugins/os/unix/log/auth.py b/dissect/target/plugins/os/unix/log/auth.py index 393502e14..4be43260e 100644 --- a/dissect/target/plugins/os/unix/log/auth.py +++ b/dissect/target/plugins/os/unix/log/auth.py @@ -1,38 +1,311 @@ +from __future__ import annotations + import itertools import logging import re +from abc import ABC, abstractmethod +from datetime import datetime +from functools import lru_cache from itertools import chain -from typing import Iterator +from pathlib import Path +from typing import Any, Iterator +from dissect.target import Target from dissect.target.exceptions import UnsupportedPluginError -from dissect.target.helpers.fsutil import TargetPath, open_decompress +from dissect.target.helpers.fsutil import open_decompress from dissect.target.helpers.record import DynamicDescriptor, TargetRecordDescriptor from dissect.target.helpers.utils import year_rollover_helper from dissect.target.plugin import Plugin, alias, export log = logging.getLogger(__name__) -_RE_TS = r"^[A-Za-z]{3}\s*[0-9]{1,2}\s[0-9]{1,2}:[0-9]{2}:[0-9]{2}" -_RE_TS_ISO = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+\d{2}:\d{2}" - -RE_TS = re.compile(_RE_TS) -RE_TS_ISO = re.compile(_RE_TS_ISO) +RE_TS = re.compile(r"^[A-Za-z]{3}\s*\d{1,2}\s\d{1,2}:\d{2}:\d{2}") +RE_TS_ISO = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+\d{2}:\d{2}") RE_LINE = re.compile( - rf"(?P{_RE_TS}|{_RE_TS_ISO})\s(?P\S+)\s(?P\S+?)(\[(?P\d+)\])?:\s(?P.+)$" + r""" + \d{2}:\d{2}\s # First match on the similar ending of the different timestamps + (?P\S+)\s # The hostname + (?P\S+?)(\[(?P\d+)\])?: # The service with optionally the PID between brackets + \s*(?P.+?)\s*$ # The log message stripped from spaces left and right + """, + re.VERBOSE, ) -AuthLogRecord = TargetRecordDescriptor( - "linux/log/auth", - [ - ("datetime", "ts"), - ("string", "message"), - ("path", "source"), - ], +# Generic regular expressions +RE_IPV4_ADDRESS = re.compile( + r""" + ((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3} # First three octets + (25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?) # Last octet + """, + re.VERBOSE, ) +RE_USER = re.compile(r"for ([^\s]+)") + + +class BaseService(ABC): + @classmethod + @abstractmethod + def parse(cls, message: str) -> dict[str, any]: + pass + + +class SudoService(BaseService): + """Parsing of sudo service messages in the auth log.""" + + RE_SUDO_COMMAND = re.compile( + r""" + TTY=(?P\w+\/\w+)\s;\s # The TTY -> TTY=pts/0 ; + PWD=(?P[\/\w]+)\s;\s # The current working directory -> PWD="/home/user" ; + USER=(?P\w+)\s;\s # The effective user -> USER=root ; + COMMAND=(?P.+)$ # The command -> COMMAND=/usr/bin/whoami + """, + re.VERBOSE, + ) + + @classmethod + def parse(cls, message: str) -> dict[str, str]: + """Parse auth log message from sudo.""" + if not (match := cls.RE_SUDO_COMMAND.search(message)): + return {} + + additional_fields = {} + for key, value in match.groupdict().items(): + additional_fields[key] = value + + return additional_fields + + +class SshdService(BaseService): + """Class for parsing sshd messages in the auth log.""" + + RE_SSHD_PORTREGEX = re.compile(r"port\s(\d+)") + RE_USER = re.compile(r"for\s([^\s]+)") + + @classmethod + def parse(cls, message: str) -> dict[str, str | int]: + """Parse message from sshd""" + additional_fields = {} + if ip_address := RE_IPV4_ADDRESS.search(message): + field_name = "host_ip" if "listening" in message else "remote_ip" + additional_fields[field_name] = ip_address.group(0) + if port := cls.RE_SSHD_PORTREGEX.search(message): + additional_fields["port"] = int(port.group(1)) + if user := cls.RE_USER.search(message): + additional_fields["user"] = user.group(1) + # Accepted publickey for test_user from 8.8.8.8 IP port 12345 ssh2: RSA SHA256:123456789asdfghjklertzuio + if "Accepted publickey" in message: + ssh_protocol, encryption_algo, key_info = message.split()[-3:] + hash_algo, key_hash = key_info.split(":") + additional_fields["ssh_protocol"] = ssh_protocol.strip(":") + additional_fields["encryption_algorithm"] = encryption_algo + additional_fields["hash_algorithm"] = hash_algo + additional_fields["key_hash"] = key_hash + if (failed := "Failed" in message) or "Accepted" in message: + action_type = "failed" if failed else "accepted" + additional_fields["action"] = f"{action_type} authentication" + additional_fields["authentication_type"] = "password" if "password" in message else "publickey" + + return additional_fields + + +class SystemdLogindService(BaseService): + """Class for parsing systemd-logind messages in the auth log.""" + + RE_SYSTEMD_LOGIND_WATCHING = re.compile( + r""" + (?PWatching\ssystem\sbuttons)\s # Action is "Watching system buttons" + on\s(?P[^\s]+)\s # The device the button is related to -> /dev/input/event0 + \((?P.*?)\) # The device (button) name -> (Power button) + """, + re.VERBOSE, + ) + + @classmethod + def parse(cls, message: str): + """Parse auth log message from systemd-logind.""" + additional_fields = {} + # Example: Nov 14 07:14:09 ubuntu-1 systemd-logind[4]: Removed session 4. + if "Removed" in message: + additional_fields["action"] = "removed session" + additional_fields["session"] = message.split()[-1].strip(".") + elif "Watching" in message and (match := cls.RE_SYSTEMD_LOGIND_WATCHING.search(message)): + for key, value in match.groupdict().items(): + additional_fields[key] = value + # Example: New session 4 of user sampleuser. + elif "New session" in message: + parts = message.removeprefix("New session ").split() + additional_fields["action"] = "new session" + additional_fields["session"] = parts[0] + additional_fields["user"] = parts[-1].strip(".") + # Example: Session 4 logged out. Waiting for processes to exit. + elif "logged out" in message: + session = message.removeprefix("Session ").split(maxsplit=1)[0] + additional_fields["action"] = "logged out session" + additional_fields["session"] = session + # Example: New seat seat0. + elif "New seat" in message: + seat = message.split()[-1].strip(".") + additional_fields["action"] = "new seat" + additional_fields["seat"] = seat + + return additional_fields + + +class SuService(BaseService): + """Class for parsing su messages in the auth log.""" + + RE_SU_BY = re.compile(r"by\s([^\s]+)") + RE_SU_ON = re.compile(r"on\s([^\s]+)") + RE_SU_COMMAND = re.compile(r"'(.*?)'") + + @classmethod + def parse(cls, message: str) -> dict[str, str]: + additional_fields = {} + if user := RE_USER.search(message): + additional_fields["user"] = user.group(1) + if by := cls.RE_SU_BY.search(message): + additional_fields["by"] = by.group(1) + if on := cls.RE_SU_ON.search(message): + additional_fields["device"] = on.group(1) + if command := cls.RE_SU_COMMAND.search(message): + additional_fields["command"] = command.group(1) + if (failed := "failed" in message) or "Successful" in message: + additional_fields["su_result"] = "failed" if failed else "success" + + return additional_fields + + +class PkexecService(BaseService): + """Class for parsing pkexec messages in the auth log.""" + + RE_PKEXEC_COMMAND = re.compile( + r""" + (?P\S+?):\sExecuting\scommand\s # Starts with actual user -> user: + \[USER=(?P[^\]]+)\]\s # The impersonated user -> [USER=root] + \[TTY=(?P[^\]]+)\]\s # The tty -> [TTY=unknown] + \[CWD=(?P[^\]]+)\]\s # Current working directory -> [CWD=/home/user] + \[COMMAND=(?P[^\]]+)\] # Command -> [COMMAND=/usr/lib/example] + """, + re.VERBOSE, + ) + + @classmethod + def parse(cls, message: str) -> dict[str, str]: + """Parse auth log message from pkexec""" + additional_fields = {} + if exec_cmd := cls.RE_PKEXEC_COMMAND.search(message): + additional_fields["action"] = "executing command" + for key, value in exec_cmd.groupdict().items(): + if value and value.isdigit(): + value = int(value) + additional_fields[key] = value + + return additional_fields + + +class PamUnixService(BaseService): + RE_PAM_UNIX = re.compile( + r""" + pam_unix\([^\s]+:session\):\s(?Psession\s\w+)\s # Session action, usually opened or closed + for\suser\s(?P[^\s\(]+)(?:\(uid=(?P\d+)\))? # User may contain uid like: root(uid=0) + (?:\sby\s\(uid=(?P\d+)\))?$ # Opened action also contains by + """, + re.VERBOSE, + ) + + @classmethod + def parse(cls, message): + """Parse auth log message from pluggable authentication modules (PAM).""" + if not (match := cls.RE_PAM_UNIX.search(message)): + return {} + + additional_fields = {} + for key, value in match.groupdict().items(): + if value and value.isdigit(): + value = int(value) + additional_fields[key] = value + + return additional_fields + + +class AuthLogRecordBuilder: + """Class for dynamically creating auth log records.""" + + RECORD_NAME = "linux/log/auth" + SERVICES: dict[str, BaseService] = { + "su": SuService, + "sudo": SudoService, + "sshd": SshdService, + "systemd-logind": SystemdLogindService, + "pkexec": PkexecService, + } + + def __init__(self, target: Target): + self._create_event_descriptor = lru_cache(4096)(self._create_event_descriptor) + self.target = target + + def _parse_additional_fields(self, service: str | None, message: str) -> dict[str, Any]: + """Parse additional fields in the message based on the service.""" + if "pam_unix(" in message: + return PamUnixService.parse(message) + + if service not in self.SERVICES: + self.target.log.debug("Service %s is not recognised, no additional fields could be parsed", service) + return {} + + try: + return self.SERVICES[service].parse(message) + except Exception as e: + self.target.log.warning("Parsing additional fields in message '%s' for service %s failed", message, service) + self.target.log.debug("", exc_info=e) + raise e + + def build_record(self, ts: datetime, source: Path, line: str) -> TargetRecordDescriptor: + """Builds an ``AuthLog`` event record.""" + + record_fields = [ + ("datetime", "ts"), + ("path", "source"), + ("string", "service"), + ("varint", "pid"), + ("string", "message"), + ] + + record_values = { + "ts": ts, + "message": line, + "service": None, + "pid": None, + "source": source, + "_target": self.target, + } + + match = RE_LINE.search(line) + if match: + record_values.update(match.groupdict()) + + for key, value in self._parse_additional_fields(record_values["service"], line).items(): + record_type = "string" + if isinstance(value, int): + record_type = "varint" + + record_fields.append((record_type, key)) + record_values[key] = value + + # tuple conversion here is needed for lru_cache + desc = self._create_event_descriptor(tuple(record_fields)) + return desc(**record_values) + + def _create_event_descriptor(self, record_fields) -> TargetRecordDescriptor: + return TargetRecordDescriptor(self.RECORD_NAME, record_fields) class AuthPlugin(Plugin): - """Unix auth log plugin.""" + """Unix authentication log plugin.""" + + def __init__(self, target: Target): + super().__init__(target) + self._auth_log_builder = AuthLogRecordBuilder(target) def check_compatible(self) -> None: var_log = self.target.fs.path("/var/log") @@ -41,7 +314,7 @@ def check_compatible(self) -> None: @alias("securelog") @export(record=DynamicDescriptor(["datetime", "path", "string"])) - def authlog(self) -> Iterator[any]: + def authlog(self) -> Iterator[Any]: """Yield contents of ``/var/log/auth.log*`` and ``/var/log/secure*`` files. Order of returned events is not guaranteed to be chronological because of year @@ -69,7 +342,6 @@ def authlog(self) -> Iterator[any]: for auth_file in chain(var_log.glob("auth.log*"), var_log.glob("secure*")): if is_iso_fmt(auth_file): iterable = iso_readlines(auth_file) - else: iterable = year_rollover_helper(auth_file, RE_TS, "%b %d %H:%M:%S", tzinfo) @@ -77,9 +349,8 @@ def authlog(self) -> Iterator[any]: yield self._auth_log_builder.build_record(ts, auth_file, line) -def iso_readlines(file: TargetPath) -> Iterator[tuple[datetime, str]]: +def iso_readlines(file: Path) -> Iterator[tuple[datetime, str]]: """Iterator reading the provided auth log file in ISO format. Mimics ``year_rollover_helper`` behaviour.""" - with open_decompress(file, "rt") as fh: for line in fh: if not (match := RE_TS_ISO.match(line)): @@ -89,7 +360,6 @@ def iso_readlines(file: TargetPath) -> Iterator[tuple[datetime, str]]: try: ts = datetime.strptime(match[0], "%Y-%m-%dT%H:%M:%S.%f%z") - except ValueError as e: log.warning("Unable to parse ISO timestamp in line: %s", line) log.debug("", exc_info=e) @@ -98,6 +368,6 @@ def iso_readlines(file: TargetPath) -> Iterator[tuple[datetime, str]]: yield ts, line -def is_iso_fmt(file: TargetPath) -> bool: +def is_iso_fmt(file: Path) -> bool: """Determine if the provided auth log file uses new ISO format logging or not.""" return any(itertools.islice(iso_readlines(file), 0, 2)) diff --git a/tests/plugins/os/unix/log/test_auth.py b/tests/plugins/os/unix/log/test_auth.py index 9de617716..4bfb63646 100644 --- a/tests/plugins/os/unix/log/test_auth.py +++ b/tests/plugins/os/unix/log/test_auth.py @@ -1,8 +1,12 @@ +from __future__ import annotations + from datetime import datetime, timezone from io import BytesIO +from pathlib import Path from unittest.mock import patch from zoneinfo import ZoneInfo +import pytest from flow.record.fieldtypes import datetime as dt from dissect.target.filesystem import VirtualFilesystem @@ -27,9 +31,8 @@ def test_auth_plugin(target_unix: Target, fs_unix: VirtualFilesystem) -> None: results = list(target_unix.authlog()) assert len(results) == 10 - assert isinstance(results[0], type(AuthLogRecord())) assert results[-1].ts == dt(2022, 11, 14, 6, 39, 1, tzinfo=ZoneInfo("Europe/Amsterdam")) - assert results[-1].message == "CRON[1]: pam_unix(cron:session): session opened for user root by (uid=0)" + assert results[-1].message == "pam_unix(cron:session): session opened for user root by (uid=0)" def test_auth_plugin_with_gz(target_unix: Target, fs_unix: VirtualFilesystem) -> None: @@ -51,9 +54,8 @@ def test_auth_plugin_with_gz(target_unix: Target, fs_unix: VirtualFilesystem) -> results = list(target_unix.authlog()) assert len(results) == 10 - assert isinstance(results[0], type(AuthLogRecord())) assert results[-1].ts == dt(2022, 11, 14, 6, 39, 1, tzinfo=ZoneInfo("Pacific/Honolulu")) - assert results[-1].message == "CRON[1]: pam_unix(cron:session): session opened for user root by (uid=0)" + assert results[-1].message == "pam_unix(cron:session): session opened for user root by (uid=0)" def test_auth_plugin_with_bz(target_unix: Target, fs_unix: VirtualFilesystem) -> None: @@ -75,9 +77,8 @@ def test_auth_plugin_with_bz(target_unix: Target, fs_unix: VirtualFilesystem) -> results = list(target_unix.authlog()) assert len(results) == 10 - assert isinstance(results[0], type(AuthLogRecord())) assert results[-1].ts == dt(2022, 11, 14, 6, 39, 1, tzinfo=ZoneInfo("America/Nuuk")) - assert results[-1].message == "CRON[1]: pam_unix(cron:session): session opened for user root by (uid=0)" + assert results[-1].message == "pam_unix(cron:session): session opened for user root by (uid=0)" def test_auth_plugin_year_rollover(target_unix: Target, fs_unix: VirtualFilesystem) -> None: @@ -97,12 +98,198 @@ def test_auth_plugin_year_rollover(target_unix: Target, fs_unix: VirtualFilesyst assert len(results) == 2 results.reverse() - assert isinstance(results[0], type(AuthLogRecord())) - assert isinstance(results[1], type(AuthLogRecord())) assert results[0].ts == dt(2021, 12, 31, 3, 14, 0, tzinfo=ZoneInfo("Etc/UTC")) assert results[1].ts == dt(2022, 1, 1, 13, 37, 0, tzinfo=ZoneInfo("Etc/UTC")) +@pytest.mark.parametrize( + "message, results", + [ + pytest.param( + "Mar 29 10:43:01 ubuntu-1 sshd[1193]: Accepted password for test_user from 8.8.8.8 port 52942 ssh2", + { + "service": "sshd", + "pid": 1193, + "action": "accepted authentication", + "authentication_type": "password", + "user": "test_user", + "remote_ip": "8.8.8.8", + "port": 52942, + }, + id="sshd: accepted password", + ), + pytest.param( + "Jun 4 22:14:15 ubuntu-1 sshd[41458]: Failed password for root from 8.8.8.8 port 22 ssh2", + { + "service": "sshd", + "pid": 41458, + "action": "failed authentication", + "authentication_type": "password", + "user": "root", + "remote_ip": "8.8.8.8", + "port": 22, + }, + id="sshd: failed password", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: Accepted publickey for test_user " + "from 8.8.8.8 port 12345 ssh2: RSA SHA256:123456789asdfghjklertzuio", + { + "service": "sshd", + "pid": 1361, + "action": "accepted authentication", + "authentication_type": "publickey", + "user": "test_user", + "remote_ip": "8.8.8.8", + "port": 12345, + "ssh_protocol": "ssh2", + "encryption_algorithm": "RSA", + "hash_algorithm": "SHA256", + "key_hash": "123456789asdfghjklertzuio", + }, + id="sshd: accepted publickey", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: Failed publickey for test_user from 8.8.8.8 port 12345 ssh2.", + { + "service": "sshd", + "pid": 1361, + "action": "failed authentication", + "authentication_type": "publickey", + "user": "test_user", + "remote_ip": "8.8.8.8", + "port": 12345, + }, + id="sshd: failed publickey", + ), + pytest.param( + "Mar 27 13:06:56 ubuntu-1 sshd[1291]: Server listening on 127.0.0.1 port 22.", + { + "service": "sshd", + "pid": 1291, + "host_ip": "127.0.0.1", + "port": 22, + }, + id="sshd: listening", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: pam_unix(sshd:session): session opened for user test_user by (uid=0)", + { + "service": "sshd", + "pid": 1361, + "action": "session opened", + "user": "test_user", + "user_uid": None, + "by_uid": 0, + }, + id="sshd: pam_unix", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: pam_unix(sshd:session): session opened " + "for user root(uid=0) by (uid=0)", + { + "service": "sshd", + "pid": 1361, + "action": "session opened", + "user": "root", + "user_uid": 0, + "by_uid": 0, + }, + id="sshd: pam_unix", + ), + pytest.param( + "Mar 27 13:06:56 ubuntu-1 systemd-logind[1118]: Watching system buttons " + "on /dev/input/event0 (Power Button)", + { + "service": "systemd-logind", + "pid": 1118, + "action": "Watching system buttons", + "device": "/dev/input/event0", + "device_name": "Power Button", + }, + id="systemd-logind: watching system buttons", + ), + pytest.param( + "Mar 27 13:06:56 ubuntu-1 systemd-logind[1118]: New seat seat0.", + { + "service": "systemd-logind", + "pid": 1118, + "action": "new seat", + "seat": "seat0", + }, + id="systemd-logind: new seat", + ), + pytest.param( + "Mar 27 13:10:08 ubuntu-1 sudo: ubuntu : TTY=pts/0 ; PWD=/home/test_user ; " + "USER=root ; COMMAND=/usr/bin/apt-key add -", + { + "service": "sudo", + "pid": None, + "tty": "pts/0", + "pwd": "/home/test_user", + "effective_user": "root", + "command": "/usr/bin/apt-key add -", + }, + id="sudo: command", + ), + pytest.param( + "Apr 3 12:32:23 ubuntu-1 su[1521]: Successful su for user by root", + {"service": "su", "pid": 1521, "su_result": "success", "user": "user", "by": "root"}, + id="su: success", + ), + pytest.param( + "Apr 3 12:32:23 ubuntu-1 su[1531]: 'su root' failed for user by root", + { + "service": "su", + "pid": 1531, + "su_result": "failed", + "command": "su root", + "user": "user", + "by": "root", + }, + id="su: failed", + ), + pytest.param( + "Apr 3 12:32:23 ubuntu-1 pkexec[1531]: user: Executing command [USER=root] " + "[TTY=unknown] [CWD=/home/user] [COMMAND=/usr/lib/update-notifier/package-system-locked]", + { + "service": "pkexec", + "pid": 1531, + "action": "executing command", + "user": "user", + "effective_user": "root", + "tty": "unknown", + "cwd": "/home/user", + "command": "/usr/lib/update-notifier/package-system-locked", + }, + id="pkexec: executing command", + ), + pytest.param( + "Mar 27 13:17:01 ubuntu-1 CRON[2623]: pam_unix(cron:session): session closed for user root", + { + "service": "CRON", + "pid": 2623, + "action": "session closed", + "user": "root", + }, + id="cron: pam_unix", + ), + ], +) +def test_auth_plugin_additional_fields( + target_unix, fs_unix: VirtualFilesystem, tmp_path: Path, message: str, results: dict[str, str | int] +) -> None: + data_path = tmp_path / "auth.log" + data_path.write_text(message) + fs_unix.map_file("var/log/auth.log", data_path) + + target_unix.add_plugin(AuthPlugin) + record = list(target_unix.authlog())[0] + + for key, value in results.items(): + assert getattr(record, key) == value + + def test_auth_plugin_iso_date_format(target_unix: Target, fs_unix: VirtualFilesystem) -> None: """test if we correctly handle Ubuntu 24.04 ISO formatted dates."""