diff --git a/pyroengine/engine.py b/pyroengine/engine.py index 0010a624..7e71f3bb 100644 --- a/pyroengine/engine.py +++ b/pyroengine/engine.py @@ -6,15 +6,16 @@ import glob import io import json +import logging import os import shutil -import signal import time from collections import deque -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, Optional, Tuple +import cv2 # type: ignore[import-untyped] import numpy as np from PIL import Image from pyroclient import client @@ -23,28 +24,44 @@ from pyroengine.utils import box_iou, nms -from .logger_config import logger -from .sensors import ReolinkCamera from .vision import Classifier __all__ = ["Engine"] +logging.basicConfig(format="%(asctime)s | %(levelname)s: %(message)s", level=logging.INFO, force=True) -def handler(signum, frame): - raise TimeoutError("Heartbeat check timed out") +def is_day_time(cache, frame, strategy, delta=0): + """This function allows to know if it is daytime or not. We have two strategies. + The first one is to take the current time and compare it to the sunset time. + The second is to see if we have a color image. The ir cameras switch to ir mode at night and + therefore produce black and white images. This function can use one or more strategies depending on the use case. -def heartbeat_with_timeout(api_instance, cam_id, timeout=1): - signal.signal(signal.SIGALRM, handler) - signal.alarm(timeout) - try: - api_instance.heartbeat(cam_id) - except TimeoutError: - logger.warning(f"Heartbeat check timed out for {cam_id}") - except ConnectionError: - logger.warning(f"Unable to reach the pyro-api with {cam_id}") - finally: - signal.alarm(0) + Args: + cache (Path): cache folder where sunset_sunrise.txt is located + frame (PIL image): frame to analyze with ir strategy + strategy (str): Strategy to define day time [None, time, ir or both] + delta (int): delta before and after sunset / sunrise in sec + + Returns: + bool: is day time + """ + is_day = True + if strategy in ["both", "time"]: + with open(cache.joinpath("sunset_sunrise.txt")) as f: + lines = f.readlines() + sunrise = datetime.strptime(lines[0][:-1], "%H:%M") + sunset = datetime.strptime(lines[1][:-1], "%H:%M") + now = datetime.strptime(datetime.now().isoformat().split("T")[1][:5], "%H:%M") + if (now - sunrise).total_seconds() < -delta or (sunset - now).total_seconds() < -delta: + is_day = False + + if strategy in ["both", "ir"]: + frame = np.array(frame) + if np.max(frame[:, :, 0] - frame[:, :, 1]) == 0: + is_day = False + + return is_day class Engine: @@ -64,7 +81,6 @@ class Engine: frame_saving_period: Send one frame over N to the api for our dataset cache_size: maximum number of alerts to save in cache day_time_strategy: strategy to define if it's daytime - save_captured_frames: save all captured frames for debugging kwargs: keyword args of Classifier Examples: @@ -73,16 +89,17 @@ class Engine: >>> "cam_id_1": {'login':'log1', 'password':'pwd1'}, >>> "cam_id_2": {'login':'log2', 'password':'pwd2'}, >>> } - >>> pyroEngine = Engine(None, 0.25, 'https://api.pyronear.org', cam_creds, 48.88, 2.38) + >>> pyroEngine = Engine("data/model.onnx", 0.25, 'https://api.pyronear.org', cam_creds, 48.88, 2.38) """ def __init__( self, - model_path: Optional[str] = None, - conf_thresh: float = 0.15, - api_host: Optional[str] = None, + model_path: Optional[str] = "data/model.onnx", + conf_thresh: float = 0.25, + api_url: Optional[str] = None, cam_creds: Optional[Dict[str, Dict[str, str]]] = None, - external_sources: Optional[bool] = False, + latitude: Optional[float] = None, + longitude: Optional[float] = None, nb_consecutive_frames: int = 4, frame_size: Optional[Tuple[int, int]] = None, cache_backup_period: int = 60, @@ -92,25 +109,24 @@ def __init__( backup_size: int = 30, jpeg_quality: int = 80, day_time_strategy: Optional[str] = None, - save_captured_frames: Optional[bool] = False, **kwargs: Any, ) -> None: """Init engine""" # Engine Setup - self.model = Classifier(model_path=model_path, conf=0.05) - self.conf_thresh = conf_thresh - self.external_sources = external_sources + self.model = Classifier(model_path) + self.conf_thresh = conf_thresh # API Setup - if isinstance(api_host, str): - assert isinstance(cam_creds, dict) - + if isinstance(api_url, str): + assert isinstance(latitude, float) and isinstance(longitude, float) and isinstance(cam_creds, dict) + self.latitude = latitude + self.longitude = longitude self.api_client = {} - if isinstance(api_host, str) and isinstance(cam_creds, dict): + if isinstance(api_url, str) and isinstance(cam_creds, dict): # Instantiate clients for each camera - for _id, camera_token in cam_creds.items(): - self.api_client[_id] = client.Client(camera_token, api_host) + for _id, vals in cam_creds.items(): + self.api_client[_id] = client.Client(api_url, vals["login"], vals["password"]) # Cache & relaxation self.frame_saving_period = frame_saving_period @@ -119,28 +135,28 @@ def __init__( self.jpeg_quality = jpeg_quality self.cache_backup_period = cache_backup_period self.day_time_strategy = day_time_strategy - self.save_captured_frames = save_captured_frames # Local backup self._backup_size = backup_size # Var initialization self._states: Dict[str, Dict[str, Any]] = { - "-1": {"last_predictions": deque([], self.nb_consecutive_frames), "ongoing": False}, + "-1": {"last_predictions": deque([], self.nb_consecutive_frames), "frame_count": 0, "ongoing": False}, } if isinstance(cam_creds, dict): for cam_id in cam_creds: self._states[cam_id] = { "last_predictions": deque([], self.nb_consecutive_frames), + "frame_count": 0, "ongoing": False, } - self.occlusion_masks: Dict[str, Optional[np.ndarray]] = {"-1": None} + self.occlusion_masks = {"-1": None} if isinstance(cam_creds, dict): for cam_id in cam_creds: mask_file = cache_folder + "/occlusion_masks/" + cam_id + ".jpg" if os.path.isfile(mask_file): - self.occlusion_masks[cam_id] = np.array(Image.open(mask_file).convert(("L"))) + self.occlusion_masks[cam_id] = cv2.imread(mask_file, 0) else: self.occlusion_masks[cam_id] = None @@ -149,7 +165,7 @@ def __init__( self._cache = Path(cache_folder) # with Docker, the path has to be a bind volume assert self._cache.is_dir() self._load_cache() - self.last_cache_dump = datetime.now(timezone.utc) + self.last_cache_dump = datetime.utcnow() def clear_cache(self) -> None: """Clear local cache""" @@ -177,9 +193,8 @@ def _dump_cache(self) -> None: { "frame_path": str(self._cache.joinpath(f"pending_frame{idx}.jpg")), "cam_id": info["cam_id"], - "pose_id": info["pose_id"], "ts": info["ts"], - "bboxes": info["bboxes"], + "localization": info["localization"], } ) @@ -198,15 +213,7 @@ def _load_cache(self) -> None: for entry in data: # Open image frame = Image.open(entry["frame_path"], mode="r") - self._alerts.append( - { - "frame": frame, - "cam_id": entry["cam_id"], - "pose_id": entry["pose_id"], - "bboxes": entry["bboxes"], - "ts": entry["ts"], - } - ) + self._alerts.append({"frame": frame, "cam_id": entry["cam_id"], "ts": entry["ts"]}) def heartbeat(self, cam_id: str) -> Response: """Updates last ping of device""" @@ -232,31 +239,24 @@ def _update_states(self, frame: Image.Image, preds: np.ndarray, cam_key: str) -> # Get the best ones if boxes.shape[0]: best_boxes = nms(boxes) - # We keep only detections with at least two boxes above conf_th - detections = boxes[boxes[:, -1] > self.conf_thresh, :] - ious_detections = box_iou(best_boxes[:, :4], detections[:, :4]) - strong_detection = np.sum(ious_detections > 0, 0) > 1 - best_boxes = best_boxes[strong_detection, :] - if best_boxes.shape[0]: - ious = box_iou(best_boxes[:, :4], boxes[:, :4]) - - best_boxes_scores = np.array([sum(boxes[iou > 0, 4]) for iou in ious.T]) - combine_predictions = best_boxes[best_boxes_scores > conf_th, :] - conf = np.max(best_boxes_scores) / (self.nb_consecutive_frames + 1) # memory + preds - if len(combine_predictions): - - # send only preds boxes that match combine_predictions - ious = box_iou(combine_predictions[:, :4], preds[:, :4]) - iou_match = [np.max(iou) > 0 for iou in ious] - output_predictions = preds[iou_match, :] + ious = box_iou(best_boxes[:, :4], boxes[:, :4]) + best_boxes_scores = np.array([sum(boxes[iou > 0, 4]) for iou in ious.T]) + combine_predictions = best_boxes[best_boxes_scores > conf_th, :] + conf = np.max(best_boxes_scores) / (self.nb_consecutive_frames + 1) # memory + preds + + if len(combine_predictions): + + # send only preds boxes that match combine_predictions + ious = box_iou(combine_predictions[:, :4], preds[:, :4]) + iou_match = [np.max(iou) > 0 for iou in ious] + output_predictions = preds[iou_match, :] # Limit bbox size for api output_predictions = np.round(output_predictions, 3) # max 3 digit output_predictions = output_predictions[:5, :] # max 5 bbox - output_predictions_tuples = [tuple(row) for row in output_predictions] self._states[cam_key]["last_predictions"].append( - (frame, preds, output_predictions_tuples, datetime.now(timezone.utc).isoformat(), False) + (frame, preds, output_predictions.tolist(), datetime.utcnow().isoformat(), False) ) # update state @@ -267,135 +267,164 @@ def _update_states(self, frame: Image.Image, preds: np.ndarray, cam_key: str) -> return conf - def predict(self, frame: Image.Image, cam_id: Optional[str] = None, pose_id: Optional[int] = None) -> float: + def predict(self, frame: Image.Image, cam_id: Optional[str] = None) -> float: """Computes the confidence that the image contains wildfire cues Args: frame: a PIL image cam_id: the name of the camera that sent this image - pose_id (int) : position of the camera, for ptz camera Returns: the predicted confidence """ # Heartbeat - if len(self.api_client) > 0 and isinstance(cam_id, str) and self.external_sources: - heartbeat_with_timeout(self, cam_id, timeout=1) + if len(self.api_client) > 0 and isinstance(cam_id, str): + try: + self.heartbeat(cam_id) + except ConnectionError: + logging.warning(f"Unable to reach the pyro-api with {cam_id}") cam_key = cam_id or "-1" # Reduce image size to save bandwidth if isinstance(self.frame_size, tuple): - frame = frame.resize(self.frame_size[::-1], getattr(Image, "BILINEAR")) - - # Inference with ONNX - preds = self.model(frame.convert("RGB"), self.occlusion_masks[cam_key]) - - conf = self._update_states(frame, preds, cam_key) - - if self.save_captured_frames: - self._local_backup(frame, cam_id, pose_id, is_alert=False) + frame_resize = frame.resize(self.frame_size[::-1], getattr(Image, "BILINEAR")) + else: + frame_resize = frame + + if is_day_time(self._cache, frame, self.day_time_strategy): + # Inference with ONNX + preds = self.model(frame.convert("RGB"), self.occlusion_masks[cam_key]) + conf = self._update_states(frame_resize, preds, cam_key) + + # Log analysis result + device_str = f"Camera '{cam_id}' - " if isinstance(cam_id, str) else "" + pred_str = "Wildfire detected" if conf > self.conf_thresh else "No wildfire" + logging.info(f"{device_str}{pred_str} (confidence: {conf:.2%})") + + # Alert + if conf > self.conf_thresh and len(self.api_client) > 0 and isinstance(cam_id, str): + # Save the alert in cache to avoid connection issues + for idx, (frame, preds, localization, ts, is_staged) in enumerate( + self._states[cam_key]["last_predictions"] + ): + if not is_staged: + self._stage_alert(frame, cam_id, ts, localization) + self._states[cam_key]["last_predictions"][idx] = frame, preds, localization, ts, True - # Log analysis result - device_str = f"Camera '{cam_id}' - " if isinstance(cam_id, str) else "" - pred_str = "Wildfire detected" if conf > self.conf_thresh else "No wildfire" - logger.info(f"{device_str}{pred_str} (confidence: {conf:.2%})") + else: + conf = 0 # return default value - # Alert - if conf > self.conf_thresh and len(self.api_client) > 0 and isinstance(cam_id, str): - # Save the alert in cache to avoid connection issues - for idx, (frame, preds, bboxes, ts, is_staged) in enumerate(self._states[cam_key]["last_predictions"]): - if not is_staged: - self._stage_alert(frame, cam_id, pose_id, ts, bboxes) - self._states[cam_key]["last_predictions"][idx] = frame, preds, bboxes, ts, True + # Uploading pending alerts + if len(self._alerts) > 0: + self._process_alerts() # Check if it's time to backup pending alerts - ts = datetime.now(timezone.utc) + ts = datetime.utcnow() if ts > self.last_cache_dump + timedelta(minutes=self.cache_backup_period): self._dump_cache() self.last_cache_dump = ts + # save frame + if len(self.api_client) > 0 and isinstance(self.frame_saving_period, int) and isinstance(cam_id, str): + self._states[cam_key]["frame_count"] += 1 + if self._states[cam_key]["frame_count"] == self.frame_saving_period: + # Save frame on device + self._local_backup(frame_resize, cam_id, is_alert=False) + # Send frame to the api + stream = io.BytesIO() + frame_resize.save(stream, format="JPEG", quality=self.jpeg_quality) + try: + self._upload_frame(cam_id, stream.getvalue()) + # Reset frame counter + self._states[cam_key]["frame_count"] = 0 + except ConnectionError: + stream.seek(0) # "Rewind" the stream to the beginning so we can read its content + return float(conf) - def _stage_alert( - self, frame: Image.Image, cam_id: Optional[str], pose_id: Optional[int], ts: int, bboxes: list - ) -> None: - # Store information in the queue + def _upload_frame(self, cam_id: str, media_data: bytes) -> Response: + """Save frame""" + logging.info(f"Camera '{cam_id}' - Uploading media...") + # Create a media + response = self.api_client[cam_id].create_media_from_device() + if response.status_code // 100 == 2: + media = response.json() + # Upload media + self.api_client[cam_id].upload_media(media_id=media["id"], media_data=media_data) + return response + + def _stage_alert(self, frame: Image.Image, cam_id: str, ts: int, localization: list) -> None: + # Store information in the queue self._alerts.append( { "frame": frame, "cam_id": cam_id, - "pose_id": pose_id, "ts": ts, - "bboxes": bboxes, + "media_id": None, + "alert_id": None, + "localization": localization, } ) - def _process_alerts(self, cameras: Optional[List[ReolinkCamera]] = None) -> None: + def _process_alerts(self) -> None: for _ in range(len(self._alerts)): # try to upload the oldest element frame_info = self._alerts[0] cam_id = frame_info["cam_id"] - pose_id = frame_info["pose_id"] - azimuth = 0 - logger.info(f"Camera '{cam_id}' - Process detection from {frame_info['ts']}...") + logging.info(f"Camera '{cam_id}' - Sending alert from {frame_info['ts']}...") # Save alert on device - if not self.external_sources: - self._local_backup(frame_info["frame"], cam_id, pose_id) + self._local_backup(frame_info["frame"], cam_id, is_alert=True) try: - # Detection creation + # Media creation + if not isinstance(self._alerts[0]["media_id"], int): + self._alerts[0]["media_id"] = self.api_client[cam_id].create_media_from_device().json()["id"] + + # Alert creation + if not isinstance(self._alerts[0]["alert_id"], int): + self._alerts[0]["alert_id"] = ( + self.api_client[cam_id] + .send_alert_from_device( + lat=self.latitude, + lon=self.longitude, + media_id=self._alerts[0]["media_id"], + localization=self._alerts[0]["localization"], + ) + .json()["id"] + ) + + # Media upload stream = io.BytesIO() frame_info["frame"].save(stream, format="JPEG", quality=self.jpeg_quality) - if cameras is not None: - for camera in cameras: - if camera.ip_address == cam_id: - azimuth = ( - camera.cam_azimuths[pose_id - 1] if pose_id is not None else camera.cam_azimuths[0] - ) - break - - bboxes = self._alerts[0]["bboxes"] - logger.info(f"Azimuth : {azimuth} , bboxes : {bboxes}") - if len(bboxes) != 0: - response = self.api_client[cam_id].create_detection(stream.getvalue(), azimuth, bboxes) - # Force a KeyError if the request failed - detection_id = response.json().get("id") - if detection_id is None: - print(response.json()) - raise KeyError(f"Missing 'id' in response from camera '{cam_id}'") # Clear - else: - logger.info(f"Camera '{cam_id}' - detection created") - + response = self.api_client[cam_id].upload_media( + self._alerts[0]["media_id"], + media_data=stream.getvalue(), + ) + # Force a KeyError if the request failed + response.json()["id"] + # Clear self._alerts.popleft() + logging.info(f"Camera '{cam_id}' - alert sent") stream.seek(0) # "Rewind" the stream to the beginning so we can read its content - except (KeyError, ConnectionError) as e: - logger.exception(f"Camera '{cam_id}' - unable to upload cache") - logger.exception(e) - break - except Exception as e: - logger.exception(f"Camera '{cam_id}' - unable to create detection") - logger.exception(e) + except (KeyError, ConnectionError): + logging.warning(f"Camera '{cam_id}' - unable to upload cache") break - def _local_backup( - self, img: Image.Image, cam_id: Optional[str], pose_id: Optional[int], is_alert: bool = True - ) -> None: + def _local_backup(self, img: Image.Image, cam_id: str, is_alert: bool = False) -> None: """Save image on device Args: img (Image.Image): Image to save cam_id (str): camera id (ip address) - pose_id (int) : position of the camera, for ptz camera - is_alert (bool): is the frame an alert ? + is_alert (bool, optional): is alert or backup frame. Defaults to False. """ - folder = "alerts" if is_alert else "save" - backup_cache = self._cache.joinpath(f"backup/{folder}/") + backup_cache = self._cache.joinpath("backup/alerts/") if is_alert else self._cache.joinpath("backup/frames/") self._clean_local_backup(backup_cache) # Dump old cache - backup_cache = backup_cache.joinpath(f"{time.strftime('%Y%m%d')}/{cam_id}/{pose_id}") + backup_cache = backup_cache.joinpath(f"{time.strftime('%Y%m%d')}/{cam_id}") backup_cache.mkdir(parents=True, exist_ok=True) - file = backup_cache.joinpath(f"{time.strftime('%Y%m%d-%H%M%S')}.jpg") + file = backup_cache.joinpath(f"{time.strftime('%Y%m%d-%H%S')}.jpg") img.save(file) def _clean_local_backup(self, backup_cache) -> None: