Skip to content

Commit

Permalink
Formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-bc committed Mar 11, 2024
1 parent 1aa36bb commit 26b87f2
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 56 deletions.
41 changes: 29 additions & 12 deletions src/fprime_gds/common/zmq_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
@author lestarch
"""

import logging
import struct
from typing import Tuple
Expand All @@ -24,6 +25,7 @@

LOGGER = logging.getLogger("transport")


class ZmqWrapper(object):
"""Handler for ZMQ functions for use in other objects"""

Expand All @@ -50,7 +52,9 @@ def configure(self, transport_url: Tuple[str], sub_topic: bytes, pub_topic: byte
sub_topic: subscription topic used to filter incoming messages
pub_topic: publication topic supplied for remote subscription filters
"""
assert len(transport_url) == 2, f"Must supply a pair of URLs for ZeroMQ not '{transport_url}'"
assert (
len(transport_url) == 2
), f"Must supply a pair of URLs for ZeroMQ not '{transport_url}'"
self.pub_topic = pub_topic
self.sub_topic = sub_topic
self.transport_url = transport_url
Expand Down Expand Up @@ -78,8 +82,12 @@ def connect_outgoing(self):
The connection is made using self.transport_url, and as such, this must be configured before running. This is
intended to be called on the sending thread.
"""
assert self.transport_url is not None and len(self.transport_url) == 2, "Must configure before connecting"
assert self.zmq_socket_outgoing is None, "Cannot connect outgoing multiple times"
assert (
self.transport_url is not None and len(self.transport_url) == 2
), "Must configure before connecting"
assert (
self.zmq_socket_outgoing is None
), "Cannot connect outgoing multiple times"
assert self.pub_topic is not None, "Must configure sockets before connecting"
self.zmq_socket_outgoing = self.context.socket(zmq.PUB)
self.zmq_socket_outgoing.setsockopt(zmq.SNDHWM, 0)
Expand All @@ -93,7 +101,7 @@ def connect_outgoing(self):
self.zmq_socket_outgoing.connect(self.transport_url[0])

def connect_incoming(self):
""" Sets up a ZeroMQ connection for incoming data
"""Sets up a ZeroMQ connection for incoming data
ZeroMQ allows multiple connections to a single endpoint. This only affects incoming connections as sockets must
be created on their owning threads. This will connect the ZeroMQ topology and if self.server is set, will bind
Expand All @@ -102,8 +110,12 @@ def connect_incoming(self):
The connection is made using self.transport_url, and as such, this must be configured before running. This is
intended to be called on the receiving thread.
"""
assert self.transport_url is not None and len(self.transport_url) == 2, "Must configure before connecting"
assert self.zmq_socket_incoming is None, "Cannot connect incoming multiple times"
assert (
self.transport_url is not None and len(self.transport_url) == 2
), "Must configure before connecting"
assert (
self.zmq_socket_incoming is None
), "Cannot connect incoming multiple times"
assert self.sub_topic is not None, "Must configure sockets before connecting"
self.zmq_socket_incoming = self.context.socket(zmq.SUB)
self.zmq_socket_incoming.setsockopt(zmq.RCVHWM, 0)
Expand All @@ -125,7 +137,7 @@ def disconnect_incoming(self):
self.zmq_socket_incoming.close()

def terminate(self):
""" Terminate the ZeroMQ context"""
"""Terminate the ZeroMQ context"""
self.context.term()

def recv(self, timeout=None):
Expand Down Expand Up @@ -162,11 +174,14 @@ def __init__(self):
self.zmq = ZmqWrapper()

def connect(
self, transport_url: Tuple[str], sub_routing: RoutingTag, pub_routing: RoutingTag
self,
transport_url: Tuple[str],
sub_routing: RoutingTag,
pub_routing: RoutingTag,
):
"""Connects to the ZeroMQ network"""
self.zmq.configure(transport_url, sub_routing.value, pub_routing.value)
self.zmq.connect_outgoing() # Outgoing socket, for clients, exists on the current thread
self.zmq.connect_outgoing() # Outgoing socket, for clients, exists on the current thread
super().connect(transport_url, sub_routing, pub_routing)

def disconnect(self):
Expand All @@ -176,16 +191,18 @@ def disconnect(self):

def send(self, data):
"""Send data via ZeroMQ"""
if data[:4] == b'ZZZZ':
if data[:4] == b"ZZZZ":
data = data[4:]
self.zmq.send(data) # Must strip out ZZZZ as that is a ThreadedTcpServer only property
self.zmq.send(
data
) # Must strip out ZZZZ as that is a ThreadedTcpServer only property

def recv(self, timeout=None):
"""Receives data from ZeroMQ"""
return self.zmq.recv(timeout)

def recv_thread(self):
""" Overrides the recv_thread method
"""Overrides the recv_thread method
Overrides the recv_thread method of the superclass such that the ZeroMQ socket may be created/destroyed
before/after the main recv loop.
Expand Down
81 changes: 52 additions & 29 deletions src/fprime_gds/executables/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
@author mstarch
"""

import argparse
import datetime
import errno
Expand Down Expand Up @@ -85,8 +86,9 @@ def get_parser(self) -> argparse.ArgumentParser:
argparse parser for supplied arguments
"""
parser = argparse.ArgumentParser(
description=self.description, add_help=True,
formatter_class=argparse.ArgumentDefaultsHelpFormatter
description=self.description,
add_help=True,
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
for flags, keywords in self.get_arguments().items():
parser.add_argument(*flags, **keywords)
Expand All @@ -98,7 +100,7 @@ def reproduce_cli_args(self, args_ns):
def flag_member(flags, argparse_inputs) -> Tuple[str, str]:
"""Get the best CLI flag and namespace member"""
best_flag = (
[flag for flag in flags if flag.startswith("--")] + list(flags)
[flag for flag in flags if flag.startswith("--")] + list(flags)
)[0]
member = argparse_inputs.get(
"dest", re.sub(r"^-+", "", best_flag).replace("-", "_")
Expand All @@ -119,7 +121,7 @@ def cli_arguments(flags, argparse_inputs) -> List[str]:

# Handle arguments
if (action == "store_true" and value) or (
action == "store_false" and not value
action == "store_false" and not value
):
return [best_flag]
elif action != "store" or value is None:
Expand Down Expand Up @@ -150,10 +152,10 @@ def handle_arguments(self, args, **kwargs):

@staticmethod
def parse_args(
parser_classes,
description="No tool description provided",
arguments=None,
**kwargs,
parser_classes,
description="No tool description provided",
arguments=None,
**kwargs,
):
"""Parse and post-process arguments
Expand Down Expand Up @@ -236,7 +238,7 @@ def handle_arguments(self, args, **kwargs):
raise Exception(msg)
# Works for the old structure where the bin, lib, and dict directories live immediately under the platform
elif len(child_directories) == 3 and set(
[path.name for path in child_directories]
[path.name for path in child_directories]
) == {"bin", "lib", "dict"}:
args.deployment = detected_toolchain
return args
Expand All @@ -248,39 +250,46 @@ def handle_arguments(self, args, **kwargs):


class PluginArgumentParser(ParserBase):
""" Parser for arguments coming from plugins """
"""Parser for arguments coming from plugins"""

DESCRIPTION = "Parse plugin CLI arguments and selections"
FPRIME_CHOICES = {
"framing": "fprime",
"communication": "ip",
}

def __init__(self):
""" Initialize the plugin information for this parser """
"""Initialize the plugin information for this parser"""
self._plugin_map = {
category: {
self.get_selection_name(selection): selection for selection in Plugins.system().get_selections(category)
} for category in Plugins.system().get_categories()
self.get_selection_name(selection): selection
for selection in Plugins.system().get_selections(category)
}
for category in Plugins.system().get_categories()
}

def get_arguments(self) -> Dict[Tuple[str, ...], Dict[str, Any]]:
""" Return arguments to used in plugins """
"""Return arguments to used in plugins"""

arguments: Dict[Tuple[str, ...], Dict[str, Any]] = {}
for category, selections in self._plugin_map.items():
arguments.update({
(f"--{category}-selection",): {
"choices": [choice for choice in selections.keys()],
"help": f"Select {category} implementer.",
"default": self.FPRIME_CHOICES.get(category, list(selections.keys())[0])
arguments.update(
{
(f"--{category}-selection",): {
"choices": [choice for choice in selections.keys()],
"help": f"Select {category} implementer.",
"default": self.FPRIME_CHOICES.get(
category, list(selections.keys())[0]
),
}
}
})
)
for selection_name, selection in selections.items():
arguments.update(self.get_selection_arguments(selection))
return arguments

def handle_arguments(self, args, **kwargs):
""" Handles the arguments """
"""Handles the arguments"""
for category, selections in self._plugin_map.items():
selection_string = getattr(args, f"{category}_selection")
selection_class = selections[selection_string]
Expand All @@ -291,24 +300,33 @@ def handle_arguments(self, args, **kwargs):

@staticmethod
def get_selection_name(selection):
""" Get the name of a selection """
return selection.get_name() if hasattr(selection, "get_name") else selection.__name__
"""Get the name of a selection"""
return (
selection.get_name()
if hasattr(selection, "get_name")
else selection.__name__
)

@staticmethod
def get_selection_arguments(selection) -> Dict[Tuple[str, ...], Dict[str, Any]]:
""" Get the name of a selection """
"""Get the name of a selection"""
return selection.get_arguments() if hasattr(selection, "get_arguments") else {}

@staticmethod
def map_selection_arguments(args, selection) -> Dict[str, Any]:
""" Get the name of a selection """
"""Get the name of a selection"""
expected_args = PluginArgumentParser.get_selection_arguments(selection)
argument_destinations = [
value["dest"] if "dest" in value else key[0].replace("--", "").replace("-", "_")
(
value["dest"]
if "dest" in value
else key[0].replace("--", "").replace("-", "_")
)
for key, value in expected_args.items()
]
filled_arguments = {
destination: getattr(args, destination) for destination in argument_destinations
destination: getattr(args, destination)
for destination in argument_destinations
}
# Check arguments or yield a Value error
if hasattr(selection, "check_arguments"):
Expand Down Expand Up @@ -358,7 +376,7 @@ def handle_arguments(self, args, **kwargs):


class CommExtraParser(ParserBase):
""" Parses extra communication arguments """
"""Parses extra communication arguments"""

DESCRIPTION = "Process arguments needed to specify arguments for communication"

Expand Down Expand Up @@ -640,7 +658,12 @@ def pipeline_factory(args_ns, pipeline=None) -> StandardPipeline:
class CommParser(CompositeParser):
"""Comm Executable Parser"""

CONSTITUENTS = [CommExtraParser, MiddleWareParser, LogDeployParser, PluginArgumentParser]
CONSTITUENTS = [
CommExtraParser,
MiddleWareParser,
LogDeployParser,
PluginArgumentParser,
]

def __init__(self):
"""Initialization"""
Expand Down
8 changes: 5 additions & 3 deletions src/fprime_gds/executables/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
@author lestarch
"""


import logging
import signal
import sys
Expand Down Expand Up @@ -62,7 +61,10 @@ def main():
client=True,
)
if args.communication_selection == "none":
print("[ERROR] Comm adapter set to 'none'. Nothing to do but exit.", file=sys.stderr)
print(
"[ERROR] Comm adapter set to 'none'. Nothing to do but exit.",
file=sys.stderr,
)
sys.exit(-1)

# Create the handling components for either side of this script, adapter for hardware, and ground for the GDS side
Expand All @@ -84,7 +86,7 @@ def main():
LOGGER.info(
"Starting uplinker/downlinker connecting to FSW using %s with %s",
args.communication_selection,
args.framing_selection
args.framing_selection,
)
discarded_file_handle = None
try:
Expand Down
Loading

0 comments on commit 26b87f2

Please sign in to comment.