-
Notifications
You must be signed in to change notification settings - Fork 0
/
model_predictive_control.py
311 lines (252 loc) · 11.2 KB
/
model_predictive_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
import random
import numpy as np
from scipy.optimize import minimize
from scipy.interpolate import splprep, splev
import sympy as sym
from sympy.tensor.array import derive_by_array
sym.init_printing()
from abstract_controller import Controller
STEER_BOUND = 1.0
STEER_BOUNDS = (-STEER_BOUND, STEER_BOUND)
THROTTLE_BOUND = 1.0
THROTTLE_BOUNDS = (0.0, THROTTLE_BOUND)
class _EqualityConstraints(object):
"""Class for storing equality constraints in the MPC."""
def __init__(self, N, state_vars):
self.dict = {}
for symbol in state_vars:
self.dict[symbol] = N*[None]
def __getitem__(self, key):
return self.dict[key]
def __setitem__(self, key, value):
self.dict[key] = value
class MPCController(Controller):
def __init__(self, target_speed, steps_ahead=10, dt=0.1):
self.target_speed = target_speed
self.state_vars = ('x', 'y', 'v', 'ψ', 'cte', 'eψ')
self.steps_ahead = steps_ahead
self.dt = dt
# Cost function coefficients
self.cte_coeff = 100 # 100
self.epsi_coeff = 100 # 100
self.speed_coeff = 0.4 # 0.2
self.acc_coeff = 1 # 1
self.steer_coeff = 0.1 # 0.1
self.consec_acc_coeff = 50
self.consec_steer_coeff = 50
# Front wheel L
self.Lf = 2.5 # TODO: check if true
# How the polynomial fitting the desired curve is fitted
self.steps_poly = 30
self.poly_degree = 3
# Bounds for the optimizer
self.bounds = (
6*self.steps_ahead * [(None, None)]
+ self.steps_ahead * [THROTTLE_BOUNDS]
+ self.steps_ahead * [STEER_BOUNDS]
)
# State 0 placeholder
num_vars = (len(self.state_vars) + 2) # State variables and two actuators
self.state0 = np.zeros(self.steps_ahead*num_vars)
# Lambdify and minimize stuff
self.evaluator = 'numpy'
self.tolerance = 1
self.cost_func, self.cost_grad_func, self.constr_funcs = self.get_func_constraints_and_bounds()
# To keep the previous state
self.steer = None
self.throttle = None
def get_func_constraints_and_bounds(self):
"""The most important method of this class, defining the MPC's cost
function and constraints.
"""
# Polynomial coefficients will also be symbolic variables
poly = self.create_array_of_symbols('poly', self.poly_degree+1)
# Initialize the initial state
x_init = sym.symbols('x_init')
y_init = sym.symbols('y_init')
ψ_init = sym.symbols('ψ_init')
v_init = sym.symbols('v_init')
cte_init = sym.symbols('cte_init')
eψ_init = sym.symbols('eψ_init')
init = (x_init, y_init, ψ_init, v_init, cte_init, eψ_init)
# State variables
x = self.create_array_of_symbols('x', self.steps_ahead)
y = self.create_array_of_symbols('y', self.steps_ahead)
ψ = self.create_array_of_symbols('ψ', self.steps_ahead)
v = self.create_array_of_symbols('v', self.steps_ahead)
cte = self.create_array_of_symbols('cte', self.steps_ahead)
eψ = self.create_array_of_symbols('eψ', self.steps_ahead)
# Actuators
a = self.create_array_of_symbols('a', self.steps_ahead)
δ = self.create_array_of_symbols('δ', self.steps_ahead)
vars_ = (
# Symbolic arrays (but NOT actuators)
*x, *y, *ψ, *v, *cte, *eψ,
# Symbolic arrays (actuators)
*a, *δ,
)
cost = 0
for t in range(self.steps_ahead):
cost += (
# Reference state penalties
self.cte_coeff * cte[t]**2
+ self.epsi_coeff * eψ[t]**2 +
+ self.speed_coeff * (v[t] - self.target_speed)**2
# # Actuator penalties
+ self.acc_coeff * a[t]**2
+ self.steer_coeff * δ[t]**2
)
# Penalty for differences in consecutive actuators
for t in range(self.steps_ahead-1):
cost += (
self.consec_acc_coeff * (a[t+1] - a[t])**2
+ self.consec_steer_coeff * (δ[t+1] - δ[t])**2
)
# Initialize constraints
eq_constr = _EqualityConstraints(self.steps_ahead, self.state_vars)
eq_constr['x'][0] = x[0] - x_init
eq_constr['y'][0] = y[0] - y_init
eq_constr['ψ'][0] = ψ[0] - ψ_init
eq_constr['v'][0] = v[0] - v_init
eq_constr['cte'][0] = cte[0] - cte_init
eq_constr['eψ'][0] = eψ[0] - eψ_init
for t in range(1, self.steps_ahead):
curve = sum(poly[-(i+1)] * x[t-1]**i for i in range(len(poly)))
# The desired ψ is equal to the derivative of the polynomial curve at
# point x[t-1]
ψdes = sum(poly[-(i+1)] * i*x[t-1]**(i-1) for i in range(1, len(poly)))
eq_constr['x'][t] = x[t] - (x[t-1] + v[t-1] * sym.cos(ψ[t-1]) * self.dt)
eq_constr['y'][t] = y[t] - (y[t-1] + v[t-1] * sym.sin(ψ[t-1]) * self.dt)
eq_constr['ψ'][t] = ψ[t] - (ψ[t-1] - v[t-1] * δ[t-1] / self.Lf * self.dt)
eq_constr['v'][t] = v[t] - (v[t-1] + a[t-1] * self.dt)
eq_constr['cte'][t] = cte[t] - (curve - y[t-1] + v[t-1] * sym.sin(eψ[t-1]) * self.dt)
eq_constr['eψ'][t] = eψ[t] - (ψ[t-1] - ψdes - v[t-1] * δ[t-1] / self.Lf * self.dt)
# Generate actual functions from
cost_func = self.generate_fun(cost, vars_, init, poly)
cost_grad_func = self.generate_grad(cost, vars_, init, poly)
constr_funcs = []
for symbol in self.state_vars:
for t in range(self.steps_ahead):
func = self.generate_fun(eq_constr[symbol][t], vars_, init, poly)
grad_func = self.generate_grad(eq_constr[symbol][t], vars_, init, poly)
constr_funcs.append(
{'type': 'eq', 'fun': func, 'jac': grad_func, 'args': None},
)
return cost_func, cost_grad_func, constr_funcs
def control(self, pts_2D, car_state, pose, depth_array):
which_closest, _, location = self._calc_closest_dists_and_location(pose, pts_2D)
# Stabilizes polynomial fitting
which_closest_shifted = which_closest - 5
# NOTE: `which_closest_shifted` might become < 0, but the modulo operation below fixes that
indeces = which_closest_shifted + self.steps_poly*np.arange(self.poly_degree+1)
indeces = indeces % pts_2D.shape[0]
pts = pts_2D[indeces]
# orient = measurements.player_measurements.transform.orientation
orient = np.array(
[pose.orientation.x_val, pose.orientation.y_val, pose.orientation.z_val, pose.orientation.w_val])
v = car_state.speed # km / h
# ψ = np.arctan2(orient.y, orient.x)
ψ = np.arctan2(2.0 * (orient[3] * orient[2] + orient[0] * orient[1]),
1.0 - 2.0 * (orient[1] ** 2 + orient[2] ** 2))
cos_ψ = np.cos(ψ)
sin_ψ = np.sin(ψ)
x, y = location[0], location[1]
pts_car = MPCController.transform_into_cars_coordinate_system(pts, x, y, cos_ψ, sin_ψ)
poly = np.polyfit(pts_car[:, 0], pts_car[:, 1], self.poly_degree)
cte = poly[-1]
eψ = -np.arctan(poly[-2])
init = (0, 0, 0, v, cte, eψ, *poly)
self.state0 = self.get_state0(v, cte, eψ, self.steer, self.throttle, poly)
result = self.minimize_cost(self.bounds, self.state0, init)
# Left here for debugging
# self.steer = -0.6 * cte - 5.5 * (cte - prev_cte)
# prev_cte = cte
# self.throttle = clip_throttle(self.throttle, v, target_speed)
if 'success' in result.message:
self.steer = result.x[-self.steps_ahead]
self.throttle = result.x[-2*self.steps_ahead]
else:
# print('Unsuccessful optimization')
print('Optimization failed with message:', result.message)
one_log_dict = {
'x': x,
'y': y,
'orient_x': orient[0],
'orient_y': orient[1],
'steer': self.steer,
'throttle': self.throttle,
'speed': v,
'psi': ψ,
'cte': cte,
'epsi': eψ,
'which_closest': which_closest,
}
for i, coeff in enumerate(poly):
one_log_dict['poly{}'.format(i)] = coeff
for i in range(pts_car.shape[0]):
for j in range(pts_car.shape[1]):
one_log_dict['pts_car_{}_{}'.format(i, j)] = pts_car[i][j]
return one_log_dict
def get_state0(self, v, cte, epsi, a, delta, poly):
a = a or 0
delta = delta or 0
# "Go as the road goes"
# x = np.linspace(0, self.steps_ahead*self.dt*v, self.steps_ahead)
# y = np.polyval(poly, x)
x = np.linspace(0, 1, self.steps_ahead)
y = np.polyval(poly, x)
psi = 0
self.state0[:self.steps_ahead] = x
self.state0[self.steps_ahead:2*self.steps_ahead] = y
self.state0[2*self.steps_ahead:3*self.steps_ahead] = psi
self.state0[3*self.steps_ahead:4*self.steps_ahead] = v
self.state0[4*self.steps_ahead:5*self.steps_ahead] = cte
self.state0[5*self.steps_ahead:6*self.steps_ahead] = epsi
self.state0[6*self.steps_ahead:7*self.steps_ahead] = a
self.state0[7*self.steps_ahead:8*self.steps_ahead] = delta
return self.state0
def generate_fun(self, symb_fun, vars_, init, poly):
args = init + poly
return sym.lambdify((vars_, *args), symb_fun, self.evaluator)
# Equivalent to (but faster than):
# func = sym.lambdify(vars_+init+poly, symb_fun, evaluator)
# return lambda x, *args: func(*np.r_[x, args])
def generate_grad(self, symb_fun, vars_, init, poly):
args = init + poly
return sym.lambdify(
(vars_, *args),
derive_by_array(symb_fun, vars_+args)[:len(vars_)],
self.evaluator
)
# Equivalent to (but faster than):
# cost_grad_funcs = [
# generate_fun(symb_fun.diff(var), vars_, init, poly)
# for var in vars_
# ]
# return lambda x, *args: [
# grad_func(np.r_[x, args]) for grad_func in cost_grad_funcs
# ]
def minimize_cost(self, bounds, x0, init):
# TODO: this is a bit retarded, but hey -- that's scipy API's fault ;)
for constr_func in self.constr_funcs:
constr_func['args'] = init
return minimize(
fun=self.cost_func,
x0=x0,
args=init,
jac=self.cost_grad_func,
bounds=bounds,
constraints=self.constr_funcs,
method='SLSQP',
tol=self.tolerance,
)
@staticmethod
def create_array_of_symbols(str_symbol, N):
return sym.symbols('{symbol}0:{N}'.format(symbol=str_symbol, N=N))
@staticmethod
def transform_into_cars_coordinate_system(pts, x, y, cos_ψ, sin_ψ):
diff = (pts - [x, y])
pts_car = np.zeros_like(diff)
pts_car[:, 0] = cos_ψ * diff[:, 0] + sin_ψ * diff[:, 1]
pts_car[:, 1] = sin_ψ * diff[:, 0] - cos_ψ * diff[:, 1]
return pts_car