diff --git a/.vscode/settings.json b/.vscode/settings.json
index 6ad68f1..28dda2d 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -5,5 +5,5 @@
"python.defaultInterpreterPath": "python3",
"modulename": "${workspaceFolderBasename}",
"distname": "${workspaceFolderBasename}",
- "moduleversion": "1.0.0"
+ "moduleversion": "1.0.1"
}
\ No newline at end of file
diff --git a/.vscode/tasks.json b/.vscode/tasks.json
index 7ab08bf..ec63c44 100644
--- a/.vscode/tasks.json
+++ b/.vscode/tasks.json
@@ -198,7 +198,7 @@
"Sort Imports",
"Format",
"Pylint",
- //"Test",
+ "Test",
"Build",
"Sphinx HTML"
],
diff --git a/README.md b/README.md
index 93bbab9..10feec0 100644
--- a/README.md
+++ b/README.md
@@ -4,6 +4,9 @@ pyubxutils
[Current Status](#currentstatus) |
[Installation](#installation) |
[ubxsimulator](#ubxsimulator) |
+[ubxsave CLI](#ubxsave) |
+[ubxload CLI](#ubxload) |
+[ubxbase CLI](#ubxbase) |
[ubxsetrate CLI](#ubxsetrate) |
[ubxcompare CLI](#ubxcompare) |
[Graphical Client](#gui) |
@@ -16,6 +19,7 @@ pyubxutils is an original series of Python u-blox ™ UBX © protocol utility cl
1. [`ubxsimulator`](#ubxsimulator) utility. This provides a basic simulation of a GNSS receiver serial stream by generating synthetic UBX or NMEA messages based on parameters defined in a json configuration file.
1. [`ubxsave`](#ubxsave) CLI utility. This saves a complete set of configuration data from any Generation 9+ u-blox device (e.g. NEO-M9N or ZED-F9P) to a file. The file can then be reloaded to any compatible device using the `ubxload` utility.
1. [`ubxload`](#ubxload) CLI utility. This reads a file containing binary configuration data and loads it into any compatible Generation 9+ u-blox device (e.g. NEO-M9N or ZED-F9P).
+1. [`ubxbase`](#ubxbase) CLI utility. A utility which configures compatible u-blox GNSS receivers (e.g. ZED-F9P) as RTK base stations, using either Fixed or Survey-In timing modes.
1. [`ubxsetrate`](#ubxsetrate) CLI utility. A simple utility which sets NMEA or UBX message rates on u-blox GNSS receivers.
1. [`ubxcompare`](#ubxcompare) CLI utility. Utility for comparing two or more u-blox config files in either text (\*.txt) or binary (\*.ubx) format. Output files from the `ubxsave` utility can be used as input files.
@@ -118,7 +122,7 @@ Command line arguments can be stored in a configuration file and invoked using t
*GENERATION 9+ DEVICES ONLY (e.g. NEO-M9N or ZED-F9P)*
```
-class pyubxutils.ubxconfig.UBXSaver(file, stream, **kwargs)
+class pyubxutils.ubxsave.UBXSaver(file, stream, **kwargs)
```
CLI utility which saves Generation 9+ UBX device configuration data to a file. `ubxsave` polls configuration data via the device's serial port using a series of CFG-VALGET poll messages. It parses the responses to these polls, converts them to CFG-VALSET command messages and saves these to a binary file. This binary file can then be loaded into any compatible UBX device (e.g. via the `ubxload` utility) to restore the saved configuration.
@@ -145,7 +149,7 @@ ubxsave -h
*GENERATION 9+ DEVICES ONLY (e.g. NEO-M9N or ZED-F9P)*
```
-class pyubxutils.ubxconfig.UBXLoader(file, stream, **kwargs)
+class pyubxutils.ubxload.UBXLoader(file, stream, **kwargs)
```
CLI utility which loads UBX configuration (CFG-VALSET) data from a binary file (e.g. one created by the `ubxsave` utility) and loads it into the volatile memory (RAM) of a compatible Generation 9+ UBX device via its serial port. It then awaits acknowledgements to this data and reports any errors.
@@ -162,11 +166,34 @@ For help and full list of optional arguments, type:
ubxload -h
```
+---
+## ubxbase CLI
+
+*RTK-COMPATIBLE GENERATION 9+ DEVICES ONLY (e.g. ZED-F9P)*
+
+```
+class pyubxutils.ubxbase.UBXBase(file, stream, **kwargs)
+```
+
+CLI utility which configures a compatible u-blox receiver to RTK base station mode, using either Fixed or Survey-In timing modes.
+
+### CLI Usage:
+
+```shell
+ubxbase -P /dev/ttyACM0 --timemode 2 --fixedpos 37.2334512,-115.8151357,18226.4 --postype 1 --acclimit 10 --waittime 5
+```
+
+For help and full list of optional arguments, type:
+
+```shell
+ubxbase -h
+```
+
---
## ubxsetrate CLI
```
-class pyubxutils.ubxconfig.UBXSetRate(**kwargs)
+class pyubxutils.ubxsetrate.UBXSetRate(**kwargs)
```
A simple CLI utility to set NMEA or UBX message rates on u-blox receivers via a serial port.
diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index c38bf88..84428cc 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -1,5 +1,11 @@
# pyubxutils Release Notes
+### RELEASE 1.0.1
+
+ENHANCEMENTS:
+
+1. Add `ubxbase` utility to configure compatible u-blox receiver (e.g. ZED-F9P) to Base Station mode. Type `ubxbase -h` for help.
+
### RELEASE 1.0.0
CHANGES:
diff --git a/docs/pyubxutils.rst b/docs/pyubxutils.rst
index 2de02a3..d4561f1 100644
--- a/docs/pyubxutils.rst
+++ b/docs/pyubxutils.rst
@@ -28,6 +28,14 @@ pyubxutils.helpers module
:undoc-members:
:show-inheritance:
+pyubxutils.ubxbase module
+-------------------------
+
+.. automodule:: pyubxutils.ubxbase
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
pyubxutils.ubxcompare module
----------------------------
diff --git a/pyproject.toml b/pyproject.toml
index 2aaf5dc..e43cede 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -7,7 +7,7 @@ name = "pyubxutils"
authors = [{ name = "semuadmin", email = "semuadmin@semuconsulting.com" }]
maintainers = [{ name = "semuadmin", email = "semuadmin@semuconsulting.com" }]
description = "UBX Protocol Command Line Utilities"
-version = "1.0.0"
+version = "1.0.1"
license = { file = "LICENSE" }
readme = "README.md"
requires-python = ">=3.9"
@@ -41,6 +41,7 @@ ubxsave = "pyubxutils.ubxsave:main"
ubxload = "pyubxutils.ubxload:main"
ubxsimulator = "pyubxutils.ubxsimulator_cli:main"
ubxcompare = "pyubxutils.ubxcompare:main"
+ubxbase = "pyubxutils.ubxbase:main"
[project.urls]
homepage = "https://github.com/semuconsulting/pyubxutils"
diff --git a/src/pyubxutils/_version.py b/src/pyubxutils/_version.py
index 650023f..ee3accc 100644
--- a/src/pyubxutils/_version.py
+++ b/src/pyubxutils/_version.py
@@ -8,4 +8,4 @@
:license: BSD 3-Clause
"""
-__version__ = "1.0.0"
+__version__ = "1.0.1"
diff --git a/src/pyubxutils/helpers.py b/src/pyubxutils/helpers.py
index 1f49cf8..b06dca2 100644
--- a/src/pyubxutils/helpers.py
+++ b/src/pyubxutils/helpers.py
@@ -13,6 +13,7 @@
import logging
import logging.handlers
from argparse import ArgumentParser
+from math import trunc
from os import getenv
from pyubxutils.globals import (
@@ -172,3 +173,35 @@ def progbar(i: int, lim: int, inc: int = 50):
f"{int(pct*100/inc):02}% " + "\u2593" * pct + "\u2591" * (inc - pct),
end="\r",
)
+
+
+def h2sphp(val: float) -> tuple:
+ """
+ Split height in cm into standard (cm) and high (mm * 10)
+ precision components.
+
+ e.g. 123456.78 -> 123456, 78
+
+ :param val: decimal lat/lon value
+ :return: tuple of integers
+ :rtype: tuple
+ """
+
+ sp = trunc(val)
+ hp = int(round((val - sp) * 100, 0))
+ return sp, hp
+
+
+def ll2sphp(val: float) -> tuple:
+ """
+ Split lat/lon into standard (1-7 dp) and high (8-9 dp)
+ precision components.
+
+ e.g. 51.123456789 -> 511234567, 89
+
+ :param val: decimal height value in cm
+ :return: tuple of integers
+ :rtype: tuple
+ """
+
+ return h2sphp(val * 1e7)
diff --git a/src/pyubxutils/ubxbase.py b/src/pyubxutils/ubxbase.py
new file mode 100644
index 0000000..c895877
--- /dev/null
+++ b/src/pyubxutils/ubxbase.py
@@ -0,0 +1,477 @@
+"""
+ubxbase.py
+
+NB: ONLY FOR GENERATION 9+ UBX RTK DEVICES e.g. ZED-F9n
+
+This command line utility configures an RTK-compatible u-blox
+GNSS receiver to operate in Base Station mode.
+
+RTCM 1006 (Antenna Reference Data) is only output if the base
+station is active, so receipt of this message type is used as
+a configuration success criterion.
+
+Created on 06 Jan 2023
+
+:author: semuadmin
+:copyright: SEMU Consulting © 2023
+:license: BSD 3-Clause
+"""
+
+from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, ArgumentTypeError
+from datetime import datetime, timedelta
+from logging import getLogger
+from queue import Queue
+from threading import Event, Thread
+from time import sleep
+
+from pyubx2 import (
+ ERR_IGNORE,
+ NMEA_PROTOCOL,
+ RTCM3_PROTOCOL,
+ UBX_PROTOCOL,
+ UBXMessage,
+ UBXMessageError,
+ UBXParseError,
+ UBXReader,
+)
+from serial import Serial
+
+from pyubxutils._version import __version__ as VERSION
+from pyubxutils.globals import EPILOG, VERBOSITY_HIGH
+from pyubxutils.helpers import h2sphp, ll2sphp, progbar, set_common_args
+
+ACK = "ACK-ACK"
+NAK = "ACK-NAK"
+DEFAULT_ACCURACY = 100
+DEFAULT_DURATION = 60
+DEFAULT_POS = (0.0, 0.0, 0.0)
+POS_ECEF = 0
+POS_LLH = 1
+TMODE_DISABLED = 0
+TMODE_SVIN = 1
+TMODE_FIXED = 2
+WAITTIME = 5 # wait time for acknowledgements
+WRITETIME = 0.02 # wait time between writes
+
+
+class UBXBase:
+ """UBX Base Station Configuration Class."""
+
+ def __init__(self, stream: object, **kwargs):
+ """
+ Constructor.
+
+ :param object file: input file
+ :param object stream: output serial stream
+ """
+
+ self.logger = getLogger(__name__)
+ self._stream = stream
+
+ self._port = kwargs["port"]
+ self._porttype = kwargs.get("porttype", "USB")
+ self._timemode = int(kwargs.get("timemode", TMODE_DISABLED))
+ self._acclimit = int(kwargs.get("acclimit", DEFAULT_ACCURACY))
+ self._duration = int(kwargs.get("duration", DEFAULT_DURATION))
+ self._postype = int(kwargs.get("postype", POS_LLH))
+ self._waittime = int(kwargs.get("waittime", WAITTIME))
+ self._fixedpos = kwargs.get("fixedpos", DEFAULT_POS)
+ self._ubxreader = UBXReader(
+ self._stream,
+ protfilter=NMEA_PROTOCOL | UBX_PROTOCOL | RTCM3_PROTOCOL,
+ quitonerror=ERR_IGNORE,
+ )
+ self._out_queue = Queue()
+ self._stop_event = Event()
+ self._last_ack = datetime.now()
+ self._read_thread = Thread(
+ target=self._read_data,
+ daemon=True,
+ args=(
+ stream,
+ self._ubxreader,
+ self._out_queue,
+ self._stop_event,
+ ),
+ )
+ self._svin_elapsed = 0
+ self._msg_ack = 0
+ self._msg_nak = 0
+ self._msg_write = 0
+ self._msg_load = 0 # if self._timemode == TMODE_DISABLED else 1
+ self._msg_rtcm = 0
+
+ def _config_svin(self, queue: Queue) -> int:
+ """
+ Configure Survey-In mode with specified accuracy limit.
+ """
+
+ print(
+ f"Survey-in duration {self._duration}s, accuracy limit {self._acclimit}cm"
+ )
+ self._waittime += self._duration
+ layers = 1
+ transaction = 0
+ acclimit = int(round(self._acclimit * 100, 0)) # mm * 10
+ cfg_data = [
+ ("CFG_TMODE_MODE", self._timemode),
+ ("CFG_TMODE_SVIN_ACC_LIMIT", acclimit),
+ ("CFG_TMODE_SVIN_MIN_DUR", self._duration),
+ # (f"CFG_MSGOUT_UBX_NAV_SVIN_{port_type}", 1),
+ ]
+ ubx = UBXMessage.config_set(layers, transaction, cfg_data)
+ queue.put(ubx)
+ self._msg_load += 1
+ return 1
+
+ def _config_fixed(self, queue: Queue) -> int:
+ """
+ Configure Fixed mode with specified coordinates.
+ """
+
+ print(
+ f"Fixed position format {('ECEF','LLH')[self._postype]}, {self._fixedpos}, "
+ f"accuracy limit {self._acclimit}cm"
+ )
+ layers = 1
+ transaction = 0
+ acclimit = int(round(self._acclimit * 100, 0)) # mm * 10
+ x, y, z = self._fixedpos
+ if self._postype == POS_ECEF:
+ x, xhp = h2sphp(x)
+ y, yhp = h2sphp(y)
+ z, zhp = h2sphp(z)
+ cfg_data = [
+ ("CFG_TMODE_MODE", self._timemode),
+ ("CFG_TMODE_POS_TYPE", self._postype),
+ ("CFG_TMODE_FIXED_POS_ACC", acclimit), # mm * 10
+ ("CFG_TMODE_ECEF_X", x), # cm
+ ("CFG_TMODE_ECEF_X_HP", xhp), # mm * 10
+ ("CFG_TMODE_ECEF_Y", y), # cm
+ ("CFG_TMODE_ECEF_Y_HP", yhp), # mm * 10
+ ("CFG_TMODE_ECEF_Z", z), # cm
+ ("CFG_TMODE_ECEF_Z_HP", zhp), # mm * 10
+ ]
+ else:
+ x, xhp = ll2sphp(x)
+ y, yhp = ll2sphp(y)
+ z, zhp = h2sphp(z)
+ cfg_data = [
+ ("CFG_TMODE_MODE", self._timemode),
+ ("CFG_TMODE_POS_TYPE", self._postype),
+ ("CFG_TMODE_FIXED_POS_ACC", acclimit), # mm * 10
+ ("CFG_TMODE_LAT", x),
+ ("CFG_TMODE_LAT_HP", xhp),
+ ("CFG_TMODE_LON", y),
+ ("CFG_TMODE_LON_HP", yhp),
+ ("CFG_TMODE_HEIGHT", z), # cm
+ ("CFG_TMODE_HEIGHT_HP", zhp), # mm * 10
+ ]
+ ubx = UBXMessage.config_set(layers, transaction, cfg_data)
+ queue.put(ubx)
+ self._msg_load += 1
+ return 1
+
+ def _config_disabled(self, queue: Queue) -> int:
+ """
+ Configure Fixed mode with specified coordinates.
+ """
+
+ tmode = TMODE_DISABLED
+ layers = 1
+ transaction = 0
+ cfg_data = [
+ ("CFG_TMODE_MODE", tmode),
+ ]
+ ubx = UBXMessage.config_set(layers, transaction, cfg_data)
+ queue.put(ubx)
+ self._msg_load += 1
+ return 1
+
+ def _config_output(
+ self, queue: Queue, rate: int = 1, port_type: str = "USB"
+ ) -> int:
+ """
+ Configure which RTCM3 and UBX messages to output.
+ """
+
+ print(f"{('Disabling','Enabling')[rate]} output messages")
+ layers = 1 # 1 = RAM, 2 = BBR, 4 = Flash (can be OR'd)
+ transaction = 0
+ cfg_data = (
+ [(f"CFG_MSGOUT_UBX_NAV_SVIN_{port_type}", 1)]
+ if self._timemode == TMODE_SVIN
+ else []
+ )
+ for rtcm_type in (
+ "1006",
+ "1077",
+ "1087",
+ "1097",
+ "1127",
+ "1230",
+ ):
+ cfg = f"CFG_MSGOUT_RTCM_3X_TYPE{rtcm_type}_{port_type}"
+ cfg_data.append([cfg, rate])
+
+ ubx = UBXMessage.config_set(layers, transaction, cfg_data)
+ queue.put(ubx)
+ self._msg_load += 1
+ return 1
+
+ def _read_data(
+ self,
+ stream: object,
+ ubr: UBXReader,
+ queue: Queue,
+ stop: Event,
+ ):
+ """
+ THREADED
+ Read incoming UBX and RTCM data from device.
+ """
+ # pylint: disable=broad-except
+
+ # read until expected no of acknowledgements has been received
+ # or waittime has been exceeded.
+ while not stop.is_set():
+ try:
+ (_, parsed_data) = ubr.read()
+ if parsed_data is not None:
+ if (
+ parsed_data.identity in (ACK, NAK)
+ and parsed_data.clsID == 6 # CFG
+ and parsed_data.msgID == 138 # CFG-VALSET
+ ):
+ self._last_ack = datetime.now()
+ if parsed_data.identity == ACK:
+ self._msg_ack += 1
+ else:
+ self._msg_nak += 1
+ self.logger.debug(
+ f"ACKNOWLEDGEMENT {self._msg_ack + self._msg_nak} - {parsed_data}"
+ )
+ if (
+ self._timemode in (TMODE_FIXED, TMODE_SVIN)
+ and parsed_data.identity == "1006"
+ ):
+ self._msg_rtcm += 1
+ self.logger.debug(
+ f"RTCM 1006 ACTIVE BASE {self._msg_rtcm} - {parsed_data}"
+ )
+ if (
+ self._timemode == TMODE_SVIN
+ and parsed_data.identity == "NAV-SVIN"
+ ):
+ self._svin_elapsed = parsed_data.dur
+ self.logger.debug(f"UBX NAV-SVIN - {parsed_data}")
+
+ # send config message(s) to receiver
+ if not queue.empty():
+ i = 0
+ while not queue.empty():
+ i += 1
+ parsed_data = queue.get()
+ self._msg_write += 1
+ self.logger.debug(
+ f"WRITE {self._msg_write} {parsed_data.identity}"
+ )
+ stream.write(parsed_data.serialize())
+ sleep(WRITETIME)
+ queue.task_done()
+
+ except (UBXMessageError, UBXParseError):
+ continue
+ except Exception as err:
+ if not stop.is_set():
+ print(f"Something went wrong {err}")
+ continue
+
+ def run(self):
+ """
+ Run configuration load routines.
+ """
+
+ rc = 0
+ if self._timemode in (TMODE_FIXED, TMODE_SVIN):
+ print(
+ f"Configuring device at port {self._port} as base station using "
+ f"{('disabled', 'survey-in', 'fixed')[self._timemode]} timing mode"
+ )
+ else:
+ print(f"Configuring device at port {self._port} to disable base station")
+
+ start = datetime.now()
+ self._read_thread.start()
+ if self._timemode == TMODE_SVIN:
+ rc = self._config_svin(self._out_queue)
+ elif self._timemode == TMODE_FIXED:
+ rc = self._config_fixed(self._out_queue)
+ else:
+ rc = self._config_disabled(self._out_queue)
+ if rc:
+ rc = self._config_output(
+ self._out_queue,
+ rate=self._timemode != TMODE_DISABLED,
+ port_type=self._porttype,
+ )
+
+ # loop until waittime / survey duration expired or user presses Ctrl-C
+ i = 0
+ while datetime.now() < start + timedelta(seconds=self._waittime):
+ try:
+ if self._timemode == TMODE_SVIN:
+ i = self._svin_elapsed # from NAV-SVIN message
+ wt = self._duration
+ else:
+ i += 1
+ wt = self._waittime
+ progbar(i, wt, wt)
+ sleep(1)
+ except KeyboardInterrupt: # capture Ctrl-C
+ print("\nTerminated by user. Configuration may be incomplete.")
+ break
+
+ self._stop_event.set()
+ self._read_thread.join()
+
+ if self._msg_ack == self._msg_load and (
+ (self._timemode in (TMODE_FIXED, TMODE_SVIN) and self._msg_rtcm > 1)
+ or (self._timemode == TMODE_DISABLED and self._msg_rtcm == 0)
+ ):
+ print(
+ "Configuration successful. "
+ f"{self._msg_ack} configuration messages acknowledged. "
+ f"{self._msg_rtcm} RTCM 1006 (active base) messages confirmed."
+ )
+ rc = 1
+ else:
+ print(
+ "\nConfiguration unsuccessful. "
+ f"{self._msg_load} configuration messages sent, "
+ f"{self._msg_ack} acknowledged, {self._msg_nak} rejected, "
+ f"{self._msg_rtcm} RTCM 1006 (active base) messages received."
+ )
+ if not self._msg_rtcm:
+ print(
+ f"Consider increasing accuracy limit to >{self._acclimit}cm "
+ f"or increasing survey duration to >{self._duration} seconds."
+ )
+ if self._msg_nak:
+ print("Check device supports base station configuration.")
+ rc = 0
+
+ return rc
+
+
+def main():
+ """
+ CLI Entry point.
+
+ :param: as per UBXBase constructor.
+ """
+
+ def duration_in_range(value):
+ val = int(value)
+ if not 0 < val <= 3600:
+ raise ArgumentTypeError(f"{value} must be between 0 and 3600")
+ return val
+
+ def fixedpos_in_range(value):
+ try:
+ val = value.split(",")
+ val = [float(val[i]) for i in range(len(val))]
+ except (TypeError, ValueError):
+ val = ()
+ if len(val) != 3:
+ raise ArgumentTypeError(
+ f"{value} must be in format (lat,lon,height) or (X,Y,Z)"
+ )
+ return val
+
+ ap = ArgumentParser(epilog=EPILOG, formatter_class=ArgumentDefaultsHelpFormatter)
+ ap.add_argument("-V", "--version", action="version", version="%(prog)s " + VERSION)
+ ap.add_argument("-P", "--port", required=True, help="Serial port")
+ ap.add_argument(
+ "--baudrate",
+ required=False,
+ help="Serial baud rate",
+ type=int,
+ choices=[4800, 9600, 19200, 38400, 57600, 115200, 230400, 460800],
+ default=38400,
+ )
+ ap.add_argument(
+ "--timeout",
+ required=False,
+ help="Serial timeout in seconds",
+ type=float,
+ default=3.0,
+ )
+ ap.add_argument(
+ "--portype",
+ required=False,
+ help="Serial port type",
+ type=str,
+ choices=("USB", "UART1", "UART2", "I2C"),
+ default="USB",
+ )
+ ap.add_argument(
+ "--timemode",
+ required=False,
+ help=(
+ f"Timing mode {TMODE_DISABLED} = disabled, "
+ f"{TMODE_SVIN} = survey-in, {TMODE_FIXED} = fixed"
+ ),
+ type=int,
+ choices=(TMODE_DISABLED, TMODE_SVIN, TMODE_FIXED),
+ default=TMODE_SVIN,
+ )
+ ap.add_argument(
+ "--acclimit",
+ required=False,
+ help="Accuracy limit in cm",
+ type=float,
+ default=DEFAULT_ACCURACY,
+ )
+ ap.add_argument(
+ "--duration",
+ required=False,
+ help="Survey-In duration in seconds",
+ type=duration_in_range,
+ default=DEFAULT_DURATION,
+ )
+ ap.add_argument(
+ "--postype",
+ required=False,
+ help=f"Position fixed reference type {POS_ECEF} = ECEF, {POS_LLH} = LLH",
+ type=int,
+ choices=(POS_ECEF, POS_LLH),
+ default=POS_LLH,
+ )
+ ap.add_argument(
+ "--fixedpos",
+ required=False,
+ help="Fixed reference position in either LLH (lat, lon, height in cm) \
+ or ECEF (X, Y, Z in cm) format",
+ type=fixedpos_in_range,
+ default=DEFAULT_POS,
+ )
+ ap.add_argument(
+ "--waittime",
+ required=False,
+ help="Response wait time in seconds",
+ type=float,
+ default=WAITTIME,
+ )
+
+ kwargs = set_common_args("ubxload", ap, logdefault=VERBOSITY_HIGH)
+
+ with Serial(
+ kwargs.get("port"), kwargs.pop("baudrate"), timeout=kwargs.pop("timeout")
+ ) as serial_stream:
+ ubl = UBXBase(serial_stream, **kwargs)
+ ubl.run()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/src/pyubxutils/ubxcompare.py b/src/pyubxutils/ubxcompare.py
index 6262f1c..14f6b72 100644
--- a/src/pyubxutils/ubxcompare.py
+++ b/src/pyubxutils/ubxcompare.py
@@ -75,7 +75,7 @@ def __init__(self, infiles: str, form: int = FORMAT_TXT, diffsonly: bool = True)
fcount += 1
self.parse_file(cfgdict, file.strip(), fcount, form)
- self.logger.info(
+ print(
f"{fcount} files processed, list of {'differences in' if diffsonly else 'all'}"
" config keys and their values follows: ",
)
@@ -90,7 +90,7 @@ def __init__(self, infiles: str, form: int = FORMAT_TXT, diffsonly: bool = True)
if (diffsonly and diff) or not diffsonly:
print(f"{key} ({'DIFFS!' if diff else None}); {str(vals).strip('{}')}")
- self.logger.info(f"Total config keys: {kcount}. Total differences: {dcount}.")
+ print(f"Total config keys: {kcount}. Total differences: {dcount}.")
def parse_line(self, line: str) -> UBXMessage:
"""
@@ -177,9 +177,9 @@ def parse_file(self, cfgdict: dict, filename: str, fileno: int, form: int):
self.get_attrs(cfgdict, str(parsed), fileno)
i += 1
except Exception as err:
- self.logger.error(f"ERROR parsing {filename}! \n{err}")
+ print(f"ERROR parsing {filename}! \n{err}")
- self.logger.info(f"\n{i} configuration commands processed in {filename}")
+ print(f"\n{i} configuration commands processed in {filename}")
def main():
diff --git a/src/pyubxutils/ubxload.py b/src/pyubxutils/ubxload.py
index f79461a..d453e70 100644
--- a/src/pyubxutils/ubxload.py
+++ b/src/pyubxutils/ubxload.py
@@ -169,7 +169,7 @@ def run(self):
"""
rc = 1
- self.logger.info(
+ print(
f"Loading configuration from {self._filename} to {self._stream.port}. "
"Press Ctrl-C to terminate early.",
)
@@ -182,35 +182,29 @@ def run(self):
try:
sleep(1)
except KeyboardInterrupt: # capture Ctrl-C
- self.logger.warning(
- "Terminated by user. Configuration may be incomplete."
- )
+ print("Terminated by user. Configuration may be incomplete.")
self._stop_event.set()
self._read_thread.join()
if self._msg_ack == self._msg_load:
- self.logger.info(
+ print(
"Configuration successfully loaded. "
f"{self._msg_load} CFG-VALSET messages sent and acknowledged."
)
else:
null = self._msg_load - self._msg_ack - self._msg_nak
rc = 0
- self.logger.warning(
+ print(
"Configuration may be incomplete. "
f"{self._msg_load} CFG-VALSET messages sent, "
f"{self._msg_ack} acknowledged, {self._msg_nak} rejected, "
f"{null} null responses."
)
if null:
- self.logger.warning(
- f"Consider increasing waittime to >{self._waittime} seconds."
- )
+ print(f"Consider increasing waittime to >{self._waittime} seconds.")
if self._msg_nak:
- self.logger.warning(
- "Check device is compatible with this saved configuration."
- )
+ print("Check device is compatible with this saved configuration.")
return rc
diff --git a/src/pyubxutils/ubxsave.py b/src/pyubxutils/ubxsave.py
index 191f156..a001663 100644
--- a/src/pyubxutils/ubxsave.py
+++ b/src/pyubxutils/ubxsave.py
@@ -203,7 +203,7 @@ def run(self):
"""
rc = 1
- self.logger.info(
+ print(
f"Saving configuration from {self._stream.port} to {self._file.name}. "
"Press Ctrl-C to terminate early."
)
@@ -242,16 +242,16 @@ def run(self):
except KeyboardInterrupt: # capture Ctrl-C
self._stop_event.set()
- self.logger.warning("Terminated by user. Configuration may be incomplete.")
+ print("Terminated by user. Configuration may be incomplete.")
if self._msg_rcvd == self._cfgkeys:
- self.logger.info(
+ print(
"Configuration successfully saved. "
f"{self._msg_save} CFG-VALSET messages saved to {self._file.name}"
)
else:
rc = 0
- self.logger.warning(
+ print(
"Configuration may not be successfully saved. "
f"{self._msg_sent} CFG-VALGET polls sent to {self._stream.port}, "
f"{self._msg_rcvd} CFG-VALGET responses received, "
diff --git a/src/pyubxutils/ubxsetrate.py b/src/pyubxutils/ubxsetrate.py
index be98e9b..03f5eef 100644
--- a/src/pyubxutils/ubxsetrate.py
+++ b/src/pyubxutils/ubxsetrate.py
@@ -110,9 +110,7 @@ def apply(self):
"""
try:
- self.logger.info(
- f"Opening serial port {self._port} @ {self._baudrate} baud ..."
- )
+ print(f"Opening serial port {self._port} @ {self._baudrate} baud ...")
self._serialOut = Serial(self._port, self._baudrate, timeout=self._timeout)
if self._msgClass == ALLNMEA: # all available NMEA messages
@@ -142,7 +140,7 @@ def apply(self):
raise err from err
finally:
if self._serialOut is not None:
- self.logger.info("Configuration message(s) sent.")
+ print("Configuration message(s) sent.")
self._serialOut.close()
def _sendmsg(self, msgClass: int, msgID: int):
@@ -166,7 +164,7 @@ def _sendmsg(self, msgClass: int, msgID: int):
rateSPI=self._rate,
)
- self.logger.info(f"Sending configuration message {msg}...")
+ print(f"Sending configuration message {msg}...")
self._serialOut.write(msg.serialize())
diff --git a/tests/test_static.py b/tests/test_static.py
index e7c87d7..9fc7687 100644
--- a/tests/test_static.py
+++ b/tests/test_static.py
@@ -12,6 +12,7 @@
from pathlib import Path
import unittest
+from pyubxutils.helpers import h2sphp, ll2sphp
class StaticTest(unittest.TestCase):
@@ -24,6 +25,16 @@ def tearDown(self):
# self.streamNAV.close()
pass
+ def testll2sphp(self):
+ res = ll2sphp(45.123456789)
+ self.assertEqual(res, (451234567, 89))
+ res = ll2sphp(45.123456789012)
+ self.assertEqual(res, (451234567, 89))
+
+ def testh2sphp(self):
+ res = h2sphp(1234567.89)
+ self.assertEqual(res, (1234567, 89))
+
if __name__ == "__main__":
# import sys;sys.argv = ['', 'Test.testName']