From 9a48c8ef666c9a5eb87a808df7feb70145633f61 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Tue, 21 Nov 2023 17:23:37 -0800 Subject: [PATCH] safety: check interceptor msg counter (#1738) * toyota: check interceptor counter * add counters and check rx * honda: check counter * clean up --- board/safety/safety_honda.h | 15 +++++++++++++-- board/safety/safety_toyota.h | 12 ++++++++++++ tests/safety/common.py | 20 ++++++++++++++++++-- 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/board/safety/safety_honda.h b/board/safety/safety_honda.h index 6bdc40058f..62440167da 100644 --- a/board/safety/safety_honda.h +++ b/board/safety/safety_honda.h @@ -44,6 +44,7 @@ const LongitudinalLimits HONDA_NIDEC_LONG_LIMITS = { // Nidec and bosch radarless has the powertrain bus on bus 0 RxCheck honda_common_rx_checks[] = { HONDA_COMMON_RX_CHECKS(0) + {.msg = {{0x201, 0, 6, .check_checksum = false, .max_counter = 15U, .expected_timestep = 0U}, { 0 }, { 0 }}}, }; RxCheck honda_common_alt_brake_rx_checks[] = { @@ -54,6 +55,7 @@ RxCheck honda_common_alt_brake_rx_checks[] = { // For Nidecs with main on signal on an alternate msg (missing 0x326) RxCheck honda_nidec_alt_rx_checks[] = { HONDA_COMMON_NO_SCM_FEEDBACK_RX_CHECKS(0) + {.msg = {{0x201, 0, 6, .check_checksum = false, .max_counter = 15U, .expected_timestep = 0U}, { 0 }, { 0 }}}, }; // Bosch has pt on bus 1, verified 0x1A6 does not exist @@ -115,8 +117,17 @@ static uint32_t honda_compute_checksum(CANPacket_t *to_push) { } static uint8_t honda_get_counter(CANPacket_t *to_push) { - int counter_byte = GET_LEN(to_push) - 1U; - return ((uint8_t)(GET_BYTE(to_push, counter_byte)) >> 4U) & 0x3U; + int addr = GET_ADDR(to_push); + + uint8_t cnt = 0U; + if (addr == 0x201) { + // Signal: COUNTER_PEDAL + cnt = GET_BYTE(to_push, 4) & 0x0FU; + } else { + int counter_byte = GET_LEN(to_push) - 1U; + cnt = (GET_BYTE(to_push, counter_byte) >> 4U) & 0x3U; + } + return cnt; } static void honda_rx_hook(CANPacket_t *to_push) { diff --git a/board/safety/safety_toyota.h b/board/safety/safety_toyota.h index c1144e655c..4d91874aa2 100644 --- a/board/safety/safety_toyota.h +++ b/board/safety/safety_toyota.h @@ -92,6 +92,17 @@ static uint32_t toyota_get_checksum(CANPacket_t *to_push) { return (uint8_t)(GET_BYTE(to_push, checksum_byte)); } +static uint8_t toyota_get_counter(CANPacket_t *to_push) { + int addr = GET_ADDR(to_push); + + uint8_t cnt = 0U; + if (addr == 0x201) { + // Signal: COUNTER_PEDAL + cnt = GET_BYTE(to_push, 4) & 0x0FU; + } + return cnt; +} + static bool toyota_get_quality_flag_valid(CANPacket_t *to_push) { int addr = GET_ADDR(to_push); @@ -345,5 +356,6 @@ const safety_hooks toyota_hooks = { .fwd = toyota_fwd_hook, .get_checksum = toyota_get_checksum, .compute_checksum = toyota_compute_checksum, + .get_counter = toyota_get_counter, .get_quality_flag_valid = toyota_get_quality_flag_valid, }; diff --git a/tests/safety/common.py b/tests/safety/common.py index 210558ac9c..0d4f2c36fc 100644 --- a/tests/safety/common.py +++ b/tests/safety/common.py @@ -122,6 +122,9 @@ class GasInterceptorSafetyTest(PandaSafetyTestBase): INTERCEPTOR_THRESHOLD = 0 + cnt_gas_cmd = 0 + cnt_user_gas = 0 + @classmethod def setUpClass(cls): if cls.__name__ == "GasInterceptorSafetyTest": @@ -129,14 +132,17 @@ def setUpClass(cls): raise unittest.SkipTest def _interceptor_gas_cmd(self, gas): - values = {} + values = {"COUNTER_PEDAL": self.__class__.cnt_gas_cmd & 0xF} if gas > 0: values["GAS_COMMAND"] = gas * 255. values["GAS_COMMAND2"] = gas * 255. + self.__class__.cnt_gas_cmd += 1 return self.packer.make_can_msg_panda("GAS_COMMAND", 0, values) def _interceptor_user_gas(self, gas): - values = {"INTERCEPTOR_GAS": gas, "INTERCEPTOR_GAS2": gas} + values = {"INTERCEPTOR_GAS": gas, "INTERCEPTOR_GAS2": gas, + "COUNTER_PEDAL": self.__class__.cnt_user_gas} + self.__class__.cnt_user_gas += 1 return self.packer.make_can_msg_panda("GAS_SENSOR", 0, values) def test_prev_gas_interceptor(self): @@ -146,6 +152,16 @@ def test_prev_gas_interceptor(self): self.assertTrue(self.safety.get_gas_interceptor_prev()) self._rx(self._interceptor_user_gas(0x0)) + def test_rx_hook_interceptor(self): + # Ensure pedal counter is checked + for i in range(MAX_WRONG_COUNTERS * 2): + self.__class__.cnt_user_gas = 0 + self.assertEqual(i + 1 < MAX_WRONG_COUNTERS, self._rx(self._interceptor_user_gas(0))) + + # Only one is needed to recover + for _ in range(MAX_WRONG_COUNTERS): + self.assertTrue(self._rx(self._interceptor_user_gas(0))) + def test_disengage_on_gas_interceptor(self): for g in range(0x1000): self._rx(self._interceptor_user_gas(0))