Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port virtualbox scripts to VBoxManage CLI #625

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 148 additions & 59 deletions virtualbox/vbox-adapter-check.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,148 @@
import sys
import textwrap
import argparse
import virtualbox
from virtualbox.library import NetworkAttachmentType as NetType
import subprocess
import re
import time
import gi
gi.require_version('Notify', '0.7')
from gi.repository import Notify

DYNAMIC_VM_NAME = '.dynamic'
DISABLED_ADAPTER_TYPE = NetType.host_only
ALLOWED_ADAPTER_TYPES = (NetType.host_only, NetType.internal, NetType.null)

ENABLED_STRS = ('Disabled','Enabled ')

def check_and_disable_internet_access(session, machine_name, max_adapters, skip_disabled, do_not_modify):
"""
Checks if a VM's network adapter is set to an internet-accessible mode
and disables it if necessary, showing a warning popup.
Args:
session: The session of the virtual machine to check.
"""
adapters_with_internet = []
for i in range(max_adapters):
adapter = session.machine.get_network_adapter(i)

if skip_disabled and not adapter.enabled:
continue

print(f"{machine_name} {i+1}: {ENABLED_STRS[adapter.enabled]} {adapter.attachment_type}")

if DYNAMIC_VM_NAME in machine_name and adapter.attachment_type not in ALLOWED_ADAPTER_TYPES:
adapters_with_internet.append(i)
if not do_not_modify:
# Disable the adapter
adapter.attachment_type = DISABLED_ADAPTER_TYPE

if adapters_with_internet:
adapters_str = ", ".join(str(i+1) for i in adapters_with_internet)
if do_not_modify:
message = f"{machine_name} may be connected to the internet on adapter(s): {adapters_str}. Please double check your VMs settings."
DISABLED_ADAPTER_TYPE = "hostonly"
ALLOWED_ADAPTER_TYPES = ("hostonly", "intnet", "none")

# cmd is an array of string arguments to pass
def run_vboxmanage(cmd):
"""Runs a VBoxManage command and returns the output."""
try:
result = subprocess.run(["VBoxManage"] + cmd, capture_output=True, text=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
# exit code is an error
print(f"Error running VBoxManage command: {e} ({e.stderr})")
raise Exception(f"Error running VBoxManage command: {e}")

def get_vm_state(machine_guid):
"""Gets the VM state using 'VBoxManage showvminfo'."""
vminfo = run_vboxmanage(["showvminfo", machine_guid, "--machinereadable"])
for line in vminfo.splitlines():
if line.startswith("VMState"):
return line.split("=")[1].strip('"')
raise Exception(f"Could not start VM '{machine_guid}'")

def ensure_hostonlyif_exists():
"""Gets the name of, or creates a new hostonlyif"""
try:
# Find existing hostonlyif
hostonlyifs_output = run_vboxmanage(["list", "hostonlyifs"])
for line in hostonlyifs_output.splitlines():
if line.startswith("Name:"):
hostonlyif_name = line.split(":")[1].strip()
print(f"Found existing hostonlyif {hostonlyif_name}")
return hostonlyif_name

# No host-only interface found, create one
print("No host-only interface found. Creating one...")
run_vboxmanage(["hostonlyif", "create"]) # Create a host-only interface
hostonlyifs_output = run_vboxmanage(["list", "hostonlyifs"]) # Get the updated list
for line in hostonlyifs_output.splitlines():
if line.startswith("Name:"):
hostonlyif_name = line.split(":")[1].strip()
print(f"Created hostonlyif {hostonlyif_name}")
return hostonlyif_name
print("Failed to create new hostonlyif. Exiting...")
raise Exception("Failed to create new hostonlyif.")
except Exception as e:
print(f"Error getting host-only interface name: {e}")
raise Exception("Failed to verify host-only interface exists")

def ensure_vm_shutdown(machine_guid):
"""Checks if the VM is running and shuts it down if it is."""
try:
vm_state = get_vm_state(machine_guid)
if vm_state != "poweroff":
print(f"VM {machine_guid} is not powered off. Shutting down VM...")
run_vboxmanage(["controlvm", machine_guid, "poweroff"])

# Wait for VM to shut down (up to 1 minute)
timeout = 60 # seconds
check_interval = 5 # seconds
start_time = time.time()
while time.time() - start_time < timeout:
vm_state = get_vm_state(machine_guid)
if vm_state == "poweroff":
print(f"VM {machine_guid} is shut down (status: {vm_state}).")
time.sleep(5) # wait a bit to be careful and avoid any weird races
return
time.sleep(check_interval)
print("Timeout waiting for VM to shut down. Exiting...")
raise TimeoutError("VM did not shut down within the timeout period.")
else:
message = f"{machine_name} may be connected to the internet on adapter(s): {adapters_str}. The network adapter(s) have been disabled automatically to prevent an undesired internet connectivity. Please double check your VMs settings."

# Show notification using PyGObject
Notify.init("VirtualBox adapter check")
notification = Notify.Notification.new(f"INTERNET IN VM: {machine_name}", message, "dialog-error")
# Set highest priority
notification.set_urgency(2)
notification.show()

session.machine.save_settings()
session.unlock_machine()
print(f"VM {machine_guid} is already shut down (state: {vm_state}).")
return
except Exception as e:
print(f"Error checking VM state: {e}")
raise Exception(f"Could not ensure '{machine_guid}' shutdown")

def get_dynamic_vm_uuids():
"""Gets the machine UUID(s) for a given VM name using 'VBoxManage list vms'."""
dynamic_machine_guids = []
try:
vms_output = run_vboxmanage(["list", "vms"])
pattern = r'"(.*?)" \{(.*?)\}'
matches = re.findall(pattern, vms_output)
if matches:
for match in matches:
vm_name = match[0]
machine_guid = match[1]
if DYNAMIC_VM_NAME in vm_name:
dynamic_machine_guids.append((vm_name, machine_guid))
except Exception as e:
print(f"Error finding dynamic machines UUIDs: {e}")
raise Exception(f"Error finding dynamic machines UUIDs: {e}")
return dynamic_machine_guids

def change_network_adapters_to_hostonly(machine_guid, vm_name, hostonly_ifname, do_not_modify):
"""Verify all adapters are in an allowed configuration. Must be poweredoff"""
try:
# gather adapters in incorrect configurations
nics_with_internet = []
vminfo = run_vboxmanage(["showvminfo", machine_guid, "--machinereadable"])
for nic_number, nic_value in re.findall("^nic(\d+)=\"(\S+)\"", vminfo, flags=re.M):
if nic_value not in ALLOWED_ADAPTER_TYPES:
nics_with_internet.append(f"nic{nic_number}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is clear for the user to use the adapter number without prepending nic:

Suggested change
nics_with_internet.append(f"nic{nic_number}")
nics_with_internet.append(nic_number)


# modify the invalid adapters if allowed
if nics_with_internet:
for nic in nics_with_internet:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This displays a notification per enabled adapter, which is IMO confusing when several adapters are enabled. The previous version was concatenating them and displaying a single notification, what is also the idea behind saving the adapters with internet into a list.

if do_not_modify:
message = f"{vm_name} may be connected to the internet on adapter(s): {nic}. Please double check your VMs settings."
else:
message = f"{vm_name} may be connected to the internet on adapter(s): {nic}. The network adapter(s) have been disabled automatically to prevent an undesired internet connectivity. Please double check your VMs settings."
# different commands are necessary if the machine is running.
if get_vm_state(machine_guid) == "poweroff":
run_vboxmanage(["modifyvm", machine_guid, f"--{nic}", DISABLED_ADAPTER_TYPE])
print(f"Set VM {nic} to hostonly")
else:
run_vboxmanage(["controlvm", machine_guid, nic, "hostonly", hostonly_ifname])
print(f"Set VM {nic} to hostonly")
Comment on lines +126 to +131
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the duplicated print, allowing us to keep the message consist in all cases. I also think we should include the VM name in the message as at the moment I find it difficult to understand in the output:

Suggested change
if get_vm_state(machine_guid) == "poweroff":
run_vboxmanage(["modifyvm", machine_guid, f"--{nic}", DISABLED_ADAPTER_TYPE])
print(f"Set VM {nic} to hostonly")
else:
run_vboxmanage(["controlvm", machine_guid, nic, "hostonly", hostonly_ifname])
print(f"Set VM {nic} to hostonly")
if get_vm_state(machine_guid) == "poweroff":
run_vboxmanage(["modifyvm", machine_guid, f"--{nic}", DISABLED_ADAPTER_TYPE])
else:
run_vboxmanage(["controlvm", machine_guid, nic, "hostonly", hostonly_ifname])
print(f"Set VM {vm_name} adaper {nic} to hostonly")


# Show notification using PyGObject
Notify.init("VirtualBox adapter check")
notification = Notify.Notification.new(f"INTERNET IN VM: {vm_name}", message, "dialog-error")
# Set highest priority
notification.set_urgency(2)
notification.show()
print(f"{vm_name} network configuration not ok, sent notifaction")
return
else:
print(f"{vm_name} network configuration is ok")
return

except Exception as e:
print(f"Error changing network adapters: {e}")
raise Exception("Failed to verify VM adapter configuration")

def main(argv=None):
if argv is None:
Expand All @@ -66,12 +158,6 @@ def main(argv=None):
# Print status of all internet adapters without modifying any of them
vbox-adapter-check.vm --do_not_modify
# Print status of enabled internet adapters and disabled the enabled adapters with internet access in VMs with {DYNAMIC_VM_NAME} in the name
vbox-adapter-check.vm --skip_disabled
# # Print status of enabled internet adapters without modifying any of them
vbox-adapter-check.vm --skip_disabled --do_not_modify
"""
)
parser = argparse.ArgumentParser(
Expand All @@ -80,15 +166,18 @@ def main(argv=None):
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--do_not_modify", action="store_true", help="Only print the status of the internet adapters without modifying them.")
parser.add_argument("--skip_disabled", action="store_true", help="Skip the disabled adapters.")
args = parser.parse_args(args=argv)

vbox = virtualbox.VirtualBox()
for machine in vbox.machines:
session = machine.create_session()
max_adapters = vbox.system_properties.get_max_network_adapters(machine.chipset_type)
check_and_disable_internet_access(session, machine.name, max_adapters, args.skip_disabled, args.do_not_modify)

try:
hostonly_ifname = ensure_hostonlyif_exists()
dynamic_machine_guids = get_dynamic_vm_uuids()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original version was checking the status of all VMs, but only modified the ones with .dynamic in the name. This version checks only the .dynamic VMs. I think it is useful to be able to display the status of all of them (this could be under an argument).

if len(dynamic_machine_guids) > 0:
for vm_name, machine_guid in dynamic_machine_guids:
change_network_adapters_to_hostonly(machine_guid, vm_name, hostonly_ifname, args.do_not_modify)
else:
print(f"[Warning ⚠️] No Dynamic VMs found")
except Exception as e:
print(f"Error verifying dynamic VM hostonly configuration: {e}")

if __name__ == "__main__":
main()
main()
115 changes: 80 additions & 35 deletions virtualbox/vbox-clean-snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,94 @@
import sys
import argparse
import textwrap
import virtualbox
from virtualbox.library import MachineState


TO_DELETE = []


def get_snapshots_to_delete(snapshot, protected_snapshots):
for child in snapshot.children:
get_snapshots_to_delete(child, protected_snapshots)
snapshot_name = snapshot.name.lower()
for protected_str in protected_snapshots:
if protected_str.lower() in snapshot_name:
return
TO_DELETE.append((snapshot.name, snapshot.id_p))

import subprocess
import re

def run_vboxmanage(cmd):
"""Runs a VBoxManage command and returns the output."""
try:
result = subprocess.run(["VBoxManage"] + cmd, capture_output=True, text=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
# exit code is an error
print(f"Error running VBoxManage command: {e} ({e.stderr})")
raise Exception(f"Error running VBoxManage command")

def get_vm_state(vm_name):
"""Gets the VM state using 'VBoxManage showvminfo'."""
vminfo = run_vboxmanage(["showvminfo", vm_name, "--machinereadable"])
for line in vminfo.splitlines():
if line.startswith("VMState"):
return line.split("=")[1].strip('"')
raise Exception(f"Could not get VM state for '{vm_name}'")

def get_snapshot_children(vm_name, root_snapshot_name, protected_snapshots):
"""Gets the children of a snapshot using 'VBoxManage showvminfo'.

Args:
vm_name: The name of the VM.
snapshot_name: The name of the snapshot.

Returns:
A list of snapshot names that are children of the given snapshot.
"""
try:
vminfo = run_vboxmanage(["showvminfo", vm_name, "--machinereadable"])
# Find all snapshot names
snapshot_regex = rf'(SnapshotName(?:-\d+)*)=\"(.*?)\"'
snapshots = re.findall(snapshot_regex, vminfo, flags=re.M)

children = []

# find the root SnapshotName by matching the name
root_snapshotid = None
for snapshotid, snapshot_name in snapshots:
if snapshot_name.lower() == root_snapshot_name.lower() and (not any(p.lower() in snapshot_name.lower() for p in protected_snapshots)):
root_snapshotid = snapshotid

if not root_snapshotid:
print("Failed to find root snapshot")
raise Exception(f"Failed to find root snapshot {snapshot_name}")

# children of that snapshot share the same prefix id
dependant_child = False
for snapshotid, snapshot_name in snapshots:
if snapshotid.startswith(root_snapshotid):
if not any(p.lower() in snapshot_name.lower() for p in protected_snapshots):
children.append((snapshotid, snapshot_name))
else:
dependant_child = True

# remove the root snapshot if any children are protected OR it's the current snapshot
if dependant_child:
print("Root snapshot cannot be deleted as a child snapshot is protected")
children = [snapshot for snapshot in children if snapshot[0] != root_snapshotid]
return children
except Exception as e:
print(f"Error getting snapshot children: {e}")
raise Exception(f"Could not get snapshot children for '{vm_name}'")

def delete_snapshot_and_children(vm_name, snapshot_name, protected_snapshots):
vbox = virtualbox.VirtualBox()
vm = vbox.find_machine(vm_name)
snapshot = vm.find_snapshot(snapshot_name)
get_snapshots_to_delete(snapshot, protected_snapshots)
TO_DELETE = get_snapshot_children(vm_name, snapshot_name, protected_snapshots)

if TO_DELETE:
print(f"\nCleaning {vm_name} 🫧 Snapshots to delete:")
for name, _ in TO_DELETE:
print(f" {name}")
for snapshotid, snapshot_name in TO_DELETE:
print(f" {snapshot_name}")

if vm.state not in (MachineState.powered_off, MachineState.saved):
print(f"\nVM state: {vm.state}\n⚠️ Snapshot deleting is slower in a running VM and may fail in a changing state")
vm_state = get_vm_state(vm_name)
if vm_state not in ("poweroff", "saved"):
print(f"\nVM state: {vm_state}\n⚠️ Snapshot deleting is slower in a running VM and may fail in a changing state")

answer = input("\nConfirm deletion ('y'):")
if answer.lower() == "y":
print("\nDeleting... (this may take some time, go for an 🍦!)")
session = vm.create_session()
for name, uuid in TO_DELETE:
for snapshotid, snapshot_name in TO_DELETE[::-1]: # delete in reverse order to avoid issues with child snapshots
try:
progress = session.machine.delete_snapshot(uuid)
progress.wait_for_completion(-1)
print(f" 🫧 DELETED '{name}'")
run_vboxmanage(["snapshot", vm_name, "delete", snapshot_name])
print(f" 🫧 DELETED '{snapshot_name}'")
except Exception as e:
print(f" ❌ ERROR '{name}': {e}")
session.unlock_machine()
print(f" ❌ ERROR '{snapshot_name}': {e}")
else:
print(f"\n{vm_name} is clean 🫧")

Expand All @@ -67,10 +112,10 @@ def main(argv=None):

# Delete the 'CLEAN with IDA 8.4' children snapshots recursively skipping the ones that include 'clean' or 'done' in the name (case insensitive) in the 'FLARE-VM.20240604' VM
# NOTE: the 'CLEAN with IDA 8.4' root snapshot is skipped in this case
vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot 'CLEAN with IDA 8.4'
vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot CLEAN with IDA 8.4

# Delete the 'Snapshot 3' snapshot and its children recursively skipping the ones that include 'clean' or 'done' in the name (case insensitive) in the 'FLARE-VM.20240604' VM
vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot 'Snapshot 3'
vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot Snapshot 3

# Delete all snapshots in the 'FLARE-VM.20240604' VM
vbox-clean-snapshots.py FLARE-VM.20240604 --protected_snapshots ""
Expand All @@ -84,7 +129,7 @@ def main(argv=None):
parser.add_argument("vm_name", help="Name of the VM to clean up")
parser.add_argument("--root_snapshot", default="", help="Snapshot to delete (and its children recursively). Leave empty to clean all snapshots in the VM.")
parser.add_argument(
"--protected_snapshots",
"--protected_snapshots",
default="clean,done",
type=lambda s: s.split(","),
help='Comma-separated list of strings. Snapshots with any of the strings included in the name (case insensitive) are not deleted. Default: "clean,done"',
Expand All @@ -95,4 +140,4 @@ def main(argv=None):


if __name__ == "__main__":
main()
main()
Loading