-
Notifications
You must be signed in to change notification settings - Fork 8
/
drive.py
79 lines (68 loc) · 2.09 KB
/
drive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#parsing command line arguments
import argparse
#decoding camera images
import base64
#for frametimestamp saving
from datetime import datetime
#reading and writing files
import os
#high level file operations
import shutil
#matrix math
import numpy as np
#real-time server
import socketio
#concurrent networking
import eventlet
#web server gateway interface
import eventlet.wsgi
#image manipulation
from PIL import Image
#web framework
from flask import Flask
#input output
from io import BytesIO
# Load model
from model import Model
#initialize our server
sio = socketio.Server()
#our flask (web) app
app = Flask(__name__)
model = Model()
model.loadModel()
#registering event handler for the server
@sio.on('telemetry')
def telemetry(sid, data):
if data:
# The current steering angle of the car
steering_angle = float(data["steering_angle"])
# The current throttle of the car, how hard to push peddle
throttle = float(data["throttle"])
# The current speed of the car
speed = float(data["speed"])
# Compute steering angle of the car
image = Image.open(BytesIO(base64.b64decode(data["image"])))
steering_angle = model.predict(image, preloaded=True)
# Compute speed
speed_target = 25 - abs(steering_angle) / 0.4 * 10
# throttle = 0.2 - abs(steering_angle) / 0.4 * 0.15
throttle = (speed_target - speed) * 0.1
print("network prediction -> (steering angle: {:.3f}, throttle: {:.3f})".format(steering_angle, throttle))
send_control(steering_angle, throttle)
@sio.on('connect')
def connect(sid, environ):
print("connect ", sid)
send_control(0, 0)
def send_control(steering_angle, throttle):
sio.emit(
"steer",
data={
'steering_angle': steering_angle.__str__(),
'throttle': throttle.__str__()
},
skip_sid=True)
if __name__ == '__main__':
# wrap Flask application with engineio's middleware
app = socketio.Middleware(sio, app)
# deploy as an eventlet WSGI server
eventlet.wsgi.server(eventlet.listen(('', 4567)), app)