-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.py
109 lines (86 loc) · 3.12 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from collections import deque
from dataclasses import dataclass
from subprocess import Popen, PIPE
from os import setsid, killpg
from signal import SIGTERM
from typing import Callable, List
import cv2
from camera_detection import Camera
from config import FRAME_HISTORY_LENGTH, WIDTH, HEIGHT
class MODE:
FREEZE = "freeze"
STUTTER = "stutter"
ARTIFACTS = "artifacts"
RECORD = "record"
LOOP = "loop"
VIEW = "view"
@dataclass
class Mode:
name: str
key: str
# on_toggle is passed the intended future active state, must return the next active state
on_toggle: Callable[[bool], bool] = lambda x: x
active: bool = False
class Core:
frame_count = 0
# todo: mostly unused - remove? or use for another artifact?
last_frames = deque(maxlen=FRAME_HISTORY_LENGTH)
frozen_frame = None
recorded_frames = []
loop_frames = []
loop_frames_offset = None
ffplay_process = None
modes = {}
available_input_cameras: List[Camera]
input_camera: Camera
input_video_capture: cv2.VideoCapture = None
output_camera: Camera
def __init__(self, input_cameras, output_camera):
self.available_input_cameras = input_cameras
self.output_camera = output_camera
self.register_mode(MODE.FREEZE, "f", self.store_frozen_frame)
self.register_mode(MODE.STUTTER, "s")
self.register_mode(MODE.ARTIFACTS, "a")
self.register_mode(MODE.RECORD, "r", self.manage_recorded_frames)
self.register_mode(MODE.LOOP, "l", self.manage_loop)
self.register_mode(MODE.VIEW, "v", self.manage_playback)
def register_mode(self, name, key, on_toggle=lambda x: x, active=False):
self.modes[name] = Mode(name, key, on_toggle, active)
def store_frozen_frame(self, active):
if active:
self.frozen_frame = self.last_frames[-1]
else:
self.frozen_frame = None
return active
def manage_recorded_frames(self, active):
if not active:
self.loop_frames = self.recorded_frames
self.recorded_frames = []
return active
def manage_loop(self, active):
if active and (self.loop_frames is None or len(self.loop_frames) == 0):
return False
if active:
self.loop_frames_offset = self.frame_count
return active
def manage_playback(self, active):
if active:
self.ffplay_process = Popen(
["/usr/bin/ffplay", self.output_camera.path],
stdout=PIPE,
stderr=PIPE,
shell=False,
preexec_fn=setsid,
)
else:
killpg(self.ffplay_process.pid, SIGTERM)
return active
def open_input_camera(self, camera: Camera):
if self.input_video_capture is not None:
self.input_video_capture.release()
self.input_video_capture = cv2.VideoCapture(
int(camera.path.replace("/dev/video", ""))
)
self.input_video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, WIDTH)
self.input_video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, HEIGHT)
self.input_camera = camera