-
Notifications
You must be signed in to change notification settings - Fork 0
/
AutomationManager.py
105 lines (90 loc) · 3.97 KB
/
AutomationManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import configparser
from threading import Thread
import xmltodict
from lifxlan import LifxLAN
import textdistance
class AutomationManager:
def __init__(self, logger, config_file, display_man):
self.l = logger
self.config = configparser.ConfigParser()
self.config.read(config_file)
self.parse_config()
self.display_man = display_man
if self.enable_lifx:
self.lifx = LifxLAN()
self.lifx_devices = self.lifx.get_lights()
# Make sure every device in the XML file has a LIFX
# device that matches it
for device in self.device_setup["devices"]["device"]:
if device["type"] != "LIFX": continue
found_match = False
for lifx_dev in self.lifx_devices:
if device["MAC"] == lifx_dev.mac_addr: found_match = True
if not found_match:
dev_name = device["MAC"]
self.l.log(f"Couldn't find MAC match for {dev_name}",
"DEBUG")
def parse_config(self):
setup_filename = self.config["Automation"]["setup_filename"]
info_xml = open(setup_filename, "r").read()
self.device_setup = xmltodict.parse(info_xml)
self.enable_lifx = self.config["Automation"]["enable_lifx"]=="True"
self.imperfect_matching = \
self.config["Automation"]["allow_imperfect_matching"]=="True"
def handle_command(self, command):
async_thread = Thread(target = self.handle_command_async,
args=(command,))
async_thread.start()
def handle_command_async(self, command):
similarities = {}
for device in self.device_setup["devices"]["device"]:
# perfect match?
dev_name = device["name"]
if device["name"] == command["object"]:
self.find_handler(device, command)
return
else:
# Maybe some text algorithms geek can tell me a better
# text similarity algorithm to use here?
simi = textdistance.jaro_winkler(dev_name, command["object"])
similarities[dev_name] = simi
# We can only get here if we didn't find a perfect name match
if self.imperfect_matching:
closest_dev_name = max(similarities, key=similarities.get)
for device in self.device_setup["devices"]["device"]:
if device["name"] == closest_dev_name:
self.find_handler(device, command)
return
# Should never get here
assert False
else:
object = command["object"]
self.l.log(f"No suitable device found for object {object}",
"RUN")
def find_handler(self, device, command):
if device["type"] == "LIFX":
self.handle_lifx(device, command)
else:
self.l.log(f"No Handler found for {dev_name}",
"RUN")
return
def handle_lifx(self, device, command):
if not self.enable_lifx:
self.l.log(f"LIFX not enabled but LIFX device requested",
"DEBUG")
return
dev_name = device["name"]
for device in self.device_setup["devices"]["device"]:
if device["type"] != "LIFX": continue
for lifx_dev in self.lifx_devices:
if device["MAC"] == lifx_dev.mac_addr:
if command["state"] == "0":
lifx_dev.set_power("off")
elif command["state"] == "1":
lifx_dev.set_power("on")
else:
self.l.log(f"Invalid state for LIFX {dev_name}",
"DEBUG")
return
self.l.log(f"Should have found match for {dev_name}, but didn't!",
"RUN")