diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 4c93403a23..eb80e4e928 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,7 +1,3 @@ # As per https://docs.github.com/en/github/creating-cloning-and-archiving-repositories/about-code-owners#example-of-a-codeowners-file - -* @Torxed - -# Any PKGBUILD changes should tag grazzolini -/PKGBUILDs/ @grazzolini -/PKGBUILD @grazzolini +.github/* @Torxed +tests/qemu/* @Torxed \ No newline at end of file diff --git a/.github/workflows/iso-build.yaml b/.github/workflows/iso-build.yaml index b86c47ecfc..79222493be 100644 --- a/.github/workflows/iso-build.yaml +++ b/.github/workflows/iso-build.yaml @@ -28,7 +28,7 @@ jobs: steps: - uses: actions/checkout@v4 - run: pwd - - run: find . + - run: echo "Building for $GITHUB_ACTOR" - run: cat /etc/os-release - run: pacman-key --init - run: pacman --noconfirm -Sy archlinux-keyring @@ -37,3 +37,10 @@ jobs: with: name: Arch Live ISO path: /tmp/archlive/out/*.iso + call-workflow-qemu-test: + name: call qemu testbench + needs: build + permissions: + contents: read + pull-requests: write + uses: ./.github/workflows/qemu-tests.yaml \ No newline at end of file diff --git a/.github/workflows/qemu-tests.yaml b/.github/workflows/qemu-tests.yaml new file mode 100644 index 0000000000..2beeeb9d9b --- /dev/null +++ b/.github/workflows/qemu-tests.yaml @@ -0,0 +1,42 @@ +name: Runs Qemu test case pci_emulation + +# This job is callewd from iso-build +on: + workflow_call: + +jobs: + run-qemu: + name: qemu testbench + if: ${{ github.actor == 'Torxed' }} + runs-on: self-hosted + environment: testing + steps: + - uses: actions/checkout@v4 + - uses: actions/download-artifact@master + with: + name: Arch Live ISO + path: ./_work/iso/ + - run: echo '' > ./tests/qemu/serial.log + # Generate a .screenrc that will auto-run the test + # as well as record it for publishing using asciinema. + - run: | + cat <> .screenrc_test + startup_message off + chdir tests/qemu + screen 0 python run_test.py + split + focus down + resize 24 + screen 1 asciinema rec demo.cast --overwrite -c "tail -f serial.log" + EOF + - run: pwd + - run: cp /usr/share/ovmf/x64/OVMF_VARS.fd ./tests/qemu/OVMF_VARS.fd + - run: rm -f -- ./tests/qemu/archtest.img + - run: qemu-img create -f qcow2 ./tests/qemu/archtest.img 10G + - run: tree -L 2 + - run: cat .screenrc_test + - run: script -c "stty rows 24 && stty cols 80 && screen -c .screenrc_test" + - uses: actions/upload-artifact@v4 + with: + name: archtest-pci_emulation + path: ./tests/qemu/demo.cast diff --git a/.gitignore b/.gitignore index a6075cc4d9..f2f14f4374 100644 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,8 @@ SAFETY_LOCK **/**.qcow2 **/**.log **/**.fd +**/**.cast +**/**.pid /test*.py **/archiso /guided.py diff --git a/tests/qemu/machines.py b/tests/qemu/machines.py new file mode 100644 index 0000000000..904a5fbc2c --- /dev/null +++ b/tests/qemu/machines.py @@ -0,0 +1,56 @@ +# flake8: noqa E131 + +import pathlib +from runners import TestLeaveAllDefault + +parameters = { + "pci_emulation" : { + 'test_class' : TestLeaveAllDefault, + 'arguments' : [ + '/usr/bin/qemu-system-x86_64', + '-name', '"archinstall-test"', + '-display', 'none', + '-monitor', 'none', + '-nographic', + '-m', '4096', + '-smp', '4,sockets=1,dies=1,cores=4,threads=1', + '-usb', '-device', 'usb-host,vendorid=0x1050,productid=0x0407', + '-pidfile', f'{pathlib.Path(__file__).parent}/archinstall-test.pid', + '-cpu', 'host,topoext,kvm=off,hv_relaxed,hv_spinlocks=0x1fff,hv_vapic,hv_time', + '-enable-kvm', + '-object', 'rng-random,filename=/dev/urandom,id=rng0', + '-device', 'virtio-rng-pci,rng=rng0', + '-usbdevice', 'mouse', + '-global', 'driver=cfi.pflash01,property=secure,value=on', + '-machine', 'type=q35,accel=kvm,kernel_irqchip=on', + '-smbios', '"type=0,vendor=American Megatrends Inc.,version=P4.60,date=08/03/2021,release=08.03.2021"', + '-smbios', '"type=1,manufacturer=Inet_AB,product=To Be Filled By O.E.M.,version=To Be Filled By O.E.M.,serial=245797,uuid=4154a2b8-7b7f-0000-0000-000000000000,sku=To Be Filled By O.E.M.,family=To Be Filled By O.E.M."', + '-smbios', '"type=2,manufacturer=ASRock,product=X570 Taichi,version=,serial=M80-D00000012340asset=,location="', + '-smbios', '"type=3,manufacturer=To Be Filled By O.E.M.,version=To Be Filled By O.E.M.,serial=214345,asset=To Be Filled By O.E.M.,sku=To Be Filled By O.E.M."', + '-smbios', '"type=4,sock_pfx=AM4,manufacturer=Advanced Micro Devices,, Inc.,version=AMD Ryzen 9 5900X 12-Core Processor,serial=Unknown,asset=Unknown,part=Unknown"', + '-smbios', '"type=17,loc_pfx=DIMM 1,bank=P0 CHANNEL A,manufacturer=Unknown,serial=00000000,asset=Not Specified,part=CMK64GX4M2E3200C16,speed=2133"', + '-smbios', '"type=17,loc_pfx=DIMM 1,bank=P0 CHANNEL B,manufacturer=Unknown,serial=00000000,asset=Not Specified,part=CMK64GX4M2E3200C16,speed=2133"', + '-device', 'intel-iommu,device-iotlb=on,caching-mode=on', + '-device', 'pcie-root-port,port=0xe,chassis=8,id=pci.8,bus=pcie.0,multifunction=on,addr=0x6', + '-device', 'virtio-keyboard-pci,id=input1,bus=pci.8,addr=0x0', + '-device', 'pcie-root-port,port=0xf,chassis=9,id=pci.9,bus=pcie.0,addr=0x6.0x1', + '-device', 'pcie-root-port,port=0x11,chassis=13,id=pci.13,bus=pcie.0,addr=0x6.0x3', + '-drive', 'if=pflash,format=raw,readonly=on,file=/usr/share/ovmf/x64/OVMF_CODE.secboot.fd', + '-drive', f'if=pflash,format=raw,file={pathlib.Path(__file__).parent}/OVMF_VARS.fd', + # '-tpmdev', 'passthrough,id=tpm0,path=/dev/tpm0,cancel-path=/tmp/foo-cancel2', + # '-device', 'tpm-tis,tpmdev=tpm0', + '-object', 'iothread,id=iothread1', + '-device', 'virtio-scsi-pci,bus=pcie.0,id=scsi2,addr=0x8', + '-device', 'virtio-scsi-pci,iothread=iothread1,id=scsi0,num_queues=8,bus=pci.13,addr=0x0', + '-device', '"scsi-hd,serial=S5GBAD12345ABCE,drive=libvirt-1-format,bus=scsi2.0,id=scsi0-0-0-0,channel=0,scsi-id=0,lun=0,device_id=drive-scsi0-0-0-0,bootindex=2,write-cache=on"', + '-blockdev', '\'{"driver":"file","filename":"./archtest.img","aio":"threads","node-name":"libvirt-1-storage","cache":{"direct":true,"no-flush":false},"auto-read-only":true,"discard":"unmap"}\'', + '-blockdev', '\'{"node-name":"libvirt-1-format","read-only":false,"discard":"unmap","cache":{"direct":true,"no-flush":false},"driver":"qcow2","file":"libvirt-1-storage","backing":null}\'', + '-device', 'pcie-root-port,multifunction=on,bus=pcie.0,id=port9-0,addr=0x9,chassis=0', + '-device', 'virtio-net-pci,mac=FE:00:00:00:00:01,id=network0,netdev=network0.0,status=on,bus=port9-0', + '-netdev', 'tap,ifname=tap0,id=network0.0,script=no,downscript=no', + '-audiodev', 'pipewire,id=win11', + '-device', 'ich9-intel-hda,id=sound0,bus=pcie.0,addr=0x1b', + '-device', 'hda-micro,audiodev=win11', + ] + } +} \ No newline at end of file diff --git a/tests/qemu/run_test.py b/tests/qemu/run_test.py new file mode 100644 index 0000000000..d52932aecc --- /dev/null +++ b/tests/qemu/run_test.py @@ -0,0 +1,259 @@ +import time +import asyncio +import threading +import pathlib +import socket +import select +import logging +import sys +from subprocess import Popen, PIPE, STDOUT +from qemu.qmp import QMPClient +from machines import parameters + +logger = logging.getLogger("archtest") +logger.setLevel(logging.INFO) +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.INFO) +logger.addHandler(handler) + +class QMPClientMonitor: + def __init__(self, name: str, qmp_socket): + self.qmp = QMPClient(name) + self.qmp.logger = logger + self.qmp_socket = qmp_socket + self.loop = None + + async def watch_events(self): + try: + async for event in self.qmp.events: + print(f"QMP Event: {event['event']}") + except asyncio.CancelledError: + return + + async def run(self): + self.loop = asyncio.get_event_loop() + await self.qmp.connect(self.qmp_socket) + + asyncio.create_task(self.watch_events()) + + await self.qmp.runstate_changed() + try: + await self.qmp.disconnect() + except: + pass + +class SerialMonitor(threading.Thread): + def __init__(self, profile, QMP, serial_socket_path, test_case): + self.profile = profile + self.QMP = QMP + self.serial_socket_path = serial_socket_path + self.test_case = test_case(serial_monitor=self) + + threading.Thread.__init__(self) + self.start() + + async def edit_boot(self): + logger.info("Adding 'console=tty0 console=ttyS0,115200' to default boot option") + + # https://github.com/coreos/qemu/blob/master/qmp-commands.hx + await self.QMP.qmp.execute_msg( + self.QMP.qmp.make_execute_msg( + 'send-key', + arguments={ + 'keys': [ + { + "type": "qcode", + "data": "e" + } + ] + } + ) + ) + + await asyncio.sleep(1) + await self.QMP.qmp.execute_msg( + self.QMP.qmp.make_execute_msg( + 'send-key', + arguments={ + 'keys': [ + { + "type": "qcode", + "data": "end" + } + ] + } + ) + ) + await asyncio.sleep(1) + + keys = [] + # https://gist.github.com/mvidner/8939289 + keys.append({"type": "qcode", "data": "spc"}) + for character in list('console=tty0 console=ttyS0,115200'): + if character.isupper(): + keys.append({"type": "qcode", "data": 'caps_lock'}) + keys.append({"type": "qcode", "data": character.lower().replace('=', 'equal').replace(',', 'comma').replace(' ', 'spc')}) + if character.isupper(): + keys.append({"type": "qcode", "data": 'caps_lock'}) + + await self.QMP.qmp.execute_msg( + self.QMP.qmp.make_execute_msg( + 'send-key', + arguments={ + 'keys': keys + } + ) + ) + + await asyncio.sleep(1) + await self.QMP.qmp.execute_msg( + self.QMP.qmp.make_execute_msg( + 'send-key', + arguments={ + 'keys': [ + { + "type": "qcode", + "data": "kp_enter" + } + ] + } + ) + ) + + async def login_root(self): + self.client_socket.send(b'root\015') + + async def run_archinstall(self): + # self.client_socket.send(b'tput cols\015') + # self.client_socket.send(b'tput lines\015') + + # For some reason, while running in this test mode, + # building archinstall fails randomly with "No such file or directory"." + # So a safe bet is to just re-run it manually before starting. + self.client_socket.send(b'python -m build --verbose --wheel --no-isolation\015') + time.sleep(1) + self.client_socket.send(b'pip install dist/archinstall*.whl --break-system-packages\015') + time.sleep(1) + self.client_socket.send(b'archinstall\015') + + def run(self): + self.client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.client_socket.connect(str(self.serial_socket_path)) + + alive = True + entered_test_case = False + # The output of serial.log can be displayed with: + # tail -f serial.log + # Or record with: + # asciinema rec demo.cast -c "tail -f serial.log" + with open('serial.log', 'wb') as fh: + while alive and self.test_case.exit_code == -1: + r, w, x = select.select([self.client_socket], [], [], 0.2) + for fd in r: + if (output := self.client_socket.recv(8192)): + fh.write(output) + fh.flush() + + # This block should be moved into the test class + if b'Boot in' in output and entered_test_case is False: + logger.info("Found boot prompt") + asyncio.run_coroutine_threadsafe(self.edit_boot(), loop=self.QMP.loop) + elif b'archiso login:' in output and entered_test_case is False: + logger.info("Found login prompt") + asyncio.run_coroutine_threadsafe(self.login_root(), loop=self.QMP.loop) + elif b'Type archinstall to launch the installer.' in output and entered_test_case is False: + logger.info("Found archinstall start point") + asyncio.run_coroutine_threadsafe(self.run_archinstall(), loop=self.QMP.loop) + entered_test_case = True + # ------- + elif entered_test_case: + self.test_case.feed(output) + + else: + self.client_socket.close() + alive = False + break + time.sleep(0.025) + + print(f"Serial died: {alive}, {self.test_case.exit_code}") + +class QemuSession(threading.Thread): + def __init__(self, cmd, qmp_socket, serial_socket): + self.cmd = cmd + self.qmp_socket = qmp_socket + self.serial_socket = serial_socket + + threading.Thread.__init__(self) + self.start() + + def run(self): + self.handle = Popen( + ' '.join(self.cmd), + stdout=PIPE, + stderr=STDOUT, + stdin=PIPE, + shell=True, + cwd=str(pathlib.Path(__file__).parent), + pass_fds=[self.qmp_socket.fileno(), self.serial_socket.fileno()] + ) + + # Run the qemu process until complete. + # And deal with the different buffers accordingly + while self.handle.poll() is None: + r, w, x = select.select([self.handle.stdout.fileno(), self.handle.stdout.fileno()], [], [], 0.2) + for fd in r: + if fd == self.handle.stdout.fileno(): + if (output := self.handle.stdout.read()): + print(output) + # elif fd == self.handle.stderr.fileno(): + # if (output := self.handle.stderr.read()): + # print(output) + # No exit signal yet + time.sleep(0.25) + + r, w, x = select.select([self.handle.stdout.fileno(), self.handle.stdout.fileno()], [], [], 0.2) + for fd in r: + if fd == self.handle.stdout.fileno(): + if (output := self.handle.stdout.read()): + print(output) + # elif fd == self.handle.stderr.fileno(): + # if (output := self.handle.stderr.read()): + # print(output) + + self.handle.stdin.close() + self.handle.stdout.close() + # self.handle.stderr.close() + logger.warning("Qemu closed..") + + +# .. todo:: +# Needs a bit of more work to allow for multiple runners and test benches. +for profile in parameters: + qmp_socket_path = pathlib.Path(__file__).parent / "qmp.socket" + serial_socket_path = pathlib.Path(__file__).parent / "serial.socket" + + qmp_socket_path.unlink(missing_ok=True) + serial_socket_path.unlink(missing_ok=True) + + logger.info(f"Creating serial ttyS0 and QMP sockets for use in Qemu") + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as qmp_socket: + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as serial_socket: + qmp_socket.bind(str(qmp_socket_path)) + serial_socket.bind(str(serial_socket_path)) + qmp_socket.listen(2) + serial_socket.listen(2) + + args = parameters[profile]['arguments'] + [ + '-chardev', f'socket,id=qmp1,fd={qmp_socket.fileno()},server=on,wait=off', + '-chardev', f'socket,id=serial1,fd={serial_socket.fileno()},server=on,wait=on', + '-mon', f'chardev=qmp1,mode=control,pretty=off', + '-serial', f"chardev:serial1", + '-drive', f'file=$(ls -t ../../_work/iso/archlinux-*-x86_64.iso | head -n 1),media=cdrom,cache=none,id=cdrom0,index=0' + ] + + logger.info(f"Spawning Qemu test profile {profile}") + session = QemuSession(args, qmp_socket, serial_socket) + monitor = QMPClientMonitor(profile, str(qmp_socket_path)) + serial = SerialMonitor(profile, monitor, serial_socket_path, test_case=parameters[profile]['test_class']) + + asyncio.run(monitor.run()) \ No newline at end of file diff --git a/tests/qemu/runners.py b/tests/qemu/runners.py new file mode 100644 index 0000000000..16515b1a33 --- /dev/null +++ b/tests/qemu/runners.py @@ -0,0 +1,176 @@ +import threading +import time +import os +import logging +import asyncio +from testbase import TestBase + +logger = logging.getLogger("archtest") + +class TestLeaveAllDefault(TestBase): + def __init__(self, serial_monitor): + super().__init__(serial_monitor) + self.running_test = False + self.installation_complete = False + self.installation_successful = False + self.fix_post_install_boot = False + self.entered_disk_enc_pw = False + + self.exit_code = -1 + + threading.Thread.__init__(self) + self.start() + + def run(self): + while True: + if b'Set/Modify the below options' in self.buffer and self.running_test is False: + # This is our entrypoint, from here on we'll have to fly blind because + # the menu system uses ANSI/VT100 escape codes to put the arrow and stuff. + # So we can't check for `> Archinstall langauge` for instance. + # So lets fire away our test sequence and monitor for some break-point values. + logger.info(f"Running through a typical installation scenario accepting all defaults.") + self.running_test = True + time.sleep(1) + + logger.info(f"Selecting a good regional mirror") + # Navigate to Mirror selection + self.send_key('arrow_down', delay=0.2) + self.send_key('enter', delay=0.2) + self.send_key('enter', delay=0.2) + time.sleep(2) # Let the UI buffer complete before we start searching + # Search for a known mirror region with good speed relative to source of execution + self.send_key('/', delay=0.2) + self.send_string('Sweden', delay=0.2) + self.send_key('enter', delay=0.2) + # Exit the mirror selection screen + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) + self.send_key('enter', delay=0.2) + # Enter 'locale' option + logger.info(f"Setting a keyboard locale to something known") + self.send_key('enter', delay=0.2) + # Enter keyboard layout + self.send_key('enter', delay=0.2) + time.sleep(1) # Let the UI buffer complete again before searching + self.send_key('/', delay=0.2) + time.sleep(0.5) + self.send_string('sv-latin1', delay=0.2) + self.send_key('enter', delay=0.2) + # Exit locale + logger.info(f"Selecting the one virtual harddrive") + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) + self.send_key('enter', delay=0.2) + # Enter disk config + self.send_key('enter', delay=0.2) + # Enter partitioning + self.send_key('enter', delay=0.2) + # Use best effort + self.send_key('enter', delay=0.2) + # Select second option, which should be Virtio Block Device + self.send_key('arrow_down', delay=0.2) + self.send_key('enter', delay=0.2) + # Select btrfs + logger.info(f"Selecting btrfs + subvolumes + compression") + self.send_key('enter', delay=0.2) + # Use subvolumes + self.send_key('enter', delay=0.2) + # Use compression + self.send_key('enter', delay=0.2) + # Exit partitioning + logger.info(f"Setting disk encryption password") + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) + self.send_key('enter', delay=0.2) + # Enter disk encryption + self.send_key('enter', delay=0.2) + # Select encryption type + self.send_key('enter', delay=0.2) + # Select LUKS + self.send_key('enter', delay=0.2) + # Enter password selection + self.send_key('arrow_down', delay=0.2) + self.send_key('enter', delay=0.2) + # Enter a test password + self.send_string('test', delay=0.2) + self.send_key('enter', delay=0.2) + self.send_string('test', delay=0.2) + self.send_key('enter', delay=0.2) + # Select partition + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) + self.send_key('enter', delay=0.2) + # Select the default partition + self.send_key('enter', delay=0.2) + # Exit encryption menu + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) + self.send_key('enter', delay=0.2) + # Skip some entries in the main menu + logger.info(f"Setting a root password") + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) # Move to "root password" + self.send_key('enter', delay=0.2) + # Enter a test root password + self.send_string('test', delay=0.2) + self.send_key('enter', delay=0.2) + self.send_string('test', delay=0.2) + self.send_key('enter', delay=0.2) + # Skip some entries + logger.info(f"Adding nano as additional password") + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) # Additional packages + self.send_key('enter', delay=0.2) + # Enter a test package that is reliable + self.send_string('nano', delay=0.2) + self.send_key('enter', delay=0.2) + logger.info(f"Setting network config") + # Configure networking + time.sleep(1) + self.send_key('enter', delay=0.2) + self.send_key('enter', delay=0.2) + time.sleep(2) + # Proceed to installation + logger.info(f"Proceeding to install") + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) + self.send_key('arrow_down', delay=0.2) # Install + self.send_key('enter', delay=0.2) + # Proceed to install + self.send_key('enter', delay=0.2) + elif b'Would you like to chroot' in self.buffer and self.installation_complete is False: + logger.info(f"Installation appears to have completed") + self.installation_complete = True + self.send_key('arrow_down', delay=0.2) # No + self.send_key('enter', delay=0.2) + elif b'Installation completed without any errors' in self.buffer and self.installation_successful is False: + logger.info(f"Installation was successful, rebooting") + self.installation_successful = True + self.send_string('reboot', delay=0.2) + self.send_key('enter', delay=0.2) + elif b'Arch Linux (linux-fallback)' in self.buffer and self.installation_successful is True and self.fix_post_install_boot is False: + logger.info(f"Found linux-fallback, assuming bootloader needs adjusting for serial output") + self.fix_post_install_boot = True + asyncio.run_coroutine_threadsafe(self.serial_monitor.edit_boot(), loop=self.serial_monitor.QMP.loop) + elif b'A password is required to access the root volume' in self.buffer and self.installation_successful is True and self.fix_post_install_boot is True and self.entered_disk_enc_pw is False: + logger.info(f"Found disk encryption password, supplying it") + self.entered_disk_enc_pw = True + self.send_string('test', delay=0.2) + self.send_key('enter', delay=0.2) + elif b'archlinux login:' in self.buffer[-500:] and self.installation_successful and self.entered_disk_enc_pw: + logger.info("Installation successful, for real!") + self.exit_code = 0 + break + + time.sleep(0.25) + + os.system('pkill tail') + exit(self.exit_code) \ No newline at end of file diff --git a/tests/qemu/testbase.py b/tests/qemu/testbase.py new file mode 100644 index 0000000000..e7a4760f64 --- /dev/null +++ b/tests/qemu/testbase.py @@ -0,0 +1,73 @@ +import enum +import time +import threading +import select +import logging +import socket + +logger = logging.getLogger("archtest") + +class Keyboard(enum.Enum): + # https://vt100.net/docs/vt100-ug/chapter3.html + # https://espterm.github.io/docs/VT100%20escape%20codes.html + arrow_up = '\033[A' + arrow_down = '\033[B' + arrow_right = '\033[C' + arrow_left = '\033[D' + enter = '\015' + forward_slash = '\057' + escape = '\033' + +class TestBase(threading.Thread): + def __init__(self, serial_monitor): + self.serial_monitor = serial_monitor + self.buffer = b'' + + async def _send_key(self, key): + pass # This is for the QMP socket if needed + + def send_string(self, chars, delay=None): + logger.debug(f"Sending string: {chars.encode('UTF-8')}") + + while len(select.select([self.serial_monitor.client_socket], [], [], 0.2)[0]): + # Waiting for read buffer to finish to not collide + time.sleep(0.02) + + if len(select.select([], [self.serial_monitor.client_socket], [], 0.2)[1]): + # asyncio.run_coroutine_threadsafe(self._send_key(vt100_key), loop=self.serial_monitor.QMP.loop) + self.serial_monitor.client_socket.send(chars.encode('UTF-8'), socket.MSG_WAITALL) # flags=socket.MSG_WAITALL + # os.fsync(self.serial_monitor.client_socket.fileno()) + else: + logger.error(f"Could not send key, serial was not in a write state!") + + if delay: + time.sleep(delay) + + def send_key(self, key, delay=None): + try: + vt100_key = Keyboard[key].value.encode('UTF-8') + except: + # logger.warning(f"Could not convert key: {key}") + vt100_key = key.encode('UTF-8') + + logger.debug(f"Sending key: {key} \033[0;32m({vt100_key})\033[0m") + + while len(select.select([self.serial_monitor.client_socket], [], [], 0.2)[0]): + # Waiting for read buffer to finish to not collide + time.sleep(0.02) + + if len(select.select([], [self.serial_monitor.client_socket], [], 0.2)[1]): + # asyncio.run_coroutine_threadsafe(self._send_key(vt100_key), loop=self.serial_monitor.QMP.loop) + self.serial_monitor.client_socket.send(vt100_key, socket.MSG_WAITALL) # flags=socket.MSG_WAITALL + # os.fsync(self.serial_monitor.client_socket.fileno()) + else: + logger.error(f"Could not send key, serial was not in a write state!") + + if delay: + time.sleep(delay) + + # Flushing the serial buffer, as it can quickly become frozen + # self.serial_monitor.client_socket.sendall(b"") + + def feed(self, line): + self.buffer += line \ No newline at end of file