diff --git a/fronius-smartmeter.py b/fronius-smartmeter.py index 966fb4d..bfa1f08 100644 --- a/fronius-smartmeter.py +++ b/fronius-smartmeter.py @@ -13,6 +13,8 @@ Used https://github.com/victronenergy/velib_python/blob/master/dbusdummyservice.py as basis for this service. Reading information from the Fronius Smart Meter via http REST API and puts the info on dbus. """ + +import contextlib try: import gobject except ImportError: @@ -32,10 +34,8 @@ except ImportError: import _thread as thread # for daemon = True / Python 3.x -# our own packages -sys.path.insert( - 1, os.path.join(os.path.sep, "data", "SetupHelper", "ext", "velib_python") -) +# use an established Victron service to maintain compatiblity +sys.path.insert(1, os.path.join('/opt/victronenergy/dbus-systemcalc-py', 'ext', 'velib_python')) from vedbus import VeDbusService, VeDbusItemImport log = logging.getLogger("DbusFroniusSmartMeter") @@ -47,7 +47,7 @@ def role_changed(self, path, val): if val not in self.allowed_roles: return False old, inst = self.get_role_instance() - self.settings["instance"] = "%s:%s" % (val, inst) + self.settings["instance"] = f"{val}:{inst}" return True def get_role_instance(self): @@ -72,13 +72,12 @@ def detect(self): except socket.timeout: continue info = json.loads(data) - sw = info.get("LoggerInfo", {}).get("SoftwareVersion", None) - if sw: + if sw := info.get("LoggerInfo", {}).get("SoftwareVersion", None): self._firmware = ".".join( str(s) for s in [sw["Major"], sw["Minor"], sw["Release"], sw["Build"]] ) - log.info("Found device @%s, firmware %s" % (addr[0], self._firmware)) + log.info(f"Found device @{addr[0]}, firmware {self._firmware}") return addr[0] return None @@ -92,31 +91,26 @@ def detect_dbus(self): return None for name in dbusConn.list_names(): if name.startswith("com.victronenergy.pvinverter."): - log.info("Getting IP from %s" % name) + log.info(f"Getting IP from {name}") conn = VeDbusItemImport(dbusConn, name, "/Mgmt/Connection").get_value() self._firmware = VeDbusItemImport( dbusConn, name, "/DataManagerVersion" ).get_value() - log.info("Connection Info: %s, Firmware %s" % (conn, self._firmware)) + log.info(f"Connection Info: {conn}, Firmware {self._firmware}") return conn.split(" ")[0] return None def __init__(self, servicename, deviceinstance, ip=None): self.settings = {"instance": "grid:%d" % deviceinstance} - self._latency = None self._firmware = "0.1" self._testdata = None + self._latency = None if ip == "test": self._testdata = "testdata/GetMeterRealtimeData.cgi" self._ip = ip or self.detect_dbus() or self.detect() - self._url = ( - "http://" - + self._ip - + "/solar_api/v1/GetMeterRealtimeData.cgi?" - + "Scope=Device&DeviceId=0&DataCollection=MeterRealtimeData" - ) + self._url = f"http://{self._ip}/solar_api/v1/GetMeterRealtimeData.cgi?Scope=Device&DeviceId=0&DataCollection=MeterRealtimeData" data = self._get_meter_data() log.debug("%s /DeviceInstance = %d" % (servicename, deviceinstance)) @@ -125,7 +119,8 @@ def __init__(self, servicename, deviceinstance, ip=None): # Create the management objects, as specified in the ccgx dbus-api document self._dbusservice.add_path("/Mgmt/ProcessName", __file__) self._dbusservice.add_path( - "/Mgmt/ProcessVersion", "Running on Python " + platform.python_version() + "/Mgmt/ProcessVersion", + f"Running on Python {platform.python_version()}", ) self._dbusservice.add_path("/Mgmt/Connection", self._ip) @@ -204,7 +199,7 @@ def __init__(self, servicename, deviceinstance, ip=None): gobject.timeout_add(700, self._safe_update) def _handlechangedvalue(self, path, value): - log.debug("someone else updated %s to %s" % (path, value)) + log.debug(f"someone else updated {path} to {value}") return True # accept the change def _safe_update(self): @@ -212,7 +207,7 @@ def _safe_update(self): self._update() self._retries = 0 except Exception as e: - log.error("Error running update %s" % e) + log.error(f"Error running update {e}") self._retries += 1 self._failures += 1 if self._retries > 10: @@ -351,15 +346,12 @@ def main(): ) args = parser.parse_args() if args.ip: - log.info("User supplied IP: %s" % args.ip) + log.info(f"User supplied IP: {args.ip}") else: log.info("Auto detecting IP") - try: + with contextlib.suppress(NameError): thread.daemon = True # allow the program to quit - except NameError: - # Python 3 - pass from dbus.mainloop.glib import DBusGMainLoop diff --git a/version b/version index d4a29ee..b4ffd19 100644 --- a/version +++ b/version @@ -1 +1 @@ -v1.10 \ No newline at end of file +v1.11 \ No newline at end of file