Replies: 2 comments
-
Hi @Aikawa24, thanks for placing this topic - please don't get me wrong, but as I don't have your hardware in place and can't reproduce it at all, I would move this topic to a discussion not an issue. In general - is there any error after 4 hours? Fabian |
Beta Was this translation helpful? Give feedback.
-
ive not get it to log, im now reinstalled the 3em and used your default script and it does not hang. |
Beta Was this translation helpful? Give feedback.
-
hi, ive adopted your script to read json data from a Tasmota 12.4 with hichi ir read head.
ive no python skills and changed only the parts to get the JSON data.
it works, but I get after 4 hours or sometimes after 2 days no updates, the last value hangs and I had to restart the script to work again.
maybe you can look at it and find a error in my changing?
thank you very much
`#!/usr/bin/env python
import platform
import logging
import sys
import os
if sys.version_info.major == 2:
import gobject
else:
from gi.repository import GLib as gobject
import sys
import time
import requests # for http GET
sys.path.insert(1, os.path.join(os.path.dirname(file), '/opt/victronenergy/dbus-systemcalc-py/ext/velib_python'))
from vedbus import VeDbusService
class DbusMT681Service:
def init(self, servicename, paths, productname='MT681', connection='MT681 JSON service'):
deviceinstance = 42
customname = 'MT681-Netz'
self.dbusservice = VeDbusService("{}.http{:02d}".format(servicename, deviceinstance))
self._paths = paths
logging.debug("%s /DeviceInstance = %d" % (servicename, deviceinstance))
# Create the management objects, as specified in the ccgx dbus-api document
self._dbusservice.add_path('/Mgmt/ProcessName', file)
self._dbusservice.add_path('/Mgmt/ProcessVersion', 'Unkown version, and running on Python ' + platform.python_version())
self._dbusservice.add_path('/Mgmt/Connection', connection)
self._dbusservice.add_path('/DeviceInstance', deviceinstance)
self._dbusservice.add_path('/ProductId', 0xFFFF) # id assigned by Victron Support from SDM630v2.py
self._dbusservice.add_path('/ProductName', productname)
self._dbusservice.add_path('/CustomName', customname)
self._dbusservice.add_path('/Connected', 1)
self._dbusservice.add_path('/Role', 'grid')
self._dbusservice.add_path('/Latency', None)
self._dbusservice.add_path('/FirmwareVersion', 0.1)
self._dbusservice.add_path('/HardwareVersion', 0)
self._dbusservice.add_path('/Position', 0) # normaly only needed for pvinverter
self._dbusservice.add_path('/Serial', self._getSerial())
self._dbusservice.add_path('/UpdateIndex', 0)
self._dbusservice.add_path('/StatusCode', 0) # Dummy path so VRM detects us as a PV-inverter.
# add path values to dbus
for path, settings in self._paths.items():
self._dbusservice.add_path(
path, settings['initial'], gettextcallback=settings['textformat'], writeable=True, onchangecallback=self._handlechangedvalue)
# last update
self._lastUpdate = 0
# add _update function 'timer'
gobject.timeout_add(250, self._update) # pause 250ms before the next request
# add _signOfLife 'timer' to get feedback in log every 5minutes
gobject.timeout_add(self._getSignOfLifeInterval()601000, self._signOfLife)
def _getSerial(self):
meter_data = self._getData()
if not meter_data['StatusSNS']['mt681']['Meter_id']:
raise ValueError("Response does not contain 'mac' attribute")
serial = meter_data['StatusSNS']['mt681']['Meter_id']
return serial
def _getSignOfLifeInterval(self):
value = 1
if not value:
value = 0
return int(value)
def _getData(self):
URL = "http://192.168.100.23/cm?cmnd=status%208"
meter_r = requests.get(url = URL)
# check for response
if not meter_r:
raise ConnectionError("No response from mt681 - %s" % (URL))
meter_data = meter_r.json()
# check for Json
if not meter_data:
raise ValueError("Converting response to JSON failed")
return meter_data
def _signOfLife(self):
logging.info("--- Start: sign of life ---")
logging.info("Last _update() call: %s" % (self._lastUpdate))
logging.info("Last '/Ac/Power': %s" % (self._dbusservice['/Ac/Power']))
logging.info("--- End: sign of life ---")
return True
def _update(self):
try:
#get data from Device
meter_data = self._getData()
self._dbusservice['/Ac/Power'] = meter_data['StatusSNS']['mt681']['Power_curr']
self._dbusservice['/Ac/L1/Voltage'] = 230
self._dbusservice['/Ac/L2/Voltage'] = 230
self._dbusservice['/Ac/L3/Voltage'] = 230
self._dbusservice['/Ac/L1/Power'] = meter_data['StatusSNS']['mt681']['Power_p1']
self._dbusservice['/Ac/L2/Power'] = meter_data['StatusSNS']['mt681']['Power_p2']
self._dbusservice['/Ac/L3/Power'] = meter_data['StatusSNS']['mt681']['Power_p3']
self._dbusservice['/Ac/L1/Current'] = meter_data['StatusSNS']['mt682']['Power_p1'] /230
self._dbusservice['/Ac/L2/Current'] = meter_data['StatusSNS']['mt682']['Power_p2'] /230
self._dbusservice['/Ac/L3/Current'] = meter_data['StatusSNS']['mt682']['Power_p3'] /230
self._dbusservice['/Ac/Energy/Forward'] = meter_data['StatusSNS']['mt681']['Total_out']
self._dbusservice['/Ac/Energy/Reverse'] = meter_data['StatusSNS']['mt681']['Total_in']
#logging
logging.debug("House Consumption (/Ac/Power): %s" % (self._dbusservice['/Ac/Power']))
logging.debug("---");
# increment UpdateIndex - to show that new data is available
index = self._dbusservice['/UpdateIndex'] + 1 # increment index
if index > 255: # maximum value of the index
index = 0 # overflow from 255 to 0
self._dbusservice['/UpdateIndex'] = index
#update lastupdate vars
self._lastUpdate = time.time()
except Exception as e:
logging.critical('Error at %s', '_update', exc_info=e)
# return true, otherwise add_timeout will be removed from GObject -
# see docs http://library.isr.ist.utl.pt/docs/pygtk2reference/gobject-functions.html#function-gobject--timeout-add
return True
def _handlechangedvalue(self, path, value):
logging.debug("someone else updated %s to %s" % (path, value))
return True # accept the change
def main():
try:
logging.info("Start");
from dbus.mainloop.glib import DBusGMainLoop
DBusGMainLoop(set_as_default=True)
#formatting
_kwh = lambda p, v: (str(round(v, 2)) + 'KWh')
_a = lambda p, v: (str(round(v, 1)) + 'A')
_w = lambda p, v: (str(round(v, 1)) + 'W')
_v = lambda p, v: (str(round(v, 1)) + 'V')
#start our main-service
pvac_output = DbusMT681Service(
servicename='com.victronenergy.grid',
paths={
'/Ac/Energy/Forward': {'initial': 0, 'textformat': _kwh}, # energy bought from the grid
'/Ac/Energy/Reverse': {'initial': 0, 'textformat': _kwh}, # energy sold to the grid
'/Ac/Power': {'initial': 0, 'textformat': _w},
'/Ac/Current': {'initial': 0, 'textformat': _a},
'/Ac/Voltage': {'initial': 0, 'textformat': _v},
'/Ac/L1/Voltage': {'initial': 0, 'textformat': _v},
'/Ac/L2/Voltage': {'initial': 0, 'textformat': _v},
'/Ac/L3/Voltage': {'initial': 0, 'textformat': _v},
'/Ac/L1/Current': {'initial': 0, 'textformat': _a},
'/Ac/L2/Current': {'initial': 0, 'textformat': _a},
'/Ac/L3/Current': {'initial': 0, 'textformat': _a},
'/Ac/L1/Power': {'initial': 0, 'textformat': _w},
'/Ac/L2/Power': {'initial': 0, 'textformat': _w},
'/Ac/L3/Power': {'initial': 0, 'textformat': _w},
})
except Exception as e:
logging.critical('Error at %s', 'main', exc_info=e)
if name == "main":
main()
`
Beta Was this translation helpful? Give feedback.
All reactions