-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgetdata.py
133 lines (103 loc) · 3.82 KB
/
getdata.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from random import gauss, choice
from typing import Tuple, List, Optional
from cfg import *
import datetime
from datetime import datetime as dt
import requests
import json
def gen_random_measurement(
mu: float, sigma: float, min_val: Optional[float],
max_val: Optional[float]) -> Tuple[float, bool]:
"""Generates a random measurement (value and flag, i.e. whether the
measurement is valid or not). The value is generated using a Gaussian
distribution.
Args:
mu, sigma: mean and variance of the Gaussian
distribution
Returns:
meas: tuple (value, validity) of the measurement
"""
flag = choice([True, False])
value = gauss(mu, sigma)
if min_val is not None and value < min_val:
value = min_val
if min_val is not None and value > max_val:
value = max_val
measurement = (value, flag)
return measurement
# def rand(mu, sigma, min=False, max=False):
# '''
# Creates a random value from a single variable
# using a Gaussian distribution.
# Used for testing.
# '''
# stat = choice([0, 1]) # whether the value will be valid
# val = gauss(mu, sigma)
# if val < min:
# val = min
# elif val > max:
# val = max
# return val, stat
def gen_all_random_measurements() -> Tuple[List[float], List[float]]:
temp1, val_temp1 = gen_random_measurement(20, 0.1, None, None)
temp2, val_temp2 = gen_random_measurement(21, 0.3, None, None)
pres, val_pres = gen_random_measurement(1000, 0.8, None, None)
hum, val_hum = gen_random_measurement(65, 3, None, None)
recep_time = dt.now().timestamp() # reception time
list_of_values = [temp1, temp2, pres, hum, recep_time]
list_of_flags = [val_temp1, val_temp2, val_pres, val_hum, True]
return list_of_values, list_of_flags
# def from_rand():
# # t1, stat_t1 = rand(mu=20, sigma=0.1, min=float(temp1.minVal), max=float(temp1.maxVal))
# t1, stat_t1 = rand(mu=20, sigma=0.1, min=0, max=10)
# t2, stat_t2 = rand(mu=21, sigma=0.3, min=0, max=10)
# p, stat_p = rand(mu=1000, sigma=0.8, min=0, max=10)
# u, stat_u = rand(mu=65, sigma=3, min=0, max=100)
# rec_time = datetime.datetime.now().timestamp()
# data = [t1, t2, p, u, rec_time]
# stat = [stat_t1, stat_t2, stat_p, stat_u, 0]
# return data, stat
def from_serial(serial):
"""
Receives data from a serial port.
Data must be sent in the following format:
temp1;temp2;pres;hum
Args:
serial (serial.Serial): Serial port which sends the data
Returns:
data (list): values of each mesure + reception time
stat (list): status of each measure
"""
while serial.inWaiting() == 0:
pass # waits for new data
rec_time = dt.now().timestamp()
list_of_values = str(serial.readline(), encoding='utf-8').split(';')
list_of_values.append(rec_time) # appends data reception time
list_of_flags = check_data(list_of_values)
return list_of_values, list_of_flags
def recv_from_thingspeak(channel, key):
'''
Receives data from a ThingSpeak™ channel.
'''
url = f'https://api.thingspeak.com/channels/{channel}/feeds.json?api_key={key}&results=2'
response = requests.get(url)
recv_string = response.json()['feeds'][1]
# fields: temp1, temp2, pres, hum
list_of_values = []
for i in range(1, 5):
list_of_values.append(float(recv_string[f'field{i}']))
rec_time = datetime.datetime.strptime(
recv_string['created_at'], '%Y-%m-%dT%H:%M:%SZ').timestamp()
list_of_values.append(rec_time)
list_of_flags = check_data(list_of_values)
return list_of_values, list_of_flags
def from_rss():
'''
Receives data from a RSS feed.
'''
pass
def check_data(data):
'''
Checks the quality of the measures.
'''
return [0, 0, 0, 0, 0]