Skip to content

Commit

Permalink
removed pyTibber dependency, watchdog improved
Browse files Browse the repository at this point in the history
  • Loading branch information
danielringch committed Feb 16, 2024
1 parent c35a0cd commit c36b38e
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 75 deletions.
Empty file modified .gitignore
100644 → 100755
Empty file.
Empty file modified LICENSE
100644 → 100755
Empty file.
Empty file modified README.md
100644 → 100755
Empty file.
4 changes: 4 additions & 0 deletions config/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ mqtt:
password: "password" # optional, only for auth via username + password
public_key: "path/to/cert/public/key/file.crt" # optional, only for auth via certificate
private_key: "path/to/cert/private/key/file.crt" # optional, only for auth via certificate
watchdog:
tolerance: 20 # maximum time between two live data packets (in seconds)
timeout: 5 # delay between losing live data and reconnect (in seconds)
maximum_timeout: 600 # maximum delay between losing live data and reconnect (in seconds)
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
aiohttp
gql[websockets]
paho-mqtt
pytibber
pyyaml
39 changes: 18 additions & 21 deletions tibber2mqtt/helpers.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
import os
from functools import reduce

def get_argument(config: dict, keys: list, varname: str, optional=False):
try:
return os.environ[varname]
except:
def get_optional_argument(config: dict, *keys: str, varname: str = None): # type: ignore
if varname is not None:
try:
for key in keys:
dict = dict[key]
return str(dict)
return os.environ[varname]
except:
if optional:
return None
print(f'Error: Missing config entry or environment variable {varname}.')
exit()
pass

sub_config = config
try:
for key in keys:
sub_config = sub_config[key]
return sub_config
except:
return None

def get_recursive_key(dict, *keys, optional=False):
final_key = keys[-1]
for key in keys[:-1]:
dict = dict.get(key, {})
value = dict.get(final_key, None)
if optional or value:
return value
def get_argument(config: dict, *keys: str, varname: str = None): # type: ignore
raw_value = get_optional_argument(config, *keys, varname=varname)
if raw_value is None:
print(f'Error: Missing config entry or environment variable for {".".join(keys)}.')
exit()
else:
print(f'Key {".".join(keys)} not found in configuration.')
exit()
return raw_value
18 changes: 9 additions & 9 deletions tibber2mqtt/mqtt.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import struct
import paho.mqtt.client as mqtt
from ssl import CERT_NONE
from helpers import get_recursive_key
from helpers import *
from logger import *

class Mqtt():
Expand All @@ -11,21 +11,21 @@ def __init__(self, name: str, config: dict):
self.__mqtt = mqtt.Client()
self.__mqtt.on_connect = self.__on_connect

ip, port = get_recursive_key(config, 'host').split(':')
ip, port = get_argument(config, 'host').split(':')

ca = get_recursive_key(config, 'ca', optional=True)
public_key = get_recursive_key(config, 'public_key', optional=True)
private_key = get_recursive_key(config, 'private_key', optional=True)
ca = get_optional_argument(config, 'ca')
public_key = get_optional_argument(config, 'public_key')
private_key = get_optional_argument(config, 'private_key')
if ca or public_key or private_key:
insecure = get_recursive_key(config, 'tls_insecure', optional=True) == True
insecure = get_optional_argument(config, 'tls_insecure') == True
self.__mqtt.tls_set(ca_certs=ca, certfile=public_key, keyfile=private_key, cert_reqs=CERT_NONE if insecure else None)

user = get_recursive_key(config, 'user', optional=True)
password = get_recursive_key(config, 'password', optional=True)
user = get_optional_argument(config, 'user')
password = get_optional_argument(config, 'password')
if user or password:
self.__mqtt.username_pw_set(user, password)

self.__topic = get_recursive_key(config, 'topic')
self.__topic = get_argument(config, 'topic')

self.__mqtt.connect(ip, int(port), 60)
self.__mqtt.loop_start()
Expand Down
10 changes: 5 additions & 5 deletions tibber2mqtt/tibber2mqtt.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import argparse, asyncio, os, yaml
from mqtt import Mqtt
from helpers import get_argument
from helpers import *
from tibberlive import Tibberlive
from watchdog import Watchdog
from logger import Logger, logger

__version__ = "0.1.0"
__version__ = "1.2.0"


async def main():
Expand All @@ -23,7 +23,7 @@ async def main():
exit()

try:
log_file = get_argument(config, ('log', 'path'), 'T2M_LOG_PATH', optional=True)
log_file = get_optional_argument(config, 'log', 'path', varname='T2M_LOG_PATH')
logger.add_file(log_file)
except Exception as e:
print(f'Failed to open log file {log_file}: {e}')
Expand All @@ -34,14 +34,14 @@ async def main():
mqtts.append(Mqtt(mqttname, mqttconfig))

tibber = None
watchdog = Watchdog()
watchdog = Watchdog(config)

while True:
if tibber is None:
tibber = Tibberlive(config.get('tibber', {}), mqtts)
watchdog.subscription_success(await tibber.start())
if not watchdog.check(tibber.last_data):
tibber.stop()
await tibber.stop()
tibber = None
await asyncio.sleep(2)

Expand Down
68 changes: 37 additions & 31 deletions tibber2mqtt/tibberlive.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import aiohttp, datetime, tibber, threading, asyncio
from enum import Enum
import aiohttp, datetime, asyncio

from gql import gql, Client
from gql.transport.websockets import WebsocketsTransport

from helpers import get_argument
from mqtt import Mqtt
Expand All @@ -8,25 +10,21 @@
class Tibberlive:
def __init__(self, config: dict, mqtts: list):

home = get_argument(config, ('home'), "T2M_TIBBER_HOME")
self.__token = get_argument(config, ('token'), "T2M_TIBBER_TOKEN")
home = get_argument(config, 'home', varname="T2M_TIBBER_HOME")
self.__token = get_argument(config, 'token', varname="T2M_TIBBER_TOKEN")

self.__available_request = {
'query': '{ viewer { home(id: "%s") { features { realTimeConsumptionEnabled } } } }' % home
}
self.__subscription_request = {
'query': 'subscription{ liveMeasurement( homeId:"%s" ) { timestamp power } }' % home
'query': '{ viewer { home(id: "%s") { features { realTimeConsumptionEnabled } } websocketSubscriptionUrl } }' % home
}
self.__subscription_request = gql('subscription{ liveMeasurement( homeId:"%s" ) { timestamp power } }' % home)

self.__home = None
self.__client = None
self.__task = None

self.__mqtts = mqtts

self.__last_timestamp = datetime.datetime.fromtimestamp(0)

def __del__(self):
self.stop()

async def start(self):
logger.log(f'Subscribing to tibber live data.')
async with aiohttp.ClientSession() as session:
Expand All @@ -36,37 +34,45 @@ async def start(self):
logger.log(f'Subscription to tibber live data failed: {e}')
return False
available = response_json['data']['viewer']['home']['features']['realTimeConsumptionEnabled']
subscription_url = response_json['data']['viewer']['websocketSubscriptionUrl']
if not available:
logger.log('No tibber live data available .')
return False

transport = WebsocketsTransport(
url=subscription_url,
init_payload={'token': self.__token},
headers={'User-Agent': 'HomeAssistant/2023.2'})

tibber_connection = tibber.Tibber(self.__token, websession=session, user_agent="HomeAssistant/2023.2")
await tibber_connection.update_info()
self.__client = Client(transport=transport)
await self.__client.connect_async(reconnecting=True)

self.__home = tibber_connection.get_homes()[0]
try:
await self.__home.rt_subscribe(self.__power_callback)
except Exception as e:
logger.log(f'Subscription to tibber live data failed: {e}')
return False
self.__task = asyncio.create_task(self.__run())
self.__last_timestamp = datetime.datetime.now()
return True

def stop(self):
if self.__home is not None:
try:
self.__home.rt_unsubscribe()
except:
pass
async def stop(self):
if self.__task is not None:
self.__task.cancel()
self.__task = None
if self.__client is not None:
await self.__client.close_async()
self.__client = None


@property
def last_data(self):
return self.__last_timestamp

def __power_callback(self, data):
self.__last_timestamp = datetime.datetime.now()
power = data['data']['liveMeasurement']['power']
for mqtt in self.__mqtts:
mqtt.send(round(power + 0.5))
async def __run(self):
try:
async for response in self.__client.session.subscribe(self.__subscription_request):
self.__last_timestamp = datetime.datetime.now()
power = response['liveMeasurement']['power']
for mqtt in self.__mqtts:
mqtt.send(round(power + 0.5))
except Exception as e:
logger.log(f'Receiving live data failed: {e}')

async def __post(self, session, query):
headers = {
Expand Down
29 changes: 21 additions & 8 deletions tibber2mqtt/watchdog.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,46 @@
import datetime

from logger import *
from helpers import get_argument

class Watchdog:
def __init__(self):
self.__tolerance = datetime.timedelta(seconds=10)
def __init__(self, config: dict):
tolerance = int(get_argument(config, 'watchdog', 'tolerance'))
self.__tolerance = datetime.timedelta(seconds=tolerance)

timeout = int(get_argument(config, 'watchdog', 'timeout'))
self.__timeout = datetime.timedelta(seconds=timeout)
self.__current_timeout = self.__timeout

maximum_timeout = int(get_argument(config, 'watchdog', 'maximum_timeout'))
self.__maximum_timeout = datetime.timedelta(seconds=maximum_timeout)

self.__lost_at = None
self.__timeout = 5

def check(self, last_timestamp):
now = datetime.datetime.now()
if last_timestamp + self.__tolerance < now:
if self.__lost_at is None:
logger.log('Lost tibber live data.')
self.__lost_at = now
if self.__lost_at + datetime.timedelta(seconds=self.__timeout) < now:
return True
seconds_until_reconnect = (self.__current_timeout - (now - self.__lost_at)).total_seconds()
if seconds_until_reconnect <= 0:
return False
else:
logger.log(f'{round(seconds_until_reconnect)}s until reconnect.')
else:
self.__reset_timeout()
self.__lost_at = None
return True

def subscription_success(self, success):
self.__lost_at = datetime.datetime.now()
self.__lost_at = None
if success:
self.__reset_timeout()
else:
self.__timeout = min (self.__timeout * 2, 3600)
logger.log(f'Reconnect attempt in {self.__timeout} seconds.')
self.__timeout = min (self.__timeout * 2, self.__maximum_timeout)
logger.log(f'Reconnect attempt in {round(self.__timeout.total_seconds())} seconds.')

def __reset_timeout(self):
self.__timeout = 5
self.__current_timeout = self.__timeout

0 comments on commit c36b38e

Please sign in to comment.