Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ISO 8601 formatted auth log timestamps #901

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 91 additions & 42 deletions dissect/target/plugins/os/unix/log/auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import itertools
import logging
import re
from abc import ABC, abstractmethod
from datetime import datetime
Expand All @@ -10,13 +12,22 @@

from dissect.target import Target
from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.fsutil import TargetPath, open_decompress
from dissect.target.helpers.record import DynamicDescriptor, TargetRecordDescriptor
from dissect.target.helpers.utils import year_rollover_helper
from dissect.target.plugin import Plugin, export
from dissect.target.plugin import Plugin, alias, export

log = logging.getLogger(__name__)

_RE_TS = r"^[A-Za-z]{3}\s*[0-9]{1,2}\s[0-9]{1,2}:[0-9]{2}:[0-9]{2}"
_RE_TS_ISO = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+\d{2}:\d{2}"

RE_TS = re.compile(_RE_TS)
RE_TS_ISO = re.compile(_RE_TS_ISO)
RE_LINE = re.compile(
rf"(?P<ts>{_RE_TS}|{_RE_TS_ISO})\s(?P<hostname>\S+)\s(?P<service>\S+?)(\[(?P<pid>\d+)\])?:\s(?P<message>.+)$"
)

_TS_REGEX = r"^[A-Za-z]{3}\s*[0-9]{1,2}\s[0-9]{1,2}:[0-9]{2}:[0-9]{2}"
RE_TS = re.compile(_TS_REGEX)
RE_TS_AND_HOSTNAME = re.compile(_TS_REGEX + r"\s\S+\s")
# Generic regular expressions
IPV4_ADDRESS_REGEX = re.compile(
r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}" # First three octets
Expand Down Expand Up @@ -161,7 +172,7 @@
"""Class for parsing pkexec messages in the auth log"""

PKEXEC_COMMAND_REGEX = re.compile(
r"(?P<user>.*?):\sExecuting\scommand\s" # Starts with actual user -> user:
r"(?P<user>\S+?):\sExecuting\scommand\s" # Starts with actual user -> user:
r"\[USER=(?P<effective_user>[^\]]+)\]\s" # The impersonated user -> [USER=root]
r"\[TTY=(?P<tty>[^\]]+)\]\s" # The tty -> [TTY=unknown]
r"\[CWD=(?P<cwd>[^\]]+)\]\s" # Current working directory -> [CWD=/home/user]
Expand Down Expand Up @@ -230,8 +241,9 @@
self.target.log.debug("", exc_info=e)
raise e

def build_record(self, ts: datetime, source: Path, service: str, pid: int, message: str) -> TargetRecordDescriptor:
def build_record(self, ts: datetime, source: Path, line: str) -> TargetRecordDescriptor:
"""Builds an AuthLog event record"""

record_fields = [
("datetime", "ts"),
("path", "source"),
Expand All @@ -241,15 +253,23 @@
("string", "message"),
]

record_values = {}
record_values["ts"] = ts
record_values["source"] = source
record_values["service"] = service
record_values["pid"] = pid
record_values["message"] = message
record_values["_target"] = self.target

for key, value in self._parse_additional_fields(service, message).items():
record_values = {
"ts": ts,
"message": line,
"service": None,
"pid": None,
"source": source,
"_target": self.target,
}

match = RE_LINE.match(line)
if match:
values = match.groupdict()
del values["ts"]
values["message"] = values["message"].strip()
record_values.update(values)

for key, value in self._parse_additional_fields(record_values["service"], line).items():
record_type = "string"
if isinstance(value, int):
record_type = "varint"
Expand All @@ -266,47 +286,76 @@


class AuthPlugin(Plugin):
"""Unix authentication log plugin."""

def __init__(self, target: Target):
super().__init__(target)
self.target
self._auth_log_builder = AuthLogRecordBuilder(target)

def check_compatible(self) -> None:
var_log = self.target.fs.path("/var/log")
if not any(var_log.glob("auth.log*")) and not any(var_log.glob("secure*")):
raise UnsupportedPluginError("No auth log files found")

@export(record=DynamicDescriptor(["datetime", "path", "string"]))
def securelog(self) -> Iterator[any]:
"""Return contents of /var/log/auth.log* and /var/log/secure*."""
return self.authlog()

@alias("securelog")
@export(record=DynamicDescriptor(["datetime", "path", "string"]))
def authlog(self) -> Iterator[any]:
"""Return contents of /var/log/auth.log* and /var/log/secure*."""
"""Yield contents of ``/var/log/auth.log*`` and ``/var/log/secure*`` files.

Order of returned events is not guaranteed to be chronological because of year
rollover detection efforts for log files without a year in the timestamp.

The following timestamp formats are recognised automatically. This plugin
assumes that no custom ``date_format`` template is set in ``syslog-ng`` or ``systemd``
configuration (defaults to ``M d H:M:S``).

ISO formatted authlog entries are parsed as can be found in Ubuntu 24.04 and later.

# Assuming no custom date_format template is set in syslog-ng or systemd (M d H:M:S)
# CentOS format: Jan 12 13:37:00 hostname daemon: message
# Debian format: Jan 12 13:37:00 hostname daemon[pid]: pam_unix(daemon:session): message
.. code-block:: text

CentOS format: Jan 12 13:37:00 hostname daemon: message
Debian format: Jan 12 13:37:00 hostname daemon[pid]: pam_unix(daemon:session): message
Ubuntu 24.04: 2024-01-12T13:37:00.000000+02:00 hostname daemon[pid]: pam_unix(daemon:session): message

Resources:
- https://help.ubuntu.com/community/LinuxLogFiles
"""

tzinfo = self.target.datetime.tzinfo

var_log = self.target.fs.path("/var/log")
for auth_file in chain(var_log.glob("auth.log*"), var_log.glob("secure*")):
for idx, (ts, line) in enumerate(year_rollover_helper(auth_file, RE_TS, "%b %d %H:%M:%S", tzinfo)):
ts_and_hostname = re.search(RE_TS_AND_HOSTNAME, line)
if not ts_and_hostname:
self.target.log.warning("No timestamp and hostname found on line %d for file %s.", idx, auth_file)
self.target.log.debug("Skipping line %d: %s", idx, line)
continue

info = line.replace(ts_and_hostname.group(0), "").strip()
service, _message = info.split(":", maxsplit=1)
message = _message.strip()
# Get the PID, if present. Example: CRON[1] --> pid=1
pid = None
if "[" in service:
service, _pid = service.split("[")[:2]
pid = _pid.strip("]")

yield self._auth_log_builder.build_record(ts, auth_file, service, pid, message)
if is_iso_fmt(auth_file):
iterable = iso_readlines(auth_file)

else:
iterable = year_rollover_helper(auth_file, RE_TS, "%b %d %H:%M:%S", tzinfo)

for ts, line in iterable:
yield self._auth_log_builder.build_record(ts, auth_file, line)


def iso_readlines(file: TargetPath) -> Iterator[tuple[datetime, str]]:
"""Iterator reading the provided auth log file in ISO format. Mimics ``year_rollover_helper`` behaviour."""

with open_decompress(file, "rt") as fh:
for line in fh:
if not (match := RE_TS_ISO.match(line)):
log.warning("No timestamp found in one of the lines in %s!", file)
log.debug("Skipping line: %s", line)
continue

try:
ts = datetime.strptime(match[0], "%Y-%m-%dT%H:%M:%S.%f%z")
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved

except ValueError as e:
log.warning("Unable to parse ISO timestamp in line: %s", line)
log.debug("", exc_info=e)
continue

Check warning on line 354 in dissect/target/plugins/os/unix/log/auth.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/log/auth.py#L351-L354

Added lines #L351 - L354 were not covered by tests

yield ts, line


def is_iso_fmt(file: TargetPath) -> bool:
"""Determine if the provided auth log file uses new ISO format logging or not."""
return any(itertools.islice(iso_readlines(file), 0, 2))
3 changes: 3 additions & 0 deletions tests/_data/plugins/os/unix/log/auth/iso.log
Git LFS file not shown
34 changes: 29 additions & 5 deletions tests/plugins/os/unix/log/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@

from dissect.target.filesystem import VirtualFilesystem
from dissect.target.plugins.os.unix.log.auth import AuthPlugin
from dissect.target.target import Target
from tests._utils import absolute_path


def test_auth_plugin(target_unix, fs_unix: VirtualFilesystem):
def test_auth_plugin(target_unix: Target, fs_unix: VirtualFilesystem) -> None:
fs_unix.map_file_fh("/etc/timezone", BytesIO("Europe/Amsterdam".encode()))

data_path = "_data/plugins/os/unix/log/auth/auth.log"
Expand All @@ -34,7 +35,7 @@ def test_auth_plugin(target_unix, fs_unix: VirtualFilesystem):
assert results[-1].message == "pam_unix(cron:session): session opened for user root by (uid=0)"


def test_auth_plugin_with_gz(target_unix, fs_unix: VirtualFilesystem):
def test_auth_plugin_with_gz(target_unix: Target, fs_unix: VirtualFilesystem) -> None:
fs_unix.map_file_fh("/etc/timezone", BytesIO("Pacific/Honolulu".encode()))

empty_file = absolute_path("_data/plugins/os/unix/log/empty.log")
Expand All @@ -57,7 +58,7 @@ def test_auth_plugin_with_gz(target_unix, fs_unix: VirtualFilesystem):
assert results[-1].message == "pam_unix(cron:session): session opened for user root by (uid=0)"


def test_auth_plugin_with_bz(target_unix, fs_unix: VirtualFilesystem):
def test_auth_plugin_with_bz(target_unix: Target, fs_unix: VirtualFilesystem) -> None:
fs_unix.map_file_fh("/etc/timezone", BytesIO("America/Nuuk".encode()))

empty_file = absolute_path("_data/plugins/os/unix/log/empty.log")
Expand All @@ -80,7 +81,7 @@ def test_auth_plugin_with_bz(target_unix, fs_unix: VirtualFilesystem):
assert results[-1].message == "pam_unix(cron:session): session opened for user root by (uid=0)"


def test_auth_plugin_year_rollover(target_unix, fs_unix: VirtualFilesystem):
def test_auth_plugin_year_rollover(target_unix: Target, fs_unix: VirtualFilesystem) -> None:
fs_unix.map_file_fh("/etc/timezone", BytesIO("Etc/UTC".encode()))

data_path = "_data/plugins/os/unix/log/auth/secure"
Expand Down Expand Up @@ -276,7 +277,7 @@ def test_auth_plugin_year_rollover(target_unix, fs_unix: VirtualFilesystem):
],
)
def test_auth_plugin_additional_fields(
target_unix, fs_unix: VirtualFilesystem, tmp_path: Path, message: str, results: dict[str, str | int]
target_unix: Target, fs_unix: VirtualFilesystem, tmp_path: Path, message: str, results: dict[str, str | int]
) -> None:
data_path = tmp_path / "auth.log"
data_path.write_text(message)
Expand All @@ -287,3 +288,26 @@ def test_auth_plugin_additional_fields(

for key, value in results.items():
assert getattr(record, key) == value


def test_auth_plugin_iso_date_format(target_unix: Target, fs_unix: VirtualFilesystem) -> None:
"""test if we correctly handle Ubuntu 24.04 ISO formatted dates."""

fs_unix.map_file("/var/log/auth.log", absolute_path("_data/plugins/os/unix/log/auth/iso.log"))
target_unix.add_plugin(AuthPlugin)

results = sorted(list(target_unix.authlog()), key=lambda r: r.ts)
assert len(results) == 10

assert results[0].ts == datetime(2024, 12, 31, 11, 37, 1, 123456, tzinfo=timezone.utc)
assert results[0].service == "sudo"
assert results[0].pid is None
assert results[0].tty == "pts/0"
assert results[0].pwd == "/home/user"
assert results[0].effective_user == "root"
assert results[0].command == "/usr/bin/chmod go+r /etc/apt/keyrings/githubcli-archive-keyring.gpg"
assert results[0].source == "/var/log/auth.log"
assert (
results[0].message
== "user : TTY=pts/0 ; PWD=/home/user ; USER=root ; COMMAND=/usr/bin/chmod go+r /etc/apt/keyrings/githubcli-archive-keyring.gpg" # noqa: E501
)
Loading