-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
77 lines (54 loc) · 1.57 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import asyncio
import json
from serial.tools import list_ports
from websockets.server import serve, WebSocketServerProtocol
from config import config
from data_import import Data, DataImport
data = Data()
data_import: DataImport = None
graphs = []
def get_input_mode():
if data_import is None:
return None
return data_import.input_mode
def set_input_mode(mode):
global data_import
if get_input_mode() == mode:
return
if data_import is not None:
data_import.close()
if mode is not None:
data_import = DataImport(mode, data)
def get_status_code():
if data_import is None:
return 0
return 2 if data_import.connected() else 1
def create_message():
ports = sorted([port.device for port in list_ports.comports()])
return {
'ports': ports,
'statusCode': get_status_code(),
'graphData': data.get_graph_data(graphs),
}
def create_init_message():
return {
**create_message(),
'init': True,
'inputMode': get_input_mode(),
}
def read_message(msg):
global graphs
data: dict = json.loads(msg)
set_input_mode(data.get('inputMode', None))
graphs = data.get('graphs', [])
async def handler(websocket: WebSocketServerProtocol):
await websocket.send(json.dumps(create_init_message()))
async for message in websocket:
read_message(message)
await websocket.send(json.dumps(create_message()))
async def main():
print('Starting websocket server...')
async with serve(handler, 'localhost', 3001):
print('Websocket server running on port 3001\n')
await asyncio.Future() # run forever
asyncio.run(main())