-
Notifications
You must be signed in to change notification settings - Fork 9.1k
/
drive_helpers.py
179 lines (144 loc) · 6.45 KB
/
drive_helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import math
from cereal import car, log
from openpilot.common.conversions import Conversions as CV
from openpilot.common.numpy_fast import clip
from openpilot.common.realtime import DT_CTRL
from openpilot.system.version import get_build_metadata
EventName = car.CarEvent.EventName
# WARNING: this value was determined based on the model's training distribution,
# model predictions above this speed can be unpredictable
# V_CRUISE's are in kph
V_CRUISE_MIN = 8
V_CRUISE_MAX = 145
V_CRUISE_UNSET = 255
V_CRUISE_INITIAL = 40
V_CRUISE_INITIAL_EXPERIMENTAL_MODE = 105
IMPERIAL_INCREMENT = round(CV.MPH_TO_KPH, 1) # round here to avoid rounding errors incrementing set speed
MIN_SPEED = 1.0
CONTROL_N = 17
CAR_ROTATION_RADIUS = 0.0
# EU guidelines
MAX_LATERAL_JERK = 5.0
MAX_VEL_ERR = 5.0
ButtonEvent = car.CarState.ButtonEvent
ButtonType = car.CarState.ButtonEvent.Type
CRUISE_LONG_PRESS = 50
CRUISE_NEAREST_FUNC = {
ButtonType.accelCruise: math.ceil,
ButtonType.decelCruise: math.floor,
}
CRUISE_INTERVAL_SIGN = {
ButtonType.accelCruise: +1,
ButtonType.decelCruise: -1,
}
class VCruiseHelper:
def __init__(self, CP):
self.CP = CP
self.v_cruise_kph = V_CRUISE_UNSET
self.v_cruise_cluster_kph = V_CRUISE_UNSET
self.v_cruise_kph_last = 0
self.button_timers = {ButtonType.decelCruise: 0, ButtonType.accelCruise: 0}
self.button_change_states = {btn: {"standstill": False, "enabled": False} for btn in self.button_timers}
@property
def v_cruise_initialized(self):
return self.v_cruise_kph != V_CRUISE_UNSET
def update_v_cruise(self, CS, enabled, is_metric):
self.v_cruise_kph_last = self.v_cruise_kph
if CS.cruiseState.available:
if not self.CP.pcmCruise:
# if stock cruise is completely disabled, then we can use our own set speed logic
self._update_v_cruise_non_pcm(CS, enabled, is_metric)
self.v_cruise_cluster_kph = self.v_cruise_kph
self.update_button_timers(CS, enabled)
else:
self.v_cruise_kph = CS.cruiseState.speed * CV.MS_TO_KPH
self.v_cruise_cluster_kph = CS.cruiseState.speedCluster * CV.MS_TO_KPH
else:
self.v_cruise_kph = V_CRUISE_UNSET
self.v_cruise_cluster_kph = V_CRUISE_UNSET
def _update_v_cruise_non_pcm(self, CS, enabled, is_metric):
# handle button presses. TODO: this should be in state_control, but a decelCruise press
# would have the effect of both enabling and changing speed is checked after the state transition
if not enabled:
return
long_press = False
button_type = None
v_cruise_delta = 1. if is_metric else IMPERIAL_INCREMENT
for b in CS.buttonEvents:
if b.type.raw in self.button_timers and not b.pressed:
if self.button_timers[b.type.raw] > CRUISE_LONG_PRESS:
return # end long press
button_type = b.type.raw
break
else:
for k, timer in self.button_timers.items():
if timer and timer % CRUISE_LONG_PRESS == 0:
button_type = k
long_press = True
break
if button_type is None:
return
# Don't adjust speed when pressing resume to exit standstill
cruise_standstill = self.button_change_states[button_type]["standstill"] or CS.cruiseState.standstill
if button_type == ButtonType.accelCruise and cruise_standstill:
return
# Don't adjust speed if we've enabled since the button was depressed (some ports enable on rising edge)
if not self.button_change_states[button_type]["enabled"]:
return
v_cruise_delta = v_cruise_delta * (5 if long_press else 1)
if long_press and self.v_cruise_kph % v_cruise_delta != 0: # partial interval
self.v_cruise_kph = CRUISE_NEAREST_FUNC[button_type](self.v_cruise_kph / v_cruise_delta) * v_cruise_delta
else:
self.v_cruise_kph += v_cruise_delta * CRUISE_INTERVAL_SIGN[button_type]
# If set is pressed while overriding, clip cruise speed to minimum of vEgo
if CS.gasPressed and button_type in (ButtonType.decelCruise, ButtonType.setCruise):
self.v_cruise_kph = max(self.v_cruise_kph, CS.vEgo * CV.MS_TO_KPH)
self.v_cruise_kph = clip(round(self.v_cruise_kph, 1), V_CRUISE_MIN, V_CRUISE_MAX)
def update_button_timers(self, CS, enabled):
# increment timer for buttons still pressed
for k in self.button_timers:
if self.button_timers[k] > 0:
self.button_timers[k] += 1
for b in CS.buttonEvents:
if b.type.raw in self.button_timers:
# Start/end timer and store current state on change of button pressed
self.button_timers[b.type.raw] = 1 if b.pressed else 0
self.button_change_states[b.type.raw] = {"standstill": CS.cruiseState.standstill, "enabled": enabled}
def initialize_v_cruise(self, CS, experimental_mode: bool) -> None:
# initializing is handled by the PCM
if self.CP.pcmCruise:
return
initial = V_CRUISE_INITIAL_EXPERIMENTAL_MODE if experimental_mode else V_CRUISE_INITIAL
# 250kph or above probably means we never had a set speed
if any(b.type in (ButtonType.accelCruise, ButtonType.resumeCruise) for b in CS.buttonEvents) and self.v_cruise_kph_last < 250:
self.v_cruise_kph = self.v_cruise_kph_last
else:
self.v_cruise_kph = int(round(clip(CS.vEgo * CV.MS_TO_KPH, initial, V_CRUISE_MAX)))
self.v_cruise_cluster_kph = self.v_cruise_kph
def clip_curvature(v_ego, prev_curvature, new_curvature):
v_ego = max(MIN_SPEED, v_ego)
max_curvature_rate = MAX_LATERAL_JERK / (v_ego**2) # inexact calculation, check https://github.com/commaai/openpilot/pull/24755
safe_desired_curvature = clip(new_curvature,
prev_curvature - max_curvature_rate * DT_CTRL,
prev_curvature + max_curvature_rate * DT_CTRL)
return safe_desired_curvature
def get_speed_error(modelV2: log.ModelDataV2, v_ego: float) -> float:
# ToDo: Try relative error, and absolute speed
if len(modelV2.temporalPose.trans):
vel_err = clip(modelV2.temporalPose.trans[0] - v_ego, -MAX_VEL_ERR, MAX_VEL_ERR)
return float(vel_err)
return 0.0
def get_startup_event(car_recognized, controller_available, fw_seen):
build_metadata = get_build_metadata()
if build_metadata.openpilot.comma_remote and build_metadata.tested_channel:
event = EventName.startup
else:
event = EventName.startupMaster
if not car_recognized:
if fw_seen:
event = EventName.startupNoCar
else:
event = EventName.startupNoFw
elif car_recognized and not controller_available:
event = EventName.startupNoControl
return event