-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathvideo.py
103 lines (92 loc) · 4.33 KB
/
video.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python
from flask import (
render_template, Response, Blueprint, request, session, current_app
)
import time
from .camera import Camera
from .auth import login_required
from .db import get_db_by_config,get_db
from . import history_records
from . import intruding_records
bp = Blueprint('video', __name__)
'''
一些需要全局使用的变量
'''
box_selection=[0,0,0,0]
old_box_selection=[0,0,0,0]
image_width=1
image_height=1
@bp.route('/', methods=('GET', 'POST'))
@login_required
def video_html():
'''返回监控界面'''
session.setdefault('ip', "0")
session.setdefault('camera_id', '0')
session.setdefault('task', "face_recognition")
session.setdefault('interval', 5)
session.setdefault('threshold',0.6)
global box_selection,old_box_selection
box_selection=old_box_selection
if request.method == 'POST':
if request.form['form_type'] == 'box_selection':
box_selection=[int(int(x)*image_width/900) for x in request.form['box_selection'].split('_')]
whether_update=False
if ((box_selection==[0,0,0,0]) or (not (box_selection==old_box_selection))):
whether_update=True
old_box_selection=box_selection
records_feed(whether_update)
elif request.form['form_type'] == "ip":
session['ip']=request.form['ip']
session['camera_id'] = request.form['camera_id']
elif request.form['form_type'] == 'task':
session['task'] = request.form['task']
elif request.form['form_type'] == 'interval':
session['interval'] = request.form['interval']
elif request.form['form_type'] == 'threshold_select':
session['threshold'] = request.form['threshold_select']
return render_template('video.html',
ip=session.get("ip"), camera_id = session.get("camera_id"),
task=session.get('task'), interval=session.get('interval'),
threshold=session.get('threshold'))
def gen(camera,config,user_id, camera_id,process,interval):
'''camera视频生成器'''
global image_width
global image_height
while True:
time.sleep(0.01) # 每个0.01s推送一帧视频
frame, criminal_ids,enter_items_label,leave_items_label,image_width,image_height = camera.get_frame(process=process)
db = get_db_by_config(config)
for criminal_id in criminal_ids:
history_records.produce_record(db, criminal_id=criminal_id, user_id=user_id,
camera_id=camera_id,interval=interval)
for enter_item in enter_items_label:
intruding_records.produce_record(db,item=enter_item.split('_')[0],item_id=int(enter_item.split('_')[1]),
user_id=user_id,camera_id=camera_id)
for leave_item in leave_items_label:
intruding_records.add_leave_time(db,item=leave_item.split('_')[0],item_id=int(leave_item.split('_')[1]))
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
@bp.route('/video_feed')
def video_feed():
'''返回监控界面中的视频部分'''
camera = Camera(ip=session.get('ip'))
user_id = session.get('user_id')
threshold=float(session.get('threshold'))
process = {session.get('task'):1,'box':box_selection,'threshold':threshold}#获取图像的处理方式
if not camera.has_opened():#如果打开失败
session['ip'] = "0"
camera = Camera("0")
return Response(gen(camera,config=current_app.config['DATABASE'], user_id=user_id,
camera_id=session['camera_id'], process=process, interval=session.get('interval')),
mimetype='multipart/x-mixed-replace; boundary=frame')
@bp.route('/records_feed')
def records_feed(whether_update=False):
'''返回监控界面的警示记录部分'''
user_id = session.get("user_id")
task=session.get('task')
if task=='face_recognition':
return Response(history_records.RecordsGenerator(user_id=user_id,db_config=current_app.config['DATABASE']),
mimetype='text/event-stream')
elif task=='object_track':
return Response(intruding_records.RecordsGenerator(user_id=user_id,db_config=current_app.config['DATABASE'],whether_update=whether_update),
mimetype='text/event-stream')