Skip to content

Commit

Permalink
Safety Test Refactor: Toyota + support code (commaai#491)
Browse files Browse the repository at this point in the history
* bring over toyota + support code from safety-test-refactor

* old tests still use StdTest

* don't duplicate

* test fwd

* make linter happy

* fix indent

* fix ident

* fix order

* whitespace

* move some common tests

* cleanup

* unused

* comment
  • Loading branch information
adeebshihadeh authored Apr 10, 2020
1 parent 500370a commit abce8f3
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 240 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- run:
name: Run safety test
command: |
docker run panda_safety /bin/bash -c "cd /panda/tests/safety; PYTHONPATH=/ ./test.sh"
docker run panda_safety /bin/bash -c "cd /openpilot/panda/tests/safety; PYTHONPATH=/openpilot ./test.sh"
misra-c2012:
machine:
Expand Down
1 change: 1 addition & 0 deletions board/safety/safety_toyota.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ static int toyota_tx_hook(CAN_FIFOMailBox_TypeDef *to_send) {
static void toyota_init(int16_t param) {
controls_allowed = 0;
relay_malfunction_reset();
gas_interceptor_detected = 0;
toyota_dbc_eps_torque_factor = param;
}

Expand Down
34 changes: 32 additions & 2 deletions tests/safety/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
FROM ubuntu:16.04

RUN apt-get update && apt-get install -y clang make python python-pip git curl locales zlib1g-dev libffi-dev bzip2 libssl-dev libbz2-dev libusb-1.0-0
RUN apt-get update && apt-get install -y \
autoconf \
clang \
curl \
git \
libtool \
libssl-dev \
libusb-1.0-0 \
libzmq3-dev \
locales \
make \
python \
python-pip \
wget \
zlib1g-dev

RUN sed -i -e 's/# en_US.UTF-8 UTF-8/en_US.UTF-8 UTF-8/' /etc/locale.gen && locale-gen
ENV LANG en_US.UTF-8
Expand All @@ -17,4 +31,20 @@ RUN pyenv rehash
COPY requirements.txt /tmp/
RUN pip install -r /tmp/requirements.txt

COPY . /panda
WORKDIR /openpilot
RUN git clone https://github.com/commaai/opendbc.git || true
WORKDIR /openpilot/opendbc
RUN git checkout 36c471e59eaac3760a00125d557ef19af091f289
WORKDIR /openpilot
RUN git clone https://github.com/commaai/cereal.git
WORKDIR /openpilot/cereal
RUN git checkout e370f79522ff7fc0b16f33f4fef420be48061206
RUN /openpilot/cereal/install_capnp.sh

RUN pip install -r /openpilot/opendbc/requirements.txt

WORKDIR /openpilot
RUN cp /openpilot/opendbc/SConstruct /openpilot
COPY . /openpilot/panda

RUN scons -c && scons -j$(nproc)
166 changes: 160 additions & 6 deletions tests/safety/common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import abc
import struct
import unittest
from opendbc.can.packer import CANPacker # pylint: disable=import-error
from panda.tests.safety import libpandasafety_py

MAX_WRONG_COUNTERS = 5
Expand All @@ -8,17 +12,167 @@ class UNSAFE_MODE:
DISABLE_STOCK_AEB = 2
RAISE_LONGITUDINAL_LIMITS_TO_ISO_MAX = 8

def make_msg(bus, addr, length=8):
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
def twos_comp(val, bits):
if val >= 0:
return val
else:
return (2**bits) + val

def package_can_msg(msg):
addr, _, dat, bus = msg
rdlr, rdhr = struct.unpack('II', dat.ljust(8, b'\x00'))

ret = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
if addr >= 0x800:
to_send[0].RIR = (addr << 3) | 5
ret[0].RIR = (addr << 3) | 5
else:
to_send[0].RIR = (addr << 21) | 1
to_send[0].RDTR = length
to_send[0].RDTR |= bus << 4
ret[0].RIR = (addr << 21) | 1
ret[0].RDTR = len(dat) | ((bus & 0xF) << 4)
ret[0].RDHR = rdhr
ret[0].RDLR = rdlr

return ret

def make_msg(bus, addr, length=8):
return package_can_msg([addr, 0, b'\x00'*length, bus])

def interceptor_msg(gas, addr):
to_send = make_msg(0, addr, 6)
gas2 = gas * 2
to_send[0].RDLR = ((gas & 0xff) << 8) | ((gas & 0xff00) >> 8) | \
((gas2 & 0xff) << 24) | ((gas2 & 0xff00) << 8)
return to_send

class CANPackerPanda(CANPacker):
def make_can_msg_panda(self, name_or_addr, bus, values, counter=-1):
msg = self.make_can_msg(name_or_addr, bus, values, counter=-1)
return package_can_msg(msg)

class PandaSafetyTest(unittest.TestCase):
TX_MSGS = None
STANDSTILL_THRESHOLD = None
RELAY_MALFUNCTION_ADDR = None
RELAY_MALFUNCTION_BUS = None
FWD_BLACKLISTED_ADDRS = {} # {bus: [addr]}
FWD_BUS_LOOKUP = {}

@classmethod
def setUpClass(cls):
if cls.__name__ == "PandaSafetyTest":
cls.safety = None
raise unittest.SkipTest

def _rx(self, msg):
return self.safety.safety_rx_hook(msg)

def _tx(self, msg):
return self.safety.safety_tx_hook(msg)

@abc.abstractmethod
def _brake_msg(self, brake):
pass

@abc.abstractmethod
def _speed_msg(self, speed):
pass

@abc.abstractmethod
def _gas_msg(self, speed):
pass

# ***** standard tests for all safety modes *****

def test_relay_malfunction(self):
# each car has an addr that is used to detect relay malfunction
# if that addr is seen on specified bus, triggers the relay malfunction
# protection logic: both tx_hook and fwd_hook are expected to return failure
self.assertFalse(self.safety.get_relay_malfunction())
self._rx(make_msg(self.RELAY_MALFUNCTION_BUS, self.RELAY_MALFUNCTION_ADDR, 8))
self.assertTrue(self.safety.get_relay_malfunction())
for a in range(1, 0x800):
for b in range(0, 3):
self.assertFalse(self._tx(make_msg(b, a, 8)))
self.assertEqual(-1, self.safety.safety_fwd_hook(b, make_msg(b, a, 8)))

def test_fwd_hook(self):
# some safety modes don't forward anything, while others blacklist msgs
for bus in range(0x0, 0x3):
for addr in range(0x1, 0x800):
# assume len 8
msg = make_msg(bus, addr, 8)
fwd_bus = self.FWD_BUS_LOOKUP.get(bus, -1)
if bus in self.FWD_BLACKLISTED_ADDRS and addr in self.FWD_BLACKLISTED_ADDRS[bus]:
fwd_bus = -1
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(bus, msg))

def test_spam_can_buses(self):
for addr in range(1, 0x800):
for bus in range(0, 4):
if all(addr != m[0] or bus != m[1] for m in self.TX_MSGS):
self.assertFalse(self._tx(make_msg(bus, addr, 8)))

def test_default_controls_not_allowed(self):
self.assertFalse(self.safety.get_controls_allowed())

def test_manually_enable_controls_allowed(self):
self.safety.set_controls_allowed(1)
self.assertTrue(self.safety.get_controls_allowed())
self.safety.set_controls_allowed(0)
self.assertFalse(self.safety.get_controls_allowed())

def test_prev_gas(self):
for pressed in [True, False]:
self._rx(self._gas_msg(pressed))
self.assertEqual(pressed, self.safety.get_gas_pressed_prev())

def test_allow_engage_with_gas_pressed(self):
self._rx(self._gas_msg(1))
self.safety.set_controls_allowed(True)
self._rx(self._gas_msg(1))
self.assertTrue(self.safety.get_controls_allowed())
self._rx(self._gas_msg(1))
self.assertTrue(self.safety.get_controls_allowed())

def test_disengage_on_gas(self):
self._rx(self._gas_msg(0))
self.safety.set_controls_allowed(True)
self._rx(self._gas_msg(1))
self.assertFalse(self.safety.get_controls_allowed())

def test_unsafe_mode_no_disengage_on_gas(self):
self._rx(self._gas_msg(0))
self.safety.set_controls_allowed(True)
self.safety.set_unsafe_mode(UNSAFE_MODE.DISABLE_DISENGAGE_ON_GAS)
self._rx(self._gas_msg(1))
self.assertTrue(self.safety.get_controls_allowed())

def test_allow_brake_at_zero_speed(self):
# Brake was already pressed
self._rx(self._speed_msg(0))
self._rx(self._brake_msg(1))
self.safety.set_controls_allowed(1)
self._rx(self._brake_msg(1))
self.assertTrue(self.safety.get_controls_allowed())
self._rx(self._brake_msg(0))
self.assertTrue(self.safety.get_controls_allowed())
# rising edge of brake should disengage
self._rx(self._brake_msg(1))
self.assertFalse(self.safety.get_controls_allowed())
self._rx(self._brake_msg(0)) # reset no brakes

def test_not_allow_brake_when_moving(self):
# Brake was already pressed
self._rx(self._brake_msg(1))
self.safety.set_controls_allowed(1)
self._rx(self._speed_msg(self.STANDSTILL_THRESHOLD))
self._rx(self._brake_msg(1))
self.assertTrue(self.safety.get_controls_allowed())
self._rx(self._speed_msg(self.STANDSTILL_THRESHOLD + 1))
self._rx(self._brake_msg(1))
self.assertFalse(self.safety.get_controls_allowed())
self._rx(self._speed_msg(0))

# TODO: use PandaSafetyTest for all tests and delete this
class StdTest:
@staticmethod
def test_relay_malfunction(test, addr, bus=0):
Expand Down
1 change: 1 addition & 0 deletions tests/safety/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ void init_tests(void){
safety_mode_cnt = 2U; // avoid ignoring relay_malfunction logic
gas_pressed_prev = false;
brake_pressed_prev = false;
unsafe_mode = 0;
}

void init_tests_toyota(void){
Expand Down
50 changes: 22 additions & 28 deletions tests/safety/test_honda.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import numpy as np
from panda import Panda
from panda.tests.safety import libpandasafety_py
from panda.tests.safety.common import StdTest, make_msg, MAX_WRONG_COUNTERS, UNSAFE_MODE
from panda.tests.safety.common import StdTest, make_msg, interceptor_msg, \
MAX_WRONG_COUNTERS, UNSAFE_MODE

MAX_BRAKE = 255

Expand Down Expand Up @@ -87,13 +88,6 @@ def _send_brake_msg(self, brake):
to_send[0].RDLR = ((brake & 0x3) << 14) | ((brake & 0x3FF) >> 2)
return to_send

def _send_interceptor_msg(self, gas, addr):
to_send = make_msg(0, addr, 6)
gas2 = gas * 2
to_send[0].RDLR = ((gas & 0xff) << 8) | ((gas & 0xff00) >> 8) | \
((gas2 & 0xff) << 24) | ((gas2 & 0xff00) << 8)
return to_send

def _send_steer_msg(self, steer):
bus = 2 if self.safety.get_honda_hw() == HONDA_BG_HW else 0
to_send = make_msg(bus, 0xE4, 6)
Expand Down Expand Up @@ -176,11 +170,11 @@ def test_prev_gas(self):
self.assertTrue(self.safety.get_gas_pressed_prev())

def test_prev_gas_interceptor(self):
self.safety.safety_rx_hook(self._send_interceptor_msg(0x0, 0x201))
self.safety.safety_rx_hook(interceptor_msg(0x0, 0x201))
self.assertFalse(self.safety.get_gas_interceptor_prev())
self.safety.safety_rx_hook(self._send_interceptor_msg(0x1000, 0x201))
self.safety.safety_rx_hook(interceptor_msg(0x1000, 0x201))
self.assertTrue(self.safety.get_gas_interceptor_prev())
self.safety.safety_rx_hook(self._send_interceptor_msg(0x0, 0x201))
self.safety.safety_rx_hook(interceptor_msg(0x0, 0x201))
self.safety.set_gas_interceptor_detected(False)

def test_disengage_on_gas(self):
Expand All @@ -205,31 +199,31 @@ def test_allow_engage_with_gas_pressed(self):

def test_disengage_on_gas_interceptor(self):
for g in range(0, 0x1000):
self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201))
self.safety.safety_rx_hook(interceptor_msg(0, 0x201))
self.safety.set_controls_allowed(True)
self.safety.safety_rx_hook(self._send_interceptor_msg(g, 0x201))
self.safety.safety_rx_hook(interceptor_msg(g, 0x201))
remain_enabled = g <= INTERCEPTOR_THRESHOLD
self.assertEqual(remain_enabled, self.safety.get_controls_allowed())
self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201))
self.safety.safety_rx_hook(interceptor_msg(0, 0x201))
self.safety.set_gas_interceptor_detected(False)

def test_unsafe_mode_no_disengage_on_gas_interceptor(self):
self.safety.set_controls_allowed(True)
self.safety.set_unsafe_mode(UNSAFE_MODE.DISABLE_DISENGAGE_ON_GAS)
for g in range(0, 0x1000):
self.safety.safety_rx_hook(self._send_interceptor_msg(g, 0x201))
self.safety.safety_rx_hook(interceptor_msg(g, 0x201))
self.assertTrue(self.safety.get_controls_allowed())
self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201))
self.safety.safety_rx_hook(interceptor_msg(0, 0x201))
self.safety.set_gas_interceptor_detected(False)
self.safety.set_unsafe_mode(UNSAFE_MODE.DEFAULT)
self.safety.set_controls_allowed(False)

def test_allow_engage_with_gas_interceptor_pressed(self):
self.safety.safety_rx_hook(self._send_interceptor_msg(0x1000, 0x201))
self.safety.safety_rx_hook(interceptor_msg(0x1000, 0x201))
self.safety.set_controls_allowed(1)
self.safety.safety_rx_hook(self._send_interceptor_msg(0x1000, 0x201))
self.safety.safety_rx_hook(interceptor_msg(0x1000, 0x201))
self.assertTrue(self.safety.get_controls_allowed())
self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201))
self.safety.safety_rx_hook(interceptor_msg(0, 0x201))
self.safety.set_gas_interceptor_detected(False)

def test_brake_safety_check(self):
Expand Down Expand Up @@ -258,7 +252,7 @@ def test_gas_interceptor_safety_check(self):
send = True
else:
send = gas == 0
self.assertEqual(send, self.safety.safety_tx_hook(self._send_interceptor_msg(gas, 0x200)))
self.assertEqual(send, self.safety.safety_tx_hook(interceptor_msg(gas, 0x200)))

def test_steer_safety_check(self):
self.safety.set_controls_allowed(0)
Expand Down Expand Up @@ -365,22 +359,22 @@ def test_tx_hook_on_pedal_pressed(self):
self.safety.safety_rx_hook(self._gas_msg(1))
elif pedal == 'interceptor':
# gas_interceptor_prev > INTERCEPTOR_THRESHOLD
self.safety.safety_rx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))
self.safety.safety_rx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))
self.safety.safety_rx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))
self.safety.safety_rx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))

self.safety.set_controls_allowed(1)
hw = self.safety.get_honda_hw()
if hw == HONDA_N_HW:
self.safety.set_honda_fwd_brake(False)
self.assertFalse(self.safety.safety_tx_hook(self._send_brake_msg(MAX_BRAKE)))
self.assertFalse(self.safety.safety_tx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD, 0x200)))
self.assertFalse(self.safety.safety_tx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD, 0x200)))
self.assertFalse(self.safety.safety_tx_hook(self._send_steer_msg(0x1000)))

# reset status
self.safety.set_controls_allowed(0)
self.safety.safety_tx_hook(self._send_brake_msg(0))
self.safety.safety_tx_hook(self._send_steer_msg(0))
self.safety.safety_tx_hook(self._send_interceptor_msg(0, 0x200))
self.safety.safety_tx_hook(interceptor_msg(0, 0x200))
if pedal == 'brake':
self.safety.safety_rx_hook(self._speed_msg(0))
self.safety.safety_rx_hook(self._brake_msg(0))
Expand All @@ -403,23 +397,23 @@ def test_tx_hook_on_pedal_pressed_on_unsafe_gas_mode(self):
allow_ctrl = True
elif pedal == 'interceptor':
# gas_interceptor_prev > INTERCEPTOR_THRESHOLD
self.safety.safety_rx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))
self.safety.safety_rx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))
self.safety.safety_rx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))
self.safety.safety_rx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))
allow_ctrl = True

self.safety.set_controls_allowed(1)
hw = self.safety.get_honda_hw()
if hw == HONDA_N_HW:
self.safety.set_honda_fwd_brake(False)
self.assertEqual(allow_ctrl, self.safety.safety_tx_hook(self._send_brake_msg(MAX_BRAKE)))
self.assertEqual(allow_ctrl, self.safety.safety_tx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD, 0x200)))
self.assertEqual(allow_ctrl, self.safety.safety_tx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD, 0x200)))
self.assertEqual(allow_ctrl, self.safety.safety_tx_hook(self._send_steer_msg(0x1000)))
# reset status
self.safety.set_controls_allowed(0)
self.safety.set_unsafe_mode(UNSAFE_MODE.DEFAULT)
self.safety.safety_tx_hook(self._send_brake_msg(0))
self.safety.safety_tx_hook(self._send_steer_msg(0))
self.safety.safety_tx_hook(self._send_interceptor_msg(0, 0x200))
self.safety.safety_tx_hook(interceptor_msg(0, 0x200))
if pedal == 'brake':
self.safety.safety_rx_hook(self._speed_msg(0))
self.safety.safety_rx_hook(self._brake_msg(0))
Expand Down
Loading

0 comments on commit abce8f3

Please sign in to comment.