-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
62 lines (55 loc) · 1.72 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import boto3, os, cv2
def video_id_check(conn, video_uid):
with conn.cursor() as cursor:
sql = f"SELECT id FROM video_video WHERE url LIKE %s"
cursor.execute(sql, ('%' + video_uid + '%',))
video_id = cursor.fetchone()[0]
if not video_id:
return []
return video_id
def camera_id_check(conn, camera_id, camera_url):
with conn.cursor() as cursor:
sql = f"SELECT id FROM camera_camera WHERE stream_url LIKE %s"
cursor.execute(sql, (camera_url,))
res = cursor.fetchone()[0]
if not res or (res != int(camera_id)):
return []
return camera_id
def upload_to_s3(bucket, filename):
# check the video file is exists
if not os.path.exists(filename):
print(f"File not found : {filename}")
return {
"status": False
}
s3 = boto3.client(
's3',
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY'),
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY')
)
key = f"clip/{filename[:-4]}"
try:
res = s3.upload_file(
filename,
bucket,
key,
ExtraArgs={
"ContentType": "video/mp4"
}
)
return {
"status": True,
"file_url": f"https://{bucket}.s3.amazonaws.com/{key}"
}
except Exception as e:
print(e)
return {
"status": False
}
def video_length_in_seconds(video_path):
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS) # Frames per second
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps
cap.release()
return round(duration)