diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index 14e4982f..8f576beb 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -277,6 +277,8 @@ hidecallergraph hidecallgraph hideinitializer hlp +Hookimpl +hookspec hostname hpaulson hpp @@ -313,6 +315,7 @@ ints ip ipc ipp +isabstract isalnum isclass isdir diff --git a/src/fprime_gds/common/communication/adapters/base.py b/src/fprime_gds/common/communication/adapters/base.py index fc38d485..d2cbc2c7 100644 --- a/src/fprime_gds/common/communication/adapters/base.py +++ b/src/fprime_gds/common/communication/adapters/base.py @@ -9,7 +9,7 @@ class representing the core features of the adapter class that must be implement @author lestarch """ import abc - +from fprime_gds.plugin.definitions import gds_plugin_implementation class BaseAdapter(abc.ABC): """ @@ -46,72 +46,25 @@ def write(self, frame): :return: True if data sent through adapter, False otherwise """ - @classmethod - @abc.abstractmethod - def get_arguments(cls): - """ - Returns a set of arguments consumed by this adapter. This will be consumed by the CLI layer in order to provide - command line arguments to the user. Note: these should be globally unique args, e.g. --ip-address - :return: dictionary, keys of tuple of arg flags and value of list of other arguments to argparse's add_argument - """ +class NoneAdapter(BaseAdapter): + """ None adapter used to turn off the comm script """ @classmethod - @abc.abstractmethod - def check_arguments(cls, args): - """ - Code that should check arguments of this adapter. If there is a problem with this code, then a "ValueError" - should be raised describing the problem with these arguments. - - :param args: arguments as dictionary - """ + def get_name(cls): + """ Get name of the non-adapter """ + return "none" - @classmethod - def get_adapters(cls): - """ - Get all known adapters of this base class. These must be imported into the comm-layer to be available to the - system, however; they only need to be imported. Once imported this function will find them and make them - available to the comm-layer in the standard way. - - :return: list of all imported comm adapters. - """ - adapter_map = {} - for adapter in cls.__subclasses__(): - # Get two versions of names - adapter_name = adapter.__module__ - adapter_short = adapter_name.split(".")[-1] - # Check to use long or short name - if adapter_short not in adapter_map: - adapter_name = adapter_short - adapter_map[adapter_name] = adapter - return adapter_map - - @staticmethod - def process_arguments(clazz, args): - """ - Process arguments incoming from the command line and construct a dictionary of kwargs to supply to the chosen - adapter at creation time. This will allow the user to choose any imported adapters at runtime. + def read(self, timeout=0.500): + """ Raise exception if this is called""" + raise NotImplementedError() - :param args: arguments to process - :return: dictionary of constructor keyword arguments - """ - return { - value["dest"]: getattr(args, value["dest"]) - for value in clazz.get_arguments().values() - } + def write(self, frame): + """ Raise exception if this is called""" + raise NotImplementedError() @classmethod - def construct_adapter(cls, adapter_name, args): - """ - Constructs a new adapter, from the given adapter name and the given namespace of argument inputs. This is a - wrapper of "get_adapters" and "process_arguments" to help build a new, fully configured, adapter. This is a - factory method. - - :param adapter_name: name of the adapter to build - :param args: namespace of arg value to help build an adapter - :return: newly constructed adapter - """ - if adapter_name == "none": - return None - adapter = cls.get_adapters()[adapter_name] - return adapter(**cls.process_arguments(adapter, args)) + @gds_plugin_implementation + def register_communication_plugin(cls): + """ Register this as a plugin """ + return cls \ No newline at end of file diff --git a/src/fprime_gds/common/communication/adapters/ip.py b/src/fprime_gds/common/communication/adapters/ip.py index 71e73743..bdd89ef9 100644 --- a/src/fprime_gds/common/communication/adapters/ip.py +++ b/src/fprime_gds/common/communication/adapters/ip.py @@ -18,6 +18,8 @@ import fprime_gds.common.communication.adapters.base import fprime_gds.common.logger +from fprime_gds.plugin.definitions import gds_plugin_implementation + LOGGER = logging.getLogger("ip_adapter") @@ -114,7 +116,7 @@ def th_handler(self, handler): def write(self, frame): """ - Send a given framed bit of data by sending it out the serial interface. It will attempt to reconnect if there is + Send a given framed bit of data by sending it out the serial interface. It will attempt to reconnect if there was a problem previously. This function will return true on success, or false on error. :param frame: framed data packet to send out @@ -151,6 +153,11 @@ def th_alive(self, interval): self.write(IpAdapter.KEEPALIVE_DATA) time.sleep(interval) + @classmethod + def get_name(cls): + """ Get the name of this adapter """ + return "ip" + @classmethod def get_arguments(cls): """ @@ -163,13 +170,13 @@ def get_arguments(cls): "dest": "address", "type": str, "default": "0.0.0.0", - "help": "Address of the IP adapter server. Default: %(default)s", + "help": "Address of the IP adapter server.", }, ("--ip-port",): { "dest": "port", "type": int, "default": 50000, - "help": "Port of the IP adapter server. Default: %(default)s", + "help": "Port of the IP adapter server.", }, ("--ip-client",): { # dest is "server" since it is handled in BaseAdapter.construct_adapter and passed with the same @@ -181,6 +188,12 @@ def get_arguments(cls): }, } + @classmethod + @gds_plugin_implementation + def register_communication_plugin(cls): + """ Register this as a communication plugin """ + return cls + @classmethod def check_arguments(cls, args): """ diff --git a/src/fprime_gds/common/communication/adapters/uart.py b/src/fprime_gds/common/communication/adapters/uart.py index 56790dba..98a51fd8 100644 --- a/src/fprime_gds/common/communication/adapters/uart.py +++ b/src/fprime_gds/common/communication/adapters/uart.py @@ -15,6 +15,8 @@ import fprime_gds.common.communication.adapters.base +from fprime_gds.plugin.definitions import gds_plugin_implementation + LOGGER = logging.getLogger("serial_adapter") @@ -151,16 +153,27 @@ def get_arguments(cls): "dest": "device", "type": str, "default": default, - "help": "UART device representing the FSW. Default: %(default)s", + "help": "UART device representing the FSW.", }, ("--uart-baud",): { "dest": "baud", "type": int, "default": 9600, - "help": "Baud rate of the serial device. Default: %(default)s", + "help": "Baud rate of the serial device.", }, } + @classmethod + def get_name(cls): + """ Get name of the adapter """ + return "uart" + + @classmethod + @gds_plugin_implementation + def register_communication_plugin(cls): + """ Register this as a communication plugin """ + return cls + @classmethod def check_arguments(cls, args): """ diff --git a/src/fprime_gds/common/communication/checksum.py b/src/fprime_gds/common/communication/checksum.py index e3130dc5..002b381c 100644 --- a/src/fprime_gds/common/communication/checksum.py +++ b/src/fprime_gds/common/communication/checksum.py @@ -11,7 +11,6 @@ def crc_calculation(data: bytes): return zlib.crc32(data) & 0xFFFFFFFF -CHECKSUM_SELECTION = "crc32" CHECKSUM_MAPPING = { "fixed": lambda data: 0xCAFECAFE, "crc32": crc_calculation, @@ -19,8 +18,7 @@ def crc_calculation(data: bytes): } -def calculate_checksum(data: bytes): +def calculate_checksum(data: bytes, selected_checksum: str): """Calculates the checksum of bytes""" - selected_checksum = CHECKSUM_SELECTION hash_fn = CHECKSUM_MAPPING.get(selected_checksum, CHECKSUM_MAPPING.get("default")) return hash_fn(data) diff --git a/src/fprime_gds/common/communication/framing.py b/src/fprime_gds/common/communication/framing.py index 71dea6a6..66717772 100644 --- a/src/fprime_gds/common/communication/framing.py +++ b/src/fprime_gds/common/communication/framing.py @@ -16,7 +16,8 @@ import struct import sys -from .checksum import calculate_checksum +from .checksum import calculate_checksum, CHECKSUM_MAPPING +from fprime_gds.plugin.definitions import gds_plugin_implementation class FramerDeframer(abc.ABC): @@ -97,10 +98,11 @@ class FpFramerDeframer(FramerDeframer): HEADER_FORMAT = None START_TOKEN = None - def __init__(self): + def __init__(self, checksum_type): """Sets constants on construction.""" # Setup the constants as soon as possible. FpFramerDeframer.set_constants() + self.checksum = checksum_type @classmethod def set_constants(cls): @@ -134,7 +136,7 @@ def frame(self, data): FpFramerDeframer.HEADER_FORMAT, FpFramerDeframer.START_TOKEN, len(data) ) framed += data - framed += struct.pack(">I", calculate_checksum(framed)) + framed += struct.pack(">I", calculate_checksum(framed, self.checksum)) return framed def deframe(self, data, no_copy=False): @@ -176,7 +178,8 @@ def deframe(self, data, no_copy=False): ) # If the checksum is valid, return the packet. Otherwise continue to rotate if check == calculate_checksum( - data[: data_size + FpFramerDeframer.HEADER_SIZE] + data[: data_size + FpFramerDeframer.HEADER_SIZE], + self.checksum ): data = data[total_size:] return deframed, data, discarded @@ -192,6 +195,33 @@ def deframe(self, data, no_copy=False): return None, data, discarded return None, data, discarded + @classmethod + def get_name(cls): + """ Get the name of this plugin """ + return "fprime" + + @classmethod + def get_arguments(cls): + """ Get arguments for the framer/deframer """ + return {("--comm-checksum-type",): { + "dest": "checksum_type", + "action": "store", + "type": str, + "help": "Setup the checksum algorithm. [default: %(default)s]", + "choices": [ + item + for item in CHECKSUM_MAPPING.keys() + if item != "default" + ], + "default": "crc32", + }} + + @classmethod + @gds_plugin_implementation + def register_framing_plugin(cls): + """ Register a bad plugin """ + return cls + class TcpServerFramerDeframer(FramerDeframer): """ diff --git a/src/fprime_gds/executables/cli.py b/src/fprime_gds/executables/cli.py index dcf04128..f2ccbf60 100644 --- a/src/fprime_gds/executables/cli.py +++ b/src/fprime_gds/executables/cli.py @@ -23,12 +23,12 @@ # Required to set the checksum as a module variable import fprime_gds.common.communication.checksum import fprime_gds.common.logger -from fprime_gds.common.communication.adapters.base import BaseAdapter from fprime_gds.common.communication.adapters.ip import check_port from fprime_gds.common.pipeline.standard import StandardPipeline from fprime_gds.common.transport import ThreadedTCPSocketClient from fprime_gds.common.utils.config_manager import ConfigManager from fprime_gds.executables.utils import find_app, find_dict, get_artifacts_root +from fprime_gds.plugin.system import Plugins # Optional import: ZeroMQ. Requires package: pyzmq try: @@ -45,7 +45,6 @@ except ImportError: SerialAdapter = None - GUIS = ["none", "html"] @@ -85,7 +84,10 @@ def get_parser(self) -> argparse.ArgumentParser: Return: argparse parser for supplied arguments """ - parser = argparse.ArgumentParser(description=self.description, add_help=True) + parser = argparse.ArgumentParser( + description=self.description, add_help=True, + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) for flags, keywords in self.get_arguments().items(): parser.add_argument(*flags, **keywords) return parser @@ -96,7 +98,7 @@ def reproduce_cli_args(self, args_ns): def flag_member(flags, argparse_inputs) -> Tuple[str, str]: """Get the best CLI flag and namespace member""" best_flag = ( - [flag for flag in flags if flag.startswith("--")] + list(flags) + [flag for flag in flags if flag.startswith("--")] + list(flags) )[0] member = argparse_inputs.get( "dest", re.sub(r"^-+", "", best_flag).replace("-", "_") @@ -117,7 +119,7 @@ def cli_arguments(flags, argparse_inputs) -> List[str]: # Handle arguments if (action == "store_true" and value) or ( - action == "store_false" and not value + action == "store_false" and not value ): return [best_flag] elif action != "store" or value is None: @@ -148,10 +150,10 @@ def handle_arguments(self, args, **kwargs): @staticmethod def parse_args( - parser_classes, - description="No tool description provided", - arguments=None, - **kwargs, + parser_classes, + description="No tool description provided", + arguments=None, + **kwargs, ): """Parse and post-process arguments @@ -234,7 +236,7 @@ def handle_arguments(self, args, **kwargs): raise Exception(msg) # Works for the old structure where the bin, lib, and dict directories live immediately under the platform elif len(child_directories) == 3 and set( - [path.name for path in child_directories] + [path.name for path in child_directories] ) == {"bin", "lib", "dict"}: args.deployment = detected_toolchain return args @@ -245,6 +247,75 @@ def handle_arguments(self, args, **kwargs): return args +class PluginArgumentParser(ParserBase): + """ Parser for arguments coming from plugins """ + DESCRIPTION = "Parse plugin CLI arguments and selections" + FPRIME_CHOICES = { + "framing": "fprime", + "communication": "ip", + } + + def __init__(self): + """ Initialize the plugin information for this parser """ + self._plugin_map = { + category: { + self.get_selection_name(selection): selection for selection in Plugins.system().get_selections(category) + } for category in Plugins.system().get_categories() + } + + def get_arguments(self) -> Dict[Tuple[str, ...], Dict[str, Any]]: + """ Return arguments to used in plugins """ + + arguments: Dict[Tuple[str, ...], Dict[str, Any]] = {} + for category, selections in self._plugin_map.items(): + arguments.update({ + (f"--{category}-selection",): { + "choices": [choice for choice in selections.keys()], + "help": f"Select {category} implementer.", + "default": self.FPRIME_CHOICES.get(category, list(selections.keys())[0]) + } + }) + for selection_name, selection in selections.items(): + arguments.update(self.get_selection_arguments(selection)) + return arguments + + def handle_arguments(self, args, **kwargs): + """ Handles the arguments """ + for category, selections in self._plugin_map.items(): + selection_string = getattr(args, f"{category}_selection") + selection_class = selections[selection_string] + filled_arguments = self.map_selection_arguments(args, selection_class) + selection_instance = selection_class(**filled_arguments) + setattr(args, f"{category}_selection_instance", selection_instance) + return args + + @staticmethod + def get_selection_name(selection): + """ Get the name of a selection """ + return selection.get_name() if hasattr(selection, "get_name") else selection.__name__ + + @staticmethod + def get_selection_arguments(selection) -> Dict[Tuple[str, ...], Dict[str, Any]]: + """ Get the name of a selection """ + return selection.get_arguments() if hasattr(selection, "get_arguments") else {} + + @staticmethod + def map_selection_arguments(args, selection) -> Dict[str, Any]: + """ Get the name of a selection """ + expected_args = PluginArgumentParser.get_selection_arguments(selection) + argument_destinations = [ + value["dest"] if "dest" in value else key[0].replace("--", "").replace("-", "_") + for key, value in expected_args.items() + ] + filled_arguments = { + destination: getattr(args, destination) for destination in argument_destinations + } + # Check arguments or yield a Value error + if hasattr(selection, "check_arguments"): + selection.check_arguments(filled_arguments) + return filled_arguments + + class CompositeParser(ParserBase): """Composite parser handles parsing as a composition of multiple other parsers""" @@ -286,49 +357,14 @@ def handle_arguments(self, args, **kwargs): return args -class CommAdapterParser(ParserBase): - """ - Handles parsing of all of the comm-layer arguments. This means selecting a comm adapter, and passing the arguments - required to setup that comm adapter. In addition, this parser uses the import parser to import modules such that a - user may import other adapter implementation files. - """ +class CommExtraParser(ParserBase): + """ Parses extra communication arguments """ - DESCRIPTION = "Process arguments needed to specify a comm-adapter" + DESCRIPTION = "Process arguments needed to specify arguments for communication" def get_arguments(self) -> Dict[Tuple[str, ...], Dict[str, Any]]: """Get arguments for the comm-layer parser""" - adapter_definition_dictionaries = BaseAdapter.get_adapters() - adapter_arguments = {} - for name, adapter in adapter_definition_dictionaries.items(): - adapter_arguments_callable = getattr(adapter, "get_arguments", None) - if not callable(adapter_arguments_callable): - print( - f"[WARNING] '{name}' does not have 'get_arguments' method, skipping.", - file=sys.stderr, - ) - continue - adapter_arguments.update(adapter.get_arguments()) com_arguments = { - ("--comm-adapter",): { - "dest": "adapter", - "action": "store", - "type": str, - "help": "Adapter for communicating to flight deployment. [default: %(default)s]", - "choices": ["none"] + list(adapter_definition_dictionaries), - "default": "ip", - }, - ("--comm-checksum-type",): { - "dest": "checksum_type", - "action": "store", - "type": str, - "help": "Setup the checksum algorithm. [default: %(default)s]", - "choices": [ - item - for item in fprime_gds.common.communication.checksum.CHECKSUM_MAPPING.keys() - if item != "default" - ], - "default": fprime_gds.common.communication.checksum.CHECKSUM_SELECTION, - }, ("--output-unframed-data",): { "dest": "output_unframed_data", "action": "store", @@ -339,17 +375,9 @@ def get_arguments(self) -> Dict[Tuple[str, ...], Dict[str, Any]]: "required": False, }, } - return {**adapter_arguments, **com_arguments} + return com_arguments def handle_arguments(self, args, **kwargs): - """ - Handle the input arguments for the parser. This will help setup the adapter with its expected arguments. - - :param args: parsed arguments in namespace format - :return: namespace with "comm_adapter" value added - """ - args.comm_adapter = BaseAdapter.construct_adapter(args.adapter, args) - fprime_gds.common.communication.checksum.CHECKSUM_SELECTION = args.checksum_type return args @@ -618,7 +646,7 @@ def pipeline_factory(args_ns, pipeline=None) -> StandardPipeline: class CommParser(CompositeParser): """Comm Executable Parser""" - CONSTITUENTS = [CommAdapterParser, MiddleWareParser, LogDeployParser] + CONSTITUENTS = [CommExtraParser, MiddleWareParser, LogDeployParser, PluginArgumentParser] def __init__(self): """Initialization""" diff --git a/src/fprime_gds/executables/comm.py b/src/fprime_gds/executables/comm.py index 1cabd25b..a2c9f74a 100644 --- a/src/fprime_gds/executables/comm.py +++ b/src/fprime_gds/executables/comm.py @@ -30,11 +30,9 @@ ZmqGround = None import fprime_gds.common.communication.adapters.base import fprime_gds.common.communication.adapters.ip -import fprime_gds.common.communication.checksum import fprime_gds.common.communication.ground import fprime_gds.common.logger import fprime_gds.executables.cli -from fprime_gds.common.communication.framing import FpFramerDeframer from fprime_gds.common.communication.updown import Downlinker, Uplinker # Uses non-standard PIP package pyserial, so test the waters before getting a hard-import crash @@ -58,12 +56,12 @@ def main(): fprime_gds.executables.cli.LogDeployParser, fprime_gds.executables.cli.MiddleWareParser, fprime_gds.executables.cli.CommParser, + fprime_gds.executables.cli.PluginArgumentParser, ], description="F prime communications layer.", client=True, ) - fprime_gds.common.communication.checksum = args.checksum_type - if args.comm_adapter == "none": + if args.communication_selection == "none": print("[ERROR] Comm adapter set to 'none'. Nothing to do but exit.", file=sys.stderr) sys.exit(-1) @@ -81,15 +79,15 @@ def main(): args.tts_addr, args.tts_port ) - adapter = args.comm_adapter + adapter = args.communication_selection_instance # Set the framing class used and pass it to the uplink and downlink component constructions giving each a separate # instantiation - framer_class = FpFramerDeframer + framer_instance = args.framing_selection_instance LOGGER.info( "Starting uplinker/downlinker connecting to FSW using %s with %s", - adapter, - framer_class.__name__, + args.communication_selection, + args.framing_selection ) discarded_file_handle = None try: @@ -108,9 +106,9 @@ def main(): discarded_file_handle_path, ) downlinker = Downlinker( - adapter, ground, framer_class(), discarded=discarded_file_handle + adapter, ground, framer_instance, discarded=discarded_file_handle ) - uplinker = Uplinker(adapter, ground, framer_class(), downlinker) + uplinker = Uplinker(adapter, ground, framer_instance, downlinker) # Open resources for the handlers on either side, this prepares the resources needed for reading/writing data ground.open() @@ -121,8 +119,8 @@ def main(): uplinker.start() LOGGER.debug("Uplinker and downlinker running") - # Wait for shutdown event in the form of a KeyboardInterrupt then stop the processing, close resources, and wait for - # everything to terminate as expected. + # Wait for shutdown event in the form of a KeyboardInterrupt then stop the processing, close resources, + # and wait for everything to terminate as expected. def shutdown(*_): """Shutdown function for signals""" uplinker.stop() diff --git a/src/fprime_gds/executables/run_deployment.py b/src/fprime_gds/executables/run_deployment.py index f0599505..29a631c0 100644 --- a/src/fprime_gds/executables/run_deployment.py +++ b/src/fprime_gds/executables/run_deployment.py @@ -13,6 +13,7 @@ GdsParser, ParserBase, StandardPipelineParser, + PluginArgumentParser ) from fprime_gds.executables.utils import AppWrapperException, run_wrapped_application @@ -27,7 +28,7 @@ def parse_args(): :return: parsed argument namespace """ # Get custom handlers for all executables we are running - arg_handlers = [StandardPipelineParser, GdsParser, BinaryDeployment, CommParser] + arg_handlers = [StandardPipelineParser, GdsParser, BinaryDeployment, CommParser, PluginArgumentParser] # Parse the arguments, and refine through all handlers args, parser = ParserBase.parse_args(arg_handlers, "Run F prime deployment and GDS") return args @@ -145,7 +146,7 @@ def launch_comm(parsed_args): arguments = CommParser().reproduce_cli_args(parsed_args) arguments = arguments + ["--log-directly"] if "--log-directly" not in arguments else arguments app_cmd = BASE_MODULE_ARGUMENTS + ["fprime_gds.executables.comm"] + arguments - return launch_process(app_cmd, name=f'comm[{parsed_args.adapter}] Application', launch_time=1) + return launch_process(app_cmd, name=f'comm[{parsed_args.communication_selection}] Application', launch_time=1) def main(): @@ -160,12 +161,12 @@ def main(): launchers.append(launch_tts) # Check if we are running with communications - if parsed_args.adapter != "none": + if parsed_args.communication_selection != "none": launchers.append(launch_comm) # Add app, if possible if parsed_args.app: - if parsed_args.adapter == "ip": + if parsed_args.communication_selection == "ip": launchers.append(launch_app) else: print("[WARNING] App cannot be auto-launched without IP adapter", file=sys.stderr) diff --git a/src/fprime_gds/plugin/__init__.py b/src/fprime_gds/plugin/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/fprime_gds/plugin/definitions.py b/src/fprime_gds/plugin/definitions.py new file mode 100644 index 00000000..af07ae45 --- /dev/null +++ b/src/fprime_gds/plugin/definitions.py @@ -0,0 +1,49 @@ +""" fprime_gds.plugin.definitions: definitions of plugin specifications and decorators + +In order to define a plugin, an implementation decorator is used. Users can import `gds_plugin_implementation` from this +file to decorate functions that implement plugins. + +This file also defines the specifications for these implementations and users may inspect those specifications here. + +@author lestarch +""" +import pluggy +from typing import Type + +PROJECT_NAME = "fprime_gds" + +gds_plugin_specification = pluggy.HookspecMarker(PROJECT_NAME) +gds_plugin_implementation = pluggy.HookimplMarker(PROJECT_NAME) + + +@gds_plugin_specification +def register_framing_plugin() -> Type["FramerDeframer"]: + """Register a plugin to provide framing capabilities + + Plugin hook for registering a plugin that supplies a FramerDeframer implementation. Implementors of this hook must + return a non-abstract subclass of FramerDeframer. This class will be provided as a framing implementation option + that users may select via command line arguments. + + Note: users should return the class, not an instance of the class. Needed arguments for instantiation are + determined from class methods, solicited via the command line, and provided at construction time to the chosen + instantiation. + + Returns: + FramerDeframer subclass + """ + + +@gds_plugin_specification +def register_communication_plugin() -> Type["BaseAdapter"]: + """Register a communications adapter + + Plugin hook for registering a plugin that supplies an adapter to the communications interface (radio, uart, i2c, + etc). This interface is expected to read and write bytes from a wire and will be provided to the framing system. + + Note: users should return the class, not an instance of the class. Needed arguments for instantiation are + determined from class methods, solicited via the command line, and provided at construction time to the chosen + instantiation. + + Returns: + BaseAdapter subclass + """ diff --git a/src/fprime_gds/plugin/system.py b/src/fprime_gds/plugin/system.py new file mode 100644 index 00000000..10f9b9d4 --- /dev/null +++ b/src/fprime_gds/plugin/system.py @@ -0,0 +1,160 @@ +""" fprime_gds.plugin.system: implementation of plugins + +This file contains the implementation and registration of plugins for fprime_gds. Primarily, it defines the Plugins +class that handles plugins. Users can acquire the Plugin singleton with `Plugin.system()`. + +This file also imports and registers plugin implementations built-into fprime-gds. These plugins are not registered +using entrypoints. + +@author lestarch +""" +import inspect +import logging +import re +from typing import Iterable + +import pluggy + + +import fprime_gds.plugin.definitions as definitions + +# For automatic validation of plugins, each plugin class type must be imported here +import fprime_gds.common.communication.framing as framing + +import fprime_gds.common.communication.adapters.base as base +import fprime_gds.common.communication.adapters.ip as ip + +try: + import fprime_gds.common.communication.adapters.uart as uart +except ImportError: + uart = None + +PROJECT_NAME = definitions.PROJECT_NAME +LOGGER = logging.getLogger(__name__) + +_NAME_REGEX = re.compile(r"^register_(\w+)_plugin") +_TYPE_MAPPINGS = { + definitions.register_framing_plugin: framing.FramerDeframer, + definitions.register_communication_plugin: base.BaseAdapter, +} +_SUPPLIED_PLUGIN_MODULES_OR_CLASSES = [ + framing.FpFramerDeframer, + base.NoneAdapter, + ip.IpAdapter, +] +if uart is not None: + _SUPPLIED_PLUGIN_MODULES_OR_CLASSES.append(uart.SerialAdapter) + + +class PluginException(Exception): + pass + + +class InvalidCategoryException(PluginException): + pass + + +class Plugins(object): + """GDS plugin system providing a plugin Singleton for use across the GDS""" + + _singleton = None + + def __init__(self): + """Initialize the plugin system""" + self.manager = pluggy.PluginManager(PROJECT_NAME) + self.manager.add_hookspecs(definitions) + self.manager.load_setuptools_entrypoints(PROJECT_NAME) + for module in _SUPPLIED_PLUGIN_MODULES_OR_CLASSES: + self.manager.register(module) + + def get_selections(self, category) -> Iterable: + """Get available plugin selections + + Gets all plugin implementors of "category" by looking for register__plugin implementors. If such a + function does not exist then this results in an exception. + + Args: + category: category of the plugin requested + """ + plugin_function_name = f"register_{category}_plugin" + if not hasattr(definitions, plugin_function_name) or not hasattr( + self.manager.hook, plugin_function_name + ): + raise InvalidCategoryException(f"Invalid plugin category: {category}") + selections = getattr(self.manager.hook, plugin_function_name)() + return [ + selection + for selection in selections + if self.validate_selection(category, selection) + ] + + def get_categories(self): + """Get all plugin categories""" + specifications = _TYPE_MAPPINGS.keys() + matches = [ + _NAME_REGEX.match(specification.__name__) + for specification in specifications + ] + return [match.group(1) for match in matches if match] + + def register_plugin(self, module_or_class): + """Register a plugin directly + + Allows local registration of plugin implementations that are shipped as part of the GDS package. + + Args: + module_or_class: module or class that has plugin implementations + """ + self.manager.register(module_or_class) + + @staticmethod + def validate_selection(category, result): + """Validate the result of plugin hook + + Validates the result of a plugin hook call to ensure the result meets the expected properties for plugins of the + given category. Primarily this ensures that this plugin returns a concrete subclass of the expected type. + + Args: + category: category of plugin used + result: result from the plugin hook call + Return: + True when the plugin passes validation, False otherwise + """ + plugin_function_name = f"register_{category}_plugin" + assert hasattr( + definitions, plugin_function_name + ), "Plugin category failed pre-validation" + # Typing library not intended for introspection at runtime, thus we maintain a map of plugin specification + # functions to the types expected as a return value. When this is not found, plugins may continue without + # automatic validation. + try: + plugin_specification_function = getattr(definitions, plugin_function_name) + expected = _TYPE_MAPPINGS[plugin_specification_function] + + # Validate the result + if not issubclass(result, expected): + LOGGER.warning( + f"{result.__name__} is not a subclass of {expected.__name__}. Not registering." + ) + return False + elif inspect.isabstract(result): + LOGGER.warning( + f"{result.__name__} is an abstract class. Not registering." + ) + return False + except KeyError: + LOGGER.warning( + f"Plugin not registered for validation. Continuing without validation." + ) + return True + + @classmethod + def system(cls) -> "PluginSystem": + """Construct singleton if needed then return it""" + return cls._build_singleton() + + @classmethod + def _build_singleton(cls) -> "PluginSystem": + """Build a singleton for this class""" + cls._singleton = cls._singleton if cls._singleton is not None else cls() + return cls._singleton diff --git a/test/fprime_gds/test_plugins.py b/test/fprime_gds/test_plugins.py new file mode 100644 index 00000000..a69974eb --- /dev/null +++ b/test/fprime_gds/test_plugins.py @@ -0,0 +1,142 @@ +import pytest +from fprime_gds.common.communication.adapters.ip import IpAdapter +from fprime_gds.common.communication.adapters.base import NoneAdapter +from fprime_gds.common.communication.adapters.uart import SerialAdapter +from fprime_gds.common.communication.framing import FramerDeframer, FpFramerDeframer +from fprime_gds.executables.cli import ParserBase, PluginArgumentParser +from fprime_gds.plugin.definitions import gds_plugin_implementation +from fprime_gds.plugin.system import Plugins + + +class BadSuperClass(object): + """ A bad framing plugin (inappropriate parent) """ + + @classmethod + @gds_plugin_implementation + def register_framing_plugin(cls): + """ Register a bad plugin """ + return cls + + +class BadImplementation(FramerDeframer): + """ A bad framing plugin (inappropriate parent) """ + + @classmethod + @gds_plugin_implementation + def register_framing_plugin(cls): + """ Register a bad plugin """ + return cls + + +class Good(FramerDeframer): + """ A good framing plugin """ + + def frame(self, data): + pass + + def deframe(self, data, no_copy=False): + pass + + @classmethod + @gds_plugin_implementation + def register_framing_plugin(cls): + """ Register a bad plugin """ + return cls + + +class GoodWithArgs(FramerDeframer): + """ A good framing plugin """ + NAMED_PLUGIN_NAME = "good-with-args" + + def __init__(self, my_fancy_arg, fancy_2): + """ Fancy argument input processing """ + self.my_fancy_arg = my_fancy_arg + self.fancy_2 = fancy_2 + + def frame(self, data): + pass + + def deframe(self, data, no_copy=False): + pass + + @classmethod + def get_name(cls): + """ Return the name of this function """ + return cls.NAMED_PLUGIN_NAME + + @classmethod + def get_arguments(cls): + """ Return the name of this function """ + return { + ("--my-fancy-arg", ): { + "type": str, + "help": "Some help string" + }, + ("--my-fancy-arg-with-dest", ): { + "dest": "fancy_2", + "type": int, + "help": "Some help string" + }, + } + + @classmethod + @gds_plugin_implementation + def register_framing_plugin(cls): + """ Register a bad plugin """ + return cls + + +@pytest.fixture() +def plugins(): + """ Register test plugins as fixture for testing """ + system = Plugins() + system.register_plugin(BadSuperClass) + system.register_plugin(BadImplementation) + system.register_plugin(Good) + system.register_plugin(GoodWithArgs) + Plugins._singleton = system + return system + + +def test_base_plugin(plugins): + """ Tests good framing plugins are returned """ + plugin_options = plugins.get_selections("framing") + assert Good in plugin_options, "Good plugin not registered as expected" + + +def test_framing_builtin_plugins(plugins): + """ Tests good framing plugins are returned """ + plugin_options = plugins.get_selections("framing") + assert FpFramerDeframer in plugin_options, "FpFramerDeframer plugin not registered as expected" + + +def test_communication_builtin_plugins(plugins): + """ Tests good framing plugins are returned """ + plugin_options = plugins.get_selections("communication") + for expected in [IpAdapter, NoneAdapter, SerialAdapter]: + assert expected in plugin_options, f"{expected.__name__} plugin not registered as expected" + + +def test_plugin_categories(plugins): + """ Tests plugin categories """ + plugin_categories = plugins.get_categories() + assert sorted(plugin_categories) == sorted(["communication", "framing"]), "Detected plugin categories incorrect" + + +def test_plugin_validation(plugins): + """ Tests good framing plugins are returned """ + plugin_options = plugins.get_selections("framing") + assert BadSuperClass not in plugin_options, "Plugin with bad parent class not excluded as expected" + assert BadImplementation not in plugin_options, "Plugin with abstract implementation not excluded as expected" + + +def test_plugin_arguments(plugins): + """ Tests that arguments can be parsed and supplied to a plugin """ + a_string = "a_string" + a_number = "201" + to_parse = ["--framing", "good-with-args", "--my-fancy-arg", a_string, "--my-fancy-arg-with-dest", a_number] + args, _ = ParserBase.parse_args([PluginArgumentParser,], arguments=to_parse) + assert args.framing_selection == GoodWithArgs.NAMED_PLUGIN_NAME, "Improper framing selection" + assert isinstance(args.framing_selection_instance, GoodWithArgs), "Invalid instance created" + assert args.framing_selection_instance.my_fancy_arg == a_string, "String argument did not process" + assert args.framing_selection_instance.fancy_2 == int(a_number), "Integer argument did not process"