forked from chenliny-zz/IoT
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdetector_tracking.py
151 lines (115 loc) · 4.19 KB
/
detector_tracking.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import cv2 as cv
import paho.mqtt.client as mqtt
import time
# mqtt parameters
local_mqtt_host = 'broker'
mqtt_port = 1883
mqtt_topic = 'faces'
# global variable indicating disconnect status
dc_flag = False
# create callback functions
def on_connect_local(local_client, userdata, flags, rc):
"""
This function handles the callback from the broker when it sends acknowledgement of connection status.
"""
if rc == 0:
local_client.connected_flag = True
print("Connected OK returned code = ", rc)
else:
print("Bad connection Returned code = ", rc)
def on_disconnect_local(local_client, userdata, rc):
#global dc_flag
print("Disconnected due to ", rc)
#dc_flag = True
# initiate local client instance
local_client = mqtt.Client("detector")
# connection flag indicating connection status
local_client.connected_flag = False
# bind call back functions
local_client.on_connect = on_connect_local
local_client.on_disconnect = on_disconnect_local
# loop start
local_client.loop_start()
# make connection
local_client.connect(local_mqtt_host, mqtt_port, 60)
while not local_client.connected_flag:
print("Waiting in loop")
time.sleep(1)
# 1 should correspond to /dev/video1 , your USB camera.
# The 0 is reserved for the NX onboard camera or webcam on laptop
cap = cv.VideoCapture(0)
cap.set(3, 640)
cap.set(4, 480)
# initiate object tracker
tracker = cv.TrackerCSRT_create()
# read initial frame for customized bounding box
ret, frame = cap.read()
bbox = cv.selectROI('Tracking', frame, False)
initial_txt = 'Please draw a bounding box'
cv.putText(frame, initial_txt, (50, 50), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv.LINE_AA)
# initialzie tracker using the bounding box
tracker.init(frame, bbox)
def drawBbox(frame, bbox):
"""
A function that draws boudning box on a frame based on the bbox dimensions.
"""
x, y, w, h = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])
cv.rectangle(frame, (x, y), ((x + w), (y+h)), (255, 0, 0), 2, 1)
# timestamps used for fps calculation
prev_frame_time = 0
new_frame_time = 0
# training samples stats and countdown
target_sample_num = 50
collected_num = 0
countdown = 5
i = 0
while True:
# read frame
ret, frame = cap.read()
# get bbox and updates tracker
ret, bbox = tracker.update(frame)
# fps calculation
new_frame_time = time.time()
fps = 1.0/(new_frame_time - prev_frame_time)
prev_frame_time = new_frame_time
fps = int(fps)
if ret:
# draw bbox if traking succeeded
drawBbox(frame, bbox)
else:
# print missing if not
cv.putText(frame, 'lost', (100, 145), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv.LINE_AA)
# display status
headline_txt = f'Capturing training samples...'
fpd_txt = f'Camera feed @ ~{fps} fps'
status_txt = f'Collection Status: {collected_num}/{target_sample_num}'
cv.putText(frame, headline_txt, (30, 20), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv.LINE_AA)
cv.putText(frame, fpd_txt, (30, 40), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv.LINE_AA)
cv.putText(frame, status_txt, (30, 60), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv.LINE_AA)
if i % 5 == 0 and collected_num < target_sample_num:
##### actual training samples with bounding box annotation #####
##### can be created here. object is cropped out here for demo #####
# Extract object
obj_extract = frame[y:y + h, x:x + w]
# Encode extract to png
rc, png = cv.imencode('.png', obj_extract)
# convert png extract to bytes (for messaging)
msg = png.tobytes()
#if dc_flag:
#local_client.connect(local_mqtt_host, mqtt_port, 60)
local_client.publish(mqtt_topic, msg, qos=1, retain=False)
if collected_num >= target_sample_num:
exit_txt = f'Session ending in {countdown}'
cv.putText(frame, exit_txt, (30, 80), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv.LINE_AA)
if i % 10 == 0:
countdown -= 1
if countdown < 0:
break
i += 1
cv.imshow('img', frame)
if cv.waitKey(1) & 0xFF == ord('q'):
break
# loop end
local_client.disconnect()
cap.release()
cv.destroyAllWindows()