-
Notifications
You must be signed in to change notification settings - Fork 11
/
socket.py
134 lines (111 loc) · 5.36 KB
/
socket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import asyncio
import json
import logging
import ssl
from qolsys.actions import QolsysAction
from qolsys.actions import QolsysActionInfo
from qolsys.events import QolsysEvent
from qolsys.exceptions import UnknownQolsysEventException
from qolsys.exceptions import UnknownQolsysSensorException
from qolsys.utils import LoggerCallback
LOGGER = logging.getLogger(__name__)
class QolsysSocket(object):
def __init__(self, hostname: str, port: int = None, token: str = None,
logger=None, callback: callable = None,
connected_callback: callable = None,
disconnected_callback: callable = None,
keep_alive: int = None) -> None:
self._hostname = hostname
self._port = port or 12345
self._token = token or ''
self._logger = logger or LOGGER
self._callback = callback or LoggerCallback()
self._connected_callback = connected_callback or LoggerCallback('Connected callback')
self._disconnected_callback = disconnected_callback or LoggerCallback('Disconnected callback')
self._keep_alive = keep_alive or 60 * 4 # 4mn, since the panel generally timeouts at 5mn
self._writer = None
def create_tasks(self, event_loop):
return {
'listen': event_loop.create_task(self.listen()),
'keep_alive': event_loop.create_task(self.keep_alive()),
}
async def send(self, action: QolsysAction):
if self._writer is None:
raise Exception('No writer')
self._logger.debug(f'Sending: {action.with_token(self._token)}')
self._writer.write(action.with_token(self._token).encode())
await self._writer.drain()
async def keep_alive(self):
while 'we need to keep the connection alive':
if self._writer is not None:
self._logger.debug('Sending keep-alive')
self._writer.write('\n'.encode())
await self._writer.drain()
await asyncio.sleep(self._keep_alive)
async def listen(self):
# Replace with https://docs.python.org/3/library/ssl.html#ssl.PROTOCOL_TLS_CLIENT ?
context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
server = (self._hostname, self._port)
self._listen = True
delay_reconnect = 0
while self._listen:
writer = None
try:
self._logger.info('Establishing connection to '
f'{server[0]}:{server[1]}')
reader, writer = await asyncio.open_connection(
*server, ssl=context, server_hostname='')
self._writer = writer
await self.send(QolsysActionInfo())
await self._connected_callback()
delay_reconnect = 0
while 'there is content to read':
line = await reader.readline()
if not line:
self._logger.info('Connection closed by the panel, exiting to reset the connection')
break
line = line.decode().rstrip('\n')
self._logger.debug(f"Data received (len: {len(line)}): {line}")
if line == 'ACK':
# This is an ACK to a command we sent, we can ignore
self._logger.debug('ACK - ignoring.')
continue
try:
# We try to parse the event to one of our event classes
event = QolsysEvent.from_json(line)
except json.decoder.JSONDecodeError:
self._logger.debug(f'Data is not JSON: {line}')
continue
except UnknownQolsysEventException:
self._logger.debug(f'Unknown Qolsys event: {line}')
continue
except UnknownQolsysSensorException:
self._logger.debug(f'Unknown sensor in Qolsys event: {line}')
continue
try:
await self._callback(event)
except: # noqa: E722
self._logger.exception(f'Error calling callback for event: {line}')
except asyncio.exceptions.CancelledError:
self._listen = False
self._logger.info('listening cancelled')
except: # noqa: E722
delay_reconnect = min(delay_reconnect * 2 or 1, 60)
self._logger.exception('error while listening')
finally:
await self._disconnected_callback()
self._writer = None
if writer:
writer.close()
try:
await writer.wait_closed()
except: # noqa: E722
self._logger.exception(
'unable to wait for writer to '
'be fully closed; this might not be an issue if '
'the connection was closed on the other side')
if self._listen and delay_reconnect:
self._logger.info(f'sleeping {delay_reconnect} second(s) before reconnecting')
await asyncio.sleep(delay_reconnect)