-
Notifications
You must be signed in to change notification settings - Fork 1
/
webcam.py
114 lines (75 loc) · 2.91 KB
/
webcam.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import time
import cv2
import imutils
import platform
import numpy as np
from threading import Thread
from queue import Queue
class Streamer :
def __init__(self ):
if cv2.ocl.haveOpenCL() :
cv2.ocl.setUseOpenCL(True)
print('[wandlab] ', 'OpenCL : ', cv2.ocl.haveOpenCL())
self.capture = None
self.thread = None
self.width = 640
self.height = 360
self.stat = False
self.current_time = time.time()
self.preview_time = time.time()
self.sec = 0
self.Q = Queue(maxsize=128)
self.started = False
def run(self, src = 0 ) :
self.stop()
if platform.system() == 'Windows' :
self.capture = cv2.VideoCapture( src , cv2.CAP_DSHOW )
else :
self.capture = cv2.VideoCapture( src )
self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
if self.thread is None :
self.thread = Thread(target=self.update, args=())
self.thread.daemon = False
self.thread.start()
self.started = True
def stop(self):
self.started = False
if self.capture is not None :
self.capture.release()
self.clear()
def update(self):
while True:
if self.started :
(grabbed, frame) = self.capture.read()
if grabbed :
self.Q.put(frame)
def clear(self):
with self.Q.mutex:
self.Q.queue.clear()
def read(self):
return self.Q.get()
def blank(self):
return np.ones(shape=[self.height, self.width, 3], dtype=np.uint8)
def bytescode(self):
if not self.capture.isOpened():
frame = self.blank()
else :
frame = imutils.resize(self.read(), width=int(self.width) )
if self.stat :
cv2.rectangle( frame, (0,0), (120,30), (0,0,0), -1)
fps = 'FPS : ' + str(self.fps())
cv2.putText ( frame, fps, (10,20), cv2.FONT_HERSHEY_PLAIN, 1, (0,0,255), 1, cv2.LINE_AA)
return cv2.imencode('.jpg', frame )[1].tobytes()
def fps(self):
self.current_time = time.time()
self.sec = self.current_time - self.preview_time
self.preview_time = self.current_time
if self.sec > 0 :
fps = round(1/(self.sec),1)
else :
fps = 1
return fps
def __exit__(self) :
print( '* streamer class exit')
self.capture.release()