forked from hou-yz/MultiviewX
-
Notifications
You must be signed in to change notification settings - Fork 0
/
generateAnnotation.py
180 lines (149 loc) · 7.54 KB
/
generateAnnotation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import os
import re
import json
import cv2
from PIL import Image
import concurrent.futures
import datasetParameters
from unitConversion import *
def read_pom(fpath):
bbox_by_pos_cam = {}
cam_pos_pattern = re.compile(r'(\d+) (\d+)')
cam_pos_bbox_pattern = re.compile(r'(\d+) (\d+) ([-\d]+) ([-\d]+) (\d+) (\d+)')
with open(fpath, 'r') as fp:
for line in fp:
if 'RECTANGLE' in line:
cam, pos = map(int, cam_pos_pattern.search(line).groups())
if pos not in bbox_by_pos_cam:
bbox_by_pos_cam[pos] = {}
if 'notvisible' in line:
bbox_by_pos_cam[pos][cam] = [-1, -1, -1, -1]
else:
cam, pos, left, top, right, bottom = map(int, cam_pos_bbox_pattern.search(line).groups())
bbox_by_pos_cam[pos][cam] = [left, top, right, bottom]
return bbox_by_pos_cam
def read_gt(cam):
# 最后一个坐标是脚底的位置
# 坐标数组 ... [1723.596 471.1655] [1421.27 544.6304]
#visualize_foot_image = points_2d[points_2d[:, 0] == 0, -2:]
gt_3d = np.loadtxt(f'matchings/Camera{cam + 1}_3d.txt')
#print(gt_3d)
#删去脚底点不在Grid内的人
gt_3d = gt_3d[np.where(np.logical_and(gt_3d[:, -3] >= 0, gt_3d[:, -3] <= MAP_WIDTH))[0], :]
gt_3d = gt_3d[np.where(np.logical_and(gt_3d[:, -2] >= 0, gt_3d[:, -2] <= MAP_HEIGHT))[0], :]
frame, pid = gt_3d[:, 0], gt_3d[:, 1]
foot_3d_coord = gt_3d[:, -3:-1].transpose()
#print(foot_3d_coord)
pos = get_pos_from_worldcoord(foot_3d_coord)
return np.stack([frame, pid, pos], axis=1).astype(int)
def create_pid_annotation(pid, pos, bbox_by_pos_cam):
person_annotation = {'personID': int(pid), 'positionID': int(pos), 'views': []}
for cam in range(len(bbox_by_pos_cam[pos])):
bbox = bbox_by_pos_cam[pos][cam]
view_annotation = {'viewNum': cam, 'xmin': int(bbox[0]), 'ymin': int(bbox[1]),
'xmax': int(bbox[2]), 'ymax': int(bbox[3])}
person_annotation['views'].append(view_annotation)
return person_annotation
def process_image(image, annotations, annotationPath, cam, frame):
img_file_name = os.path.splitext(image)[0]
img_frame_index = int(img_file_name.split('_')[-1])
if img_frame_index == frame:
print(f"try to open: {os.path.join(annotationPath, image)} ")
img = Image.open(os.path.join(annotationPath, image))
img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
for anno in annotations:
pid = anno['personID']
anno = anno['views'][cam]
bbox = tuple([anno['xmin'], anno['ymin'], anno['xmax'], anno['ymax']])
if bbox[0] == -1 and bbox[1] == -1:
continue
cv2.rectangle(img, bbox[:2], bbox[2:], (0, 255, 0), 2)
cv2.putText(img, str(pid), ((bbox[:2][0]+bbox[2:][0])//2, (bbox[:2][1]+bbox[2:][1])//2), cv2.FONT_HERSHEY_SIMPLEX, 1,
(0, 255, 255), 2)
cv2.putText(img, str(bbox[:2]), tuple(bbox[:2]), cv2.FONT_HERSHEY_SIMPLEX, 1,
(255, 255, 255), 2)
cv2.putText(img, str(bbox[2:]), tuple(bbox[2:]), cv2.FONT_HERSHEY_SIMPLEX, 1,
(255, 255, 255), 2)
img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
img.save(f'bbox_cam{cam + 1}_frame{frame}.png')
def process_frame(frame, gts, pids_dict, bbox_by_pos_cam, lastFrameIndex, NUM_CAM):
#print(f"try to process frames")
DATASET_NAME = datasetParameters.DATASET_NAME
gts_frame = gts[gts[:, 0] == frame, :]
annotations = []
for i in range(gts_frame.shape[0]):
pid, pos = gts_frame[i, 1:]
if pid not in pids_dict:
pids_dict[pid] = len(pids_dict)
annotations.append(create_pid_annotation(pids_dict[pid], pos, bbox_by_pos_cam))
with open(os.path.join(DATASET_NAME, 'annotations_positions/{:05d}.json'.format(frame)), 'w') as fp:
json.dump(annotations, fp, indent=4)
if lastFrameIndex != 0 and frame <= lastFrameIndex:
for cam in range(NUM_CAM):
annotationPath = os.path.join(DATASET_NAME, 'Image_subsets', f'C{cam + 1}')
imageLists = os.listdir(annotationPath)
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(process_image, imageLists, [annotations]*len(imageLists), [annotationPath]*len(imageLists), [cam]*len(imageLists), [frame]*len(imageLists))
def create_gif(cam, previewCount):
gif = []
for frame in range(previewCount):
filename = f"bbox_cam{cam + 1}_frame{frame}.png"
img = Image.open(filename)
gif.append(img)
gif[0].save(f"cam{cam + 1}_frames.gif", format="GIF", append_images=gif[1:], save_all=True, duration=100, loop=0)
def fallback(previewCount):
lastFrameIndex = previewCount - 1
DATASET_NAME = datasetParameters.DATASET_NAME
bbox_by_pos_cam = read_pom(os.path.join(DATASET_NAME, 'rectangles.pom'))
gts = []
for cam in range(NUM_CAM):
gt = read_gt(cam)
gts.append(gt)
gts = np.concatenate(gts, axis=0)
gts = np.unique(gts, axis=0)
print(f'average persons per frame: {gts.shape[0] / len(np.unique(gts[:, 0]))}')
pids_dict = {}
os.makedirs(os.path.join(DATASET_NAME, 'annotations_positions'), exist_ok=True)
for frame in np.unique(gts[:, 0]):
process_frame(frame, gts, pids_dict, bbox_by_pos_cam, lastFrameIndex)
if(not previewCount == 0):
for cam in range(NUM_CAM):
gif = []
for frame in range(previewCount):
filename = f"bbox_cam{cam + 1}_frame{frame}.png"
img = Image.open(filename)
gif.append(img)
gif[0].save(f"cam{cam + 1}_frames.gif", format="GIF", append_images=gif[1:], save_all=True, duration=100,
loop=0)
#gif[0].show()
def annotate(previewCount, threadCount,DisableMultiprocessing = False):
if DisableMultiprocessing or NUM_CAM > threadCount:
fallback(previewCount)
else:
try:
lastFrameIndex = previewCount - 1
DATASET_NAME = datasetParameters.DATASET_NAME
bbox_by_pos_cam = read_pom(os.path.join(DATASET_NAME, 'rectangles.pom'))
gts = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_gt = {executor.submit(read_gt, cam): cam for cam in range(NUM_CAM)}
for future in concurrent.futures.as_completed(future_to_gt):
gts.append(future.result())
gts = np.concatenate(gts, axis=0)
gts = np.unique(gts, axis=0)
print(f'average persons per frame: {gts.shape[0] / len(np.unique(gts[:, 0]))}')
pids_dict = {}
os.makedirs(os.path.join(DATASET_NAME, 'annotations_positions'), exist_ok=True)
with concurrent.futures.ThreadPoolExecutor() as executor:
frame_to_annotations = {executor.submit(process_frame, frame, gts, pids_dict, bbox_by_pos_cam, lastFrameIndex, NUM_CAM): frame for frame in np.unique(gts[:, 0])}
for future in concurrent.futures.as_completed(frame_to_annotations):
pass
if previewCount != 0:
with concurrent.futures.ThreadPoolExecutor() as executor:
for cam in range(NUM_CAM):
executor.submit(create_gif, cam, previewCount)
except Exception as e:
print(f"Exception occurred: {str(e)}")
fallback(previewCount)
if __name__ == '__main__':
annotate(0)