diff --git a/README.md b/README.md index 65b83ae..1edf09a 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ pocs sensor monitor weather --endpoint http://localhost:8080 The web service will serve the weather data as json. The data can be accessed by going to the `/weather` endpoint. For example, if the web service is running on `localhost` on port `8080` then the weather data can be accessed at -`http://localhost:8000/weather`. +`http://localhost:8080/weather`. The [httpie](https://httpie.io/) is installed with this package and can be used to read the weather data from the command line: @@ -84,7 +84,7 @@ http :8080/weather ### Starting The `aag-weather` command line tool can be used to read the weather data from -the CloudWatcher and store it in a file. The `aag-weather` command line tool +the CloudWatcher and store it in a csv file. The `aag-weather` command line tool can be run with: ```bash diff --git a/src/aag/weather.py b/src/aag/weather.py index c638263..1c002dd 100644 --- a/src/aag/weather.py +++ b/src/aag/weather.py @@ -11,7 +11,7 @@ from rich import print from aag.commands import WeatherCommand, WeatherResponseCodes -from aag.settings import WeatherSettings, WhichUnits +from aag.settings import WeatherSettings, WhichUnits, Thresholds class CloudSensor(object): @@ -40,12 +40,12 @@ def __init__(self, connect: bool = True, **kwargs): # Set up a queue for readings self.readings = deque(maxlen=self.config.num_readings) - self.name = 'CloudWatcher' - self.firmware = None - self.serial_number = None - self.has_anemometer = False + self.name: str = 'CloudWatcher' + self.firmware: str | None = None + self.serial_number: str | None = None + self.has_anemometer: bool = False - self._is_connected = False + self._is_connected: bool = False if connect: self._is_connected = self.connect() @@ -55,8 +55,30 @@ def is_connected(self) -> bool: """ Is the sensor connected?""" return self._is_connected - def connect(self) -> bool: - """ Connect to the sensor. """ + @property + def thresholds(self) -> Thresholds: + """Thresholds for the safety checks.""" + return self.config.thresholds + + @property + def status(self) -> dict: + """Returns the most recent reading and safety value.""" + return self.readings[-1] + + @property + def is_safe(self) -> bool: + """Is the sensor safe?""" + return self.status['is_safe'] + + def connect(self, raise_exceptions: bool = True) -> bool: + """ Connect to the sensor. + + Args: + raise_exceptions: Whether to raise exceptions, default True. + + Returns: + True if connected, False otherwise. + """ try: # Initialize and get static values. self.name = self.query(WeatherCommand.GET_INTERNAL_NAME) @@ -71,6 +93,9 @@ def connect(self) -> bool: self._is_connected = True except Exception as e: + print(f'[red]Unable to connect to weather sensor. Check the port. {e}') + if raise_exceptions: + raise e self._is_connected = False return self._is_connected @@ -92,17 +117,17 @@ def capture(self, callback: Callable | None = None, units: WhichUnits = 'none') except KeyboardInterrupt: pass - def get_reading(self, enqueue: bool = True, units: WhichUnits = 'none') -> dict: + def get_reading(self, units: WhichUnits = 'none') -> dict: """ Get a single reading of all values. Args: - enqueue: Whether to add the reading to the queue, default True. - units: The units to return the reading in, default 'none'. + units: The astropy units to return the reading in, default 'none', + can be 'metric' or 'imperial'. Returns: A dictionary of readings. """ - readings = { + reading = { 'timestamp': datetime.now().isoformat(), 'ambient_temp': self.get_ambient_temperature(), 'sky_temp': self.get_sky_temperature(), @@ -112,22 +137,72 @@ def get_reading(self, enqueue: bool = True, units: WhichUnits = 'none') -> dict: **{f'error_{i}': err for i, err in enumerate(self.get_errors())} } + # Add the safety values. + reading = self.get_safe_reading(reading) + + # Add astropy units if requested. if units != 'none': # First make them metric units. - readings['ambient_temp'] *= u.Celsius - readings['sky_temp'] *= u.Celsius - readings['wind_speed'] *= u.m / u.s - readings['pwm'] *= u.percent + reading['ambient_temp'] *= u.Celsius + reading['sky_temp'] *= u.Celsius + reading['wind_speed'] *= u.m / u.s + reading['pwm'] *= u.percent # Then convert if needed. if units == 'imperial': - readings['ambient_temp'] = readings['ambient_temp'].to(u.imperial.deg_F, equivalencies=u.temperature()) - readings['sky_temp'] = readings['sky_temp'].to(u.imperial.deg_F, equivalencies=u.temperature()) - readings['wind_speed'] = readings['wind_speed'].to(u.imperial.mile / u.hour) + reading['ambient_temp'] = reading['ambient_temp'].to(u.imperial.deg_F, equivalencies=u.temperature()) + reading['sky_temp'] = reading['sky_temp'].to(u.imperial.deg_F, equivalencies=u.temperature()) + reading['wind_speed'] = reading['wind_speed'].to(u.imperial.mile / u.hour) + + self.readings.append(reading) + + return reading - if enqueue: - self.readings.append(readings) + def get_safe_reading(self, reading: dict) -> dict: + """ Checks the reading against the thresholds. - return readings + Args: + reading: The reading to check. + + Returns: + The reading with the safety values added. + """ + reading['cloud_condition'] = 'unknown' + temp_diff = reading['sky_temp'] - reading['ambient_temp'] + if temp_diff >= self.thresholds.very_cloudy: + reading['cloud_condition'] = 'very cloudy' + elif temp_diff >= self.thresholds.cloudy: + reading['cloud_condition'] = 'cloudy' + elif temp_diff < self.thresholds.cloudy: + reading['cloud_condition'] = 'clear' + + reading['wind_condition'] = 'unknown' + if reading['wind_speed'] is not None: + if reading['wind_speed'] >= self.thresholds.very_gusty: + reading['wind_condition'] = 'very gusty' + elif reading['wind_speed'] >= self.thresholds.gusty: + reading['wind_condition'] = 'gusty' + elif reading['wind_speed'] >= self.thresholds.very_windy: + reading['wind_condition'] = 'very windy' + elif reading['wind_speed'] >= self.thresholds.windy: + reading['wind_condition'] = 'windy' + elif reading['wind_speed'] < self.thresholds.windy: + reading['wind_condition'] = 'calm' + + reading['rain_condition'] = 'unknown' + if reading['rain_frequency'] <= self.thresholds.rainy: + reading['rain_condition'] = 'rainy' + elif reading['rain_frequency'] <= self.thresholds.wet: + reading['rain_condition'] = 'wet' + elif reading['rain_frequency'] > self.thresholds.wet: + reading['rain_condition'] = 'dry' + + reading['cloud_safe'] = True if reading['cloud_condition'] == 'clear' else False + reading['wind_safe'] = True if reading['wind_condition'] == 'calm' else False + reading['rain_safe'] = True if reading['rain_condition'] == 'dry' else False + + reading['is_safe'] = True if reading['cloud_safe'] and reading['wind_safe'] and reading['rain_safe'] else False + + return reading def get_errors(self) -> list[int]: """Gets the number of internal errors diff --git a/tests/test_weather.py b/tests/test_weather.py index d42dd82..4e6779a 100644 --- a/tests/test_weather.py +++ b/tests/test_weather.py @@ -9,7 +9,7 @@ def test_create_sensor(): sensor = CloudSensor(connect=False) assert isinstance(sensor, CloudSensor) assert not sensor.is_connected - assert sensor.connect() is False + assert sensor.connect(raise_exceptions=False) is False def test_bad_port(): @@ -18,3 +18,85 @@ def test_bad_port(): # Should raise an exception with pytest.raises(Exception): CloudSensor(connect=False) + + +def test_connect_loop(): + with pytest.raises(Exception): + CloudSensor(connect=True, serial_port='loop://') + + sensor = CloudSensor(connect=False, serial_port='loop://') + is_connected = sensor.connect(raise_exceptions=False) + assert isinstance(sensor, CloudSensor) + assert is_connected is False + assert sensor.is_connected is False + + +def test_get_safe_reading(): + os.environ['AAG_SERIAL_PORT'] = 'loop://' + sensor = CloudSensor(connect=False) + assert isinstance(sensor, CloudSensor) + assert not sensor.is_connected + assert sensor.connect(raise_exceptions=False) is False + + # Make a fake reading entry. + reading = { + 'wind_speed': 10, + 'ambient_temp': 20, + 'sky_temp': 10, + 'timestamp': '2021-01-01T00:00:00', + 'rain_frequency': 2500, + 'pwm': 0, + } + + # Check safety. + reading = sensor.get_safe_reading(reading=reading) + assert reading['is_safe'] is False + assert reading['cloud_safe'] is False + assert reading['cloud_condition'] == 'very cloudy' + + # Make safe + reading['ambient_temp'] = 20 + reading['sky_temp'] = -20 + print(reading) + reading = sensor.get_safe_reading(reading=reading) + print(reading) + assert reading['is_safe'] is True + assert reading['cloud_safe'] is True + assert reading['cloud_condition'] == 'clear' + + # Make windy + reading['wind_speed'] = 51 + reading = sensor.get_safe_reading(reading=reading) + assert reading['is_safe'] is False + assert reading['wind_safe'] is False + assert reading['wind_condition'] == 'windy' + + reading['wind_speed'] = 76 + reading = sensor.get_safe_reading(reading=reading) + assert reading['wind_condition'] == 'very windy' + + reading['wind_speed'] = 101 + reading = sensor.get_safe_reading(reading=reading) + assert reading['wind_condition'] == 'gusty' + + reading['wind_speed'] = 126 + reading = sensor.get_safe_reading(reading=reading) + assert reading['wind_condition'] == 'very gusty' + + # Make rainy + reading['rain_frequency'] = 2000 + reading = sensor.get_safe_reading(reading=reading) + assert reading['is_safe'] is False + assert reading['rain_safe'] is False + assert reading['rain_condition'] == 'wet' + + reading['rain_frequency'] = 1700 + reading = sensor.get_safe_reading(reading=reading) + assert reading['is_safe'] is False + assert reading['rain_safe'] is False + assert reading['rain_condition'] == 'rainy' + + # Make dry + reading['rain_frequency'] = 2300 + reading = sensor.get_safe_reading(reading=reading) + assert reading['rain_condition'] == 'dry'