-
Notifications
You must be signed in to change notification settings - Fork 0
/
environmental_model.py
86 lines (67 loc) · 2.27 KB
/
environmental_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import socketio
import file_controller as fc
from .image_processor import np_cvt_base64img
class EnvironmentalModel:
def __init__(self, data):
if type(data) is str:
data = fc.read_json(data)
self._data = data
elif type(data) is list:
self._data = data
else:
raise TypeError('data\'s type have to string or list.')
def __str__(self):
return self._data
def __repr__(self):
return self._data
def get(self, index):
return self._data[index]
class EnvironmentalModelSocket:
def __init__(self, host, port):
self._socket = socketio.Client()
self._socket.connect(f'http://{host}:{port}')
print('Environmental model socket is connecting.')
def __enter__(self):
return self
def __exit__(self, type, value, trace):
self.close()
def close(self):
print('Environmental model socket is closing.')
self._socket.disconnect()
def send(self, model):
self._socket.emit(
'receiveEnvironmentalModel', model)
_MODEL_SOCKET = EnvironmentalModelSocket('120.125.83.10', '8090')
def create_environmental_model(file_path, image, resolution, bboxes):
_to_str = lambda dic: dict(zip(
map(lambda k: k, dic.keys()),
map(lambda v: str(v), dic.values())
))
model = {
'image': np_cvt_base64img(image),
'height': image.shape[0],
'width': image.shape[1],
'resolution': resolution,
'obstacles' : []
}
for bbox in bboxes:
lt = _to_str(dict(bbox.coordinates.lt._asdict()))
rt = _to_str(dict(bbox.coordinates.rt._asdict()))
lb = _to_str(dict(bbox.coordinates.lb._asdict()))
rb = _to_str(dict(bbox.coordinates.rb._asdict()))
model['obstacles'].append({
'class': bbox.clsName,
'confidence': str(bbox.confidence),
'distance': str(bbox.distance),
'angle': str(bbox.angle),
'coordinate': {
'lt': lt,
'rt': rt,
'lb': lb,
'rb': rb,
}
})
fc.write_json(file_path, model)
_MODEL_SOCKET.send(model)
def disconnect_environmental_model_socket():
_MODEL_SOCKET.close()