From 1237f6c6daaff823f86e8e2f6778f7b940c8f374 Mon Sep 17 00:00:00 2001 From: semuadmin <28569967+semuadmin@users.noreply.github.com> Date: Fri, 6 Dec 2024 13:21:59 +0000 Subject: [PATCH] add ubxbase utility --- .vscode/settings.json | 2 +- .vscode/tasks.json | 2 +- README.md | 33 ++- RELEASE_NOTES.md | 6 + docs/pyubxutils.rst | 8 + pyproject.toml | 3 +- src/pyubxutils/_version.py | 2 +- src/pyubxutils/helpers.py | 33 +++ src/pyubxutils/ubxbase.py | 477 +++++++++++++++++++++++++++++++++++ src/pyubxutils/ubxcompare.py | 8 +- src/pyubxutils/ubxload.py | 18 +- src/pyubxutils/ubxsave.py | 8 +- src/pyubxutils/ubxsetrate.py | 8 +- tests/test_static.py | 11 + 14 files changed, 587 insertions(+), 32 deletions(-) create mode 100644 src/pyubxutils/ubxbase.py diff --git a/.vscode/settings.json b/.vscode/settings.json index 6ad68f1..28dda2d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,5 +5,5 @@ "python.defaultInterpreterPath": "python3", "modulename": "${workspaceFolderBasename}", "distname": "${workspaceFolderBasename}", - "moduleversion": "1.0.0" + "moduleversion": "1.0.1" } \ No newline at end of file diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 7ab08bf..ec63c44 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -198,7 +198,7 @@ "Sort Imports", "Format", "Pylint", - //"Test", + "Test", "Build", "Sphinx HTML" ], diff --git a/README.md b/README.md index 93bbab9..10feec0 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,9 @@ pyubxutils [Current Status](#currentstatus) | [Installation](#installation) | [ubxsimulator](#ubxsimulator) | +[ubxsave CLI](#ubxsave) | +[ubxload CLI](#ubxload) | +[ubxbase CLI](#ubxbase) | [ubxsetrate CLI](#ubxsetrate) | [ubxcompare CLI](#ubxcompare) | [Graphical Client](#gui) | @@ -16,6 +19,7 @@ pyubxutils is an original series of Python u-blox ™ UBX © protocol utility cl 1. [`ubxsimulator`](#ubxsimulator) utility. This provides a basic simulation of a GNSS receiver serial stream by generating synthetic UBX or NMEA messages based on parameters defined in a json configuration file. 1. [`ubxsave`](#ubxsave) CLI utility. This saves a complete set of configuration data from any Generation 9+ u-blox device (e.g. NEO-M9N or ZED-F9P) to a file. The file can then be reloaded to any compatible device using the `ubxload` utility. 1. [`ubxload`](#ubxload) CLI utility. This reads a file containing binary configuration data and loads it into any compatible Generation 9+ u-blox device (e.g. NEO-M9N or ZED-F9P). +1. [`ubxbase`](#ubxbase) CLI utility. A utility which configures compatible u-blox GNSS receivers (e.g. ZED-F9P) as RTK base stations, using either Fixed or Survey-In timing modes. 1. [`ubxsetrate`](#ubxsetrate) CLI utility. A simple utility which sets NMEA or UBX message rates on u-blox GNSS receivers. 1. [`ubxcompare`](#ubxcompare) CLI utility. Utility for comparing two or more u-blox config files in either text (\*.txt) or binary (\*.ubx) format. Output files from the `ubxsave` utility can be used as input files. @@ -118,7 +122,7 @@ Command line arguments can be stored in a configuration file and invoked using t *GENERATION 9+ DEVICES ONLY (e.g. NEO-M9N or ZED-F9P)* ``` -class pyubxutils.ubxconfig.UBXSaver(file, stream, **kwargs) +class pyubxutils.ubxsave.UBXSaver(file, stream, **kwargs) ``` CLI utility which saves Generation 9+ UBX device configuration data to a file. `ubxsave` polls configuration data via the device's serial port using a series of CFG-VALGET poll messages. It parses the responses to these polls, converts them to CFG-VALSET command messages and saves these to a binary file. This binary file can then be loaded into any compatible UBX device (e.g. via the `ubxload` utility) to restore the saved configuration. @@ -145,7 +149,7 @@ ubxsave -h *GENERATION 9+ DEVICES ONLY (e.g. NEO-M9N or ZED-F9P)* ``` -class pyubxutils.ubxconfig.UBXLoader(file, stream, **kwargs) +class pyubxutils.ubxload.UBXLoader(file, stream, **kwargs) ``` CLI utility which loads UBX configuration (CFG-VALSET) data from a binary file (e.g. one created by the `ubxsave` utility) and loads it into the volatile memory (RAM) of a compatible Generation 9+ UBX device via its serial port. It then awaits acknowledgements to this data and reports any errors. @@ -162,11 +166,34 @@ For help and full list of optional arguments, type: ubxload -h ``` +--- +## ubxbase CLI + +*RTK-COMPATIBLE GENERATION 9+ DEVICES ONLY (e.g. ZED-F9P)* + +``` +class pyubxutils.ubxbase.UBXBase(file, stream, **kwargs) +``` + +CLI utility which configures a compatible u-blox receiver to RTK base station mode, using either Fixed or Survey-In timing modes. + +### CLI Usage: + +```shell +ubxbase -P /dev/ttyACM0 --timemode 2 --fixedpos 37.2334512,-115.8151357,18226.4 --postype 1 --acclimit 10 --waittime 5 +``` + +For help and full list of optional arguments, type: + +```shell +ubxbase -h +``` + --- ## ubxsetrate CLI ``` -class pyubxutils.ubxconfig.UBXSetRate(**kwargs) +class pyubxutils.ubxsetrate.UBXSetRate(**kwargs) ``` A simple CLI utility to set NMEA or UBX message rates on u-blox receivers via a serial port. diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index c38bf88..84428cc 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,11 @@ # pyubxutils Release Notes +### RELEASE 1.0.1 + +ENHANCEMENTS: + +1. Add `ubxbase` utility to configure compatible u-blox receiver (e.g. ZED-F9P) to Base Station mode. Type `ubxbase -h` for help. + ### RELEASE 1.0.0 CHANGES: diff --git a/docs/pyubxutils.rst b/docs/pyubxutils.rst index 2de02a3..d4561f1 100644 --- a/docs/pyubxutils.rst +++ b/docs/pyubxutils.rst @@ -28,6 +28,14 @@ pyubxutils.helpers module :undoc-members: :show-inheritance: +pyubxutils.ubxbase module +------------------------- + +.. automodule:: pyubxutils.ubxbase + :members: + :undoc-members: + :show-inheritance: + pyubxutils.ubxcompare module ---------------------------- diff --git a/pyproject.toml b/pyproject.toml index 2aaf5dc..e43cede 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "pyubxutils" authors = [{ name = "semuadmin", email = "semuadmin@semuconsulting.com" }] maintainers = [{ name = "semuadmin", email = "semuadmin@semuconsulting.com" }] description = "UBX Protocol Command Line Utilities" -version = "1.0.0" +version = "1.0.1" license = { file = "LICENSE" } readme = "README.md" requires-python = ">=3.9" @@ -41,6 +41,7 @@ ubxsave = "pyubxutils.ubxsave:main" ubxload = "pyubxutils.ubxload:main" ubxsimulator = "pyubxutils.ubxsimulator_cli:main" ubxcompare = "pyubxutils.ubxcompare:main" +ubxbase = "pyubxutils.ubxbase:main" [project.urls] homepage = "https://github.com/semuconsulting/pyubxutils" diff --git a/src/pyubxutils/_version.py b/src/pyubxutils/_version.py index 650023f..ee3accc 100644 --- a/src/pyubxutils/_version.py +++ b/src/pyubxutils/_version.py @@ -8,4 +8,4 @@ :license: BSD 3-Clause """ -__version__ = "1.0.0" +__version__ = "1.0.1" diff --git a/src/pyubxutils/helpers.py b/src/pyubxutils/helpers.py index 1f49cf8..b06dca2 100644 --- a/src/pyubxutils/helpers.py +++ b/src/pyubxutils/helpers.py @@ -13,6 +13,7 @@ import logging import logging.handlers from argparse import ArgumentParser +from math import trunc from os import getenv from pyubxutils.globals import ( @@ -172,3 +173,35 @@ def progbar(i: int, lim: int, inc: int = 50): f"{int(pct*100/inc):02}% " + "\u2593" * pct + "\u2591" * (inc - pct), end="\r", ) + + +def h2sphp(val: float) -> tuple: + """ + Split height in cm into standard (cm) and high (mm * 10) + precision components. + + e.g. 123456.78 -> 123456, 78 + + :param val: decimal lat/lon value + :return: tuple of integers + :rtype: tuple + """ + + sp = trunc(val) + hp = int(round((val - sp) * 100, 0)) + return sp, hp + + +def ll2sphp(val: float) -> tuple: + """ + Split lat/lon into standard (1-7 dp) and high (8-9 dp) + precision components. + + e.g. 51.123456789 -> 511234567, 89 + + :param val: decimal height value in cm + :return: tuple of integers + :rtype: tuple + """ + + return h2sphp(val * 1e7) diff --git a/src/pyubxutils/ubxbase.py b/src/pyubxutils/ubxbase.py new file mode 100644 index 0000000..c895877 --- /dev/null +++ b/src/pyubxutils/ubxbase.py @@ -0,0 +1,477 @@ +""" +ubxbase.py + +NB: ONLY FOR GENERATION 9+ UBX RTK DEVICES e.g. ZED-F9n + +This command line utility configures an RTK-compatible u-blox +GNSS receiver to operate in Base Station mode. + +RTCM 1006 (Antenna Reference Data) is only output if the base +station is active, so receipt of this message type is used as +a configuration success criterion. + +Created on 06 Jan 2023 + +:author: semuadmin +:copyright: SEMU Consulting © 2023 +:license: BSD 3-Clause +""" + +from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, ArgumentTypeError +from datetime import datetime, timedelta +from logging import getLogger +from queue import Queue +from threading import Event, Thread +from time import sleep + +from pyubx2 import ( + ERR_IGNORE, + NMEA_PROTOCOL, + RTCM3_PROTOCOL, + UBX_PROTOCOL, + UBXMessage, + UBXMessageError, + UBXParseError, + UBXReader, +) +from serial import Serial + +from pyubxutils._version import __version__ as VERSION +from pyubxutils.globals import EPILOG, VERBOSITY_HIGH +from pyubxutils.helpers import h2sphp, ll2sphp, progbar, set_common_args + +ACK = "ACK-ACK" +NAK = "ACK-NAK" +DEFAULT_ACCURACY = 100 +DEFAULT_DURATION = 60 +DEFAULT_POS = (0.0, 0.0, 0.0) +POS_ECEF = 0 +POS_LLH = 1 +TMODE_DISABLED = 0 +TMODE_SVIN = 1 +TMODE_FIXED = 2 +WAITTIME = 5 # wait time for acknowledgements +WRITETIME = 0.02 # wait time between writes + + +class UBXBase: + """UBX Base Station Configuration Class.""" + + def __init__(self, stream: object, **kwargs): + """ + Constructor. + + :param object file: input file + :param object stream: output serial stream + """ + + self.logger = getLogger(__name__) + self._stream = stream + + self._port = kwargs["port"] + self._porttype = kwargs.get("porttype", "USB") + self._timemode = int(kwargs.get("timemode", TMODE_DISABLED)) + self._acclimit = int(kwargs.get("acclimit", DEFAULT_ACCURACY)) + self._duration = int(kwargs.get("duration", DEFAULT_DURATION)) + self._postype = int(kwargs.get("postype", POS_LLH)) + self._waittime = int(kwargs.get("waittime", WAITTIME)) + self._fixedpos = kwargs.get("fixedpos", DEFAULT_POS) + self._ubxreader = UBXReader( + self._stream, + protfilter=NMEA_PROTOCOL | UBX_PROTOCOL | RTCM3_PROTOCOL, + quitonerror=ERR_IGNORE, + ) + self._out_queue = Queue() + self._stop_event = Event() + self._last_ack = datetime.now() + self._read_thread = Thread( + target=self._read_data, + daemon=True, + args=( + stream, + self._ubxreader, + self._out_queue, + self._stop_event, + ), + ) + self._svin_elapsed = 0 + self._msg_ack = 0 + self._msg_nak = 0 + self._msg_write = 0 + self._msg_load = 0 # if self._timemode == TMODE_DISABLED else 1 + self._msg_rtcm = 0 + + def _config_svin(self, queue: Queue) -> int: + """ + Configure Survey-In mode with specified accuracy limit. + """ + + print( + f"Survey-in duration {self._duration}s, accuracy limit {self._acclimit}cm" + ) + self._waittime += self._duration + layers = 1 + transaction = 0 + acclimit = int(round(self._acclimit * 100, 0)) # mm * 10 + cfg_data = [ + ("CFG_TMODE_MODE", self._timemode), + ("CFG_TMODE_SVIN_ACC_LIMIT", acclimit), + ("CFG_TMODE_SVIN_MIN_DUR", self._duration), + # (f"CFG_MSGOUT_UBX_NAV_SVIN_{port_type}", 1), + ] + ubx = UBXMessage.config_set(layers, transaction, cfg_data) + queue.put(ubx) + self._msg_load += 1 + return 1 + + def _config_fixed(self, queue: Queue) -> int: + """ + Configure Fixed mode with specified coordinates. + """ + + print( + f"Fixed position format {('ECEF','LLH')[self._postype]}, {self._fixedpos}, " + f"accuracy limit {self._acclimit}cm" + ) + layers = 1 + transaction = 0 + acclimit = int(round(self._acclimit * 100, 0)) # mm * 10 + x, y, z = self._fixedpos + if self._postype == POS_ECEF: + x, xhp = h2sphp(x) + y, yhp = h2sphp(y) + z, zhp = h2sphp(z) + cfg_data = [ + ("CFG_TMODE_MODE", self._timemode), + ("CFG_TMODE_POS_TYPE", self._postype), + ("CFG_TMODE_FIXED_POS_ACC", acclimit), # mm * 10 + ("CFG_TMODE_ECEF_X", x), # cm + ("CFG_TMODE_ECEF_X_HP", xhp), # mm * 10 + ("CFG_TMODE_ECEF_Y", y), # cm + ("CFG_TMODE_ECEF_Y_HP", yhp), # mm * 10 + ("CFG_TMODE_ECEF_Z", z), # cm + ("CFG_TMODE_ECEF_Z_HP", zhp), # mm * 10 + ] + else: + x, xhp = ll2sphp(x) + y, yhp = ll2sphp(y) + z, zhp = h2sphp(z) + cfg_data = [ + ("CFG_TMODE_MODE", self._timemode), + ("CFG_TMODE_POS_TYPE", self._postype), + ("CFG_TMODE_FIXED_POS_ACC", acclimit), # mm * 10 + ("CFG_TMODE_LAT", x), + ("CFG_TMODE_LAT_HP", xhp), + ("CFG_TMODE_LON", y), + ("CFG_TMODE_LON_HP", yhp), + ("CFG_TMODE_HEIGHT", z), # cm + ("CFG_TMODE_HEIGHT_HP", zhp), # mm * 10 + ] + ubx = UBXMessage.config_set(layers, transaction, cfg_data) + queue.put(ubx) + self._msg_load += 1 + return 1 + + def _config_disabled(self, queue: Queue) -> int: + """ + Configure Fixed mode with specified coordinates. + """ + + tmode = TMODE_DISABLED + layers = 1 + transaction = 0 + cfg_data = [ + ("CFG_TMODE_MODE", tmode), + ] + ubx = UBXMessage.config_set(layers, transaction, cfg_data) + queue.put(ubx) + self._msg_load += 1 + return 1 + + def _config_output( + self, queue: Queue, rate: int = 1, port_type: str = "USB" + ) -> int: + """ + Configure which RTCM3 and UBX messages to output. + """ + + print(f"{('Disabling','Enabling')[rate]} output messages") + layers = 1 # 1 = RAM, 2 = BBR, 4 = Flash (can be OR'd) + transaction = 0 + cfg_data = ( + [(f"CFG_MSGOUT_UBX_NAV_SVIN_{port_type}", 1)] + if self._timemode == TMODE_SVIN + else [] + ) + for rtcm_type in ( + "1006", + "1077", + "1087", + "1097", + "1127", + "1230", + ): + cfg = f"CFG_MSGOUT_RTCM_3X_TYPE{rtcm_type}_{port_type}" + cfg_data.append([cfg, rate]) + + ubx = UBXMessage.config_set(layers, transaction, cfg_data) + queue.put(ubx) + self._msg_load += 1 + return 1 + + def _read_data( + self, + stream: object, + ubr: UBXReader, + queue: Queue, + stop: Event, + ): + """ + THREADED + Read incoming UBX and RTCM data from device. + """ + # pylint: disable=broad-except + + # read until expected no of acknowledgements has been received + # or waittime has been exceeded. + while not stop.is_set(): + try: + (_, parsed_data) = ubr.read() + if parsed_data is not None: + if ( + parsed_data.identity in (ACK, NAK) + and parsed_data.clsID == 6 # CFG + and parsed_data.msgID == 138 # CFG-VALSET + ): + self._last_ack = datetime.now() + if parsed_data.identity == ACK: + self._msg_ack += 1 + else: + self._msg_nak += 1 + self.logger.debug( + f"ACKNOWLEDGEMENT {self._msg_ack + self._msg_nak} - {parsed_data}" + ) + if ( + self._timemode in (TMODE_FIXED, TMODE_SVIN) + and parsed_data.identity == "1006" + ): + self._msg_rtcm += 1 + self.logger.debug( + f"RTCM 1006 ACTIVE BASE {self._msg_rtcm} - {parsed_data}" + ) + if ( + self._timemode == TMODE_SVIN + and parsed_data.identity == "NAV-SVIN" + ): + self._svin_elapsed = parsed_data.dur + self.logger.debug(f"UBX NAV-SVIN - {parsed_data}") + + # send config message(s) to receiver + if not queue.empty(): + i = 0 + while not queue.empty(): + i += 1 + parsed_data = queue.get() + self._msg_write += 1 + self.logger.debug( + f"WRITE {self._msg_write} {parsed_data.identity}" + ) + stream.write(parsed_data.serialize()) + sleep(WRITETIME) + queue.task_done() + + except (UBXMessageError, UBXParseError): + continue + except Exception as err: + if not stop.is_set(): + print(f"Something went wrong {err}") + continue + + def run(self): + """ + Run configuration load routines. + """ + + rc = 0 + if self._timemode in (TMODE_FIXED, TMODE_SVIN): + print( + f"Configuring device at port {self._port} as base station using " + f"{('disabled', 'survey-in', 'fixed')[self._timemode]} timing mode" + ) + else: + print(f"Configuring device at port {self._port} to disable base station") + + start = datetime.now() + self._read_thread.start() + if self._timemode == TMODE_SVIN: + rc = self._config_svin(self._out_queue) + elif self._timemode == TMODE_FIXED: + rc = self._config_fixed(self._out_queue) + else: + rc = self._config_disabled(self._out_queue) + if rc: + rc = self._config_output( + self._out_queue, + rate=self._timemode != TMODE_DISABLED, + port_type=self._porttype, + ) + + # loop until waittime / survey duration expired or user presses Ctrl-C + i = 0 + while datetime.now() < start + timedelta(seconds=self._waittime): + try: + if self._timemode == TMODE_SVIN: + i = self._svin_elapsed # from NAV-SVIN message + wt = self._duration + else: + i += 1 + wt = self._waittime + progbar(i, wt, wt) + sleep(1) + except KeyboardInterrupt: # capture Ctrl-C + print("\nTerminated by user. Configuration may be incomplete.") + break + + self._stop_event.set() + self._read_thread.join() + + if self._msg_ack == self._msg_load and ( + (self._timemode in (TMODE_FIXED, TMODE_SVIN) and self._msg_rtcm > 1) + or (self._timemode == TMODE_DISABLED and self._msg_rtcm == 0) + ): + print( + "Configuration successful. " + f"{self._msg_ack} configuration messages acknowledged. " + f"{self._msg_rtcm} RTCM 1006 (active base) messages confirmed." + ) + rc = 1 + else: + print( + "\nConfiguration unsuccessful. " + f"{self._msg_load} configuration messages sent, " + f"{self._msg_ack} acknowledged, {self._msg_nak} rejected, " + f"{self._msg_rtcm} RTCM 1006 (active base) messages received." + ) + if not self._msg_rtcm: + print( + f"Consider increasing accuracy limit to >{self._acclimit}cm " + f"or increasing survey duration to >{self._duration} seconds." + ) + if self._msg_nak: + print("Check device supports base station configuration.") + rc = 0 + + return rc + + +def main(): + """ + CLI Entry point. + + :param: as per UBXBase constructor. + """ + + def duration_in_range(value): + val = int(value) + if not 0 < val <= 3600: + raise ArgumentTypeError(f"{value} must be between 0 and 3600") + return val + + def fixedpos_in_range(value): + try: + val = value.split(",") + val = [float(val[i]) for i in range(len(val))] + except (TypeError, ValueError): + val = () + if len(val) != 3: + raise ArgumentTypeError( + f"{value} must be in format (lat,lon,height) or (X,Y,Z)" + ) + return val + + ap = ArgumentParser(epilog=EPILOG, formatter_class=ArgumentDefaultsHelpFormatter) + ap.add_argument("-V", "--version", action="version", version="%(prog)s " + VERSION) + ap.add_argument("-P", "--port", required=True, help="Serial port") + ap.add_argument( + "--baudrate", + required=False, + help="Serial baud rate", + type=int, + choices=[4800, 9600, 19200, 38400, 57600, 115200, 230400, 460800], + default=38400, + ) + ap.add_argument( + "--timeout", + required=False, + help="Serial timeout in seconds", + type=float, + default=3.0, + ) + ap.add_argument( + "--portype", + required=False, + help="Serial port type", + type=str, + choices=("USB", "UART1", "UART2", "I2C"), + default="USB", + ) + ap.add_argument( + "--timemode", + required=False, + help=( + f"Timing mode {TMODE_DISABLED} = disabled, " + f"{TMODE_SVIN} = survey-in, {TMODE_FIXED} = fixed" + ), + type=int, + choices=(TMODE_DISABLED, TMODE_SVIN, TMODE_FIXED), + default=TMODE_SVIN, + ) + ap.add_argument( + "--acclimit", + required=False, + help="Accuracy limit in cm", + type=float, + default=DEFAULT_ACCURACY, + ) + ap.add_argument( + "--duration", + required=False, + help="Survey-In duration in seconds", + type=duration_in_range, + default=DEFAULT_DURATION, + ) + ap.add_argument( + "--postype", + required=False, + help=f"Position fixed reference type {POS_ECEF} = ECEF, {POS_LLH} = LLH", + type=int, + choices=(POS_ECEF, POS_LLH), + default=POS_LLH, + ) + ap.add_argument( + "--fixedpos", + required=False, + help="Fixed reference position in either LLH (lat, lon, height in cm) \ + or ECEF (X, Y, Z in cm) format", + type=fixedpos_in_range, + default=DEFAULT_POS, + ) + ap.add_argument( + "--waittime", + required=False, + help="Response wait time in seconds", + type=float, + default=WAITTIME, + ) + + kwargs = set_common_args("ubxload", ap, logdefault=VERBOSITY_HIGH) + + with Serial( + kwargs.get("port"), kwargs.pop("baudrate"), timeout=kwargs.pop("timeout") + ) as serial_stream: + ubl = UBXBase(serial_stream, **kwargs) + ubl.run() + + +if __name__ == "__main__": + main() diff --git a/src/pyubxutils/ubxcompare.py b/src/pyubxutils/ubxcompare.py index 6262f1c..14f6b72 100644 --- a/src/pyubxutils/ubxcompare.py +++ b/src/pyubxutils/ubxcompare.py @@ -75,7 +75,7 @@ def __init__(self, infiles: str, form: int = FORMAT_TXT, diffsonly: bool = True) fcount += 1 self.parse_file(cfgdict, file.strip(), fcount, form) - self.logger.info( + print( f"{fcount} files processed, list of {'differences in' if diffsonly else 'all'}" " config keys and their values follows: ", ) @@ -90,7 +90,7 @@ def __init__(self, infiles: str, form: int = FORMAT_TXT, diffsonly: bool = True) if (diffsonly and diff) or not diffsonly: print(f"{key} ({'DIFFS!' if diff else None}); {str(vals).strip('{}')}") - self.logger.info(f"Total config keys: {kcount}. Total differences: {dcount}.") + print(f"Total config keys: {kcount}. Total differences: {dcount}.") def parse_line(self, line: str) -> UBXMessage: """ @@ -177,9 +177,9 @@ def parse_file(self, cfgdict: dict, filename: str, fileno: int, form: int): self.get_attrs(cfgdict, str(parsed), fileno) i += 1 except Exception as err: - self.logger.error(f"ERROR parsing {filename}! \n{err}") + print(f"ERROR parsing {filename}! \n{err}") - self.logger.info(f"\n{i} configuration commands processed in {filename}") + print(f"\n{i} configuration commands processed in {filename}") def main(): diff --git a/src/pyubxutils/ubxload.py b/src/pyubxutils/ubxload.py index f79461a..d453e70 100644 --- a/src/pyubxutils/ubxload.py +++ b/src/pyubxutils/ubxload.py @@ -169,7 +169,7 @@ def run(self): """ rc = 1 - self.logger.info( + print( f"Loading configuration from {self._filename} to {self._stream.port}. " "Press Ctrl-C to terminate early.", ) @@ -182,35 +182,29 @@ def run(self): try: sleep(1) except KeyboardInterrupt: # capture Ctrl-C - self.logger.warning( - "Terminated by user. Configuration may be incomplete." - ) + print("Terminated by user. Configuration may be incomplete.") self._stop_event.set() self._read_thread.join() if self._msg_ack == self._msg_load: - self.logger.info( + print( "Configuration successfully loaded. " f"{self._msg_load} CFG-VALSET messages sent and acknowledged." ) else: null = self._msg_load - self._msg_ack - self._msg_nak rc = 0 - self.logger.warning( + print( "Configuration may be incomplete. " f"{self._msg_load} CFG-VALSET messages sent, " f"{self._msg_ack} acknowledged, {self._msg_nak} rejected, " f"{null} null responses." ) if null: - self.logger.warning( - f"Consider increasing waittime to >{self._waittime} seconds." - ) + print(f"Consider increasing waittime to >{self._waittime} seconds.") if self._msg_nak: - self.logger.warning( - "Check device is compatible with this saved configuration." - ) + print("Check device is compatible with this saved configuration.") return rc diff --git a/src/pyubxutils/ubxsave.py b/src/pyubxutils/ubxsave.py index 191f156..a001663 100644 --- a/src/pyubxutils/ubxsave.py +++ b/src/pyubxutils/ubxsave.py @@ -203,7 +203,7 @@ def run(self): """ rc = 1 - self.logger.info( + print( f"Saving configuration from {self._stream.port} to {self._file.name}. " "Press Ctrl-C to terminate early." ) @@ -242,16 +242,16 @@ def run(self): except KeyboardInterrupt: # capture Ctrl-C self._stop_event.set() - self.logger.warning("Terminated by user. Configuration may be incomplete.") + print("Terminated by user. Configuration may be incomplete.") if self._msg_rcvd == self._cfgkeys: - self.logger.info( + print( "Configuration successfully saved. " f"{self._msg_save} CFG-VALSET messages saved to {self._file.name}" ) else: rc = 0 - self.logger.warning( + print( "Configuration may not be successfully saved. " f"{self._msg_sent} CFG-VALGET polls sent to {self._stream.port}, " f"{self._msg_rcvd} CFG-VALGET responses received, " diff --git a/src/pyubxutils/ubxsetrate.py b/src/pyubxutils/ubxsetrate.py index be98e9b..03f5eef 100644 --- a/src/pyubxutils/ubxsetrate.py +++ b/src/pyubxutils/ubxsetrate.py @@ -110,9 +110,7 @@ def apply(self): """ try: - self.logger.info( - f"Opening serial port {self._port} @ {self._baudrate} baud ..." - ) + print(f"Opening serial port {self._port} @ {self._baudrate} baud ...") self._serialOut = Serial(self._port, self._baudrate, timeout=self._timeout) if self._msgClass == ALLNMEA: # all available NMEA messages @@ -142,7 +140,7 @@ def apply(self): raise err from err finally: if self._serialOut is not None: - self.logger.info("Configuration message(s) sent.") + print("Configuration message(s) sent.") self._serialOut.close() def _sendmsg(self, msgClass: int, msgID: int): @@ -166,7 +164,7 @@ def _sendmsg(self, msgClass: int, msgID: int): rateSPI=self._rate, ) - self.logger.info(f"Sending configuration message {msg}...") + print(f"Sending configuration message {msg}...") self._serialOut.write(msg.serialize()) diff --git a/tests/test_static.py b/tests/test_static.py index e7c87d7..9fc7687 100644 --- a/tests/test_static.py +++ b/tests/test_static.py @@ -12,6 +12,7 @@ from pathlib import Path import unittest +from pyubxutils.helpers import h2sphp, ll2sphp class StaticTest(unittest.TestCase): @@ -24,6 +25,16 @@ def tearDown(self): # self.streamNAV.close() pass + def testll2sphp(self): + res = ll2sphp(45.123456789) + self.assertEqual(res, (451234567, 89)) + res = ll2sphp(45.123456789012) + self.assertEqual(res, (451234567, 89)) + + def testh2sphp(self): + res = h2sphp(1234567.89) + self.assertEqual(res, (1234567, 89)) + if __name__ == "__main__": # import sys;sys.argv = ['', 'Test.testName']