-
Notifications
You must be signed in to change notification settings - Fork 2
/
server.py
73 lines (56 loc) · 2.17 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import json
from asyncio import Event
from aiohttp import web
import errors
from lock import Lock
routes = web.RouteTableDef()
class Server:
def __init__(self, port: int):
self.port: int = port
async def start(self):
app = web.Application()
app.add_routes(routes)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, port=self.port)
await site.start()
await Event().wait()
@routes.post('/info')
async def info(request):
try:
data = await request.json()
sn = data['sn']
timeout = int(data.get('timeout', 10))
except (json.decoder.JSONDecodeError, AttributeError, KeyError):
return web.json_response({'error': 'Invalid input'}, status=400)
try:
lock = await Lock.create(sn, bytearray(), timeout)
level = await lock.get_battery()
except errors.DeviceNotFoundError:
return web.json_response({'error': 'Device not found'}, status=400)
return web.json_response({'battery': level})
@routes.post('/do')
async def do(request):
try:
data = await request.json()
action = data['action']
sn = data['sn']
sign_key = data['sign_key']
timeout = int(data.get('timeout', 10))
except (json.decoder.JSONDecodeError, AttributeError, KeyError):
return web.json_response({'error': 'Invalid input'}, status=400)
try:
lock = await Lock.create(sn, bytearray.fromhex(sign_key), timeout)
except ValueError:
return web.json_response({'error': 'Sign key is not a hexadecimal string'}, status=400)
except errors.DeviceNotFoundError:
return web.json_response({'error': 'Device not found'}, status=400)
if action == 'lock':
await lock.lock()
elif action == 'unlock':
await lock.unlock()
elif action == 'temp_unlock':
await lock.temp_unlock()
else:
return web.json_response({'error': 'Unknown action to do'}, status=400)
return web.json_response({'success': True})