Skip to content

Commit

Permalink
- Finished on/off: #12 (function & test)
Browse files Browse the repository at this point in the history
- Finished hue discovery: #11
- Implemented (test missing): #9
- Implemented (test missing): #10
  • Loading branch information
En3rGy committed May 10, 2022
1 parent 30af9c4 commit 9756644
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 99 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Der Code des Bausteins befindet sich in der hslz Datei oder auf [github](https:/
- Abh. von der Aktion wird die jew. rid über die Device id herausgesucht
- Der Status wird über den Event Stream ausgelesen
- Nur ein Baustein verbindet sich mit der Hue Bridge, die übrigen nutzen diese Verbindung über [HS Instanzen](http://127.0.0.1/doc_extra/de/commloginst.html)

- Die eventstream Verbindung läuft in einem eigenen Thread in einer `While true`-Schleife

## Validation & Verification
- Unit tests are performed.
Expand Down
182 changes: 84 additions & 98 deletions src/tst_Hue_Group.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@
import threading


# import httplib
# import re
# import struct
# import hashlib


class hsl20_4:
LOGGING_NONE = 0

Expand Down Expand Up @@ -136,6 +130,9 @@ def __init__(self, homeserver_context):
#### Own written code can be placed after this commentblock . Do not change or delete commentblock! ####
###################################################################################################!!!##

eventstream_is_connected = False # type: bool
bridge_ip = str() # type: str

def log_msg(self, text):
self.DEBUG.add_message("14100: " + str(text))

Expand All @@ -162,13 +159,10 @@ def hex2int(self, msg):
return int(val)

def discover_hue(self):
"""
# type: () -> str

:rtype: urlparse
"""
# mDNS query request msg from application
MCAST_PORT = 5353
MCAST_GRP = '224.0.0.251'
mcast_port = 5353
mcast_grp = '224.0.0.251'

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 255)
Expand All @@ -191,7 +185,7 @@ def discover_hue(self):
query_msg = query_header + search

try:
sock.sendto(query_msg, (MCAST_GRP, MCAST_PORT))
sock.sendto(query_msg, (mcast_grp, mcast_port))
except socket.error as e:
self.log_data("Error", "discover: " + str(e))
sock.close()
Expand Down Expand Up @@ -222,15 +216,6 @@ def discover_hue(self):
add_rec_idx = ans_idx + 12 + ans_rd_length
add_records = data[add_rec_idx:]

# print('\nHeader: ')
# print(":".join("{:02x}".format(ord(c)) for c in header))
# print('\nQuery: ')
# print(":".join("{:02x}".format(ord(c)) for c in data[12:12+len(search)]))
# print('\nAnswer: ')
# print(":".join("{:02x}".format(ord(c)) for c in answers))
# print("\nAdd records:")
# print(":".join("{:02x}".format(ord(c)) for c in add_records))

# process additional records
ar_offset = 0
for i in range(ar_count):
Expand All @@ -239,7 +224,6 @@ def discover_hue(self):
# print(":".join("{:02x}".format(ord(c)) for c in ar_type))
ar_length = add_records[ar_offset + 10: ar_offset + 12]
ar_length = self.hex2int(ar_length)
# self.log_data("Addition record no. " + str(i), (":".join("{:02x}".format(ord(c)) for c in add_records[ar_offset:ar_offset + 12 + ar_length])))

if ar_type == "\x00\x01": # Type A, get IP & Port
ar_ip = add_records[ar_offset + 12:ar_offset + 12 + ar_length]
Expand All @@ -249,18 +233,18 @@ def discover_hue(self):
ip4 = self.hex2int(ar_ip[3])

ip = str(ip1) + "." + str(ip2) + "." + str(ip3) + "." + str(ip4)
self.log_data("Discovered IP", ip)
self.log_msg("Bridge IP (auto) is " + ip)
self.bridge_ip = ip
return ip

ar_offset = ar_offset + 12 + ar_length

return

except socket.timeout:
self.log_msg("Discovery timeout")
break

sock.close()
return str()

# get general web request
def get_data(self, api_cmd):
Expand All @@ -269,7 +253,6 @@ def get_data(self, api_cmd):
:rtype: {string, string}
"""
api_path = 'https://' + self.bridge_ip + '/clip/v2/resource/' + api_cmd
data = ""
url_parsed = urlparse.urlparse(api_path)
headers = {'Host': url_parsed.hostname, "hue-application-key": self._get_input_value(self.PIN_I_HUE_KEY)}

Expand All @@ -287,7 +270,8 @@ def get_data(self, api_cmd):
print data["data"]
else:
data = {'data': "debug", 'status': str(200)}
self.DEBUG.add_message("14100: Hue bridge response code: " + data["status"])

self.log_msg("In get_data, Hue bridge response code for '" + api_cmd + "' is " + data["status"])

except Exception as e:
self.log_data("Error", "getData: " + str(e))
Expand All @@ -296,22 +280,12 @@ def get_data(self, api_cmd):

return data

def http_put(self, payload, device_rid):
# type: (str, str) -> {str, int}

if device_rid not in self.devices:
return {'data': 'Device id not registered', 'status': 0}
def http_put(self, device_rid, api_path, payload):
# type: (str, str, str) -> {str, int}

device_type = self.devices[device_rid]["type"]

api_path = "https://" + self.bridge_ip
if device_type == "grouped_light":
api_path = api_path + '/clip/v2/resource/grouped_light/'
elif type == "light":
api_path = api_path + '/clip/v2/resource/light/'
api_path = "https://" + self.bridge_ip + '/clip/v2/resource/' + api_path + "/" + device_rid
# todo add szene, zone, etc.

api_path = api_path + device_rid
url_parsed = urlparse.urlparse(api_path)
headers = {"Host": url_parsed.hostname,
"Content-type": 'application/json',
Expand Down Expand Up @@ -364,7 +338,10 @@ def register_devices(self):

for data_set in data:
device_id = data_set["id"]
self.devices[device_id] = {}

if device_id not in self.devices:
self.devices[device_id] = {}

self.devices[device_id]["type"] = item_type
if item_type == "light" or item_type == "scene" or item_type == "room" or item_type == "zone":
self.devices[device_id]["name"] = data_set["metadata"]["name"]
Expand All @@ -390,17 +367,23 @@ def register_devices(self):
return True

def register_eventstream(self):
thread.start_new_thread(self.eventstream, ())
if not self.eventstream_is_connected:
thread.start_new_thread(self.eventstream, ())

def eventstream(self):
self.eventstream_is_connected = True
while True:
self.log_msg("Connecting to eventstream...")
api_path = 'https://' + self.bridge_ip + '/eventstream/clip/v2'
if not self.bridge_ip:
time.sleep(5)
continue

self.log_msg("Connecting to eventstream.")

api_path = 'https://' + str(self.bridge_ip) + '/eventstream/clip/v2'
url_parsed = urlparse.urlparse(api_path)

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = ssl.wrap_socket(sock, cert_reqs=ssl.CERT_NONE)
# sock.settimeout(5)

try:
sock.bind(('', 0))
Expand All @@ -410,31 +393,34 @@ def eventstream(self):
sock.send("hue-application-key: " + str(self._get_input_value(self.PIN_I_HUE_KEY)) + "\r\n")
sock.send("Accept: text/event-stream\r\n\r\n")

except socket.error as e:
self.log_data("Error", "discover: " + str(e))
except Exception as e:
self.log_data("Error", "In eventstream, " + str(e))
sock.close()

while True:
# todo check / handle "normal" disconnects
try:
data = sock.recv(1024)
data = data[data.find("data: ") + 6:]
data = str(data).replace("\r", '').replace("\n", '')
self.process_json(data)

if self.stop_eventstream:
if self.eventstream_stop:
sock.close()
self.log_msg("Stopping eventstream on request")
self.eventstream_is_connected = False
return

except socket.timeout:
self.log_msg("Eventstream timeout")
break

except Exception as e:
self.log_msg(str(e))
self.log_msg("In 'eventstream', " + str(e))
break

print("Exit thread")
# gently disconnect and wait for re-connection
sock.close()
return str()
self.log_msg("Disconnected from hue eventstream, waiting to reconnect.")
time.sleep(4)

self.eventstream_is_connected = False

def process_json(self, msg):
# type: (str) -> None
Expand Down Expand Up @@ -494,16 +480,25 @@ def get_rig(self, service):
def set_on(self, set_on):
# type: (bool) -> bool

rid = self.get_rig("light")
rid = str(self._get_input_value(self.PIN_I_ITM_IDX))
if not rid:
self.log_msg("In set_on, rid for type 'light' not found.")
return False

if rid not in self.devices:
self.log_msg("In set_on, rid not registered.")
return False

if "light" not in self.devices[rid]:
self.log_msg("In set_on, type 'light' not registered for " + rid + ".")
return False

if set_on:
payload = '{"on":{"on":true}}'
else:
payload = '{"on":{"on":false}}'

ret = self.http_put(payload, rid)
ret = self.http_put(self.devices[rid]["light"], "light", payload)
return ret["status"] == 200

def set_dimming(self, brightness):
Expand All @@ -519,7 +514,7 @@ def set_dimming(self, brightness):
return False

payload = '{"dimming":{"brightness":' + str(brightness) + '}}'
ret = self.http_put(payload, rid)
ret = self.http_put(rid, "light", payload)
return ret["status"] == 200

def set_scene(self, scene):
Expand Down Expand Up @@ -616,17 +611,21 @@ def do_dim(self):

def on_init(self):
self.DEBUG = self.FRAMEWORK.create_debug_section()
self.g_out_sbc = {} # type: {int: Any}
self.debug = False # type: bool
self.bridge_ip = self._get_input_value(self.PIN_I_SHUEIP) # type: str
self.stop_eventstream = False # type: bool
self.g_out_sbc = {} # type: {int, object}
self.debug = False # type: bool

if str(self._get_input_value(self.PIN_I_SHUEIP)):
self.bridge_ip = str(self._get_input_value(self.PIN_I_SHUEIP))
self.log_msg("Bridge IP (manual) is " + self.bridge_ip)

self.eventstream_stop = False # type: bool
self.devices = {} # type: {str: {str: str}}

self.discover_hue()
self.register_devices()
self.register_eventstream()

def on_input_value(self, index, value):
res = False

# Process State
itm_idx = str(self._get_input_value(self.PIN_I_ITM_IDX))
Expand Down Expand Up @@ -741,18 +740,20 @@ def setUp(self):
self.cred = json.load(f)

self.dummy = HueGroup_14100_14100(0)
self.dummy.on_init()

self.dummy.debug_input_value[self.dummy.PIN_I_SHUEIP] = self.cred["PIN_I_SHUEIP"]
# self.dummy.debug_input_value[self.dummy.PIN_I_SHUEIP] = self.cred["PIN_I_SHUEIP"]
self.dummy.debug_input_value[self.dummy.PIN_I_HUE_KEY] = self.cred["PIN_I_SUSER"]

self.dummy.debug_input_value[self.dummy.PIN_I_ITM_IDX] = self.cred["hue_device_id"]
self.dummy.devices[self.cred["hue_device_id"]] = {}
self.dummy.devices[self.cred["hue_device_id"]]["light"] = self.cred["hue_light_id"]
# self.dummy.devices[self.cred["hue_device_id"]] = {}
# self.dummy.devices[self.cred["hue_device_id"]]["light"] = self.cred["hue_light_id"]
self.dummy.debug_rid = self.cred["hue_light_id"]

self.dummy.bridge_ip = self.cred["PIN_I_SHUEIP"]
self.dummy.stop_eventstream = True
# self.dummy.bridge_ip = self.cred["PIN_I_SHUEIP"]
self.dummy.eventstream_stop = False

self.dummy.on_init()


def tearDown(selfself):
print("\n###tearDown")
Expand All @@ -779,20 +780,27 @@ def test_get_data(self):
print(data["data"])
self.assertTrue("id" in data["data"])

def test_set_on(self):
print("###test_set_on")
def test_12_set_on(self):
print("###test_12_set_on")
self.dummy.eventstream_stop = True
self.assertTrue(self.dummy.set_on(False))
time.sleep(3)
self.assertTrue(self.dummy.set_on(True))

self.dummy.stop_eventstream = False
self.dummy.register_eventstream()
time.sleep(3)
self.assertTrue(self.dummy.set_on(False))
self.dummy.on_input_value(self.dummy.PIN_I_BONOFF, False)
time.sleep(3)
self.dummy.stop_eventstream = True
self.assertFalse(self.dummy.debug_output_value[self.dummy.PIN_O_BSTATUSONOFF])

# def test_eventstream(self):
# self.dummy.stop_eventstream = False
# self.dummy.register_eventstream()

def test_print_devices(self):
print("###test_print_devices")
def test_08_print_devices(self):
print("###test_08_print_devices")
res = self.dummy.register_devices()
self.assertTrue(res)

Expand Down Expand Up @@ -841,29 +849,7 @@ def test_dimming(self):
# time.sleep(3)
# self.dummy.on_input_value(self.dummy.PIN_I_BONOFF, 0)
# self.assertEqual(0, self.dummy.debug_output_value[self.dummy.PIN_O_BSTATUSONOFF])
#
# def test_read_json(self):
# self.dummy.debug = True
# self.dummy.curr_bri = 204
#
# api_url = "192.168.0.10"
# api_port = "80"
# api_user = "debug"
# group = "1"
# light = 3
#
# retGroups = {
# "data": '{"1": {"name": "Wohnzimmer", "lights": ["3", "5"], "state": {"any_on": true, "all_on": true}, "action": {"on": true, "hue": 5226, "colormode": "hs", "effect": "none", "alert": "none", "xy": [0.4779, 0.3823], "bri": 204, "ct": 399, "sat": 121}, "recycle": false, "sensors": [], "type": "Room", "class": "Living room"}, "3": {"name": "Thilo", "lights": [], "state": {"any_on": false, "all_on": false}, "action": {"on": false, "alert": "none"}, "recycle": false, "sensors": [], "type": "Room", "class": "Kids bedroom"}, "2": {"name": "Bad OG", "lights": ["4"], "state": {"any_on": false, "all_on": false}, "action": {"on": false, "hue": 8402, "colormode": "xy", "effect": "none", "alert": "select", "xy": [0.4575, 0.4099], "bri": 254, "ct": 366, "sat": 140}, "recycle": false, "sensors": [], "type": "Room", "class": "Bathroom"}, "5": {"name": "Nora", "lights": ["7", "8"], "state": {"any_on": false, "all_on": false}, "action": {"on": false, "bri": 1, "alert": "select"}, "recycle": false, "sensors": [], "type": "Room", "class": "Kids bedroom"}, "4": {"name": "Flur DG", "lights": ["6"], "state": {"any_on": false, "all_on": false}, "action": {"on": false, "bri": 254, "alert": "select"}, "recycle": false, "sensors": [], "type": "Room", "class": "Hallway"}, "7": {"name": "TV", "lights": ["3"], "state": {"any_on": true, "all_on": true}, "action": {"on": true, "hue": 5226, "colormode": "hs", "effect": "none", "alert": "select", "xy": [0.4779, 0.3823], "bri": 204, "ct": 399, "sat": 121}, "recycle": false, "sensors": [], "type": "Zone", "class": "Downstairs"}, "6": {"name": "Garage", "lights": ["9"], "state": {"any_on": true, "all_on": true}, "action": {"on": true, "hue": 0, "colormode": "ct", "effect": "none", "alert": "select", "xy": [0.3805, 0.3769], "bri": 254, "ct": 370, "sat": 0}, "recycle": false, "sensors": [], "type": "Room", "class": "Carport"}}'}
# # retLights = dummy.getData(api_url, api_port, api_user, "lights")
# ret = self.dummy.read_json(retGroups["data"], light)
# res = '{"1": {"name": "Wohnzimmer", "lights": ["3", "5"], "state": {"any_on": true, "all_on": true}, "recycle": false, "action": {"on": true, "hue": 5226, "colormode": "hs", "effect": "none", "alert": "none", "xy": [0.4779, 0.3823], "bri": 204, "sat": 121, "ct": 399}, "sensors": [], "type": "Room", "class": "Living room"}, "3": {"name": "Thilo", "lights": [], "state": {"any_on": false, "all_on": false}, "recycle": false, "action": {"on": false, "alert": "none"}, "sensors": [], "type": "Room", "class": "Kids bedroom"}, "2": {"name": "Bad OG", "lights": ["4"], "state": {"any_on": false, "all_on": false}, "recycle": false, "action": {"on": false, "hue": 8402, "colormode": "xy", "effect": "none", "alert": "select", "xy": [0.4575, 0.4099], "bri": 254, "sat": 140, "ct": 366}, "sensors": [], "type": "Room", "class": "Bathroom"}, "5": {"name": "Nora", "lights": ["7", "8"], "state": {"any_on": false, "all_on": false}, "recycle": false, "action": {"on": false, "bri": 1, "alert": "select"}, "sensors": [], "type": "Room", "class": "Kids bedroom"}, "4": {"name": "Flur DG", "lights": ["6"], "state": {"any_on": false, "all_on": false}, "recycle": false, "action": {"on": false, "bri": 254, "alert": "select"}, "sensors": [], "type": "Room", "class": "Hallway"}, "7": {"name": "TV", "lights": ["3"], "state": {"any_on": true, "all_on": true}, "recycle": false, "action": {"on": true, "hue": 5226, "colormode": "hs", "effect": "none", "alert": "select", "xy": [0.4779, 0.3823], "bri": 204, "sat": 121, "ct": 399}, "sensors": [], "type": "Zone", "class": "Downstairs"}, "6": {"name": "Garage", "lights": ["9"], "state": {"any_on": true, "all_on": true}, "recycle": false, "action": {"on": true, "hue": 0, "colormode": "ct", "effect": "none", "alert": "select", "xy": [0.3805, 0.3769], "bri": 254, "sat": 0, "ct": 370}, "sensors": [], "type": "Room", "class": "Carport"}}'
# self.assertEqual(res, ret)
# # ret = dummy.readLightsJson(retLights["data"], light)
#
# # sScene = "mYJ1jB9LmBAG6yN"
# # sScene = "qhfcIHIJ9JuYK19"
# # nHueCol = 24381
#
##
# def test_dim(self):
# self.dummy.debug = True
# self.dummy.curr_bri = 255
Expand Down

0 comments on commit 9756644

Please sign in to comment.