Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Typing Part 3: events.py #3352

Merged
merged 4 commits into from
Nov 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions frigate/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import multiprocessing as mp
from multiprocessing.queues import Queue
from multiprocessing.synchronize import Event
from multiprocessing.synchronize import Event as MpEvent
import os
import signal
import sys
Expand Down Expand Up @@ -38,10 +38,10 @@

class FrigateApp:
def __init__(self) -> None:
self.stop_event: Event = mp.Event()
self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_out_events: dict[str, Event] = {}
self.detection_out_events: dict[str, MpEvent] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue()
self.plus_api = PlusApi()
Expand Down
47 changes: 30 additions & 17 deletions frigate/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,55 @@
from frigate.config import EventsConfig, FrigateConfig, RecordConfig
from frigate.const import CLIPS_DIR
from frigate.models import Event
from frigate.types import CameraMetricsTypes

from multiprocessing.queues import Queue
from multiprocessing.synchronize import Event as MpEvent
from typing import Dict

logger = logging.getLogger(__name__)


def should_insert_db(prev_event, current_event):
def should_insert_db(prev_event: Event, current_event: Event) -> bool:
"""If current event has new clip or snapshot."""
return (not prev_event["has_clip"] and not prev_event["has_snapshot"]) and (
current_event["has_clip"] or current_event["has_snapshot"]
)


def should_update_db(prev_event, current_event):
def should_update_db(prev_event: Event, current_event: Event) -> bool:
"""If current_event has updated fields and (clip or snapshot)."""
return (current_event["has_clip"] or current_event["has_snapshot"]) and (
prev_event["top_score"] != current_event["top_score"]
or prev_event["entered_zones"] != current_event["entered_zones"]
or prev_event["thumbnail"] != current_event["thumbnail"]
or prev_event["has_clip"] != current_event["has_clip"]
or prev_event["has_snapshot"] != current_event["has_snapshot"]
)
if current_event["has_clip"] or current_event["has_snapshot"]:
if (
prev_event["top_score"] != current_event["top_score"]
or prev_event["entered_zones"] != current_event["entered_zones"]
or prev_event["thumbnail"] != current_event["thumbnail"]
or prev_event["has_clip"] != current_event["has_clip"]
or prev_event["has_snapshot"] != current_event["has_snapshot"]
):
return True
return False


class EventProcessor(threading.Thread):
def __init__(
self, config, camera_processes, event_queue, event_processed_queue, stop_event
self,
config: FrigateConfig,
camera_processes: dict[str, CameraMetricsTypes],
event_queue: Queue,
event_processed_queue: Queue,
stop_event: MpEvent,
):
threading.Thread.__init__(self)
self.name = "event_processor"
self.config = config
self.camera_processes = camera_processes
self.cached_clips = {}
self.event_queue = event_queue
self.event_processed_queue = event_processed_queue
self.events_in_process = {}
self.events_in_process: Dict[str, Event] = {}
self.stop_event = stop_event

def run(self):
def run(self) -> None:
# set an end_time on events without an end_time on startup
Event.update(end_time=Event.start_time + 30).where(
Event.end_time == None
Expand Down Expand Up @@ -147,14 +159,15 @@ def run(self):


class EventCleanup(threading.Thread):
def __init__(self, config: FrigateConfig, stop_event):
def __init__(self, config: FrigateConfig, stop_event: MpEvent):
threading.Thread.__init__(self)
self.name = "event_cleanup"
self.config = config
self.stop_event = stop_event
self.camera_keys = list(self.config.cameras.keys())

def expire(self, media_type):
def expire(self, media_type: str) -> None:
# TODO: Refactor media_type to enum
## Expire events from unlisted cameras based on the global config
if media_type == "clips":
retain_config = self.config.record.events.retain
Expand Down Expand Up @@ -253,7 +266,7 @@ def expire(self, media_type):
)
update_query.execute()

def purge_duplicates(self):
def purge_duplicates(self) -> None:
duplicate_query = """with grouped_events as (
select id,
label,
Expand Down Expand Up @@ -287,7 +300,7 @@ def purge_duplicates(self):
.execute()
)

def run(self):
def run(self) -> None:
# only expire events every 5 minutes
while not self.stop_event.wait(300):
self.expire("clips")
Expand Down
3 changes: 3 additions & 0 deletions frigate/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ disallow_untyped_calls = false
[mypy-frigate.const]
ignore_errors = false

[mypy-frigate.events]
ignore_errors = false

[mypy-frigate.log]
ignore_errors = false

Expand Down
4 changes: 2 additions & 2 deletions frigate/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import requests
from typing import Optional, Any
from paho.mqtt.client import Client
from multiprocessing.synchronize import Event
from multiprocessing.synchronize import Event as MpEvent

from frigate.config import FrigateConfig
from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR
Expand Down Expand Up @@ -148,7 +148,7 @@ def __init__(
stats_tracking: StatsTrackingTypes,
mqtt_client: Client,
topic_prefix: str,
stop_event: Event,
stop_event: MpEvent,
):
threading.Thread.__init__(self)
self.name = "frigate_stats_emitter"
Expand Down
4 changes: 2 additions & 2 deletions frigate/watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

from frigate.object_detection import ObjectDetectProcess
from frigate.util import restart_frigate
from multiprocessing.synchronize import Event
from multiprocessing.synchronize import Event as MpEvent

logger = logging.getLogger(__name__)


class FrigateWatchdog(threading.Thread):
def __init__(self, detectors: dict[str, ObjectDetectProcess], stop_event: Event):
def __init__(self, detectors: dict[str, ObjectDetectProcess], stop_event: MpEvent):
threading.Thread.__init__(self)
self.name = "frigate_watchdog"
self.detectors = detectors
Expand Down