-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDevcom.py
122 lines (94 loc) · 3.69 KB
/
Devcom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import random
import datetime
import uuid
# OBJECTIVES TODO:
# 1) Read the code and understand it.
# 2) Complete the SyncService implementation.
# Note that the SyncService.onMessage and SyncService.__init__ function signature must not be altered.
_DATA_KEYS = ["a","b","c"]
class Device:
def __init__(self, id):
self._id = id
self.records = []
self.sent = []
def obtainData(self) -> dict:
"""Returns a single new datapoint from the device.
Identified by type `record`. `timestamp` records when the record
was sent and `dev_id` is the device id.
`data` is the data collected by the device."""
if random.random() < 0.4:
# Sometimes there's no new data
return {}
rec = {
'type': 'record',
'timestamp': datetime.datetime.now().isoformat(),
'dev_id': self._id,
'data': {
kee: str(uuid.uuid4())
for kee in _DATA_KEYS
}
}
self.sent.append(rec)
return rec
def probe(self) -> dict:
"""Returns a probe request to be sent to the SyncService.
Identified by type `probe`. `from` is the index number from which
the device is asking for the data."""
if random.random() < 0.5:
# Sometimes the device forgets to probe the SyncService
return {}
return {'type': 'probe', 'dev_id': self._id, 'from': len(self.records)}
def onMessage(self, data: dict):
"""Receives updates from the server"""
if random.random() < 0.6:
# Sometimes devices make mistakes. Let's hope the SyncService
# handles such failures.
return
if data['type'] == 'update':
_from = data['from']
if _from > len(self.records):
return
self.records = self.records[:_from] + data['data']
class SyncService:
def __init__(self):
self.records = []
def onMessage(self, data: dict):
"""Handle messages received from devices.
Return the desired information in the correct format (type
`update`, see Device.onMessage and testSyncing to understand format
intricacies) in response to a `probe`.
No return value required on handling a `record`."""
if data['type'] == 'record':
self.records.append(data)
elif data['type'] == 'probe':
from_index = data['from']
update_data = self.records[from_index:]
return {'type': 'update', 'from': from_index, 'data': update_data}
else:
raise NotImplementedError()
def testSyncing(self):
devices = [Device(f"dev_{i}") for i in range(10)]
syn = SyncService()
_N = int(1e6)
for i in range(_N):
for _dev in devices:
syn.onMessage(_dev.obtainData())
_dev.onMessage(syn.onMessage(_dev.probe()))
done = False
while not done:
for _dev in devices:
_dev.onMessage(syn.onMessage(_dev.probe()))
num_recs = len(devices[0].records)
done = all([len(_dev.records) == num_recs for _dev in devices])
ver_start = [0] * len(devices)
for i, rec in enumerate(devices[0].records):
_dev_idx = int(rec['dev_id'].split("_")[-1])
assertEquivalent(rec, devices[_dev_idx].sent[ver_start[_dev_idx]])
for _dev in devices[1:]:
assertEquivalent(rec, _dev.records[i])
ver_start[_dev_idx] += 1
def assertEquivalent(d1: dict, d2: dict):
assert d1['dev_id'] == d2['dev_id']
assert d1['timestamp'] == d2['timestamp']
for kee in _DATA_KEYS:
assert d1['data'][kee] == d2['data'][kee]