-
Notifications
You must be signed in to change notification settings - Fork 14
/
main.py
146 lines (114 loc) · 5.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
'''
This is main function, used to start instances of the full system
'''
from arlo import Arlo
from subprocess import call
from shared_variables import Shared_Variables
from camera_handler import *
from vizualise import Vizualise
from vizualise_detections import Vizualise_Detections
from obj_detection import Obj_Detection
from threading import Thread
from pyfiglet import Figlet
import os, re
from monitor import *
"""
Change These Variables, some functions won't work while running in docker!
"""
source_list = [0]
# Set both these to False in order to use default image sizes
UTILIZE_ALL_MONITOR_SPACE = True # Set this to maximize visual size of all images
VISUALIZATION_OFFSET_AND_SIZE = False # If UTILIZE_ALL_MONITOR_SPACE is false, use this size instead, set to False if you want all screen to load on eachother with camera default size.
MONITOR_INDEX_LIST = False # fill this list with the index numbers of monitors to use [1 2 3]
SHOW_ALL_PERSONS = False # Decide if you want to show all cropped images of detected persons
use_arlo = False # Set to True if you want to scan for and use arlo cameras!
USERNAME = '' # for arlo login
PASSWORD = ''
DETECTION_OPTIMIZE_SIZE = [640,480] # Set size of images before prediction, Set to None if you want to use full size images
"""
----- PROGRAM CODE -----
"""
path = "/proc/" + str(os.getpid()) + "/cgroup"
def is_docker():
if not os.path.isfile(path): return False
with open(path) as f:
for line in f:
if re.match("\d+:[\w=]+:/docker(-[ce]e)?/\w+", line):
return True
return False
def create_instances( source_list , monitor_list):
'''
Create instances of system
@param get list of sources, int for webcams and str for rtsp
'''
camera_amount = len(source_list)
shared_variables = Shared_Variables(amount = camera_amount)
if not is_docker():
monitor_box_list = split_monitors(monitor_list, source_list, MONITOR_INDEX_LIST)
# start show all detection crop images thread
if SHOW_ALL_PERSONS:
Vizualise_Detections(shared_variables=shared_variables).start()
for i in range(0, camera_amount):
# Create camera stream
cam = Camera(id = i, cam_id=source_list[i], shared_variables=shared_variables, DETECTION_OPTIMIZE_SIZE=DETECTION_OPTIMIZE_SIZE).start()
# Start Object detection on camera
Obj_Detection( id = i, shared_variables=shared_variables, DETECTION_OPTIMIZE_SIZE=DETECTION_OPTIMIZE_SIZE).start()
# Set Visualization mode
if UTILIZE_ALL_MONITOR_SPACE:
Vizualise(id = i, shared_variables=shared_variables, pos = monitor_box_list[i]).start()
elif VISUALIZATION_OFFSET_AND_SIZE:
Vizualise(id = i, shared_variables=shared_variables, pos = VISUALIZATION_OFFSET_AND_SIZE).start()
else:
Vizualise(id = i, shared_variables=shared_variables).start()
print("Created instance ", i," with camera, detection and vizualisation.")
# Main start here
if __name__ == "__main__":
f = Figlet(font='slant')
print (f.renderText('Number of Persons in Room'))
print("Please hold on while we receive cameras and start detection threads...")
if is_docker():
print("Running on docker!")
UTILIZE_ALL_MONITOR_SPACE = False # Set this to maximize visual size of all images
VISUALIZATION_OFFSET_AND_SIZE = False # If UTILIZE_ALL_MONITOR_SPACE is false, use this size instead, set to False if you want all screen to load on eachother with camera default size.
MONITOR_INDEX_LIST = False # fill this list with the index numbers of monitors to use [1 2 3]
monitor_list = None
else:
print("Collecting Monitor information...")
print("----- Print Monitor List -----")
monitor_list = detect_monitors()
print(monitor_list)
print("----- Monitor List End -----")
print("Setting up source_list...")
print("Adding local cameras...")
if use_arlo:
print("Waiting for Arlo cameras...")
try:
# Instantiating the Arlo object automatically calls Login(),
# which returns an oAuth token that gets cached.
# Subsequent successful calls to login will update the oAuth token.
arlo = Arlo(USERNAME, PASSWORD)
# At this point you're logged into Arlo.
# Get the list of devices and filter on device type to only get the cameras.
# This will return an array which includes all of the canera's associated metadata.
cameras = arlo.GetDevices('camera')
# Get the list of devices and filter on device type to only get the basestation.
# This will return an array which includes all of the basestation's associated metadata.
basestations = arlo.GetDevices('basestation')
# Open the event stream to act as a keep-alive for our stream.
arlo.Subscribe(basestations[0])
# Send the command to start the stream and return the stream url.
#url = arlo.StartStream(basestations[0], cameras[0])
i = 0
for cam in cameras:
url = arlo.StartStream(basestations[0], cameras[i])
source_list.append(url)
i+=1
except Exception as e:
print(e)
print("Done with collecting cameras...")
print("----- Print source list -----")
for address in source_list:
print(address)
print("----- Source list end -----")
print("Creating all instances... this might take a while!")
create_instances(source_list, monitor_list)