-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
361 lines (274 loc) · 11.3 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
from copy import deepcopy
import json
import time
import os
import subprocess
import requests
from urllib.parse import urlparse
import os
from flask import jsonify, abort
def wlog(level="notice", msg=""):
print(level, msg)
def get_server_details():
'''get API server env variables'''
try:
SERVER_ADDRESS = os.environ['SERVER_ADDRESS']
SERVER_PORT = os.environ['SERVER_PORT']
except Exception as e:
wlog("required env variables are not set, check .env file")
exit()
return SERVER_ADDRESS, SERVER_PORT
def get_env(env_var):
try:
return os.environ[env_var]
except Exception as e:
return_response(
{"status": "error", "message": f"ENV: {env_var} not found. exiting..."})
return False
def load_json_structure(path):
try:
file_data = json.loads(read_file(path))
if not file_data:
return False
except Exception as e:
wlog("error", f"Unable to open file {path}, {e}")
return_response(
{"status": "error", "message": f"Unable to open file {path}, {e}"})
return file_data
def read_file(filename):
try:
with open(filename, "r") as f:
file_data = f.read()
except Exception as e:
wlog("error", str(e))
return False
return file_data
def write_file(filename, data):
try:
with open(filename, "w") as f:
f.write(data)
except Exception as e:
wlog("error", str(e))
return False
return True
def run_sox(sox_input, filename):
try:
cmd = 'sox ' + sox_input + " " + filename
wlog("Running sox command : "+cmd)
os.system(cmd)
except Exception as e:
wlog("concat file, check sox")
return_response(
{"status": "error", "message": f"concat file, check sox"}
)
def download_file(url, dir):
path = ""
try:
r = requests.get(url, stream=True)
if r.status_code == 200:
path = os.path.join(dir,
os.path.basename(urlparse(url).path))
with open(path, 'wb') as f:
for chunk in r:
f.write(chunk)
except Exception as e:
wlog("error", f"Unable to download file {url}, {e}")
return_response(
{"status": "error", "message": f"Unable to download file {url}, {e}"})
return path
def get_duration(filename):
# if file is a url, download it
if filename.startswith("http"):
wlog("info", f"Downloading file {filename}")
filename = download_file(filename, "/tmp")
wlog("info", f"Getting duration of file {filename}")
if os.path.exists(filename):
try:
wlog("running command", "timeout 5s soxi -D "+filename)
result = subprocess.run(['timeout','5s','soxi', '-D', filename], stdout=subprocess.PIPE).stdout.decode('utf-8')
if not result:
result = subprocess.run(['timeout','5s','soxi', '-D', filename], stdout=subprocess.PIPE).stdout.decode('utf-8')
duration = result.strip()
wlog("command output: ", duration)
wlog("command completed", " timeout 5s soxi -D "+filename)
except Exception as e:
wlog("unable to check the duration of file check sox")
exit()
else:
wlog(f"Audiofile does not exists {filename}")
return 0.0
return int(float(duration)*1000)
def remove_file(path):
try:
os.remove(path)
except Exception as e:
wlog("error", f"Unable to remove file {path}, {e}")
return False
return True
def elements_validate_default_helper(request, required_keys, class_key):
# check if required keys exists in global elements
if "default_elements" in request:
if class_key in request['default_elements']:
for key in required_keys:
if key not in request['default_elements'][class_key]:
return_response(
{"status": "error", "message": f"Required keys { ','.join(required_keys)} not found in request -> default_elements -> {class_key}"})
return {}
def elements_validate_global_helper(request, required_keys, class_key):
# check if required keys exists in global elements
if "global_elements" in request:
if class_key in request['global_elements']:
for key in required_keys:
if key not in request['global_elements'][class_key]:
return_response(
{"status": "error", "message": f"Required keys { ','.join(required_keys)} not found in request -> global_elements -> {class_key}"})
return {}
def elements_validate_slides_helper(request, required_keys, class_key):
# check if required keys exists in all elements in slides
if "slides" in request:
for slide in request["slides"]:
if class_key in slide:
for key in required_keys:
if key not in slide[class_key]:
return_response(
{"status": "error", "message": f"Required keys { ','.join(required_keys)} not found in request -> slides -> {class_key}"})
return {}
def get_templates_from_req(req):
''' Gets unique templates from all slides '''
templates = []
for slide in req['slides']:
if 'template' in slide and slide['template']['value'] not in templates:
templates.append(slide['template']['value'])
return templates
def validate_templates(templates, template_path):
""" Check if the template snippet exists """
if not templates:
return_response(
{"status": "error", "message": f"pass atleast 1 template"})
for template in templates:
if not os.path.exists(os.path.join(f"{template_path}template-snippets/", f"{template}.html")) \
or not os.path.exists(os.path.join(f"{template_path}template-placeholders/", f"{template}.json")):
return_response(
{"status": "error", "message": f"template {template} does not exist."})
return {"status": "success"}
def return_response(response_data):
print(json.dumps(response_data))
response = jsonify(response_data)
status_code = 400
if "status_code" in response_data:
status_code = response_data["status_code"]
response.status_code = status_code
abort(response)
def is_valid_url(url):
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception as e:
return False
def apply_placeholders(slide, snippet_html, placeholders, template_path=""):
"""
Placeholders are static and dynamic
Static placeholders are non-overlays
Dynamic placeholders are overlays like text, image, video
"""
snippet_html = apply_static_placeholders(slide, snippet_html, placeholders)
# dynamic placeholders
if "dynamic" in placeholders:
snippet_html = apply_dynamic_placeholders(
slide, snippet_html, placeholders, template_path)
return snippet_html
def apply_static_placeholders(slide, snippet_html, placeholders):
'''
for static placeholders:
Replce all the placeholders for a template
1. for every placeholder
a. check if placeholder exists in snippet_html
b. map the placeholder mapping with the slide and load the value
c. replace the loaded value in the snippet_html
'''
for key, placeholder in placeholders["static"].items():
if key in snippet_html:
# heading.value = [heading][value]
mapping_keys = placeholder["mapping"].split(".")
placeholder_value = placeholders_mapper(mapping_keys, slide)
if not placeholder_value and "default" in placeholder:
# user has not passed param , so applying default value supplies in snippet json
placeholder_value = placeholder['default']
elif not placeholder_value:
placeholder_value = ""
snippet_html = snippet_html.replace(key, str(placeholder_value))
return snippet_html
def apply_dynamic_placeholders(slide, snippet_html, placeholders, template_path):
'''
for dynamic placeholders:
1. for every placeholder
a. check if placeholder exists in snippet_html
b. map the placeholder mapping with the slide and load the value
c. replace the loaded value in the snippet_html
'''
overlay_elements_placeholder = "__overlay_elements_html__"
overlay_elements_html = ""
for element in slide["overlay"]:
for key, placeholder in placeholders["dynamic"].items():
if key == element["type"]:
# if element type matches with placeholder type
# e.g. slide["overlay"][0]["type"] = placeholder["dynamic"]["text"]
element_snippet = read_file(os.path.join(
f"{template_path}element-snippets/", f"{element['type']}.html"))
element_final_data = ""
for element_key, element_placeholder in placeholder.items():
if element_key in element_snippet:
mapping_keys = element_placeholder["mapping"].split(
".")
placeholder_value = placeholders_mapper(
mapping_keys, {key: element})
element_snippet = element_snippet.replace(
element_key, str(placeholder_value))
element_final_data = element_snippet
overlay_elements_html += "\n\n"+element_final_data
snippet_html = snippet_html.replace(
overlay_elements_placeholder, overlay_elements_html)
return snippet_html
def placeholders_mapper(mapping_keys, slide):
""" Convert placeholder list [heading, value] to specific key [heading][value]
and returns the value of config"""
mapper_data = slide
result = {}
for key in mapping_keys:
if key not in mapper_data:
# user has not passed the param so passing default value
wlog(f"Key {key} not found in mapping data {mapper_data}")
return False
result = mapper_data[key]
if isinstance(result, dict):
mapper_data = result
else:
param_value = result
return param_value
def get_element_by_type(slide, mapping_keys, type):
'''
Select element from list with given type
this is helpful when an element is provided as a list like text or overlay
in these cases we need to find the element from the list with matching type
Return: element from list, mapping changed to inner level
'''
element_name = mapping_keys[0]
selected_element = {}
if element_name in slide:
for element_item in slide[element_name]:
#print(element_item, slide[element_name])
if type == element_item['type']:
selected_element = element_item
# because the first mapping is name of element
new_mapping = mapping_keys[1:]
return selected_element, new_mapping
def is_record_video(request):
if "settings" in request:
if "record_video" in request["settings"] and request["settings"]["record_video"] == True:
return True
return False
def create_dirs():
dirs = ["data/path/media_cache"]
wlog("Creating dirs")
for i in dirs:
os.system(f"mkdir -p {i}")