diff --git a/dissect/target/helpers/configutil.py b/dissect/target/helpers/configutil.py index b794f3873..be9081aff 100644 --- a/dissect/target/helpers/configutil.py +++ b/dissect/target/helpers/configutil.py @@ -891,7 +891,7 @@ def create_parser(self, options: Optional[ParserOptions] = None) -> Configuratio } -def parse(path: Union[FilesystemEntry, TargetPath], hint: Optional[str] = None, *args, **kwargs) -> ConfigParser: +def parse(path: Union[FilesystemEntry, TargetPath], hint: Optional[str] = None, *args, **kwargs) -> ConfigurationParser: """Parses the content of an ``path`` or ``entry`` to a dictionary. Args: @@ -922,7 +922,7 @@ def parse_config( entry: FilesystemEntry, hint: Optional[str] = None, options: Optional[ParserOptions] = None, -) -> ConfigParser: +) -> ConfigurationParser: parser_type = _select_parser(entry, hint) parser = parser_type.create_parser(options) diff --git a/dissect/target/helpers/record.py b/dissect/target/helpers/record.py index c680866cd..54611e6c8 100644 --- a/dissect/target/helpers/record.py +++ b/dissect/target/helpers/record.py @@ -145,33 +145,40 @@ def DynamicDescriptor(types): # noqa COMMON_INTERFACE_ELEMENTS = [ ("string", "name"), + ("string[]", "mac"), ("string", "type"), ("boolean", "enabled"), - ("string", "mac"), ("net.ipaddress[]", "dns"), ("net.ipaddress[]", "ip"), ("net.ipaddress[]", "gateway"), + ("net.ipnetwork[]", "network"), ("string", "source"), ] UnixInterfaceRecord = TargetRecordDescriptor( "unix/network/interface", - COMMON_INTERFACE_ELEMENTS, + [ + *COMMON_INTERFACE_ELEMENTS, + ("boolean", "dhcp_ipv4"), # NetworkManager allows for dual-stack configurations. + ("boolean", "dhcp_ipv6"), + ("datetime", "last_connected"), + ("varint[]", "vlan"), + ("string", "configurator"), + ], ) WindowsInterfaceRecord = TargetRecordDescriptor( "windows/network/interface", [ *COMMON_INTERFACE_ELEMENTS, - ("varint", "vlan"), - ("net.ipnetwork[]", "network"), ("varint", "metric"), ("stringlist", "search_domain"), ("datetime", "first_connected"), ("datetime", "last_connected"), ("net.ipaddress[]", "subnetmask"), ("boolean", "dhcp"), + ("varint", "vlan"), ], ) @@ -179,10 +186,9 @@ def DynamicDescriptor(types): # noqa "macos/network/interface", [ *COMMON_INTERFACE_ELEMENTS, - ("varint", "vlan"), - ("net.ipnetwork[]", "network"), ("varint", "interface_service_order"), ("boolean", "dhcp"), + ("varint", "vlan"), ], ) diff --git a/dissect/target/helpers/utils.py b/dissect/target/helpers/utils.py index 0f3578dc9..7648e94e2 100644 --- a/dissect/target/helpers/utils.py +++ b/dissect/target/helpers/utils.py @@ -1,10 +1,12 @@ +from __future__ import annotations + import logging import re import urllib.parse from datetime import datetime, timezone, tzinfo from enum import Enum from pathlib import Path -from typing import BinaryIO, Callable, Iterator, Optional, Union +from typing import BinaryIO, Callable, Iterator, Optional, TypeVar, Union from dissect.util.ts import from_unix @@ -24,6 +26,23 @@ def findall(buf: bytes, needle: bytes) -> Iterator[int]: offset += 1 +T = TypeVar("T") + + +def to_list(value: T | list[T]) -> list[T]: + """Convert a single value or a list of values to a list. + + Args: + value: The value to convert. + + Returns: + A list of values. + """ + if not isinstance(value, list): + return [value] + return value + + class StrEnum(str, Enum): """Sortable and serializible string-based enum""" diff --git a/dissect/target/plugins/general/network.py b/dissect/target/plugins/general/network.py index 236c10c71..8ddf5e4b5 100644 --- a/dissect/target/plugins/general/network.py +++ b/dissect/target/plugins/general/network.py @@ -79,7 +79,7 @@ def with_ip(self, ip_addr: str) -> Iterator[InterfaceRecord]: @internal def with_mac(self, mac: str) -> Iterator[InterfaceRecord]: for interface in self.interfaces(): - if interface.mac == mac: + if mac in interface.mac: yield interface @internal diff --git a/dissect/target/plugins/os/unix/bsd/osx/network.py b/dissect/target/plugins/os/unix/bsd/osx/network.py index 23a9318b6..5017db750 100644 --- a/dissect/target/plugins/os/unix/bsd/osx/network.py +++ b/dissect/target/plugins/os/unix/bsd/osx/network.py @@ -84,9 +84,10 @@ def _interfaces(self) -> Iterator[MacInterfaceRecord]: network=network, interface_service_order=interface_service_order, dhcp=dhcp, + mac=[], _target=self.target, ) except Exception as e: - self.target.log.warning("Error reading configuration for network device %s: %s", name, e) + self.target.log.warning("Error reading configuration for network device %s", name, exc_info=e) continue diff --git a/dissect/target/plugins/os/unix/linux/network.py b/dissect/target/plugins/os/unix/linux/network.py new file mode 100644 index 000000000..a4e55d296 --- /dev/null +++ b/dissect/target/plugins/os/unix/linux/network.py @@ -0,0 +1,338 @@ +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from datetime import datetime, timezone +from ipaddress import ip_address, ip_interface +from typing import TYPE_CHECKING, Any, Iterator, Literal, NamedTuple + +from dissect.target.helpers import configutil +from dissect.target.helpers.record import UnixInterfaceRecord +from dissect.target.helpers.utils import to_list +from dissect.target.plugins.general.network import NetworkPlugin + +if TYPE_CHECKING: + from ipaddress import IPv4Address, IPv4Interface, IPv6Address, IPv6Interface + + from dissect.target import Target + from dissect.target.target import TargetPath + + NetAddress = IPv4Address | IPv6Address + NetInterface = IPv4Interface | IPv6Interface + + +class LinuxNetworkPlugin(NetworkPlugin): + """Linux network interface plugin.""" + + def _interfaces(self) -> Iterator[UnixInterfaceRecord]: + """Try all available network configuration managers and aggregate the results.""" + for manager_cls in MANAGERS: + manager: LinuxNetworkConfigParser = manager_cls(self.target) + yield from manager.interfaces() + + +VlanIdByInterface = dict[str, set[int]] + + +class LinuxNetworkConfigParser: + def __init__(self, target: Target): + self._target = target + + def _config_files(self, config_paths: list[str], glob: str) -> list[TargetPath]: + """Returns all configuration files in config_paths matching the given extension.""" + all_files = [] + for config_path in config_paths: + paths = self._target.fs.path(config_path).glob(glob) + all_files.extend(config_file for config_file in paths if config_file.is_file()) + + return sorted(all_files, key=lambda p: p.stem) + + def interfaces(self) -> Iterator[UnixInterfaceRecord]: + """Parse network interfaces from configuration files.""" + yield from () + + +class NetworkManagerConfigParser(LinuxNetworkConfigParser): + """NetworkManager configuration parser. + + NetworkManager configuration files are generally in an INI-like format. + Note that Red Hat and Fedora deprecated ifcfg files. + Documentation: https://networkmanager.dev/docs/api/latest/nm-settings-keyfile.html + """ + + config_paths: list[str] = [ + "/etc/NetworkManager/system-connections/", + "/usr/lib/NetworkManager/system-connections/", + "/run/NetworkManager/system-connections/", + ] + + @dataclass + class ParserContext: + source: str + uuid: str | None = None + last_connected: datetime | None = None + name: str | None = None + mac_address: str | None = None + type: str = "" + dns: set[NetAddress] = field(default_factory=set) + ip_interfaces: set[NetInterface] = field(default_factory=set) + gateways: set[NetAddress] = field(default_factory=set) + dhcp_ipv4: bool = False + dhcp_ipv6: bool = False + vlan: set[int] = field(default_factory=set) + + def to_record(self) -> UnixInterfaceRecord: + return UnixInterfaceRecord( + source=self.source, + last_connected=self.last_connected, + name=self.name, + mac=[self.mac_address] if self.mac_address else [], + type=self.type, + dhcp_ipv4=self.dhcp_ipv4, + dhcp_ipv6=self.dhcp_ipv6, + dns=list(self.dns), + ip=[interface.ip for interface in self.ip_interfaces], + network=[interface.network for interface in self.ip_interfaces], + gateway=list(self.gateways), + vlan=list(self.vlan), + configurator="NetworkManager", + ) + + def interfaces(self) -> Iterator[UnixInterfaceRecord]: + connections: list[NetworkManagerConfigParser.ParserContext] = [] + vlan_id_by_interface: VlanIdByInterface = {} + + for connection_file_path in self._config_files(self.config_paths, "*"): + try: + config = configutil.parse(connection_file_path, hint="ini") + context = self.ParserContext(source=connection_file_path) + common_section: dict[str, str] = config.get("connection", {}) + context.type = common_section.get("type", "") + sub_type: dict[str, str] = config.get(context.type, {}) + + if context.type == "vlan": + self._parse_vlan(sub_type, vlan_id_by_interface) + continue + + for ip_version in ["ipv4", "ipv6"]: + ip_section: dict[str, str] = config.get(ip_version, {}) + for key, value in ip_section.items(): + self._parse_ip_section_key(key, value, context, ip_version) + + context.name = common_section.get("interface-name") + context.mac_address = sub_type.get("mac-address") + context.uuid = common_section.get("uuid") + context.source = str(connection_file_path) + context.last_connected = self._parse_lastconnected(common_section.get("timestamp", "")) + + connections.append(context) + + except Exception as e: + self._target.log.warning("Error parsing network config file %s", connection_file_path) + self._target.log.debug("", exc_info=e) + + for connection in connections: + vlan_ids_from_interface = vlan_id_by_interface.get(connection.name, set()) + connection.vlan.update(vlan_ids_from_interface) + + vlan_ids_from_uuid = vlan_id_by_interface.get(connection.uuid, set()) + connection.vlan.update(vlan_ids_from_uuid) + + yield connection.to_record() + + def _parse_route(self, route: str) -> NetAddress | None: + """Parse a route and return gateway IP address.""" + if (elements := route.split(",")) and len(elements) > 1: + return ip_address(elements[1]) + + return None + + def _parse_lastconnected(self, last_connected: str) -> datetime | None: + """Parse last connected timestamp.""" + if not last_connected: + return None + + return datetime.fromtimestamp(int(last_connected), timezone.utc) + + def _parse_ip_section_key( + self, key: str, value: str, context: ParserContext, ip_version: Literal["ipv4", "ipv6"] + ) -> None: + if not (trimmed := value.strip()): + return + + if key == "dns": + context.dns.update(ip_address(addr) for addr in trimmed.split(";") if addr) + elif key.startswith("address"): + # Undocumented: single gateway on address line. Observed when running: + # nmcli connection add type ethernet ... ip4 192.168.2.138/24 gw4 192.168.2.1 + ip, *gateway = trimmed.split(",", 1) + context.ip_interfaces.add(ip_interface(ip)) + if gateway: + context.gateways.add(ip_address(gateway[0])) + elif key.startswith("gateway"): + context.gateways.add(ip_address(trimmed)) + elif key == "method": + if ip_version == "ipv4": + context.dhcp_ipv4 = trimmed == "auto" + elif ip_version == "ipv6": + context.dhcp_ipv6 = trimmed == "auto" + elif key.startswith("route"): + if gateway := self._parse_route(value): + context.gateways.add(gateway) + + def _parse_vlan(self, sub_type: dict[str, Any], vlan_id_by_interface: VlanIdByInterface) -> None: + parent_interface = sub_type.get("parent") + vlan_id = sub_type.get("id") + if not parent_interface or not vlan_id: + return + + ids = vlan_id_by_interface.setdefault(parent_interface, set()) + ids.add(int(vlan_id)) + + +class SystemdNetworkConfigParser(LinuxNetworkConfigParser): + """Systemd network configuration parser. + + Systemd network configuration files are generally in an INI-like format with some quirks. + Note that drop-in directories are not yet supported. + + Documentation: https://www.freedesktop.org/software/systemd/man/latest/systemd.network.html + """ + + config_paths: list[str] = [ + "/etc/systemd/network/", + "/run/systemd/network/", + "/usr/lib/systemd/network/", + "/usr/local/lib/systemd/network/", + ] + + class DhcpConfig(NamedTuple): + ipv4: bool + ipv6: bool + + # Can be enclosed in brackets for IPv6. Can also have port, iface name, and SNI, which we ignore. + # Example: [1111:2222::3333]:9953%ifname#example.com + dns_ip_patttern = re.compile( + r"(?P(?:\d{1,3}\.){3}\d{1,3})|\[(?P\[?[0-9a-fA-F:]+\]?)\]" + ) + + def interfaces(self) -> Iterator: + virtual_networks = self._parse_virtual_networks() + yield from self._parse_networks(virtual_networks) + + def _parse_virtual_networks(self) -> VlanIdByInterface: + """Parse virtual network configurations from systemd network configuration files.""" + + virtual_networks: VlanIdByInterface = {} + for config_file in self._config_files(self.config_paths, "*.netdev"): + try: + virtual_network_config = configutil.parse(config_file, hint="systemd") + net_dev_section: dict[str, str] = virtual_network_config.get("NetDev", {}) + if net_dev_section.get("Kind") != "vlan": + continue + + vlan_id = virtual_network_config.get("VLAN", {}).get("Id") + if (name := net_dev_section.get("Name")) and vlan_id: + vlan_ids = virtual_networks.setdefault(name, set()) + vlan_ids.add(int(vlan_id)) + except Exception as e: + self._target.log.warning("Error parsing virtual network config file %s", config_file) + self._target.log.debug("", exc_info=e) + + return virtual_networks + + def _parse_networks(self, virtual_networks: VlanIdByInterface) -> Iterator[UnixInterfaceRecord]: + """Parse network configurations from systemd network configuration files.""" + for config_file in self._config_files(self.config_paths, "*.network"): + try: + config = configutil.parse(config_file, hint="systemd") + + match_section: dict[str, str] = config.get("Match", {}) + network_section: dict[str, str] = config.get("Network", {}) + link_section: dict[str, str] = config.get("Link", {}) + + ip_interfaces: set[NetInterface] = set() + gateways: set[NetAddress] = set() + dns: set[NetAddress] = set() + mac_addresses: set[str] = set() + + if link_mac := link_section.get("MACAddress"): + mac_addresses.add(link_mac) + mac_addresses.update(match_section.get("MACAddress", "").split()) + mac_addresses.update(match_section.get("PermanentMACAddress", "").split()) + + dns_value = to_list(network_section.get("DNS", [])) + dns.update(self._parse_dns_ip(dns_ip) for dns_ip in dns_value) + + address_value = to_list(network_section.get("Address", [])) + ip_interfaces.update(ip_interface(addr) for addr in address_value) + + gateway_value = to_list(network_section.get("Gateway", [])) + gateways.update(ip_address(gateway) for gateway in gateway_value) + + vlan_ids: set[int] = set() + vlan_names = to_list(network_section.get("VLAN", [])) + for vlan_name in vlan_names: + if ids := virtual_networks.get(vlan_name): + vlan_ids.update(ids) + + # There are possibly multiple route sections, but they are collapsed into one by the parser. + route_section: dict[str, Any] = config.get("Route", {}) + gateway_values = to_list(route_section.get("Gateway", [])) + gateways.update(filter(None, map(self._parse_gateway, gateway_values))) + + dhcp_ipv4, dhcp_ipv6 = self._parse_dhcp(network_section.get("DHCP")) + + yield UnixInterfaceRecord( + source=str(config_file), + type=match_section.get("Type"), + enabled=None, # Unknown, dependent on run-time state + dhcp_ipv4=dhcp_ipv4, + dhcp_ipv6=dhcp_ipv6, + name=match_section.get("Name"), + dns=list(dns), + mac=list(mac_addresses), + ip=[interface.ip for interface in ip_interfaces], + network=[interface.network for interface in ip_interfaces], + gateway=list(gateways), + vlan=list(vlan_ids), + configurator="systemd-networkd", + ) + except Exception as e: + self._target.log.warning("Error parsing network config file %s", config_file) + self._target.log.debug("", exc_info=e) + + def _parse_dns_ip(self, address: str) -> NetAddress: + """Parse DNS address from systemd network configuration file. + + The optional brackets and port number make this hard to parse. + See https://www.freedesktop.org/software/systemd/man/latest/systemd.network.html and search for DNS. + """ + + if match := self.dns_ip_patttern.search(address): + return ip_address(match.group("withoutBrackets") or match.group("withBrackets")) + + raise ValueError(f"Invalid DNS address format: {address}") + + def _parse_dhcp(self, value: str | None) -> DhcpConfig: + """Parse DHCP value from systemd network configuration file to a named tuple (ipv4, ipv6).""" + + if value is None or value == "no": + return self.DhcpConfig(ipv4=False, ipv6=False) + elif value == "yes": + return self.DhcpConfig(ipv4=True, ipv6=True) + elif value == "ipv4": + return self.DhcpConfig(ipv4=True, ipv6=False) + elif value == "ipv6": + return self.DhcpConfig(ipv4=False, ipv6=True) + + raise ValueError(f"Invalid DHCP value: {value}") + + def _parse_gateway(self, value: str | None) -> NetAddress | None: + if (not value) or (value in {"_dhcp4", "_ipv6ra"}): + return None + + return ip_address(value) + + +MANAGERS = [NetworkManagerConfigParser, SystemdNetworkConfigParser] diff --git a/dissect/target/plugins/os/windows/network.py b/dissect/target/plugins/os/windows/network.py index d29d00a39..d7c39ede8 100644 --- a/dissect/target/plugins/os/windows/network.py +++ b/dissect/target/plugins/os/windows/network.py @@ -281,7 +281,8 @@ def _interfaces(self) -> Iterator[WindowsInterfaceRecord]: pass # Extract the rest of the device information - device_info["mac"] = _try_value(subkey, "NetworkAddress") + if mac_address := _try_value(subkey, "NetworkAddress"): + device_info["mac"] = [mac_address] device_info["vlan"] = _try_value(subkey, "VlanID") if timestamp := _try_value(subkey, "NetworkInterfaceInstallTimestamp"): diff --git a/tests/_data/plugins/os/unix/linux/NetworkManager/vlan.nmconnection b/tests/_data/plugins/os/unix/linux/NetworkManager/vlan.nmconnection new file mode 100644 index 000000000..b12ae3d7b --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/NetworkManager/vlan.nmconnection @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:6ebf03683c2328a8497a38cbd038184e0760c1334dda413d7f768ab5d9136807 +size 201 diff --git a/tests/_data/plugins/os/unix/linux/NetworkManager/vlan2.nmconnection b/tests/_data/plugins/os/unix/linux/NetworkManager/vlan2.nmconnection new file mode 100644 index 000000000..fa6cf64b8 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/NetworkManager/vlan2.nmconnection @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4b0ea3831e424cb610acd39ef1662f9a63d625b90fc962558d9cb31df9af7d00 +size 231 diff --git a/tests/_data/plugins/os/unix/linux/NetworkManager/wired-static.nmconnection b/tests/_data/plugins/os/unix/linux/NetworkManager/wired-static.nmconnection new file mode 100644 index 000000000..e5cf3f233 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/NetworkManager/wired-static.nmconnection @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:887db9ac918cad1d6b62eb796cf97ded4c2791610937fdaa14cf23e872dd8276 +size 409 diff --git a/tests/_data/plugins/os/unix/linux/NetworkManager/wireless.nmconnection b/tests/_data/plugins/os/unix/linux/NetworkManager/wireless.nmconnection new file mode 100644 index 000000000..ddf468f1a --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/NetworkManager/wireless.nmconnection @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9812f728545918f4c4f1ce519145be73986fab4f9b6140eb6c6ce7a8a11c8c75 +size 130 diff --git a/tests/_data/plugins/os/unix/linux/systemd.network/20-vlan.netdev b/tests/_data/plugins/os/unix/linux/systemd.network/20-vlan.netdev new file mode 100644 index 000000000..6951a194c --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/systemd.network/20-vlan.netdev @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:8ffcbd8f72fc41f21e1702332b5c6a8f2c3fb78085f21a3a3435cb3015d4dc23 +size 48 diff --git a/tests/_data/plugins/os/unix/linux/systemd.network/20-vlan2.netdev b/tests/_data/plugins/os/unix/linux/systemd.network/20-vlan2.netdev new file mode 100644 index 000000000..bdc099e64 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/systemd.network/20-vlan2.netdev @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:38c91d7d3f1f55765d6da2bb76863f8f5be659f28c3e1204a9a17df683acdc58 +size 48 diff --git a/tests/_data/plugins/os/unix/linux/systemd.network/20-wired-static.network b/tests/_data/plugins/os/unix/linux/systemd.network/20-wired-static.network new file mode 100644 index 000000000..3914e18cf --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/systemd.network/20-wired-static.network @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:2644524e0f127e1ce21751b32fa0d166405dd488fe435f4b7eb08faf7e4a8048 +size 132 diff --git a/tests/_data/plugins/os/unix/linux/systemd.network/30-wired-static-complex.network b/tests/_data/plugins/os/unix/linux/systemd.network/30-wired-static-complex.network new file mode 100644 index 000000000..e54039ba7 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/systemd.network/30-wired-static-complex.network @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:ef770dec329eb6a7e09987b4c3d472a2e451019463c01e3cd7ad7f3bfb857862 +size 400 diff --git a/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless-ipv4.network b/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless-ipv4.network new file mode 100644 index 000000000..292308719 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless-ipv4.network @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:7395907b756bd08ba172bece9ce48c144a142999d0956b5e4dfe436e5119d484 +size 51 diff --git a/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless-ipv6.network b/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless-ipv6.network new file mode 100644 index 000000000..51708ccd9 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless-ipv6.network @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:62a93e614178f653ccf928d2ca658888e029e67beff30ccb0efc88e3df8ac84d +size 51 diff --git a/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless.network b/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless.network new file mode 100644 index 000000000..c481da488 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless.network @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:23a6c023a9c8422c4ca865557653446c3ba5680a6b6e73f57f5361cc4aba7bc2 +size 50 diff --git a/tests/helpers/test_utils.py b/tests/helpers/test_utils.py index 903abf9c7..8011fda19 100644 --- a/tests/helpers/test_utils.py +++ b/tests/helpers/test_utils.py @@ -8,11 +8,23 @@ from dissect.target.helpers import fsutil, utils -def test_slugify(): +def test_to_list_single_value() -> None: + assert utils.to_list(1) == [1] + assert utils.to_list("a") == ["a"] + assert utils.to_list(None) == [None] + + +def test_to_list_list_value() -> None: + assert utils.to_list([1, 2, 3]) == [1, 2, 3] + assert utils.to_list(["a", "b", "c"]) == ["a", "b", "c"] + assert utils.to_list([]) == [] + + +def test_slugify() -> None: assert utils.slugify("foo/bar\\baz bla") == "foo_bar_baz_bla" -def test_filesystem_readinto(): +def test_filesystem_readinto() -> None: data = b"hello_world" mocked_file = mock_open(read_data=b"hello_world") @@ -22,7 +34,7 @@ def test_filesystem_readinto(): assert len(buffer) == 512 -def test_helpers_fsutil_year_rollover_helper(): +def test_helpers_fsutil_year_rollover_helper() -> None: vfs = VirtualFilesystem() content = """ diff --git a/tests/plugins/general/test_network.py b/tests/plugins/general/test_network.py index a8131235f..3819a3003 100644 --- a/tests/plugins/general/test_network.py +++ b/tests/plugins/general/test_network.py @@ -17,7 +17,7 @@ def network_record(request: pytest.FixtureRequest) -> InterfaceRecord: name="interface_name", type="physical", enabled=True, - mac="DE:AD:BE:EF:00:00", + mac=["DE:AD:BE:EF:00:00"], ip=["10.42.42.10"], gateway=["10.42.42.1"], dns=["8.8.8.8", "1.1.1.1"], diff --git a/tests/plugins/os/unix/bsd/osx/test_network.py b/tests/plugins/os/unix/bsd/osx/test_network.py index d21ea6ec5..b804e12ea 100644 --- a/tests/plugins/os/unix/bsd/osx/test_network.py +++ b/tests/plugins/os/unix/bsd/osx/test_network.py @@ -95,7 +95,7 @@ def dhcp(fake_plist: dict) -> Iterator[dict]: (0, "vlan", ["None"]), (0, "enabled", ["True"]), (0, "interface_service_order", ["0"]), - (0, "mac", ["None"]), + (0, "mac", []), (0, "vlan", ["None"]), ], 1, diff --git a/tests/plugins/os/unix/linux/test_network.py b/tests/plugins/os/unix/linux/test_network.py new file mode 100644 index 000000000..e29f0c3a1 --- /dev/null +++ b/tests/plugins/os/unix/linux/test_network.py @@ -0,0 +1,168 @@ +import posixpath +from datetime import datetime +from ipaddress import ip_address, ip_network +from typing import Counter +from unittest.mock import MagicMock, patch + +from dissect.target import Target +from dissect.target.filesystem import VirtualFilesystem +from dissect.target.plugins.general.network import UnixInterfaceRecord +from dissect.target.plugins.os.unix.linux.network import ( + LinuxNetworkConfigParser, + LinuxNetworkPlugin, + NetworkManagerConfigParser, + SystemdNetworkConfigParser, +) + + +def test_networkmanager_parser(target_linux: Target, fs_linux: VirtualFilesystem) -> None: + fixture_dir = "tests/_data/plugins/os/unix/linux/NetworkManager/" + fs_linux.map_dir("/etc/NetworkManager/system-connections", fixture_dir) + + network_manager_config_parser = NetworkManagerConfigParser(target_linux) + interfaces = list(network_manager_config_parser.interfaces()) + + assert len(interfaces) == 2 + wired, wireless = interfaces + + assert wired.name == "enp0s3" + assert wired.type == "ethernet" + assert wired.mac == ["08:00:27:5B:4A:EB"] + assert Counter(wired.ip) == Counter([ip_address("192.168.2.138"), ip_address("10.1.1.10")]) + assert wired.dns == [ip_address("88.88.88.88")] + assert Counter(wired.gateway) == Counter( + [ip_address("192.168.2.2"), ip_address("2620:52:0:2219:222:68ff:fe11:5403"), ip_address("192.168.2.3")] + ) + assert Counter(wired.network) == Counter([ip_network("192.168.2.0/24"), ip_network("10.1.0.0/16")]) + assert not wired.dhcp_ipv4 + assert not wired.dhcp_ipv6 + assert wired.enabled is None + assert wired.last_connected == datetime.fromisoformat("2024-10-29 07:59:54+00:00") + assert Counter(wired.vlan) == Counter([10, 11]) + assert wired.source == "/etc/NetworkManager/system-connections/wired-static.nmconnection" + assert wired.configurator == "NetworkManager" + + assert wireless.name == "wlp2s0" + assert wireless.type == "wifi" + assert wireless.mac == [] + assert wireless.ip == [] + assert wireless.dns == [] + assert wireless.gateway == [] + assert wireless.network == [] + assert wireless.dhcp_ipv4 + assert wireless.dhcp_ipv6 + assert wireless.enabled is None + assert wireless.last_connected is None + assert wireless.vlan == [] + assert wireless.source == "/etc/NetworkManager/system-connections/wireless.nmconnection" + assert wireless.configurator == "NetworkManager" + + +def test_systemd_network_parser(target_linux: Target, fs_linux: VirtualFilesystem) -> None: + fixture_dir = "tests/_data/plugins/os/unix/linux/systemd.network/" + fs_linux.makedirs("/etc/systemd/network") + + fs_linux.map_file( + "/etc/systemd/network/20-wired-static.network", posixpath.join(fixture_dir, "20-wired-static.network") + ) + fs_linux.map_file( + "/etc/systemd/network/30-wired-static-complex.network", + posixpath.join(fixture_dir, "30-wired-static-complex.network"), + ) + fs_linux.map_file( + "/usr/lib/systemd/network/40-wireless.network", posixpath.join(fixture_dir, "40-wireless.network") + ) + fs_linux.map_file( + "/run/systemd/network/40-wireless-ipv4.network", posixpath.join(fixture_dir, "40-wireless-ipv4.network") + ) + fs_linux.map_file( + "/usr/local/lib/systemd/network/40-wireless-ipv6.network", + posixpath.join(fixture_dir, "40-wireless-ipv6.network"), + ) + fs_linux.map_file("/etc/systemd/network/20-vlan.netdev", posixpath.join(fixture_dir, "20-vlan.netdev")) + fs_linux.map_file("/etc/systemd/network/20-vlan2.netdev", posixpath.join(fixture_dir, "20-vlan2.netdev")) + + systemd_network_config_parser = SystemdNetworkConfigParser(target_linux) + interfaces = list(systemd_network_config_parser.interfaces()) + + assert len(interfaces) == 5 + wired_static, wired_static_complex, wireless, wireless_ipv4, wireless_ipv6 = interfaces + + assert wired_static.name == "enp1s0" + assert wired_static.type is None + assert wired_static.mac == ["aa::bb::cc::dd::ee::ff"] + assert wired_static.ip == [ip_address("10.1.10.9")] + assert wired_static.dns == [ip_address("10.1.10.1")] + assert wired_static.gateway == [ip_address("10.1.10.1")] + assert wired_static.network == [ip_network("10.1.10.0/24")] + assert not wired_static.dhcp_ipv4 + assert not wired_static.dhcp_ipv6 + assert wired_static.enabled is None + assert wired_static.last_connected is None + assert Counter(wired_static.vlan) == Counter([100, 101]) + assert wired_static.source == "/etc/systemd/network/20-wired-static.network" + assert wired_static.configurator == "systemd-networkd" + + assert wired_static_complex.name == "enp1s0" + assert wired_static_complex.type == "ether" + assert Counter(wired_static_complex.mac) == Counter( + ["aa::bb::cc::dd::ee::ff", "ff::ee::dd::cc::bb::aa", "cc::ff::bb::aa::dd", "bb::aa::dd::cc::ff"] + ) + assert Counter(wired_static_complex.ip) == Counter([ip_address("10.1.10.9"), ip_address("10.1.9.10")]) + assert Counter(wired_static_complex.dns) == Counter( + [ip_address("10.1.10.1"), ip_address("10.1.10.2"), ip_address("1111:2222::3333")] + ) + assert Counter(wired_static_complex.gateway) == Counter( + [ip_address("10.1.6.3"), ip_address("10.1.10.2"), ip_address("10.1.9.3")] + ) + assert Counter(wired_static_complex.network) == Counter([ip_network("10.1.0.0/16"), ip_network("10.1.9.0/24")]) + assert not wired_static_complex.dhcp_ipv4 + assert not wired_static_complex.dhcp_ipv6 + assert wired_static_complex.enabled is None + assert wired_static_complex.last_connected is None + assert wired_static_complex.vlan == [] + assert wired_static_complex.source == "/etc/systemd/network/30-wired-static-complex.network" + assert wired_static_complex.configurator == "systemd-networkd" + + assert wireless.name == "wlp2s0" + assert wireless.type == "wifi" + assert wireless.mac == [] + assert wireless.ip == [] + assert wireless.dns == [] + assert wireless.gateway == [] + assert wireless.network == [] + assert wireless.dhcp_ipv4 + assert wireless.dhcp_ipv6 + assert wireless.enabled is None + assert wireless.last_connected is None + assert wired_static_complex.vlan == [] + assert wireless.source == "/usr/lib/systemd/network/40-wireless.network" + assert wired_static_complex.configurator == "systemd-networkd" + + assert wireless_ipv4.dhcp_ipv4 + assert not wireless_ipv4.dhcp_ipv6 + assert wireless_ipv4.source == "/run/systemd/network/40-wireless-ipv4.network" + + assert not wireless_ipv6.dhcp_ipv4 + assert wireless_ipv6.dhcp_ipv6 + assert wireless_ipv6.source == "/usr/local/lib/systemd/network/40-wireless-ipv6.network" + + +def test_linux_network_plugin_interfaces(target_linux: Target) -> None: + """Assert that the LinuxNetworkPlugin aggregates from all Config Parsers.""" + + MockLinuxConfigParser1: LinuxNetworkConfigParser = MagicMock() + MockLinuxConfigParser1.return_value.interfaces.return_value = [] + + MockLinuxConfigParser2: LinuxNetworkConfigParser = MagicMock() + MockLinuxConfigParser2.return_value.interfaces.return_value = [UnixInterfaceRecord()] + + with patch( + "dissect.target.plugins.os.unix.linux.network.MANAGERS", [MockLinuxConfigParser1, MockLinuxConfigParser2] + ): + linux_network_plugin = LinuxNetworkPlugin(target_linux) + interfaces = list(linux_network_plugin.interfaces()) + + assert len(interfaces) == 1 + MockLinuxConfigParser1.return_value.interfaces.assert_called_once() + MockLinuxConfigParser2.return_value.interfaces.assert_called_once() diff --git a/tests/plugins/os/windows/test_network.py b/tests/plugins/os/windows/test_network.py index 2398e66d7..a9237fcdf 100644 --- a/tests/plugins/os/windows/test_network.py +++ b/tests/plugins/os/windows/test_network.py @@ -271,7 +271,7 @@ def test_windows_network_none( "ip": ["192.168.0.10"], "dns": ["192.168.0.2"], "gateway": ["192.168.0.1"], - "mac": "FE:EE:EE:EE:EE:ED", + "mac": ["FE:EE:EE:EE:EE:ED"], "subnetmask": ["255.255.255.0"], "network": ["192.168.0.0/24"], "first_connected": datetime.fromisoformat("2012-12-21 00:00:00+00:00"), @@ -287,7 +287,7 @@ def test_windows_network_none( "ip": ["10.0.0.10"], "dns": ["10.0.0.2"], "gateway": ["10.0.0.1"], - "mac": "FE:EE:EE:EE:EE:ED", + "mac": ["FE:EE:EE:EE:EE:ED"], "subnetmask": ["255.255.255.0"], "network": ["10.0.0.0/24"], "first_connected": datetime.fromisoformat("2012-12-21 00:00:00+00:00"), @@ -344,7 +344,7 @@ def test_network_dhcp_and_static( ips.update(interface.ip) dns.update(interface.dns) gateways.update(interface.gateway) - macs.add(interface.mac) + macs.update(interface.mac) assert interface.ip == expected["ip"] assert interface.dns == expected["dns"]