From 0fc2e5023dc0b640038281fe12fb4b0654669ad7 Mon Sep 17 00:00:00 2001 From: Antoine van Gelder Date: Wed, 3 May 2023 16:02:01 +0200 Subject: [PATCH] host: add usb1 bulk transfer backend --- host/pygreat/comms.py | 26 +- host/pygreat/comms_backends/usb1.py | 522 ++++++++++++++++++++++++++++ 2 files changed, 544 insertions(+), 4 deletions(-) create mode 100644 host/pygreat/comms_backends/usb1.py diff --git a/host/pygreat/comms.py b/host/pygreat/comms.py index 44d81f9..5e5f583 100644 --- a/host/pygreat/comms.py +++ b/host/pygreat/comms.py @@ -77,17 +77,27 @@ class CommsBackend(object): @classmethod def from_device_uri(cls, **device_uri): - """ Creates a CommsBackend object apporpriate for the given device_uri. """ + """ Creates a CommsBackend object appropriate for the given device_uri. """ - #FIXME: implment this properly - - from .comms_backends.usb import USBCommsBackend + #FIXME: implement this properly # TODO: handle providing board "URIs", like "usb://1234abcd/?param=value", # and automatic resolution to a backend? # TODO: support more than USB + # Use the usb1 backend if this is a Cynthion. + # TODO use cynthion vendor/product id and interface protocol field + # TODO we should probably rather run through the backends and return + # the backend that can successfully match the device_identifiers + if 'idVendor' in device_uri and 'idProduct' in device_uri: + idVendor = device_uri['idVendor'] + idProduct = device_uri['idProduct'] + if idVendor == 0x1d50 and idProduct == 0x615b: + from .comms_backends.usb1 import USB1CommsBackend + return USB1CommsBackend(**device_uri) + # XXX: for now, just create a USB backend, as that's all we support + from .comms_backends.usb import USBCommsBackend return USBCommsBackend(**device_uri) @@ -579,6 +589,14 @@ def pack(cls, format_string, *args): @classmethod @memoize_with_lru_cache(maxsize=128) def unpack(cls, format_string, raw_bytes): + try: + return cls._unpack(cls, format_string, raw_bytes) + except Exception as e: + import logging + logging.error(f"Failed to unpack struct from raw bytes: {len(raw_bytes)} bytes => {raw_bytes}") + raise e + + def _unpack(cls, format_string, raw_bytes): """ Extended version of struct.unpack() for libgreat communciations. Accepts mostly the same arguments as struct.pack, with the following diff --git a/host/pygreat/comms_backends/usb1.py b/host/pygreat/comms_backends/usb1.py new file mode 100644 index 0000000..d56cd97 --- /dev/null +++ b/host/pygreat/comms_backends/usb1.py @@ -0,0 +1,522 @@ +# +# This file is part of libgreat +# + +""" +Module containing the definitions necessary to communicate with libgreat +devices over USB. +""" + +from __future__ import absolute_import +from future import utils as future_utils + +import sys + +import usb1 +import time +import errno +import array +import atexit +import struct +import platform +import logging + +from ..comms import CommsBackend +from ..errors import DeviceNotFoundError + +class USB1CommsBackend(CommsBackend): + """ + Class representing an abstract communications channel used to + connect with a libgreat board. + """ + + """ Class variable that stores our global libusb library context. """ + context = None + + """ Class variable that stores any devices that still need to be closed at program exit. """ + devices = [] + + # TODO this should be defined by the board! + """ The maximum input/output buffer size for libgreat commands. """ + LIBGREAT_MAX_COMMAND_SIZE = 1024 + + """ Bulk OUT endpoint address for libgreat interfaces. """ + LIBGREAT_BULK_OUT_ENDPOINT_ADDRESS = 0x02 + + """ Bulk IN endpoint address for libgreat interfaces. """ + LIBGREAT_BULK_IN_ENDPOINT_ADDRESS = 0x81 + + """ Bulk OUT endpoint number for libgreat interfaces. """ + LIBGREAT_BULK_OUT_ENDPOINT_NUMBER = LIBGREAT_BULK_OUT_ENDPOINT_ADDRESS + + """ Bulk IN endpoint number for libgreat interfaces. """ + LIBGREAT_BULK_IN_ENDPOINT_NUMBER = LIBGREAT_BULK_IN_ENDPOINT_ADDRESS & 0x7f + + """ The configuration number for libgreat interfaces. """ + LIBGREAT_CONFIGURATION = 1 + + """ The interface number for the libgreat command interface. """ + LIBGREAT_COMMAND_INTERFACE = 0 + + """ The request number for issuing vendor-request encapsulated libgreat commands. """ + LIBGREAT_REQUEST_NUMBER = 0x65 + + """ + Constant value provided to libgreat vendor requests to indicate that the command + should execute normally. + """ + LIBGREAT_VALUE_EXECUTE = 0 + + """ + Constant value provided to the libgreat vendor requests indicating that the active + command should be cancelled. + """ + LIBGREAT_VALUE_CANCEL = 0xDEAD + + """ + Constant size of errors returned by libgreat commands on failure. + """ + LIBGREAT_ERRNO_SIZE = 4 + + """ + A flag passed to command execution that indicates we expect no response, and don't need to wait + for anything more than the initial ACK. + """ + LIBGREAT_FLAG_SKIP_RESPONSE = (1 << 0) + + """ + A flag passed to command execution that indicates that the host should re-use all of the in-arguments + from a previous iteration. See execute_raw_command for documentation. + """ + LIBGREAT_FLAG_REPEAT_LAST = (1 << 1) + + + @classmethod + def _get_libusb_context(cls): + """ Retrieves the libusb context we'll use to fetch libusb device instances. """ + + # If we don't have a libusb context, create one. + if cls.context is None: + cls.context = usb1.USBContext().__enter__() + atexit.register(cls._destroy_libusb_context) + + return cls.context + + + @classmethod + def _destroy_libusb_context(cls): + """ Destroys our libusb context on closing our python instance. """ + for device in cls.devices: + device.close() + cls.context.close() + cls.context = None + + + @classmethod + def _device_matches_identifiers(cls, device, device_identifiers): + def _getInterfaceSubClass(cls, device): + for configuration in device: + if configuration.getConfigurationValue() == cls.LIBGREAT_CONFIGURATION: + try: + interface = configuration[cls.LIBGREAT_COMMAND_INTERFACE] + except IndexError: + return None + return interface[0].getSubClass() + return None + + def _getInterfaceProtocol(cls, device): + for configuration in device: + if configuration.getConfigurationValue() == cls.LIBGREAT_CONFIGURATION: + try: + interface = configuration[cls.LIBGREAT_COMMAND_INTERFACE] + except IndexError: + return None + return interface[0].getProtocol() + return None + + property_fetchers = { + 'idVendor': device.getVendorID, + 'idProduct': device.getProductID, + 'serial_number': device.getSerialNumber, + 'bus': device.getBusNumber, + 'address': device.getDeviceAddress, + 'interface_subclass': lambda: _getInterfaceSubClass(cls, device), + 'interface_protocol': lambda: _getInterfaceProtocol(cls, device), + } + + # Check for a match on each of our properties. + for identifier, fetcher in property_fetchers.items(): + + # If we have a constraint on the given identifier, check to make sure + # it matches our requested value... + if identifier in device_identifiers: + if device_identifiers[identifier] != fetcher(): + + # ... and return False if it doesn't. + return False + + # If we didn't fail to meet any constraints, return True. + return True + + + @classmethod + def _find_device(cls, device_identifiers, find_all=False): + """ Finds a USB device by its identifiers. See __init__ for their definitions. """ + + matching_devices = [] + context = cls._get_libusb_context() + + + # Search our list of devices until we find one that matches each of our identifiers. + for device in context.getDeviceList(): + + # If it matches all of our properties, add it to our list. + if cls._device_matches_identifiers(device, device_identifiers): + matching_devices.append(device) + + # If we have find_all, return all relevant devices... + if find_all: + return matching_devices + + # otherwise, return the first found device, or None if we haven't found any. + elif matching_devices: + return matching_devices[0] + else: + return None + + + # TODO: handle providing board "URIs", like "usb;serial_number=0x123", + # and automatic resolution to a backend? + + def __init__(self, **device_identifiers): + """ + Instantiates a new comms connection to a libgreat device; by default connects + to the first available board. + + Accepts the same arguments as pyusb's usb.find() method, allowing narrowing + to a more specific board by serial number. + """ + + # The usb1.UsbDevice device. + self.device = None + + # Zero pad serial numbers to 32 characters to match those + # provided by the USB descriptors. + if 'serial_number' in device_identifiers and len(device_identifiers['serial_number']) < 32: + device_identifiers['serial_number'] = device_identifiers['serial_number'].zfill(32) + + # Connect to the device that matches our identifiers. + device = self._find_device(device_identifiers) + + # If we couldn't find a board, bail out early. + if device is None: + raise DeviceNotFoundError() + + # Open the device, and get a libusb device handle. + self.device = device.open() + USB1CommsBackend.devices.append(self.device) + + # For now, supported boards provide a single configuration, so we + # can accept the first configuration provided. If the device isn't + # already configured, apply that configuration. + # + # Note that we can't universally apply configurations, as e.g. linux + # doesn't support this, and macOS considers setting the device's configuration + # grabbing an exclusive hold on the device. Both set the configuration for us, + # so this is skipped. + if not self.device.getConfiguration(): + self.device.setConfiguration(self.LIBGREAT_CONFIGURATION) + + # Start off with no knowledge of the device's state. + self._last_command_arguments = None + self._have_exclusive_access = False + + # Run the parent initialization. + super(USB1CommsBackend, self).__init__(**device_identifiers) + + + def _hold_libgreat_interface(self, timeout=1000): + """ + Ensures we have exclusive access to the USB interface used by libgreat. + + This is used internally both to get long-term exclusive access and to + take temporary exclusive access (e.g. during a libgreat command). + + parameters: + timeout -- the maximum amount of time we should wait before + """ + + # If we already have exclusive access, there's no need to grab it again. + if self._have_exclusive_access: + return + + # If timeout is none, set it to a microsecond. + if timeout is None: + timeout = 0.001 + + # Claim the first interface on the device, which we consider the standard + # interface used by libgreat. + timeout = time.time() + (timeout / 1000) + + while True: + try: + self.device.claimInterface(self.LIBGREAT_COMMAND_INTERFACE) + return + + # If the interface is already in use, repeat until we get the interface, or we time out. + except (usb1.USBErrorAccess, usb1.USBErrorBusy): + pass + + if time.time() > timeout: + raise IOError("timed out trying to claim access to a libgreat device!") + + + def _release_libgreat_interface(self, maintain_exclusivity=True): + """ + Releases our hold on the USB interface used by libgreat, allowing others + to use it. + + Parameters: + maintain_exclusivity -- if true, we won't release an interface held for + long-term exclusive access + """ + + # If we have exclusive access to the device, there's no need to release anything. + if maintain_exclusivity and self._have_exclusive_access: + return + + # Release our control over interface #0. + self.device.releaseInterface(self.LIBGREAT_COMMAND_INTERFACE) + + + def get_exclusive_access(self): + """ + Take (and hold) exclusive access to the device. Enables optimizations, + as we can make assumptions base on our holding of the device. + """ + + self._hold_libgreat_interface() + self._have_exclusive_access = True + + + def release_exclusive_access(self): + """ + Release exclusive access to the device. + """ + + if not self._have_exclusive_access: + return + + self._release_libgreat_interface(maintain_exclusivity=False) + self._have_exclusive_access = False + + + @staticmethod + def _build_command_prelude(class_number, verb): + """Builds a libgreat command prelude, which identifies the command + being executed to libgreat. + """ + return struct.pack(" self.LIBGREAT_MAX_COMMAND_SIZE: + raise ValueError("Command payload is too long!") + + # Otherwise, just send the prelude. + else: + to_send = prelude + + # If our max response is zero, never bother reading a response. + skip_reading_response = (max_response_length == 0) + + # To save on the overall number of command transactions, the backend provides an optimization + # that allows us to skip the "send" phase if the class, verb, and data are the same as the immediately + # preceeding call. Check to see if we can use that optimization. + use_repeat_optimization = False #self._have_exclusive_access and self._check_for_repeat(class_number, verb, data) + + # TODO: upgrade this to be able to not block? + try: + # If we're not using the repeat-optimization, send the in-arguments to the device. + if not use_repeat_optimization: + + # Set the FLAG_SKIP_RESPONSE flag if we don't expect a response back from the device. + flags = self.LIBGREAT_FLAG_SKIP_RESPONSE if skip_reading_response else 0 + + self.device.controlWrite(usb1.TYPE_VENDOR | usb1.RECIPIENT_ENDPOINT, + self.LIBGREAT_REQUEST_NUMBER, self.LIBGREAT_VALUE_EXECUTE, flags, to_send, timeout) + + # If we're skipping reading a response, return immediately. + if skip_reading_response: + return None + + + # Set the FLAG_REPEAT_LAST if we're using our repeat-last optimization. + flags = self.LIBGREAT_FLAG_REPEAT_LAST if use_repeat_optimization else 0 + + # Truncate our maximum, if necessary. + if max_response_length > 4096: + max_response_length = self.LIBGREAT_MAX_COMMAND_SIZE + + # ... and read any response the device has prepared for us. + response = self.device.controlRead(usb1.TYPE_VENDOR | usb1.RECIPIENT_ENDPOINT, + self.LIBGREAT_REQUEST_NUMBER, self.LIBGREAT_VALUE_EXECUTE, flags, max_response_length, comms_timeout) + response = bytes(response) + + # If we were passed an encoding, attempt to decode the response data. + if encoding and response: + response = response.decode(encoding, errors='ignore') + + # Return the device's response. + return response + + except Exception as e: + + # Abort the command, and grab the last error number, if possible. + error_number = self.abort_command() + + # If we got a pipe error, this indicates the device issued a realerror, + # and we should convert this into a failed command error. + is_signaled_error = isinstance(e, usb1.USBErrorPipe) + + # If this was an error raised on the device side, covert it to a CommandFailureError. + if is_signaled_error and rephrase_errors: + future_utils.raise_from(self._exception_for_command_failure(error_number, pretty_name), None) + else: + raise + finally: + + # Always release the libgreat interface before we return. + self._release_libgreat_interface() + + + def abort_command(self, timeout=1000, retry_delay=1): + """ Aborts execution of a current libgreat command. Used for error handling. + + Returns: + the last error code returned by a command; only meaningful if + """ + + # Invalidate any existing knowledge of the device's state. + self._last_command_arguments = None + + # Create a quick function to issue the abort request. + execute_abort = lambda device : device.controlRead( + usb1.TYPE_VENDOR | usb1.RECIPIENT_ENDPOINT, + self.LIBGREAT_REQUEST_NUMBER, + self.LIBGREAT_VALUE_CANCEL, + 0, + self.LIBGREAT_ERRNO_SIZE, + timeout + ) + + # And try executing the abort progressively, multiple times. + try: + result = execute_abort(self.device) + except: + if retry_delay: + time.sleep(retry_delay) + result = execute_abort(self.device) + else: + raise + + # Parse the value returned from the request, which may be an error code. + return struct.unpack("