-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Linux network plugin: NetworkManager & systemd-networkd #932
Conversation
f2e7ce1
to
b91d450
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #932 +/- ##
==========================================
+ Coverage 77.82% 77.90% +0.07%
==========================================
Files 323 324 +1
Lines 27647 27850 +203
==========================================
+ Hits 21517 21696 +179
- Misses 6130 6154 +24
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
dhcp_ipv4, dhcp_ipv6 = self._parse_dhcp(network_section.get("DHCP")) | ||
if link_mac := link_section.get("MACAddress"): | ||
mac_addresses.add(link_mac) | ||
if match_macs := match_section.get("MACAddress"): | ||
mac_addresses.update(match_macs.split(" ")) | ||
if permanent_macs := match_section.get("PermanentMACAddress"): | ||
mac_addresses.update(permanent_macs.split(" ")) | ||
|
||
if dns_value := network_section.get("DNS"): | ||
if isinstance(dns_value, str): | ||
dns_value = [dns_value] | ||
dns.update({self._parse_dns_ip(dns_ip) for dns_ip in dns_value}) | ||
|
||
if address_value := network_section.get("Address"): | ||
if isinstance(address_value, str): | ||
address_value = [address_value] | ||
ip_interfaces.update({ip_interface(addr) for addr in address_value}) | ||
|
||
if gateway_value := network_section.get("Gateway"): | ||
if isinstance(gateway_value, str): | ||
gateway_value = [gateway_value] | ||
gateways.update({ip_address(gateway) for gateway in gateway_value}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be an idea to create some helper functions and change it to something like this:
def _splitter(value: str | None, seperator=" ") -> list[str]:
if not value:
return []
return value.split(seperator)
def _create_list(func, value: str | list | None):
if not value:
return []
if isinstance(value, str):
value = [value]
return value
then for the code itself:
mac_addresses.update(_splitter(match_section.get("MACAddress"))
ip_interfaces.update(map(ip_address, create_list(network_section.get("Address")))
Or create a function with a map inside of it, cause this is still a bit lengthy ofc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With regard to _splitter
I re-read the documentation of str.split()
and if you don´t specify a separator, it will separate on white space, and return an empty list if the string is empty.
With regard to _create_list
Wrote general purpose to_list
in dissect/target/helpers/utils.py
.
Did not opt for the map in this case, one line for value retrieval and one line for the update is fine imo
vlan_values = network_section.get("VLAN", []) | ||
vlan_ids = { | ||
virtual_networks[vlan_name] | ||
for vlan_name in ([vlan_values] if isinstance(vlan_values, str) else vlan_values) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would put this above the comprehension statement. It becomes a bit more readable then. Currently a lot is going on imho
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplified with to_list
configurator="systemd-networkd", | ||
) | ||
except Exception as e: | ||
self._target.log.warning("Error parsing network config file %s: %s", config_file, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its better to put e
inside a self._target.log.debug("", exc_info=e)
there are some examples for that everywhere. such as dissect/target/container.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a warning is better; that way, the analyst can respond by analyzing the potentially broken configuration file manually.
There are apparently also some wrong examples; I took this from
self.target.log.warning("Error reading configuration for network device %s: %s", name, e) |
Will fix it there too.
|
||
# Can be enclosed in brackets for IPv6. Can also have port, iface name, and SNI, which we ignore. | ||
# Example: [1111:2222::3333]:9953%ifname#example.com | ||
dns_ip_patttern = re.compile(r"((?:\d{1,3}\.){3}\d{1,3})|\[(\[?[0-9a-fA-F:]+\]?)\]") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be better to use capture groups for this, instead of using indexes of 1 and 2 on L276?
Would make it a bit more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, although Imo it is still unreadable.
Alternative could be parser combinators, but we don't use them yet. I would have to search for a suitable python implementation.
match = self.dns_ip_patttern.search(address) | ||
if match: | ||
return ip_address(match.group(1) or match.group(2)) | ||
else: | ||
raise ValueError(f"Invalid DNS address format: {address}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match = self.dns_ip_patttern.search(address) | |
if match: | |
return ip_address(match.group(1) or match.group(2)) | |
else: | |
raise ValueError(f"Invalid DNS address format: {address}") | |
if match:= self.dns_ip_patttern.search(address): | |
return ip_address(match.group(1) or match.group(2)) | |
raise ValueError(f"Invalid DNS address format: {address}") |
def _parse_lastconnected(self, value: str) -> datetime | None: | ||
"""Parse last connected timestamp.""" | ||
if not value: | ||
return None | ||
|
||
timestamp_int = int(value) | ||
return datetime.fromtimestamp(timestamp_int, timezone.utc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def _parse_lastconnected(self, value: str) -> datetime | None: | |
"""Parse last connected timestamp.""" | |
if not value: | |
return None | |
timestamp_int = int(value) | |
return datetime.fromtimestamp(timestamp_int, timezone.utc) | |
def _parse_lastconnected(self, last_connected: str) -> datetime | None: | |
"""Parse last connected timestamp.""" | |
if not last_connected: | |
return None | |
return datetime.fromtimestamp(int(last_connected), timezone.utc) |
vlan_id_by_interface[parent_interface] = int(vlan_id) | ||
continue | ||
|
||
dns = set[ip_address]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dns = set[ip_address]() | |
dns: set[ip_address] = set() |
|
||
|
||
class LinuxConfigParser: | ||
VlanIdByName = dict[str, int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could put this type definition outside the class. Then it can just be used by any class without needing to first call LinuxConfigParser
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I want to prevent polluting the module scope. But moved it, since the requirement to fully qualify decreases readability
yield from manager.interfaces() | ||
|
||
|
||
class LinuxConfigParser: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class LinuxConfigParser: | |
class LinuxNetworkParser: |
wouldn't this be more accurate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed into LinuxNetworkConfigParser
, which is perhaps even better
83258be
to
3b56d58
Compare
T = TypeVar("T") | ||
|
||
|
||
def to_list(value: T | list[T]) -> list[T]: | ||
"""Convert a single value or a list of values to a list. | ||
|
||
Args: | ||
value: The value to convert. | ||
|
||
Returns: | ||
A list of values. | ||
""" | ||
if not isinstance(value, list): | ||
return [value] | ||
return value | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a bit overkill to put here as no other class uses it yet, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel the method is too general to put anywhere else, although the module names helpers
and utils
are a bit nondescript.
Besides, if we keep it in for example the SystemdNetworkConfigParser
, then it is likely that no one is going to use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
until we find everyone is making it again and move it then :P but i get your point
source=self.source, | ||
last_connected=self.last_connected, | ||
name=self.name, | ||
mac=[self.mac_address] if self.mac_address else [], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not going to use to_list
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has different semantics than to_list
: it maps None
to an empty list, which to_list
does not do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I was remembering an implementation where None
was transformed into an empty list. So that is fair
uuid = vlan_id_by_interface.get(context.uuid) if context.uuid else None | ||
name = vlan_id_by_interface.get(connection.name) if connection.name else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to do the if conditions here, the get
on a dictionary returns a None
by default. if context.uuid
is not, it will try to index it using a None
which returns a None
cause it does not exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am defending against lookup of None
in vlan_id_by_interface
, which would be a logic error.
But perhaps overly cautious, removed.
However, there is an edge case where there are multiple vlans bound to the same interface, where the first vlan is bound by iface name, and the second by uuid.
Rewrote and adjusted unit test
elif key.startswith("route"): | ||
if gateway := self._parse_route(value): | ||
context.gateways.add(gateway) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elif key.startswith("route"): | |
if gateway := self._parse_route(value): | |
context.gateways.add(gateway) | |
elif key.startswith("route") and (gateway := self._parse_route(value)): | |
context.gateways.add(gateway) |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this again, I actually prefer the nested if because they inner and outer if have different purposes.
The outer if
switches on the key type, and the inner if
is part of the action of handling a certain configuration value. This way, we can scan more easily over the different key types.
I made all other outer clauses consistent. This leads to more indentation but I think it is for the better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, so you want to treat it like some kind of switch/match statement. That's fair
elif key == "method" and ip_version == "ipv4": | ||
context.dhcp_ipv4 = value == "auto" | ||
elif key == "method" and ip_version == "ipv6": | ||
context.dhcp_ipv6 = value == "auto" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elif key == "method" and ip_version == "ipv4": | |
context.dhcp_ipv4 = value == "auto" | |
elif key == "method" and ip_version == "ipv6": | |
context.dhcp_ipv6 = value == "auto" | |
elif key == "method": | |
setattr(context, f"dhcp_{ip_version}", value == "auto") |
tho maybe this is a bit too much magic :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clever, but I think the original is more readable.
if key == "dns" and (stripped := value.rstrip(";")): | ||
context.dns.update({ip_address(addr) for addr in stripped.split(";")}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if key == "dns" and (stripped := value.rstrip(";")): | |
context.dns.update({ip_address(addr) for addr in stripped.split(";")}) | |
if key == "dns": | |
context.dns.update({ip_address(addr) for addr in value.split(";") if addr}) |
wouldn't this also work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would also work, but I think the original more clearly reflects that we are guarding against a trailing ;
.
Then again, maybe empty values are somehow allowed so your suggestion is slightly more robust.
Applied suggestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the empty values get ignored due to the if addr
if addr =''
for example. that is some truethy magic in python. same with an empty list and such
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With maybe empty values are somehow allowed
I meant according to the Network Manager spec. (I applied your suggestion)
parent_interface = sub_type.get("parent", None) | ||
vlan_id = sub_type.get("id", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get()
doesn't need an explicit None
as it is the default for dicts :)
vlan_id | ||
for vlan_name in to_list(vlan_names) | ||
if (vlan_id := virtual_networks.get(vlan_name)) is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
vlan_id | |
for vlan_name in to_list(vlan_names) | |
if (vlan_id := virtual_networks.get(vlan_name)) is not None | |
virtual_networks.get(_name) for _name in to_list(vlan_names) if _name in virtual_networks |
isn't this a bit more clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I don't like is that the lookup will be performed twice.
In any case, rewrote as an explicit loop. Also fixed a type-inconsistency.
Also adjusted unit test to check for multiple vlans.
99bb458
to
e3982a7
Compare
return | ||
|
||
if key == "dns": | ||
context.dns.update({ip_address(addr) for addr in trimmed.split(";") if addr}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed you put these explicitly inside a set, however this isn't needed as the update already accepts any kind of iterator.
context.dns.update({ip_address(addr) for addr in trimmed.split(";") if addr}) | |
context.dns.update(ip_address(addr) for addr in trimmed.split(";") if addr) |
this is similar to the previous soltion, but doesn't allocate a new set first.
I think you can do this for all the ones inside this file.
elif key.startswith("route"): | ||
if gateway := self._parse_route(value): | ||
context.gateways.add(gateway) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, so you want to treat it like some kind of switch/match statement. That's fair
if gateway := self._parse_route(value): | ||
context.gateways.add(gateway) | ||
|
||
def _parse_vlan(self, sub_type: dict["str", any], vlan_id_by_interface: VlanIdByInterface) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the "str" wasn't needed like that, and any is a function, use from typing import Any
instead
def _parse_vlan(self, sub_type: dict["str", any], vlan_id_by_interface: VlanIdByInterface) -> None: | |
def _parse_vlan(self, sub_type: dict[str, Any], vlan_id_by_interface: VlanIdByInterface) -> None: |
|
||
yield UnixInterfaceRecord( | ||
source=str(config_file), | ||
type=match_section.get("Type", None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type=match_section.get("Type", None), | |
type=match_section.get("Type"), |
enabled=None, # Unknown, dependent on run-time state | ||
dhcp_ipv4=dhcp_ipv4, | ||
dhcp_ipv6=dhcp_ipv6, | ||
name=match_section.get("Name", None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name=match_section.get("Name", None), | |
name=match_section.get("Name"), |
import re | ||
from dataclasses import dataclass, field | ||
from datetime import datetime, timezone | ||
from ipaddress import ip_address, ip_interface | ||
from typing import Iterator, Literal, NamedTuple | ||
|
||
from dissect.target import Target | ||
from dissect.target.helpers import configutil | ||
from dissect.target.helpers.record import UnixInterfaceRecord | ||
from dissect.target.helpers.utils import to_list | ||
from dissect.target.plugins.general.network import NetworkPlugin | ||
from dissect.target.target import TargetPath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import re | |
from dataclasses import dataclass, field | |
from datetime import datetime, timezone | |
from ipaddress import ip_address, ip_interface | |
from typing import Iterator, Literal, NamedTuple | |
from dissect.target import Target | |
from dissect.target.helpers import configutil | |
from dissect.target.helpers.record import UnixInterfaceRecord | |
from dissect.target.helpers.utils import to_list | |
from dissect.target.plugins.general.network import NetworkPlugin | |
from dissect.target.target import TargetPath | |
from ipaddress import ip_address, ip_interface | |
from typing import TYPE_CHECKING, Iterator, Literal, NamedTuple | |
from dissect.target.helpers import configutil | |
from dissect.target.helpers.record import UnixInterfaceRecord | |
from dissect.target.helpers.utils import to_list | |
from dissect.target.plugins.general.network import NetworkPlugin | |
if TYPE_CHECKING: | |
from ipaddress import IPv4Address, IPv6Address | |
from dissect.target import Target | |
from dissect.target.target import TargetPath | |
NetAddress = IPv4Address | IPv6Address |
maybe something like this for _parse_dns_ip
.
with NetAddress as the return type of _parse_nds_ip
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, also added NetInterface
self._target.log.warning("Error parsing network config file %s", config_file, exc_info=e) | ||
|
||
def _parse_dns_ip(self, address: str) -> ip_address: | ||
"""Parse DNS address from systemd network configuration file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ip_address isn't a type but a function here.
Maybe its better to put the type definition at the top. I'll put an example there
|
||
raise ValueError(f"Invalid DHCP value: {value}") | ||
|
||
def _parse_gateway(self, value: str | None) -> ip_address | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same thing here ip_address
is a function, not a type
match = self.dns_ip_patttern.search(address) | ||
if not match: | ||
raise ValueError(f"Invalid DNS address format: {address}") | ||
|
||
return ip_address(match.group("withoutBrackets") or match.group("withBrackets")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match = self.dns_ip_patttern.search(address) | |
if not match: | |
raise ValueError(f"Invalid DNS address format: {address}") | |
return ip_address(match.group("withoutBrackets") or match.group("withBrackets")) | |
if match := self.dns_ip_patttern.search(address): | |
return ip_address(match.group("withoutBrackets") or match.group("withBrackets")) | |
raise ValueError(f"Invalid DNS address format: {address}") |
a bit more readable IMHO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wanted to keep the happy path on the left, but fair. Applied
vlan_ids = virtual_networks.setdefault(name, set()) | ||
vlan_ids.add(int(vlan_id)) | ||
except Exception as e: | ||
self._target.log.warning("Error parsing virtual network config file %s", config_file, exc_info=e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also the issue with too much information in the warning
log of a target.
The base class uses this field so we cannot make it a singleton/collection depending on os.
- connections with same iface name caused conflicts. - support vlan matching on uuid.
c695d83
to
f4dbcf9
Compare
f4dbcf9
to
6cd8b30
Compare
if mac_address := _try_value(subkey, "NetworkAddress"): | ||
device_info["mac"] = [mac_address] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this, wouldn't network.mac
fail for windows if it doesn't have a NetworkAddress
now? Cause it checks it with an in
for a None
value.
to avoid that:
if mac_address := _try_value(subkey, "NetworkAddress"): | |
device_info["mac"] = [mac_address] | |
mac_address = _try_value(subkey, "NetworkAddress") | |
device_info["mac"] = [mac_address] if mac_address else [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flow record constructor creates an empty list in that case.
We also have a unit test for this case.
(but good challenge)
connections.append(context) | ||
|
||
except Exception as e: | ||
self._target.log.warning("Error parsing network config file %s: %s", connection_file_path, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe also an idea to put a debug exc_info here?
6cd8b30
to
608f8c3
Compare
Implement the
NetworkPlugin
for Linux.Initial support is for:
Theips
method of the Linux OS plugin is implemented using the new parser; other methods such asdns
anddhcp
still use the legacy solution until we implemented parsers for the remaining network configuration systems. Let me know if this is wrong.Update: I retained the old
ips
. The new implementation can be invoked by prefixing with thenetwork
namespace, i.e.-f network.interfaces
While doing research I discovered that systemd supports "drop-in" configuration directories (see https://www.freedesktop.org/software/systemd/man/latest/systemd.unit.html) . Since this feature applicable to all systemd domains and not only networking, I propose we extend the systemd config parser with support for this (#933). I checked this with Stefan de Reuver.
Keep in mind that unlike the OSX and Windows implementations, which report actual values, the Linux implementation reports configuration of interfaces, or "potential" interfaces, as there is often no way to determine which configuration was active. I think this results in some friction with the interface records; for example, the mac address needed to be extended to a list.
Closes #776