-
Notifications
You must be signed in to change notification settings - Fork 0
/
sensors.py
379 lines (320 loc) · 13.5 KB
/
sensors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
"""
This module contains the abstract class for sensors,
and the implementations of different sensor classes inheriting this class.
"""
import sys
from abc import abstractmethod, ABC
from enum import Enum
from time import sleep
import serial
from modules.now_time import NowTime # pylint: disable=import-error
class SensorType(Enum):
"""
Enum class for the different types of sensors.
"""
PARSIVEL = "parsivel"
THIES = "thies"
class Sensor(ABC):
"""
Abstract class for outlining the commonly used functionality for different types of sensors.
Attributes:
- sensor_type: type of the sensor
Functions:
- init_serial_connection: initializes the serial connection with the sensor
- sensor_start_sequence: executes the startup sequence for a sensor
- reset_sensor: resets a sensor
- close_serial_connection: closes the serial connection
- write: sends a message to the sensor
- read: reads lines from the sensor
- get_type: returns the type of the sensor as a string
"""
def __init__(self, sensor_type: SensorType):
"""
Constructor for sensors.
:param sensor_type: type of the sensor (enum)
"""
self.sensor_type = sensor_type
@abstractmethod
def init_serial_connection(self, port: str, baud: int, logger):
"""
Abstract function for initializing the serial connection with the sensor.
:param port: the port where the sensor is connected to
:param baud: the baudrate of the sensor
:param logger: Logger for logging information and errors
"""
@abstractmethod
def sensor_start_sequence(self, config_dict, logger, include_in_log: bool):
"""
Abstract function for executing the startup sequence for a sensor.
This function performs the necessary initialization steps to configure a
sensor with specific settings provided in the `config_dict`.
It logs each step of the process using the provided `logger` object.
:param config_dict: Dictionary containing configuration parameters
for the sensor.
:param logger: Logger for logging information and errors
:param include_in_log: Whether the start sequence should be included in the log
"""
@abstractmethod
def reset_sensor(self, logger, factory_reset: bool):
"""
Abstract function for reseting a sensor.
:param logger: Logger for logging information
:param factory_reset: Whether the factory reset should be performed
"""
@abstractmethod
def close_serial_connection(self):
"""
Abstract function for closing the serial connection.
"""
@abstractmethod
def write(self, msg, logger):
"""
Abstract function for sending a message to the sensor.
:param msg: Message for the sensor
:param logger: Logger for errors
"""
@abstractmethod
def read(self, logger):
"""
Abstract function for reading lines from the sensor.
:param logger: Logger for errors
"""
@abstractmethod
def get_type(self) -> str:
"""
Abstract function for returning the type of the sensor as a string.
:return: Type of the sensor as a string
"""
class Parsivel(Sensor):
"""
Class inheriting Sensor and representing the Parsivel type sensor.
Attributes:
- serial_connection : the serial connection to the sensor
"""
def __init__(self, sensor_type=SensorType.PARSIVEL):
"""
Constructor for the parsivel type sensor
:param sensor_type: type of the sensor (enum)
"""
super().__init__(sensor_type)
self.serial_connection: serial.Serial = None
def init_serial_connection(self, port: str, baud: int, logger):
"""
Initializes the serial connection with the sensor
:param port: the port where the parsivel sensors is connected to
:param baud: the baudrate of the parsivel sensor
:param logger: Logger for logging information and errors
"""
try:
parsivel = serial.Serial(port, baud, timeout=1) # Defines the serial port
logger.info(msg=f'Connected to parsivel, via: {parsivel}')
except Exception as e: # pylint: disable=broad-except
logger.error(msg=e)
sys.exit()
self.serial_connection = parsivel
def sensor_start_sequence(self, config_dict, logger, include_in_log: bool):
"""
Executes the startup sequence for the Parsivel sensor.
This function performs the necessary initialization steps to configure the
Parsivel sensor with specific settings provided in the `config_dict`. It
logs each step of the process using the provided `logger` object.
:param config_dict: Dictionary containing configuration parameters
for the sensor.
:param logger: Logger for logging information and errors
:param include_in_log: Whether the start sequence should be included in the log
"""
if include_in_log:
logger.info(msg="Starting parsivel start sequence commands")
self.serial_connection.reset_input_buffer() # Flushes input buffer
# Sets the name of the Parsivel, maximum 10 characters
parsivel_set_station_code = ('CS/K/' + config_dict['station_code'] + '\r').encode('utf-8')
self.write(parsivel_set_station_code, logger)
sleep(1)
# Sets the ID of the Parsivel, maximum 4 numerical characters
parsivel_set_id = ('CS/J/'
+ config_dict['global_attrs']['sensor_name']
+ '\r').encode('utf-8')
self.write(parsivel_set_id, logger)
sleep(2)
parsivel_restart = 'CS/Z/1\r'.encode('utf-8')
self.write(parsivel_restart, logger) # resets rain amount
sleep(10)
# The Parsivel broadcasts the user defined telegram.
parsivel_user_telegram = 'CS/M/M/1\r'.encode('utf-8')
self.write(parsivel_user_telegram, logger)
self.serial_connection.reset_input_buffer()
self.serial_connection.reset_output_buffer()
def reset_sensor(self, logger, factory_reset: bool):
"""
Abstract function for reseting a sensor.
:param logger: Logger for logging information
:param factory_reset: Whether the factory reset should be performed
"""
logger.info(msg="Resetting Parsivel")
if factory_reset:
parsivel_reset_code = 'CS/F/1\r'.encode('utf-8')
self.write(parsivel_reset_code, logger)
else:
parsivel_restart = 'CS/Z/1\r'.encode('utf-8') # restart
self.write(parsivel_restart, logger)
sleep(5)
def close_serial_connection(self):
"""
Closes the serial connection.
"""
if self.serial_connection is not None:
self.serial_connection.close()
def write(self, msg, logger):
"""
If the serial connection is initialized ->
executes the write function for the
serial connection with the respective message,
else -> sends an error through the logger.
:param msg: Message for the sensor
:param logger: Logger for errors
:return: None
"""
if self.serial_connection is not None:
self.serial_connection.write(msg)
return None
logger.error(msg="serial_connection not initialized")
return None
def read(self, logger):
"""
If the serial connection is initialized ->
read and return a list lines from the sensor
else -> write an error through the logger
:param logger: logger for errors
:return: List of lines or None
"""
if self.serial_connection is not None:
self.write('CS/PA\r\n'.encode('ascii'), logger)
parsivel_lines = self.serial_connection.readlines()
return parsivel_lines
logger.error(msg="serial_connection not initialized")
return None
def get_type(self) -> str:
"""
Returns the type of the sensor as a string.
:return: Type of the sensor as a string
"""
return self.sensor_type.value
class Thies(Sensor):
"""
Class inheriting Sensor and representing the Thies type sensor.
Attributes:
- serial_connection : the serial connection to the sensor
- thies_id : id for the specific Thies sensor
"""
def __init__(self, sensor_type=SensorType.THIES, thies_id='00'):
"""
Constructor for the thies type serial_connection.
:param sensor_type: type of the serial_connection (enum)
"""
super().__init__(sensor_type)
self.serial_connection: serial.Serial = None
self.thies_id = thies_id
def init_serial_connection(self, port, baud, logger):
"""
Initializes the serial connection for the thies sensor.
:param port: the port where the thies is connected to
:param baud: the baudrate of the thies
:param logger: the logger object
"""
try:
thies = serial.Serial(port, baud, timeout=5) # Defines the serial port
logger.info(msg=f'Connected to parsivel, via: {thies}')
self.serial_connection = thies
except Exception as e: # pylint: disable=broad-except
logger.error(msg=e)
sys.exit()
def sensor_start_sequence(self, config_dict, logger, include_in_log: bool):
"""
Send the serial commands to the thies that changes the necessary parameters.
:param config_dict: the configuration dictionary
:param logger: the logger object
:param include_in_log: whether the start sequence should be included in the log
"""
self.serial_connection.reset_input_buffer()
self.serial_connection.reset_output_buffer()
if include_in_log:
logger.info(msg="Starting thies start sequence commands")
thies_config_mode_enable = ('\r' + self.thies_id + 'KY00001\r').encode('utf-8')
self.write(thies_config_mode_enable, logger) # place in config mode
sleep(1)
thies_automatic_mode_on = ('\r' + self.thies_id + 'TM00000\r').encode('utf-8')
self.write(thies_automatic_mode_on, logger) # turn of automatic mode
sleep(1)
thies_set_hours = ('\r' + self.thies_id + 'ZH000' + NowTime().time_list[0] + '\r').encode('utf-8')
self.write(thies_set_hours, logger) # set hour
sleep(1)
thies_set_minutes = ('\r' + self.thies_id + 'ZM000' + NowTime().time_list[1] + '\r').encode('utf-8')
self.write(thies_set_minutes, logger) # set minutes
sleep(1)
thies_set_seconds = ('\r' + self.thies_id + 'ZS000' + NowTime().time_list[2] + '\r').encode('utf-8')
self.write(thies_set_seconds, logger) # set seconds
sleep(1)
thies_config_mode_disable = ('\r' + self.thies_id + 'KY00000\r').encode('utf-8')
self.write(thies_config_mode_disable, logger) # place out of config mode
sleep(1)
self.serial_connection.reset_input_buffer()
self.serial_connection.reset_output_buffer()
def reset_sensor(self, logger, factory_reset: bool):
"""
Resets the thies sensor.
:param logger: the logger object
:param factory_reset: whether the factory reset should be performed
"""
logger.info(msg="Resetting Thies")
# place in config mode
self.write(f'\r{self.thies_id}KY00001\r'.encode('utf-8'), logger)
sleep(1)
# Restart the sensor
self.write(f'\r{self.thies_id}RS00001\r'.encode('utf-8'), logger)
sleep(60)
# Reset error counters
self.write(f'\r{self.thies_id}RF00001\r'.encode('utf-8'), logger)
sleep(1)
# Reset precipitation quantity and duration of quantity measurement
self.write(f'\r{self.thies_id}RA00001\r'.encode('utf-8'), logger)
sleep(1)
# Place out of config mode
self.write(f'\r{self.thies_id}KY00000\r'.encode('utf-8'), logger)
sleep(1)
logger.info(msg="Thies reset complete")
def close_serial_connection(self):
"""
Closes the serial connection.
"""
if self.serial_connection is not None:
self.serial_connection.close()
def write(self, msg, logger):
"""
Writes the message to the serial connection.
:param msg: The message to send over the serial connection
:param logger: the logger object
"""
if self.serial_connection is None:
logger.error(msg="serial_connection not initialized")
else:
self.serial_connection.write(msg)
def read(self, logger):
"""
Reads the data sent by the thies sensor.
:param logger: the logger object
:return: the read line from the thies sensor
"""
if self.serial_connection is None:
logger.error(msg="serial_connection not initialized")
return None
sleep(2) # Give sensor some time to create the telegram
self.serial_connection.write(f'\r{self.thies_id}TR00005\r'.encode('utf-8'))
output = self.serial_connection.readline()
decoded = str(output[0:len(output) - 2].decode("utf-8"))
return decoded
def get_type(self) -> str:
"""
Returns the type of the sensor as a string.
:return: Type of the sensor as a string
"""
return self.sensor_type.value