From 7e095978a384a4f5987d7cd00f41e446e2c43a21 Mon Sep 17 00:00:00 2001 From: Zimri Leisher Date: Mon, 27 Jan 2025 18:40:55 -0600 Subject: [PATCH] Add fprime param json to seq/dat file converter --- .../common/loaders/prm_json_loader.py | 85 +++++++ .../common/templates/prm_template.py | 81 ++++++ src/fprime_gds/common/tools/params.py | 233 ++++++++++++++++++ 3 files changed, 399 insertions(+) create mode 100644 src/fprime_gds/common/loaders/prm_json_loader.py create mode 100644 src/fprime_gds/common/templates/prm_template.py create mode 100644 src/fprime_gds/common/tools/params.py diff --git a/src/fprime_gds/common/loaders/prm_json_loader.py b/src/fprime_gds/common/loaders/prm_json_loader.py new file mode 100644 index 00000000..aa7407da --- /dev/null +++ b/src/fprime_gds/common/loaders/prm_json_loader.py @@ -0,0 +1,85 @@ +""" +prm_json_loader.py: + +Loads flight dictionary (JSON) and returns id and mnemonic based Python dictionaries of params + +@author zimri.leisher +""" + +from fprime_gds.common.templates.prm_template import PrmTemplate +from fprime_gds.common.loaders.json_loader import JsonLoader +from fprime_gds.common.data_types.exceptions import GdsDictionaryParsingException + + +class PrmJsonLoader(JsonLoader): + """Class to load parameters from json dictionaries""" + + PARAMS_FIELD = "parameters" + + ID = "id" + NAME = "name" + TYPE = "type" + DESC = "annotation" + DEFAULT = "default" + + + def construct_dicts(self, _): + """ + Constructs and returns python dictionaries keyed on id and name + + Args: + _: Unused argument (inherited) + Returns: + A tuple with two channel dictionaries (python type dict): + (id_dict, fqn_name_dict). The keys should be the channels' id and + fully qualified name fields respectively and the values should be PrmTemplate + objects. + """ + id_dict = {} + fqn_name_dict = {} + + if self.PARAMS_FIELD not in self.json_dict: + raise GdsDictionaryParsingException( + f"Ground Dictionary missing '{self.PARAMS_FIELD}' field: {str(self.json_file)}" + ) + + for prm_dict in self.json_dict[self.PARAMS_FIELD]: + # Create a channel template object + prm_temp = self.construct_template_from_dict(prm_dict) + + id_dict[prm_temp.get_id()] = prm_temp + fqn_name_dict[prm_temp.get_full_name()] = prm_temp + + return ( + dict(sorted(id_dict.items())), + dict(sorted(fqn_name_dict.items())), + self.get_versions(), + ) + + def construct_template_from_dict(self, prm_dict: dict) -> PrmTemplate: + try: + prm_id = prm_dict[self.ID] + # The below assignment also raises a ValueError if the name does not contain a '.' + qualified_component_name, prm_name = prm_dict[self.NAME].rsplit('.', 1) + if not qualified_component_name or not prm_name: + raise ValueError() + + type_obj = self.parse_type(prm_dict[self.TYPE]) + except ValueError as e: + raise GdsDictionaryParsingException( + f"Parameter dictionary entry malformed, expected name of the form '.' in : {str(prm_dict)}" + ) + except KeyError as e: + raise GdsDictionaryParsingException( + f"{str(e)} key missing from parameter dictionary entry or its associated type in the dictionary: {str(prm_dict)}" + ) + + prm_default_val = prm_dict.get(self.DEFAULT, None) + + return PrmTemplate( + prm_id, + prm_name, + qualified_component_name, + type_obj, + prm_default_val + ) \ No newline at end of file diff --git a/src/fprime_gds/common/templates/prm_template.py b/src/fprime_gds/common/templates/prm_template.py new file mode 100644 index 00000000..cedf09da --- /dev/null +++ b/src/fprime_gds/common/templates/prm_template.py @@ -0,0 +1,81 @@ +""" +@brief Params Template class + +Instances of this class describe a parameter of a component instance (not +including a specific value) + +@date Created January 27, 2025 +@author Zimri Leisher + +@bug Hopefully none +""" + +from fprime.common.models.serialize.type_base import BaseType +from fprime.common.models.serialize.type_exceptions import TypeMismatchException + +from . import data_template + + +class PrmTemplate(data_template.DataTemplate): + """Class for param templates that describe parameters of component instances""" + + def __init__( + self, + prm_id: int, + prm_name: str, + comp_name: str, + prm_type_obj: BaseType, + prm_default_val, + ): + """ + Constructor + + Args: + prm_id: the id of the parameter + prm_name: the name of the parameter + comp_name: the name of the component instance owning this parameter + prm_type_obj: the instance of BaseType corresponding to the type of this parameter + prm_default_val: the default value of this parameter, in raw JSON form + """ + super().__init__() + # Make sure correct types are passed + if not isinstance(prm_id, int): + raise TypeMismatchException(int, type(prm_id)) + + if not isinstance(prm_name, str): + raise TypeMismatchException(str, type(prm_name)) + + if not isinstance(comp_name, str): + raise TypeMismatchException(str, type(comp_name)) + + if not issubclass(prm_type_obj, BaseType): + raise TypeMismatchException(BaseType, prm_type_obj) + + # prm_default_val is an arbitrary type, likely a primitive or dict + + self.prm_id = prm_id + self.prm_name = prm_name + self.comp_name = comp_name + self.prm_type_obj = prm_type_obj + self.prm_default_val = prm_default_val + + def get_full_name(self): + """ + Get the full name of this param + + Returns: + The full name (component.param) for this param + """ + return f"{self.comp_name}.{self.prm_name}" + + def get_id(self): + return self.prm_id + + def get_name(self): + return self.prm_name + + def get_comp_name(self): + return self.comp_name + + def get_type_obj(self): + return self.prm_type_obj diff --git a/src/fprime_gds/common/tools/params.py b/src/fprime_gds/common/tools/params.py new file mode 100644 index 00000000..c587862f --- /dev/null +++ b/src/fprime_gds/common/tools/params.py @@ -0,0 +1,233 @@ +# author: zimri.leisher +# created on: Jan 27, 2025 + +# allow us to use bracketed types +from __future__ import annotations +import json as js +from pathlib import Path +from argparse import ArgumentParser +from fprime_gds.common.loaders.prm_json_loader import PrmJsonLoader +from fprime_gds.common.templates.prm_template import PrmTemplate +from fprime.common.models.serialize.type_base import BaseType +from fprime.common.models.serialize.array_type import ArrayType +from fprime.common.models.serialize.bool_type import BoolType +from fprime.common.models.serialize.enum_type import EnumType +from fprime.common.models.serialize.numerical_types import ( + F32Type, + F64Type, + I8Type, + I16Type, + I32Type, + I64Type, + U8Type, + U16Type, + U32Type, + U64Type, +) +from fprime.common.models.serialize.serializable_type import SerializableType +from fprime.common.models.serialize.string_type import StringType + + +def instantiate_prm_type(prm_val_json, prm_type): + prm_instance = prm_type() + if isinstance(prm_instance, BoolType): + value = str(prm_val_json).lower().strip() + if value in {"true", "yes"}: + av = True + elif value in {"false", "no"}: + av = False + else: + raise RuntimeError("Param value is not a valid boolean") + prm_instance.val = av + elif isinstance(prm_instance, EnumType): + prm_instance.val = prm_val_json + elif isinstance(prm_instance, (F64Type, F32Type)): + prm_instance.val = float(prm_val_json) + elif isinstance( + prm_instance, + (I64Type, U64Type, I32Type, U32Type, I16Type, U16Type, I8Type, U8Type), + ): + prm_instance.val = int(prm_val_json, 0) if isinstance(prm_val_json, str) else int(prm_val_json) + elif isinstance(prm_instance, StringType): + prm_instance.val = prm_val_json + elif isinstance(prm_instance, (ArrayType, SerializableType)): + prm_instance.val = prm_val_json + else: + raise RuntimeError( + "Param value could not be converted to type object" + ) + return prm_instance + + +def parsed_json_to_dat(templates_and_values: list[tuple[PrmTemplate, dict]]) -> bytes: + serialized = bytes() + for template_and_value in templates_and_values: + template, json_value = template_and_value + prm_instance = instantiate_prm_type(json_value, template.prm_type_obj) + + prm_instance_bytes = prm_instance.serialize() + + # delimiter + serialized += b"\xA5" + + record_size = 4 + len(prm_instance_bytes) + + # size of following data + serialized += record_size.to_bytes(length=4, byteorder="big") + # id of param + serialized += template.prm_id.to_bytes(length=4, byteorder="big") + # value of param + serialized += prm_instance_bytes + return serialized + + +def parsed_json_to_seq(templates_and_values: list[tuple[PrmTemplate, dict]], include_save=False) -> list[str]: + cmds = [] + cmds.append("; Autocoded sequence file from JSON") + for template_and_value in templates_and_values: + template, json_value = template_and_value + set_cmd_name = template.comp_name + "." + template.prm_name.upper() + "_PARAM_SET" + cmd = "R00:00:00 " + set_cmd_name + " " + str(json_value) + cmds.append(cmd) + if include_save: + save_cmd = template.comp_name + "." + template.prm_name.upper() + "_PARAM_SAVE" + cmds.append(save_cmd) + return cmds + + + +def parse_json(json, name_dict: dict[str, PrmTemplate], include_implicit_defaults=False) -> list[tuple[PrmTemplate, dict]]: + """ + json: the json object read from the .json file + name_dict: a dictionary of (fqn param name, PrmTemplate) pairs + include_implicit_defaults: whether or not to also include default values from the name dict + if no value was specified in the json + @return a list of tuple of param template and the intended param value (in form of json dict) + """ + # first, check the json for errors + for component_name in json: + for param_name in json[component_name]: + fqn_param_name = component_name + "." + param_name + param_temp: PrmTemplate = name_dict.get(fqn_param_name, None) + if not param_temp: + raise RuntimeError( + "Unable to find param " + + fqn_param_name + + " in dictionary" + ) + + # okay, now iterate over the dict + templates_to_values = [] + for fqn_param_name, prm_template in name_dict.items(): + + prm_val = None + + if include_implicit_defaults: + # there is a default value + prm_val = prm_template.prm_default_val + + comp_json = json.get(prm_template.comp_name, None) + if comp_json: + # if there is an entry for the component + if prm_template.prm_name in comp_json: + # if there is an entry for this param + # get the value + prm_val = comp_json[prm_template.prm_name] + + if not prm_val: + # not writing a val for this prm + continue + + templates_to_values.append((prm_template, prm_val)) + + return templates_to_values + + +def main(): + arg_parser = ArgumentParser() + subparsers = arg_parser.add_subparsers(dest="subcmd") + + + json_to_dat = subparsers.add_parser("json-to-dat", description="Compiles .json files into param DB .dat files") + json_to_dat.add_argument( + "json_file", type=Path, help="The .json file to turn into a .dat file" + ) + json_to_dat.add_argument( + "--dictionary", + "-d", + type=Path, + help="The dictionary file of the FSW", + required=True, + ) + json_to_dat.add_argument("--defaults", action="store_true", help="Whether or not to implicitly include default parameter values in the output") + json_to_dat.add_argument("--output", "-o", type=Path, help="The output file", default=None) + + + json_to_seq = subparsers.add_parser("json-to-seq", description="Converts .json files into command sequence .seq files") + json_to_seq.add_argument( + "json_file", type=Path, help="The .json file to turn into a .seq file" + ) + json_to_seq.add_argument( + "--dictionary", + "-d", + type=Path, + help="The dictionary file of the FSW", + required=True, + ) + json_to_seq.add_argument("--defaults", action="store_true", help="Whether or not to implicitly include default parameter values in the output") + json_to_seq.add_argument("--save", action="store_true", help="Whether or not to include the PRM_SAVE cmd in the output") + json_to_seq.add_argument("--output", "-o", type=Path, help="The output file", default=None) + + + args = arg_parser.parse_args() + + if not args.json_file.exists(): + print("Unable to find", args.json_file) + exit(1) + + if args.json_file.is_dir(): + print("json-file is a dir", args.json_file) + exit(1) + + if not args.dictionary.exists(): + print("Unable to find", args.dictionary) + exit(1) + + output_format = "dat" if args.subcmd == "json-to-dat" else "seq" + + # just compile the one file in place + if args.output is None: + output_path = args.json_file.with_suffix("." + output_format) + else: + output_path = args.output + + convert_json(args.json_file, args.dictionary, output_path, output_format, args.defaults, args.save) + + +def convert_json(json_file: Path, dictionary: Path, output: Path, output_format: str, implicit_defaults=False, include_save_cmd=False): + + print("Converting", json_file, "to", output, "(format: ." + output_format + ")") + output.parent.mkdir(parents=True, exist_ok=True) + + json = js.loads(json_file.read_text()) + + dict_parser = PrmJsonLoader(str(dictionary.resolve())) + id_dict, name_dict, versions = dict_parser.construct_dicts( + str(dictionary.resolve()) + ) + + templates_to_values = parse_json(json, name_dict, implicit_defaults) + + if output_format == "dat": + serialized_values = parsed_json_to_dat(templates_to_values) + + print("Done, writing to", output.resolve()) + output.write_bytes(serialized_values) + elif output_format == "seq": + sequence_cmds = parsed_json_to_seq(templates_to_values, include_save_cmd) + print("Done, writing to", output.resolve()) + output.write_text("\n".join(sequence_cmds)) + + +if __name__ == "__main__": + main() \ No newline at end of file