-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhmi.py
363 lines (321 loc) · 14.3 KB
/
hmi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
import time
import utils.cv
from config import MultifileConfig as Config
from program import Program
from coderbot import CoderBot
from PIL import Image
import StringIO
from re import findall
import errno
import json
from flask import Flask, render_template, request, send_file, redirect, Response, make_response, jsonify, url_for, abort, session
from flask.ext.babel import Babel
app = Flask(__name__, static_url_path='')
app.debug = True
app.secret_key = 'CoderBot v4.0'
app.program = None
babel = Babel(app)
RECORD_PATH = 'DCIM'
CONFIG_PATH = '.'
DEFAULT_CONFIG_PATH = 'coderbot.cfg'
# A template filter to get quoted elements in a string
@app.template_filter('quoted')
def quoted(s):
l = findall('\'([^\']*)\'', str(s))
if l:
return l
return None
def run_server(prog=None):
app.shutdown_requested = False
Config().load(DEFAULT_CONFIG_PATH)
app.bot = CoderBot()
app.bot.camera.setDCIMpath(Config().get('record_path', RECORD_PATH))
# TODO : Launch the Config().get('program_launch_at_start', None) program
if prog is not None:
prog()
try:
app.run(host='0.0.0.0', port=Config().get('listen_port', 8080), use_reloader=False, threaded=True)
except:
raise
finally:
app.shutdown_requested = True
app.bot.shutdown()
@babel.localeselector
def get_locale():
# Try to use the browser language
loc = request.accept_languages.best_match(['en', 'fr', 'it'])
if loc is None:
# Otherwise, use a universal language
loc = 'en'
# Set the speech synthesis to the language detected (according to IHM)
app.bot.sound.language(loc)
return loc
@app.context_processor
def utility_processor():
return dict(get_locale=get_locale)
######################################################################################################
# Routes definition
# Paths for IHM
@app.route('/')
def handle_home():
return redirect('/control.html')
@app.route('/<filename>.html')
def handle_template(filename):
# Session is loaded but not the user's config file
if len(Config()._configs) == 2 and 'username' in session and session['username'] != 'admin':
from os.path import isfile, join, splitext
filename = join(Config().get('config_path', CONFIG_PATH), "%s.cfg" % session['username'].lower())
if isfile(filename): Config().load(filename)
if filename == 'gallery':
from os import listdir
from os.path import isfile, join, splitext
files = [f for f in listdir(Config().get('record_path', RECORD_PATH)) if isfile(join(Config().get('record_path', RECORD_PATH), f))]
pics = [f for f in files if splitext(f)[1].lower() in ['.jpg', '.jpeg', '.png']]
vids = [f for f in files if splitext(f)[1].lower() in ['.h264', '.avi']]
files = zip(pics, ['picture']*len(pics))
files.extend(zip(vids, ['video']*len(vids)))
files.sort()
return render_template("gallery.html", pictures=files, config=Config())
#if filename == 'preferences':
# return render_template("preferences.html", config=Config())
return render_template("%s.html" % filename, config=Config())
# Path for configuration API
@app.route("/config/<command>", methods=['GET', 'POST'])
def handle_config(command):
if command not in ['get', 'save']: abort(404) # Not found
if command == 'get' and request.method == 'GET':
if len(request.args):
d = dict([(k,Config().get(k)) for k in request.args.iterkeys()])
else:
d = Config()._config
return jsonify(**d)
if command == 'save' and request.method == 'POST':
glbs = {'__builtins__':None, 'yes':True, 'true':True, 'no':False, 'false':False}
for k,v in json.loads(request.form.get('json')).iteritems():
if k == 'camera_resolutions':
for i,r in v.iteritems(): v[i] = eval(r.lower(), glbs)
else:
try: v = eval(v.lower(), glbs)
except: pass
Config().set(k, v)
# TODO: call the commands to set new Coderbot configuration
Config().save()
return jsonify(result=True)
# TODO: if command == 'reset': erase config file content
abort(405) # Not allowed
# Path for user's session API
@app.route('/user/<command>', methods=['GET', 'POST'])
def handle_user(command):
if command == 'login':
print Config()._configs
if len(Config()._configs) > 2: abort(403) # Forbidden (too many users like FTP)
from os.path import isfile, join, splitext
username = request.form.get('username', request.args.get('username', ''))
filename = join(Config().get('config_path', CONFIG_PATH), "%s.cfg" % username.lower())
if username.lower() == 'admin' or isfile(filename):
session['username'] = username
if username.lower() != 'admin':
Config().load(filename)
# TODO: call the commands to set new Coderbot configuration
return redirect(request.referrer)
abort(401) # Unauthorized without authentication
if command == 'logout' and request.method == 'GET':
if session.pop('username', None) != 'admin':
# Close only the last config file, the user file
Config().close()
return redirect(request.referrer)
if command == 'list' and request.method == 'GET':
from os import listdir
from os.path import isfile, join, splitext
files = [f for f in listdir(Config().get('config_path', CONFIG_PATH)) if isfile(join(Config().get('config_path', CONFIG_PATH), f))]
users = [f for f in files if splitext(f)[1].lower() == '.cfg']
if Config().get('config_path', CONFIG_PATH) == CONFIG_PATH:
users = [splitext(f)[0] for f in users if f.lower() != DEFAULT_CONFIG_PATH]
else: users = [splitext(f)[0] for f in users]
users.append('admin')
users.sort()
return jsonify(users=users)
if command == 'add' and request.method == 'GET':
from os import unlink
username = request.args.get('username', '')
try: open(join(Config().get('config_path', CONFIG_PATH), "%s.cfg" % username.lower()), 'a').close()
except: abort(500) # Internal error
return jsonify(result=True)
if command == 'del' and request.method == 'GET':
from os import unlink
username = request.args.get('username', '')
try: unlink(join(Config().get('config_path', CONFIG_PATH), "%s.cfg" % username.lower()))
except: abort(500) # Internal error
return jsonify(result=True)
abort(405) # Not allowed
# Paths for video streaming
@app.route('/video')
def handle_video():
return """<html><head><style type='text/css'> body { background-image: url(/video/stream/SD); background-repeat:no-repeat; background-position:center top; background-attachment:fixed; height:100% } </style></head><body> </body></html>"""
@app.route('/video/snapshot/<definition>')
def video_snapshot(definition):
if definition not in app.bot.streamers.keys(): abort(404) # Not found
with app.bot.streamers[definition].frame.lock:
frame = utils.cv.JPEGencode(app.bot.streamers[definition].frame)
response = make_response(frame)
response.headers['Content-Type'] = 'image/jpeg'
response.headers['Content-Disposition'] = 'attachment; filename=snapshot.jpg'
return response
@app.route("/video/stream/<definition>")
def handle_video_stream(definition):
if not definition in app.bot.streamers.keys(): abort(404) # Not found
def streamer():
while not app.shutdown_requested:
with app.bot.streamers[definition].frame.lock:
frame = utils.cv.JPEGencode(app.bot.streamers[definition].frame)
yield ("--BOUNDARYSTRING\r\n" +
"Content-type: image/jpeg\r\n" +
"Content-Length: " + str(len(frame)) + "\r\n\r\n" +
frame + "\r\n")
time.sleep(0.1)
try:
return Response(streamer(), mimetype="multipart/x-mixed-replace; boundary=--BOUNDARYSTRING")
except: pass
# Paths for CoderBot API (Motors, Sound and Camera control)
# Path for get the status of CoderBot
@app.route("/bot/status")
def handle_bot_status():
return jsonify(status=app.bot._init_complete)
# Path for Motors or GPIO API
@app.route("/bot/motors/<movement>", methods=['GET'])
def handle_bot_motors(movement):
if movement == 'stop':
app.bot.motors.stop()
elif movement == 'set':
speed_left = request.args.get('speed_left', Config().get('default_speed_move', 100))
speed_right = request.args.get('speed_right', Config().get('default_speed_move', 100))
elapse = request.args.get('elapse')
app.bot.motors.set(speed_left, speed_right, elapse)
elif movement in ['move', 'forward', 'backward']:
speed = request.args.get('speed', Config().get('default_speed_move', 100))
elapse = request.args.get('elapse')
cmd = getattr(app.bot.motors, movement)
cmd(speed, elapse)
elif movement in ['turn', 'right', 'left']:
speed = request.args.get('speed', Config().get('default_speed_turn', 100))
elapse = request.args.get('elapse')
cmd = getattr(app.bot.motors, movement)
cmd(speed, elapse)
else:
abort(404) # Not found
return jsonify(result=True)
# Path for Sound API
@app.route("/bot/sound/<command>", methods=['GET'])
def handle_bot_sound(command):
if not command in ['play', 'say']: abort(404) # Not found
if command == 'play': snd = request.args.get('filename')
else: snd = request.args.get('what')
if snd is not None:
getattr(app.bot.sound, command)(snd)
return jsonify(result=True)
abort(400) # Bad Request
# Path for Camera API
@app.route("/bot/camera/<command>")
def handle_bot_camera(command):
if not command in ['capture', 'start_recording', 'stop_recording']: abort(404) # Not found
if command == 'stop_recording':
app.bot.camera.stop_recording()
else: # start_recording or capture
filename = request.args.get('filename')
# size = request.args.get('size')
# Or : size = (request.args.get('width|i'), request.args.get('height|i'))
getattr(app.bot.camera, command)(filename)
return jsonify(result=True)
# Path for system API control
@app.route("/system/<command>", methods=['GET'])
def handle_system(command):
from os import system
if not command in ['halt', 'reboot', 'restart']: abort(404) # Not found
if command == 'restart':
os.system ('sudo service coderbot restart')
else:
os.system ("sudo %s" % command)
# Path for DCIM API control
@app.route("/camera/DCIM/<filename>")
def handle_photos(filename):
from os.path import join
if request.args.get('thumb'):
im = Image.open(join(Config().get('record_path', RECORD_PATH), filename))
im.thumbnail((80,60), Image.ANTIALIAS)
io = StringIO.StringIO()
im.save(io, format='JPEG')
return Response(io.getvalue(), mimetype='image/jpeg')
if request.args.get('delete'):
from os import unlink
try: unlink(join(Config().get('record_path', RECORD_PATH), filename))
except: abort(500) # Internal error
return jsonify(result=True)
return send_file(join(Config().get('record_path', RECORD_PATH), filename))
# Path for program API control
@app.route("/programs")
def handle_programs():
action = request.args.get('action')
#print action, "undefined", app.program
if action is None:
return jsonify(result=True, files=Program.listdir())
abort(501) # Not implemented
@app.route("/program/<filename>", methods=['GET', 'POST'])
def handle_program(filename=""):
action = request.form.get('action', request.args.get('action'))
#print action, filename, app.program
if action is None and request.method == 'POST': abort(405) # Not allowed
if not action in [None, 'save', 'save as', 'delete', 'exec', 'abort', 'status']: abort(400) # Bad request
if action == 'save as':
overwrite = not (request.form.get('overwrite', '').lower() in ['false', '0', ''])
try:
app.program = Program.new(filename, overwrite, request.form.get('dom_code'), request.form.get('py_code'))
app.program.save()
except OSError as e:
if e.errno == errno.EEXIST: return jsonify(result=False, error='File exists', filename=filename)
else: raise
return jsonify(result=True, filename=filename)
if action == 'save':
if app.program is None: abort(410) # Gone
app.program.update(request.form.get('dom_code'), request.form.get('py_code'))
app.program.save()
return jsonify(result=True)
if action == 'exec':
if app.program is None:
app.program = Program(filename, request.form.get('dom_code'), request.form.get('py_code'))
#abort(410) # Gone
else:
app.program.update(request.form.get('dom_code'), request.form.get('py_code'))
if Config().get('program_autosave_on_run', False): app.program.save()
return jsonify(result=app.program.start())
if action == 'abort':
if app.program is None:
print 'filename:', filename, ', program:', app.program
abort(410) # Gone
app.program.stop()
return jsonify(result=True)
if action == 'status':
if app.program is None: abort(410) # Gone
return jsonify(status=app.program.isRunning())
if action is None and request.method == 'GET':
try: app.program = Program.load(filename)
except OSError as e:
if e.errno == errno.ENOENT: abort(404) # File not found
else: raise
return jsonify(app.program.get())
abort(501) # Not implemented
if __name__ == '__main__':
def demo():
def retrieveDetections(frame):
LDframe = app.bot.streamers['LD'].frame
if not hasattr(LDframe, 'lock'): return
with LDframe.lock:
if not hasattr(LDframe, 'faces'): return
faces = LDframe.faces
# Apply zoom on rectangles
frame.faces = faces*2
app.bot.streamers['LD'].addProcess(utils.cv.faceDetect)
app.bot.streamers['SD'].addProcess(retrieveDetections)
app.bot.streamers['LD'].addProcess(utils.cv.drawFaces)
app.bot.streamers['SD'].addProcess(utils.cv.drawFaces)
app.bot.streamers['SD'].addProcess(utils.cv.drawFPS)
run_server()