-
Notifications
You must be signed in to change notification settings - Fork 18
/
example.py
116 lines (93 loc) · 3.41 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import time
import sys
import live555
import threading
# Shows how to use live555 module to pull frames from an RTSP/RTP
# source. Run this (likely first customizing the URL below:
# Example: python3 example.py 10.17.4.118 admin 10 out.264
if len(sys.argv) != 5:
print()
print('Usage: python3 example.py cameraIP username seconds fileOut')
print()
sys.exit(1)
cameraIP = sys.argv[1]
username = sys.argv[2]
seconds = float(sys.argv[3])
fileOut = sys.argv[4]
password = input("Enter password:")
# NOTE: the username & password, and the URL path, will vary from one
# camera to another! This URL path works with HikVision/Dahua NVR and pulls a set of DVR channels:
url = 'rtsp://{username}:{password}@{cameraIP}/Streaming/Channels/{channel}'
fOut = open(fileOut, 'wb')
def shutdownCallback():
# Note this should never actually be called because callbacks upon shutdown are disabled due to Python issue 23571:
# https://bugs.python.org/issue23571
print('shutdown callback')
def oneFrame(codecName, bytes, sec, usec, durUSec):
print('frame for %s: %d bytes' % (codecName, len(bytes)))
fOut.write(b'\0\0\0\1' + bytes)
def oneFrame2(codecName, bytes, sec, usec, durUSec):
print('frame (handle2) for %s: %d bytes' % (codecName, len(bytes)))
fOut.write(b'\0\0\0\1' + bytes)
# Starts pulling frames from the URL, with the provided callback:
useTCP = True
handle = live555.startRTSP(url.format(username=username, password=password, cameraIP=cameraIP, channel=102), oneFrame, shutdownCallback, useTCP)
print('got handle {}'.format(handle))
handle2 = live555.startRTSP(url.format(username=username, password=password, cameraIP=cameraIP, channel=202), oneFrame2, shutdownCallback, useTCP)
print('got handle2 {}'.format(handle2))
# Run Live555's event loop in a background thread:
t = threading.Thread(target=live555.runEventLoop, args=())
t.setDaemon(True)
t.start()
endTime = time.time() + seconds
while time.time() < endTime:
time.sleep(0.1)
def oneFrame3(codecName, bytes, sec, usec, durUSec):
print('frame (handle3) for %s: %d bytes' % (codecName, len(bytes)))
fOut.write(b'\0\0\0\1' + bytes)
handle3 = live555.startRTSP(url.format(username=username, password=password, cameraIP=cameraIP, channel=302), oneFrame3, shutdownCallback, useTCP)
print('got handle3: {}'.format(handle3))
print('stopping first handle')
try:
live555.stopRTSP(handle)
except live555.error as e:
print('live555 error received')
print('stopped first handle')
endTime = time.time() + seconds
while time.time() < endTime:
time.sleep(0.1)
print('stopping 2nd handle')
try:
live555.stopRTSP(handle2)
except live555.error as e:
print('live555 error received')
pass
print('stopped 2nd handle')
handle = live555.startRTSP(url.format(username=username, password=password, cameraIP=cameraIP, channel=102), oneFrame, shutdownCallback, useTCP)
print('got handle {}'.format(handle))
endTime = time.time() + seconds
while time.time() < endTime:
time.sleep(0.1)
print('stopping 3rd handle')
try:
live555.stopRTSP(handle3)
except live555.error as e:
print('live555 error received')
pass
print('stopped 3rd handle')
time.sleep(5)
print('slept after 3rd handle')
try:
live555.stopRTSP(handle)
except live555.error as e:
print('live555 error received')
pass
# Tell Live555's event loop to stop:
try:
live555.stopEventLoop()
except:
import traceback
print('exception received')
traceback.print_exc()
# Wait for the background thread to finish:
t.join()