diff --git a/.gitignore b/.gitignore index 70d010fabf3d7f..397996a0bcca35 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.o *.so *.d +*.dump a.out *~ .#* @@ -12,4 +13,5 @@ pandacan.egg-info/ board/obj/ examples/output.csv .DS_Store +.vscode nosetests.xml diff --git a/VERSION b/VERSION index 5beebea89c0857..ef6abda3f40a28 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v1.4.3 \ No newline at end of file +v1.4.6 \ No newline at end of file diff --git a/board/board_declarations.h b/board/board_declarations.h index 7a44113f613bf2..cc93ec7c97e354 100644 --- a/board/board_declarations.h +++ b/board/board_declarations.h @@ -24,7 +24,7 @@ struct board { }; // ******************* Definitions ******************** -// These should match the enum in cereal/log.capnp +// These should match the enums in cereal/log.capnp and __init__.py #define HW_TYPE_UNKNOWN 0U #define HW_TYPE_WHITE_PANDA 1U #define HW_TYPE_GREY_PANDA 2U @@ -54,4 +54,4 @@ struct board { #define CAN_MODE_OBD_CAN2 3U // ********************* Globals ********************** -uint8_t usb_power_mode = USB_POWER_NONE; \ No newline at end of file +uint8_t usb_power_mode = USB_POWER_NONE; diff --git a/board/boards/black.h b/board/boards/black.h index 0372aa3d079ae2..62b3b674036177 100644 --- a/board/boards/black.h +++ b/board/boards/black.h @@ -23,8 +23,9 @@ void black_enable_can_transciever(uint8_t transciever, bool enabled) { } void black_enable_can_transcievers(bool enabled) { - for(uint8_t i=1; i<=4U; i++) + for(uint8_t i=1U; i<=4U; i++){ black_enable_can_transciever(i, enabled); + } } void black_set_led(uint8_t color, bool enabled) { @@ -43,26 +44,41 @@ void black_set_led(uint8_t color, bool enabled) { } } -void black_set_usb_power_mode(uint8_t mode){ +void black_set_gps_load_switch(bool enabled) { + set_gpio_output(GPIOC, 12, enabled); +} + +void black_set_usb_load_switch(bool enabled) { + set_gpio_output(GPIOB, 1, !enabled); +} + +void black_set_usb_power_mode(uint8_t mode) { usb_power_mode = mode; - puts("Trying to set USB power mode on black panda. This is not supported.\n"); + if (mode == USB_POWER_NONE) { + black_set_usb_load_switch(false); + } else { + black_set_usb_load_switch(true); + } } void black_set_esp_gps_mode(uint8_t mode) { switch (mode) { case ESP_GPS_DISABLED: - // ESP OFF + // GPS OFF set_gpio_output(GPIOC, 14, 0); set_gpio_output(GPIOC, 5, 0); + black_set_gps_load_switch(false); break; case ESP_GPS_ENABLED: - // ESP ON + // GPS ON set_gpio_output(GPIOC, 14, 1); set_gpio_output(GPIOC, 5, 1); + black_set_gps_load_switch(true); break; case ESP_GPS_BOOTMODE: set_gpio_output(GPIOC, 14, 1); set_gpio_output(GPIOC, 5, 0); + black_set_gps_load_switch(true); break; default: puts("Invalid ESP/GPS mode\n"); @@ -132,9 +148,11 @@ void black_init(void) { // C8: FAN aka TIM3_CH3 set_gpio_alternate(GPIOC, 8, GPIO_AF2_TIM3); - // C12: GPS load switch. Turn on permanently for now - set_gpio_output(GPIOC, 12, true); - //set_gpio_output(GPIOC, 12, false); //TODO: stupid inverted switch on prototype + // Turn on GPS load switch. + black_set_gps_load_switch(true); + + // Turn on USB load switch. + black_set_usb_load_switch(true); // Initialize harness harness_init(); diff --git a/board/boards/white.h b/board/boards/white.h index a4e65d67bacde5..46668c3a81f555 100644 --- a/board/boards/white.h +++ b/board/boards/white.h @@ -285,6 +285,13 @@ void white_init(void) { // Set normal CAN mode white_set_can_mode(CAN_MODE_NORMAL); + + // Setup ignition interrupts + SYSCFG->EXTICR[1] = SYSCFG_EXTICR1_EXTI1_PA; + EXTI->IMR |= (1U << 1); + EXTI->RTSR |= (1U << 1); + EXTI->FTSR |= (1U << 1); + NVIC_EnableIRQ(EXTI1_IRQn); } const harness_configuration white_harness_config = { diff --git a/board/main.c b/board/main.c index b5c8cea42a7111..567795d6259e00 100644 --- a/board/main.c +++ b/board/main.c @@ -78,9 +78,11 @@ void started_interrupt_handler(uint8_t interrupt_line) { // jenky debounce delay(100000); - // set power savings mode here - int power_save_state = current_board->check_ignition() ? POWER_SAVE_STATUS_DISABLED : POWER_SAVE_STATUS_ENABLED; - set_power_save_state(power_save_state); + // set power savings mode here if on EON build + #ifdef EON + int power_save_state = current_board->check_ignition() ? POWER_SAVE_STATUS_DISABLED : POWER_SAVE_STATUS_ENABLED; + set_power_save_state(power_save_state); + #endif } EXTI->PR = (1U << interrupt_line); } @@ -100,14 +102,6 @@ void EXTI3_IRQHandler(void) { started_interrupt_handler(3); } -void started_interrupt_init(void) { - SYSCFG->EXTICR[1] = SYSCFG_EXTICR1_EXTI1_PA; - EXTI->IMR |= (1U << 1); - EXTI->RTSR |= (1U << 1); - EXTI->FTSR |= (1U << 1); - NVIC_EnableIRQ(EXTI1_IRQn); -} - // ****************************** safety mode ****************************** // this is the only way to leave silent mode @@ -116,30 +110,29 @@ void set_safety_mode(uint16_t mode, int16_t param) { if (err == -1) { puts("Error: safety set mode failed\n"); } else { - if (mode == SAFETY_NOOUTPUT) { - can_silent = ALL_CAN_SILENT; - } else { - can_silent = ALL_CAN_LIVE; - } - switch (mode) { case SAFETY_NOOUTPUT: set_intercept_relay(false); if(hw_type == HW_TYPE_BLACK_PANDA){ current_board->set_can_mode(CAN_MODE_NORMAL); } + can_silent = ALL_CAN_SILENT; break; case SAFETY_ELM327: set_intercept_relay(false); + heartbeat_counter = 0U; if(hw_type == HW_TYPE_BLACK_PANDA){ current_board->set_can_mode(CAN_MODE_OBD_CAN2); } + can_silent = ALL_CAN_LIVE; break; default: set_intercept_relay(true); + heartbeat_counter = 0U; if(hw_type == HW_TYPE_BLACK_PANDA){ current_board->set_can_mode(CAN_MODE_NORMAL); } + can_silent = ALL_CAN_LIVE; break; } if (safety_ignition_hook() != -1) { @@ -464,7 +457,10 @@ int usb_cb_control_msg(USB_Setup_TypeDef *setup, uint8_t *resp, bool hardwired) break; // **** 0xe6: set USB power case 0xe6: - if (setup->b.wValue.w == 1U) { + if (setup->b.wValue.w == 0U) { + puts("user setting NONE mode\n"); + current_board->set_usb_power_mode(USB_POWER_NONE); + } else if (setup->b.wValue.w == 1U) { puts("user setting CDP mode\n"); current_board->set_usb_power_mode(USB_POWER_CDP); } else if (setup->b.wValue.w == 2U) { @@ -610,11 +606,10 @@ void TIM3_IRQHandler(void) { pending_can_live = 0; } #ifdef DEBUG - //TODO: re-enable - //puts("** blink "); - //puth(can_rx_q.r_ptr); puts(" "); puth(can_rx_q.w_ptr); puts(" "); - //puth(can_tx1_q.r_ptr); puts(" "); puth(can_tx1_q.w_ptr); puts(" "); - //puth(can_tx2_q.r_ptr); puts(" "); puth(can_tx2_q.w_ptr); puts("\n"); + puts("** blink "); + puth(can_rx_q.r_ptr); puts(" "); puth(can_rx_q.w_ptr); puts(" "); + puth(can_tx1_q.r_ptr); puts(" "); puth(can_tx1_q.w_ptr); puts(" "); + puth(can_tx2_q.r_ptr); puts(" "); puth(can_tx2_q.w_ptr); puts("\n"); #endif // set green LED to be controls allowed @@ -730,11 +725,6 @@ int main(void) { /*if (current_board->check_ignition()) { set_power_save_state(POWER_SAVE_STATUS_ENABLED); }*/ - - if (hw_type != HW_TYPE_BLACK_PANDA) { - // interrupt on started line - started_interrupt_init(); - } #endif // 48mhz / 65536 ~= 732 / 732 = 1 diff --git a/board/power_saving.h b/board/power_saving.h index 94ebbb53cf9a0a..0a926c119d3e22 100644 --- a/board/power_saving.h +++ b/board/power_saving.h @@ -28,6 +28,13 @@ void set_power_save_state(int state) { // Switch CAN transcievers current_board->enable_can_transcievers(enable); + // Switch EPS/GPS + if (enable) { + current_board->set_esp_gps_mode(ESP_GPS_ENABLED); + } else { + current_board->set_esp_gps_mode(ESP_GPS_DISABLED); + } + if(hw_type != HW_TYPE_BLACK_PANDA){ // turn on GMLAN set_gpio_output(GPIOB, 14, enable); diff --git a/python/__init__.py b/python/__init__.py index e83a4a16947366..573d6f159a3c9d 100644 --- a/python/__init__.py +++ b/python/__init__.py @@ -135,6 +135,12 @@ class Panda(object): REQUEST_IN = usb1.ENDPOINT_IN | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE REQUEST_OUT = usb1.ENDPOINT_OUT | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE + HW_TYPE_UNKNOWN = '\x00' + HW_TYPE_WHITE_PANDA = '\x01' + HW_TYPE_GREY_PANDA = '\x02' + HW_TYPE_BLACK_PANDA = '\x03' + HW_TYPE_PEDAL = '\x04' + def __init__(self, serial=None, claim=True): self._serial = serial self._handle = None @@ -363,11 +369,14 @@ def get_version(self): def get_type(self): return self._handle.controlRead(Panda.REQUEST_IN, 0xc1, 0, 0, 0x40) + def is_white(self): + return self.get_type() == Panda.HW_TYPE_WHITE_PANDA + def is_grey(self): - return self.get_type() == "\x02" + return self.get_type() == Panda.HW_TYPE_GREY_PANDA def is_black(self): - return self.get_type() == "\x03" + return self.get_type() == Panda.HW_TYPE_BLACK_PANDA def get_serial(self): dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 0, 0, 0x20) @@ -470,6 +479,7 @@ def can_recv(self): break except (usb1.USBErrorIO, usb1.USBErrorOverflow): print("CAN: BAD RECV, RETRYING") + time.sleep(0.1) return parse_can_buffer(dat) def can_clear(self, bus): diff --git a/tests/automated/1_program.py b/tests/automated/1_program.py index 1e0beb8ae52425..6b8a3ad4875657 100644 --- a/tests/automated/1_program.py +++ b/tests/automated/1_program.py @@ -1,15 +1,13 @@ import os from panda import Panda -from helpers import panda_color_to_serial, test_white_and_grey +from helpers import panda_type_to_serial, test_white_and_grey, test_all_pandas, panda_connect_and_init -@test_white_and_grey -@panda_color_to_serial -def test_recover(serial=None): - p = Panda(serial=serial) +@test_all_pandas +@panda_connect_and_init +def test_recover(p): assert p.recover(timeout=30) -@test_white_and_grey -@panda_color_to_serial -def test_flash(serial=None): - p = Panda(serial=serial) +@test_all_pandas +@panda_connect_and_init +def test_flash(p): p.flash() diff --git a/tests/automated/2_usb_to_can.py b/tests/automated/2_usb_to_can.py index 9e3e07aa490daa..f0411b32c66aae 100644 --- a/tests/automated/2_usb_to_can.py +++ b/tests/automated/2_usb_to_can.py @@ -4,16 +4,11 @@ import time from panda import Panda from nose.tools import assert_equal, assert_less, assert_greater -from helpers import time_many_sends, connect_wo_esp, test_white_and_grey, panda_color_to_serial - -SPEED_NORMAL = 500 -SPEED_GMLAN = 33.3 - -@test_white_and_grey -@panda_color_to_serial -def test_can_loopback(serial=None): - p = connect_wo_esp(serial) +from helpers import SPEED_NORMAL, SPEED_GMLAN, time_many_sends, test_white_and_grey, panda_type_to_serial, test_all_pandas, panda_connect_and_init +@test_all_pandas +@panda_connect_and_init +def test_can_loopback(p): # enable output mode p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) @@ -26,9 +21,6 @@ def test_can_loopback(serial=None): busses = [0,1,2] for bus in busses: - # send heartbeat - p.send_heartbeat() - # set bus 0 speed to 250 p.set_can_speed_kbps(bus, 250) @@ -47,17 +39,12 @@ def test_can_loopback(serial=None): assert 0x1aa == sr[0][0] == lb[0][0] assert "message" == sr[0][2] == lb[0][2] -@test_white_and_grey -@panda_color_to_serial -def test_safety_nooutput(serial=None): - p = connect_wo_esp(serial) - +@test_all_pandas +@panda_connect_and_init +def test_safety_nooutput(p): # enable output mode p.set_safety_mode(Panda.SAFETY_NOOUTPUT) - # send heartbeat - p.send_heartbeat() - # enable CAN loopback mode p.set_can_loopback(True) @@ -69,11 +56,9 @@ def test_safety_nooutput(serial=None): r = p.can_recv() assert len(r) == 0 -@test_white_and_grey -@panda_color_to_serial -def test_reliability(serial=None): - p = connect_wo_esp(serial) - +@test_all_pandas +@panda_connect_and_init +def test_reliability(p): LOOP_COUNT = 100 MSG_COUNT = 100 @@ -82,17 +67,11 @@ def test_reliability(serial=None): p.set_can_loopback(True) p.set_can_speed_kbps(0, 1000) - # send heartbeat - p.send_heartbeat() - addrs = range(100, 100+MSG_COUNT) ts = [(j, 0, "\xaa"*8, 0) for j in addrs] # 100 loops for i in range(LOOP_COUNT): - # send heartbeat - p.send_heartbeat() - st = time.time() p.can_send_many(ts) @@ -115,17 +94,12 @@ def test_reliability(serial=None): sys.stdout.write("P") sys.stdout.flush() -@test_white_and_grey -@panda_color_to_serial -def test_throughput(serial=None): - p = connect_wo_esp(serial) - +@test_all_pandas +@panda_connect_and_init +def test_throughput(p): # enable output mode p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) - # send heartbeat - p.send_heartbeat() - # enable CAN loopback mode p.set_can_loopback(True) @@ -134,9 +108,6 @@ def test_throughput(serial=None): p.set_can_speed_kbps(0, speed) time.sleep(0.05) - # send heartbeat - p.send_heartbeat() - comp_kbps = time_many_sends(p, 0) # bit count from https://en.wikipedia.org/wiki/CAN_bus @@ -147,19 +118,15 @@ def test_throughput(serial=None): print("loopback 100 messages at speed %d, comp speed is %.2f, percent %.2f" % (speed, comp_kbps, saturation_pct)) @test_white_and_grey -@panda_color_to_serial -def test_gmlan(serial=None): - p = connect_wo_esp(serial) - +@panda_type_to_serial +@panda_connect_and_init +def test_gmlan(p): if p.legacy: return # enable output mode p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) - # send heartbeat - p.send_heartbeat() - # enable CAN loopback mode p.set_can_loopback(True) @@ -169,9 +136,6 @@ def test_gmlan(serial=None): # set gmlan on CAN2 for bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3, Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]: - # send heartbeat - p.send_heartbeat() - p.set_gmlan(bus) comp_kbps_gmlan = time_many_sends(p, 3) assert_greater(comp_kbps_gmlan, 0.8 * SPEED_GMLAN) @@ -185,27 +149,20 @@ def test_gmlan(serial=None): print("%d: %.2f kbps vs %.2f kbps" % (bus, comp_kbps_gmlan, comp_kbps_normal)) @test_white_and_grey -@panda_color_to_serial -def test_gmlan_bad_toggle(serial=None): - p = connect_wo_esp(serial) - +@panda_type_to_serial +@panda_connect_and_init +def test_gmlan_bad_toggle(p): if p.legacy: return # enable output mode p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) - # send heartbeat - p.send_heartbeat() - # enable CAN loopback mode p.set_can_loopback(True) # GMLAN_CAN2 for bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]: - # send heartbeat - p.send_heartbeat() - p.set_gmlan(bus) comp_kbps_gmlan = time_many_sends(p, 3) assert_greater(comp_kbps_gmlan, 0.6 * SPEED_GMLAN) @@ -213,9 +170,6 @@ def test_gmlan_bad_toggle(serial=None): # normal for bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]: - # send heartbeat - p.send_heartbeat() - p.set_gmlan(None) comp_kbps_normal = time_many_sends(p, bus) assert_greater(comp_kbps_normal, 0.6 * SPEED_NORMAL) @@ -223,10 +177,9 @@ def test_gmlan_bad_toggle(serial=None): # this will fail if you have hardware serial connected -@test_white_and_grey -@panda_color_to_serial -def test_serial_debug(serial=None): - p = connect_wo_esp(serial) +@test_all_pandas +@panda_connect_and_init +def test_serial_debug(p): junk = p.serial_read(Panda.SERIAL_DEBUG) p.call_control_api(0xc0) assert(p.serial_read(Panda.SERIAL_DEBUG).startswith("can ")) diff --git a/tests/automated/3_wifi.py b/tests/automated/3_wifi.py index 1251663ba5838d..2e9c81f3f4efe4 100644 --- a/tests/automated/3_wifi.py +++ b/tests/automated/3_wifi.py @@ -2,46 +2,44 @@ import os import time from panda import Panda -from helpers import connect_wifi, test_white, test_white_and_grey, panda_color_to_serial +from helpers import connect_wifi, test_white, test_all_pandas, panda_type_to_serial, panda_connect_and_init import requests -@test_white_and_grey -@panda_color_to_serial -def test_get_serial(serial=None): - p = Panda(serial) +@test_all_pandas +@panda_connect_and_init +def test_get_serial(p): print(p.get_serial()) -@test_white_and_grey -@panda_color_to_serial -def test_get_serial_in_flash_mode(serial=None): - p = Panda(serial) +@test_all_pandas +@panda_connect_and_init +def test_get_serial_in_flash_mode(p): p.reset(enter_bootstub=True) assert(p.bootstub) print(p.get_serial()) p.reset() @test_white -@panda_color_to_serial -def test_connect_wifi(serial=None): - connect_wifi(serial) +@panda_type_to_serial +def test_connect_wifi(serials=None): + connect_wifi(serials[0]) @test_white -@panda_color_to_serial -def test_flash_wifi(serial=None): - connect_wifi(serial) +@panda_type_to_serial +def test_flash_wifi(serials=None): + connect_wifi(serials[0]) assert Panda.flash_ota_wifi(release=False), "OTA Wifi Flash Failed" - connect_wifi(serial) + connect_wifi(serials[0]) @test_white -@panda_color_to_serial -def test_wifi_flash_st(serial=None): - connect_wifi(serial) +@panda_type_to_serial +def test_wifi_flash_st(serials=None): + connect_wifi(serials[0]) assert Panda.flash_ota_st(), "OTA ST Flash Failed" connected = False st = time.time() while not connected and (time.time() - st) < 20: try: - p = Panda(serial=serial) + p = Panda(serial=serials[0]) p.get_serial() connected = True except: @@ -51,9 +49,9 @@ def test_wifi_flash_st(serial=None): assert False, "Panda failed to connect on USB after flashing" @test_white -@panda_color_to_serial -def test_webpage_fetch(serial=None): - connect_wifi(serial) +@panda_type_to_serial +def test_webpage_fetch(serials=None): + connect_wifi(serials[0]) r = requests.get("http://192.168.0.10/") print(r.text) diff --git a/tests/automated/4_wifi_functionality.py b/tests/automated/4_wifi_functionality.py index ab9bed70052e41..ee9857d09e1817 100644 --- a/tests/automated/4_wifi_functionality.py +++ b/tests/automated/4_wifi_functionality.py @@ -1,38 +1,32 @@ from __future__ import print_function import time from panda import Panda -from helpers import time_many_sends, connect_wifi, test_white, panda_color_to_serial +from helpers import time_many_sends, connect_wifi, test_white, panda_type_to_serial from nose.tools import timed, assert_equal, assert_less, assert_greater @test_white -@panda_color_to_serial -def test_get_serial_wifi(serial=None): - connect_wifi(serial) +@panda_type_to_serial +def test_get_serial_wifi(serials=None): + connect_wifi(serials[0]) p = Panda("WIFI") print(p.get_serial()) @test_white -@panda_color_to_serial -def test_throughput(serial=None): - connect_wifi(serial) - p = Panda(serial) +@panda_type_to_serial +def test_throughput(serials=None): + connect_wifi(serials[0]) + p = Panda(serials[0]) # enable output mode p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) - # send heartbeat - p.send_heartbeat() - # enable CAN loopback mode p.set_can_loopback(True) p = Panda("WIFI") for speed in [100,250,500,750,1000]: - # send heartbeat - p.send_heartbeat() - # set bus 0 speed to speed p.set_can_speed_kbps(0, speed) time.sleep(0.1) @@ -47,23 +41,17 @@ def test_throughput(serial=None): print("WIFI loopback 100 messages at speed %d, comp speed is %.2f, percent %.2f" % (speed, comp_kbps, saturation_pct)) @test_white -@panda_color_to_serial -def test_recv_only(serial=None): - connect_wifi(serial) - p = Panda(serial) +@panda_type_to_serial +def test_recv_only(serials=None): + connect_wifi(serials[0]) + p = Panda(serials[0]) p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) - # send heartbeat - p.send_heartbeat() - p.set_can_loopback(True) pwifi = Panda("WIFI") # TODO: msg_count=1000 drops packets, is this fixable? for msg_count in [10,100,200]: - # send heartbeat - p.send_heartbeat() - speed = 500 p.set_can_speed_kbps(0, speed) comp_kbps = time_many_sends(p, 0, pwifi, msg_count) diff --git a/tests/automated/5_wifi_udp.py b/tests/automated/5_wifi_udp.py index d55baa659aa162..8b62cf082ee705 100644 --- a/tests/automated/5_wifi_udp.py +++ b/tests/automated/5_wifi_udp.py @@ -1,16 +1,16 @@ from __future__ import print_function import sys import time -from helpers import time_many_sends, connect_wifi, test_white, panda_color_to_serial +from helpers import time_many_sends, connect_wifi, test_white, panda_type_to_serial from panda import Panda, PandaWifiStreaming from nose.tools import timed, assert_equal, assert_less, assert_greater @test_white -@panda_color_to_serial -def test_udp_doesnt_drop(serial=None): - connect_wifi(serial) +@panda_type_to_serial +def test_udp_doesnt_drop(serials=None): + connect_wifi(serials[0]) - p = Panda(serial) + p = Panda(serials[0]) p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p.set_can_loopback(True) diff --git a/tests/automated/6_two_panda.py b/tests/automated/6_two_panda.py index 09cf1861f34d95..8b308ce500ff8c 100644 --- a/tests/automated/6_two_panda.py +++ b/tests/automated/6_two_panda.py @@ -1,21 +1,18 @@ from __future__ import print_function +import os import time +import random from panda import Panda from nose.tools import assert_equal, assert_less, assert_greater -from helpers import time_many_sends, test_two_panda, panda_color_to_serial +from helpers import time_many_sends, test_two_panda, test_two_black_panda, panda_type_to_serial, clear_can_buffers, panda_connect_and_init @test_two_panda -@panda_color_to_serial -def test_send_recv(serial_sender=None, serial_reciever=None): - p_send = Panda(serial_sender) - p_recv = Panda(serial_reciever) - +@panda_type_to_serial +@panda_connect_and_init +def test_send_recv(p_send, p_recv): p_send.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + p_recv.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p_send.set_can_loopback(False) - - # send heartbeat - p_send.send_heartbeat() - p_recv.set_can_loopback(False) assert not p_send.legacy @@ -30,9 +27,6 @@ def test_send_recv(serial_sender=None, serial_reciever=None): for bus in busses: for speed in [100, 250, 500, 750, 1000]: - # send heartbeat - p_send.send_heartbeat() - p_send.set_can_speed_kbps(bus, speed) p_recv.set_can_speed_kbps(bus, speed) time.sleep(0.05) @@ -46,18 +40,12 @@ def test_send_recv(serial_sender=None, serial_reciever=None): print("two pandas bus {}, 100 messages at speed {:4d}, comp speed is {:7.2f}, percent {:6.2f}".format(bus, speed, comp_kbps, saturation_pct)) @test_two_panda -@panda_color_to_serial -def test_latency(serial_sender=None, serial_reciever=None): - p_send = Panda(serial_sender) - p_recv = Panda(serial_reciever) - - # send heartbeat - p_send.send_heartbeat() - p_recv.send_heartbeat() - +@panda_type_to_serial +@panda_connect_and_init +def test_latency(p_send, p_recv): p_send.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + p_recv.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p_send.set_can_loopback(False) - p_recv.set_can_loopback(False) assert not p_send.legacy @@ -72,29 +60,17 @@ def test_latency(serial_sender=None, serial_reciever=None): p_recv.can_recv() p_send.can_recv() - # send heartbeat - p_send.send_heartbeat() - p_recv.send_heartbeat() - busses = [0,1,2] for bus in busses: for speed in [100, 250, 500, 750, 1000]: - # send heartbeat - p_send.send_heartbeat() - p_recv.send_heartbeat() - p_send.set_can_speed_kbps(bus, speed) p_recv.set_can_speed_kbps(bus, speed) time.sleep(0.1) + #clear can buffers - r = [1] - while len(r) > 0: - r = p_send.can_recv() - r = [1] - while len(r) > 0: - r = p_recv.can_recv() - time.sleep(0.05) + clear_can_buffers(p_send) + clear_can_buffers(p_recv) latencies = [] comp_kbps_list = [] @@ -137,3 +113,83 @@ def test_latency(serial_sender=None, serial_reciever=None): print("two pandas bus {}, {} message average at speed {:4d}, latency is {:5.3f}ms, comp speed is {:7.2f}, percent {:6.2f}"\ .format(bus, num_messages, speed, average_latency, average_comp_kbps, average_saturation_pct)) + +@test_two_black_panda +@panda_type_to_serial +@panda_connect_and_init +def test_black_loopback(panda0, panda1): + # disable safety modes + panda0.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + panda1.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + + # disable loopback + panda0.set_can_loopback(False) + panda1.set_can_loopback(False) + + # clear stuff + panda0.can_send_many([(0x1ba, 0, "testmsg", 0)]*10) + time.sleep(0.05) + panda0.can_recv() + panda1.can_recv() + + # test array (send bus, sender obd, reciever obd, expected busses) + test_array = [ + (0, False, False, [0]), + (1, False, False, [1]), + (2, False, False, [2]), + (0, False, True, [0, 1]), + (1, False, True, []), + (2, False, True, [2]), + (0, True, False, [0]), + (1, True, False, [0]), + (2, True, False, [2]), + (0, True, True, [0, 1]), + (1, True, True, [0, 1]), + (2, True, True, [2]) + ] + + # test functions + def get_test_string(): + return b"test"+os.urandom(10) + + def _test_buses(send_panda, recv_panda, _test_array): + for send_bus, send_obd, recv_obd, recv_buses in _test_array: + print("\nSend bus:", send_bus, " Send OBD:", send_obd, " Recv OBD:", recv_obd) + + # set OBD on pandas + send_panda.set_gmlan(True if send_obd else None) + recv_panda.set_gmlan(True if recv_obd else None) + + # clear buffers + clear_can_buffers(send_panda) + clear_can_buffers(recv_panda) + + # send the characters + at = random.randint(1, 2000) + st = get_test_string()[0:8] + send_panda.can_send(at, st, send_bus) + time.sleep(0.1) + + # check for receive + cans_echo = send_panda.can_recv() + cans_loop = recv_panda.can_recv() + + loop_buses = [] + for loop in cans_loop: + print(" Loop on bus", str(loop[3])) + loop_buses.append(loop[3]) + if len(cans_loop) == 0: + print(" No loop") + + # test loop buses + recv_buses.sort() + loop_buses.sort() + assert recv_buses == loop_buses + print(" TEST PASSED") + print("\n") + + # test both orientations + print("***************** TESTING (0 --> 1) *****************") + _test_buses(panda0, panda1, test_array) + print("***************** TESTING (1 --> 0) *****************") + _test_buses(panda1, panda0, test_array) \ No newline at end of file diff --git a/tests/automated/helpers.py b/tests/automated/helpers.py index 9e92f56bfe4bcd..c9e0c676267583 100644 --- a/tests/automated/helpers.py +++ b/tests/automated/helpers.py @@ -4,43 +4,41 @@ import random import subprocess import requests +import thread from functools import wraps from panda import Panda from nose.tools import timed, assert_equal, assert_less, assert_greater from parameterized import parameterized, param -test_white_and_grey = parameterized([param(panda_color="White"), - param(panda_color="Grey")]) -test_white = parameterized([param(panda_color="White")]) -test_grey = parameterized([param(panda_color="Grey")]) -test_two_panda = parameterized([param(panda_color=["Grey", "White"]), - param(panda_color=["White", "Grey"])]) - -_serials = {} -def get_panda_serial(is_grey=None): - global _serials - if is_grey not in _serials: - for serial in Panda.list(): - p = Panda(serial=serial) - if is_grey is None or p.is_grey() == is_grey: - _serials[is_grey] = serial - return serial - raise IOError("Panda not found. is_grey: {}".format(is_grey)) - else: - return _serials[is_grey] - -def connect_wo_esp(serial=None): - # connect to the panda - p = Panda(serial=serial) - - # power down the ESP - p.set_esp_power(False) +SPEED_NORMAL = 500 +SPEED_GMLAN = 33.3 - # clear old junk - while len(p.can_recv()) > 0: - pass - - return p +test_all_types = parameterized([ + param(panda_type=Panda.HW_TYPE_WHITE_PANDA), + param(panda_type=Panda.HW_TYPE_GREY_PANDA), + param(panda_type=Panda.HW_TYPE_BLACK_PANDA) + ]) +test_all_pandas = parameterized( + Panda.list() + ) +test_white_and_grey = parameterized([ + param(panda_type=Panda.HW_TYPE_WHITE_PANDA), + param(panda_type=Panda.HW_TYPE_GREY_PANDA) + ]) +test_white = parameterized([ + param(panda_type=Panda.HW_TYPE_WHITE_PANDA) + ]) +test_grey = parameterized([ + param(panda_type=Panda.HW_TYPE_GREY_PANDA) + ]) +test_two_panda = parameterized([ + param(panda_type=[Panda.HW_TYPE_GREY_PANDA, Panda.HW_TYPE_WHITE_PANDA]), + param(panda_type=[Panda.HW_TYPE_WHITE_PANDA, Panda.HW_TYPE_GREY_PANDA]), + param(panda_type=[Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_BLACK_PANDA]) + ]) +test_two_black_panda = parameterized([ + param(panda_type=[Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_BLACK_PANDA]) + ]) def connect_wifi(serial=None): p = Panda(serial=serial) @@ -170,23 +168,93 @@ def time_many_sends(p, bus, precv=None, msg_count=100, msg_id=None, two_pandas=F return comp_kbps +_panda_serials = None +def panda_type_to_serial(fn): + @wraps(fn) + def wrapper(panda_type=None, **kwargs): + # Change panda_types to a list + if panda_type is not None: + if not isinstance(panda_type, list): + panda_type = [panda_type] + + # If not done already, get panda serials and their type + global _panda_serials + if _panda_serials == None: + _panda_serials = [] + for serial in Panda.list(): + p = Panda(serial=serial) + _panda_serials.append((serial, p.get_type())) + p.close() + + # Find a panda with the correct types and add the corresponding serial + serials = [] + for p_type in panda_type: + found = False + for serial, pt in _panda_serials: + # Never take the same panda twice + if (pt == p_type) and (serial not in serials): + serials.append(serial) + found = True + break + if not found: + raise IOError("No unused panda found for type: {}".format(p_type)) + return fn(serials, **kwargs) + return wrapper + +def heartbeat_thread(p): + while True: + try: + p.send_heartbeat() + time.sleep(1) + except: + break -def panda_color_to_serial(fn): +def panda_connect_and_init(fn): @wraps(fn) - def wrapper(panda_color=None, **kwargs): - pandas_is_grey = [] - if panda_color is not None: - if not isinstance(panda_color, list): - panda_color = [panda_color] - panda_color = [s.lower() for s in panda_color] - for p in panda_color: - if p is None: - pandas_is_grey.append(None) - elif p in ["grey", "gray"]: - pandas_is_grey.append(True) - elif p in ["white"]: - pandas_is_grey.append(False) - else: - raise ValueError("Invalid Panda Color {}".format(p)) - return fn(*[get_panda_serial(is_grey) for is_grey in pandas_is_grey], **kwargs) + def wrapper(panda_serials=None, **kwargs): + # Change panda_serials to a list + if panda_serials is not None: + if not isinstance(panda_serials, list): + panda_serials = [panda_serials] + + # Connect to pandas + pandas = [] + for panda_serial in panda_serials: + pandas.append(Panda(serial=panda_serial)) + + # Initialize pandas + for panda in pandas: + panda.set_can_loopback(False) + panda.set_gmlan(None) + panda.set_esp_power(False) + for bus, speed in [(0, SPEED_NORMAL), (1, SPEED_NORMAL), (2, SPEED_NORMAL), (3, SPEED_GMLAN)]: + panda.set_can_speed_kbps(bus, speed) + clear_can_buffers(panda) + thread.start_new_thread(heartbeat_thread, (panda,)) + + # Run test function + ret = fn(*pandas, **kwargs) + + # Close all connections + for panda in pandas: + panda.close() + + # Return test function result + return ret return wrapper + +def clear_can_buffers(panda): + # clear tx buffers + for i in range(4): + panda.can_clear(i) + + # clear rx buffers + panda.can_clear(0xFFFF) + r = [1] + st = time.time() + while len(r) > 0: + r = panda.can_recv() + time.sleep(0.05) + if (time.time() - st) > 10: + print("Unable to clear can buffers for panda ", panda.get_serial()) + assert False \ No newline at end of file diff --git a/tests/black_loopback_test.py b/tests/black_loopback_test.py index 8683561a4da839..d16ac21af14776 100755 --- a/tests/black_loopback_test.py +++ b/tests/black_loopback_test.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Loopback test between black panda (+ harness and power) and white/grey panda +# Loopback test between two black pandas (+ harness and power) # Tests all buses, including OBD CAN, which is on the same bus as CAN0 in this test. # To be sure, the test should be run with both harness orientations @@ -33,81 +33,70 @@ def run_test(sleep_duration): pandas[0] = Panda(pandas[0]) pandas[1] = Panda(pandas[1]) - # find out which one is black + # find out the hardware types type0 = pandas[0].get_type() type1 = pandas[1].get_type() - - black_panda = None - other_panda = None - if type0 == "\x03" and type1 != "\x03": - black_panda = pandas[0] - other_panda = pandas[1] - elif type0 != "\x03" and type1 == "\x03": - black_panda = pandas[1] - other_panda = pandas[0] - else: - print("Connect white/grey and black panda to run this test!") + if type0 != "\x03" or type1 != "\x03": + print("Connect two black pandas to run this test!") assert False - # disable safety modes - black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) - other_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) - - # test health packet - print("black panda health", black_panda.health()) - print("other panda health", other_panda.health()) - - # test black -> other - test_buses(black_panda, other_panda, True, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (1, True, [0])], sleep_duration) - test_buses(black_panda, other_panda, False, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (0, True, [0, 1])], sleep_duration) + for panda in pandas: + # disable safety modes + panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + + # test health packet + print("panda health", panda.health()) + + # setup test array (send bus, sender obd, reciever obd, expected busses) + test_array = [ + (0, False, False, [0]), + (1, False, False, [1]), + (2, False, False, [2]), + (0, False, True, [0, 1]), + (1, False, True, []), + (2, False, True, [2]), + (0, True, False, [0]), + (1, True, False, [0]), + (2, True, False, [2]), + (0, True, True, [0, 1]), + (1, True, True, [0, 1]), + (2, True, True, [2]) + ] + + # test both orientations + print("***************** TESTING (0 --> 1) *****************") + test_buses(pandas[0], pandas[1], test_array, sleep_duration) + print("***************** TESTING (1 --> 0) *****************") + test_buses(pandas[1], pandas[0], test_array, sleep_duration) -def test_buses(black_panda, other_panda, direction, test_array, sleep_duration): - if direction: - print("***************** TESTING (BLACK --> OTHER) *****************") - else: - print("***************** TESTING (OTHER --> BLACK) *****************") - - for send_bus, obd, recv_buses in test_array: - black_panda.send_heartbeat() - other_panda.send_heartbeat() - print("\ntest can: ", send_bus, " OBD: ", obd) +def test_buses(send_panda, recv_panda, test_array, sleep_duration): + for send_bus, send_obd, recv_obd, recv_buses in test_array: + send_panda.send_heartbeat() + recv_panda.send_heartbeat() + print("\nSend bus:", send_bus, " Send OBD:", send_obd, " Recv OBD:", recv_obd) - # set OBD on black panda - black_panda.set_gmlan(True if obd else None) + # set OBD on pandas + send_panda.set_gmlan(True if send_obd else None) + recv_panda.set_gmlan(True if recv_obd else None) # clear and flush - if direction: - black_panda.can_clear(send_bus) - else: - other_panda.can_clear(send_bus) - + send_panda.can_clear(send_bus) for recv_bus in recv_buses: - if direction: - other_panda.can_clear(recv_bus) - else: - black_panda.can_clear(recv_bus) - - black_panda.can_recv() - other_panda.can_recv() + recv_panda.can_clear(recv_bus) + send_panda.can_recv() + recv_panda.can_recv() # send the characters at = random.randint(1, 2000) st = get_test_string()[0:8] - if direction: - black_panda.can_send(at, st, send_bus) - else: - other_panda.can_send(at, st, send_bus) + send_panda.can_send(at, st, send_bus) time.sleep(0.1) # check for receive - if direction: - cans_echo = black_panda.can_recv() - cans_loop = other_panda.can_recv() - else: - cans_echo = other_panda.can_recv() - cans_loop = black_panda.can_recv() + cans_echo = send_panda.can_recv() + cans_loop = recv_panda.can_recv() loop_buses = [] for loop in cans_loop: diff --git a/tests/black_white_loopback_test.py b/tests/black_white_loopback_test.py new file mode 100755 index 00000000000000..7e121343930d64 --- /dev/null +++ b/tests/black_white_loopback_test.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python + +# Loopback test between black panda (+ harness and power) and white/grey panda +# Tests all buses, including OBD CAN, which is on the same bus as CAN0 in this test. +# To be sure, the test should be run with both harness orientations + +from __future__ import print_function +import os +import sys +import time +import random +import argparse + +from hexdump import hexdump +from itertools import permutations + +sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) +from panda import Panda + +def get_test_string(): + return b"test"+os.urandom(10) + +counter = 0 +nonzero_bus_errors = 0 +zero_bus_errors = 0 +content_errors = 0 + +def run_test(sleep_duration): + global counter, nonzero_bus_errors, zero_bus_errors, content_errors + + pandas = Panda.list() + print(pandas) + + # make sure two pandas are connected + if len(pandas) != 2: + print("Connect white/grey and black panda to run this test!") + assert False + + # connect + pandas[0] = Panda(pandas[0]) + pandas[1] = Panda(pandas[1]) + + # find out which one is black + type0 = pandas[0].get_type() + type1 = pandas[1].get_type() + + black_panda = None + other_panda = None + + if type0 == "\x03" and type1 != "\x03": + black_panda = pandas[0] + other_panda = pandas[1] + elif type0 != "\x03" and type1 == "\x03": + black_panda = pandas[1] + other_panda = pandas[0] + else: + print("Connect white/grey and black panda to run this test!") + assert False + + # disable safety modes + black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + other_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + + # test health packet + print("black panda health", black_panda.health()) + print("other panda health", other_panda.health()) + + # test black -> other + while True: + test_buses(black_panda, other_panda, True, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (1, True, [0])], sleep_duration) + test_buses(black_panda, other_panda, False, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (0, True, [0, 1])], sleep_duration) + counter += 1 + print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors, "Content errors:", content_errors) + + # Toggle relay + black_panda.set_safety_mode(Panda.SAFETY_NOOUTPUT) + time.sleep(1) + black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + time.sleep(1) + + +def test_buses(black_panda, other_panda, direction, test_array, sleep_duration): + global nonzero_bus_errors, zero_bus_errors, content_errors + + if direction: + print("***************** TESTING (BLACK --> OTHER) *****************") + else: + print("***************** TESTING (OTHER --> BLACK) *****************") + + for send_bus, obd, recv_buses in test_array: + black_panda.send_heartbeat() + other_panda.send_heartbeat() + print("\ntest can: ", send_bus, " OBD: ", obd) + + # set OBD on black panda + black_panda.set_gmlan(True if obd else None) + + # clear and flush + if direction: + black_panda.can_clear(send_bus) + else: + other_panda.can_clear(send_bus) + + for recv_bus in recv_buses: + if direction: + other_panda.can_clear(recv_bus) + else: + black_panda.can_clear(recv_bus) + + black_panda.can_recv() + other_panda.can_recv() + + # send the characters + at = random.randint(1, 2000) + st = get_test_string()[0:8] + if direction: + black_panda.can_send(at, st, send_bus) + else: + other_panda.can_send(at, st, send_bus) + time.sleep(0.1) + + # check for receive + if direction: + cans_echo = black_panda.can_recv() + cans_loop = other_panda.can_recv() + else: + cans_echo = other_panda.can_recv() + cans_loop = black_panda.can_recv() + + loop_buses = [] + for loop in cans_loop: + if (loop[0] != at) or (loop[2] != st): + content_errors += 1 + + print(" Loop on bus", str(loop[3])) + loop_buses.append(loop[3]) + if len(cans_loop) == 0: + print(" No loop") + if not os.getenv("NOASSERT"): + assert False + + # test loop buses + recv_buses.sort() + loop_buses.sort() + if(recv_buses != loop_buses): + if len(loop_buses) == 0: + zero_bus_errors += 1 + else: + nonzero_bus_errors += 1 + if not os.getenv("NOASSERT"): + assert False + else: + print(" TEST PASSED") + + time.sleep(sleep_duration) + print("\n") + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-n", type=int, help="Number of test iterations to run") + parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0) + args = parser.parse_args() + + if args.n is None: + while True: + run_test(sleep_duration=args.sleep) + else: + for i in range(args.n): + run_test(sleep_duration=args.sleep) diff --git a/tests/black_white_relay_endurance.py b/tests/black_white_relay_endurance.py new file mode 100755 index 00000000000000..8868b9848f4e10 --- /dev/null +++ b/tests/black_white_relay_endurance.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python + +# Loopback test between black panda (+ harness and power) and white/grey panda +# Tests all buses, including OBD CAN, which is on the same bus as CAN0 in this test. +# To be sure, the test should be run with both harness orientations + +from __future__ import print_function +import os +import sys +import time +import random +import argparse + +from hexdump import hexdump +from itertools import permutations + +sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) +from panda import Panda + +def get_test_string(): + return b"test"+os.urandom(10) + +counter = 0 +nonzero_bus_errors = 0 +zero_bus_errors = 0 +content_errors = 0 + +def run_test(sleep_duration): + global counter, nonzero_bus_errors, zero_bus_errors, content_errors + + pandas = Panda.list() + print(pandas) + + # make sure two pandas are connected + if len(pandas) != 2: + print("Connect white/grey and black panda to run this test!") + assert False + + # connect + pandas[0] = Panda(pandas[0]) + pandas[1] = Panda(pandas[1]) + + # find out which one is black + type0 = pandas[0].get_type() + type1 = pandas[1].get_type() + + black_panda = None + other_panda = None + + if type0 == "\x03" and type1 != "\x03": + black_panda = pandas[0] + other_panda = pandas[1] + elif type0 != "\x03" and type1 == "\x03": + black_panda = pandas[1] + other_panda = pandas[0] + else: + print("Connect white/grey and black panda to run this test!") + assert False + + # disable safety modes + black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + other_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + + # test health packet + print("black panda health", black_panda.health()) + print("other panda health", other_panda.health()) + + # test black -> other + start_time = time.time() + temp_start_time = start_time + while True: + test_buses(black_panda, other_panda, True, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (1, True, [0])], sleep_duration) + test_buses(black_panda, other_panda, False, [(0, False, [0]), (1, False, [1]), (2, False, [2]), (0, True, [0, 1])], sleep_duration) + counter += 1 + + runtime = time.time() - start_time + print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors, "Content errors:", content_errors, "Runtime: ", runtime) + + if (time.time() - temp_start_time) > 3600*6: + # Toggle relay + black_panda.set_safety_mode(Panda.SAFETY_NOOUTPUT) + time.sleep(1) + black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + time.sleep(1) + temp_start_time = time.time() + + +def test_buses(black_panda, other_panda, direction, test_array, sleep_duration): + global nonzero_bus_errors, zero_bus_errors, content_errors + + if direction: + print("***************** TESTING (BLACK --> OTHER) *****************") + else: + print("***************** TESTING (OTHER --> BLACK) *****************") + + for send_bus, obd, recv_buses in test_array: + black_panda.send_heartbeat() + other_panda.send_heartbeat() + print("\ntest can: ", send_bus, " OBD: ", obd) + + # set OBD on black panda + black_panda.set_gmlan(True if obd else None) + + # clear and flush + if direction: + black_panda.can_clear(send_bus) + else: + other_panda.can_clear(send_bus) + + for recv_bus in recv_buses: + if direction: + other_panda.can_clear(recv_bus) + else: + black_panda.can_clear(recv_bus) + + black_panda.can_recv() + other_panda.can_recv() + + # send the characters + at = random.randint(1, 2000) + st = get_test_string()[0:8] + if direction: + black_panda.can_send(at, st, send_bus) + else: + other_panda.can_send(at, st, send_bus) + time.sleep(0.1) + + # check for receive + if direction: + cans_echo = black_panda.can_recv() + cans_loop = other_panda.can_recv() + else: + cans_echo = other_panda.can_recv() + cans_loop = black_panda.can_recv() + + loop_buses = [] + for loop in cans_loop: + if (loop[0] != at) or (loop[2] != st): + content_errors += 1 + + print(" Loop on bus", str(loop[3])) + loop_buses.append(loop[3]) + if len(cans_loop) == 0: + print(" No loop") + if not os.getenv("NOASSERT"): + assert False + + # test loop buses + recv_buses.sort() + loop_buses.sort() + if(recv_buses != loop_buses): + if len(loop_buses) == 0: + zero_bus_errors += 1 + else: + nonzero_bus_errors += 1 + if not os.getenv("NOASSERT"): + assert False + else: + print(" TEST PASSED") + + time.sleep(sleep_duration) + print("\n") + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-n", type=int, help="Number of test iterations to run") + parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0) + args = parser.parse_args() + + if args.n is None: + while True: + run_test(sleep_duration=args.sleep) + else: + for i in range(args.n): + run_test(sleep_duration=args.sleep) diff --git a/tests/black_white_relay_test.py b/tests/black_white_relay_test.py new file mode 100755 index 00000000000000..21a2ef6d7f0bbd --- /dev/null +++ b/tests/black_white_relay_test.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python + +# Relay test with loopback between black panda (+ harness and power) and white/grey panda +# Tests the relay switching multiple times / second by looking at the buses on which loop occurs. + +from __future__ import print_function +import os +import sys +import time +import random +import argparse + +from hexdump import hexdump +from itertools import permutations + +sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) +from panda import Panda + +def get_test_string(): + return b"test"+os.urandom(10) + +counter = 0 +open_errors = 0 +closed_errors = 0 +content_errors = 0 + +def run_test(sleep_duration): + global counter, open_errors, closed_errors, content_errors + + pandas = Panda.list() + #pandas = ["540046000c51363338383037", "07801b800f51363038363036"] + print(pandas) + + # make sure two pandas are connected + if len(pandas) != 2: + print("Connect white/grey and black panda to run this test!") + assert False + + # connect + pandas[0] = Panda(pandas[0]) + pandas[1] = Panda(pandas[1]) + + # find out which one is black + type0 = pandas[0].get_type() + type1 = pandas[1].get_type() + + black_panda = None + other_panda = None + + if type0 == "\x03" and type1 != "\x03": + black_panda = pandas[0] + other_panda = pandas[1] + elif type0 != "\x03" and type1 == "\x03": + black_panda = pandas[1] + other_panda = pandas[0] + else: + print("Connect white/grey and black panda to run this test!") + assert False + + # disable safety modes + black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + other_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + + # test health packet + print("black panda health", black_panda.health()) + print("other panda health", other_panda.health()) + + # test black -> other + while True: + # Switch on relay + black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + time.sleep(0.05) + + if not test_buses(black_panda, other_panda, (0, False, [0])): + open_errors += 1 + print("Open error") + assert False + + # Switch off relay + black_panda.set_safety_mode(Panda.SAFETY_NOOUTPUT) + time.sleep(0.05) + + if not test_buses(black_panda, other_panda, (0, False, [0, 2])): + closed_errors += 1 + print("Close error") + assert False + + counter += 1 + print("Number of cycles:", counter, "Open errors:", open_errors, "Closed errors:", closed_errors, "Content errors:", content_errors) + +def test_buses(black_panda, other_panda, test_obj): + global content_errors + send_bus, obd, recv_buses = test_obj + + black_panda.send_heartbeat() + other_panda.send_heartbeat() + + # Set OBD on send panda + other_panda.set_gmlan(True if obd else None) + + # clear and flush + other_panda.can_clear(send_bus) + + for recv_bus in recv_buses: + black_panda.can_clear(recv_bus) + + black_panda.can_recv() + other_panda.can_recv() + + # send the characters + at = random.randint(1, 2000) + st = get_test_string()[0:8] + other_panda.can_send(at, st, send_bus) + time.sleep(0.05) + + # check for receive + cans_echo = other_panda.can_recv() + cans_loop = black_panda.can_recv() + + loop_buses = [] + for loop in cans_loop: + if (loop[0] != at) or (loop[2] != st): + content_errors += 1 + loop_buses.append(loop[3]) + + # test loop buses + recv_buses.sort() + loop_buses.sort() + if(recv_buses != loop_buses): + return False + else: + return True + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-n", type=int, help="Number of test iterations to run") + parser.add_argument("-sleep", type=int, help="Sleep time between tests", default=0) + args = parser.parse_args() + + if args.n is None: + while True: + run_test(sleep_duration=args.sleep) + else: + for i in range(args.n): + run_test(sleep_duration=args.sleep) diff --git a/tests/debug_console.py b/tests/debug_console.py index 0238ed789783e4..e341b266c8ddc4 100755 --- a/tests/debug_console.py +++ b/tests/debug_console.py @@ -12,33 +12,38 @@ unsetcolor = "\033[00m" if __name__ == "__main__": - port_number = int(os.getenv("PORT", 0)) - claim = os.getenv("CLAIM") is not None + while True: + try: + port_number = int(os.getenv("PORT", 0)) + claim = os.getenv("CLAIM") is not None - serials = Panda.list() - if os.getenv("SERIAL"): - serials = filter(lambda x: x==os.getenv("SERIAL"), serials) + serials = Panda.list() + if os.getenv("SERIAL"): + serials = filter(lambda x: x==os.getenv("SERIAL"), serials) - pandas = list(map(lambda x: Panda(x, claim=claim), serials)) + pandas = list(map(lambda x: Panda(x, claim=claim), serials)) - if not len(pandas): - sys.exit("no pandas found") + if not len(pandas): + sys.exit("no pandas found") - if os.getenv("BAUD") is not None: - for panda in pandas: - panda.set_uart_baud(port_number, int(os.getenv("BAUD"))) + if os.getenv("BAUD") is not None: + for panda in pandas: + panda.set_uart_baud(port_number, int(os.getenv("BAUD"))) - while True: - for i, panda in enumerate(pandas): while True: - ret = panda.serial_read(port_number) - if len(ret) > 0: - sys.stdout.write(setcolor[i] + str(ret) + unsetcolor) - sys.stdout.flush() - else: - break - if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []): - ln = sys.stdin.readline() - if claim: - panda.serial_write(port_number, ln) - time.sleep(0.01) + for i, panda in enumerate(pandas): + while True: + ret = panda.serial_read(port_number) + if len(ret) > 0: + sys.stdout.write(setcolor[i] + str(ret) + unsetcolor) + sys.stdout.flush() + else: + break + if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []): + ln = sys.stdin.readline() + if claim: + panda.serial_write(port_number, ln) + time.sleep(0.01) + except: + print("panda disconnected!") + time.sleep(0.5); diff --git a/tests/health_test.py b/tests/health_test.py new file mode 100755 index 00000000000000..1042c860d866f1 --- /dev/null +++ b/tests/health_test.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python +import time +from panda import Panda + +if __name__ == "__main__": + panda_serials = Panda.list() + pandas = [] + for ps in panda_serials: + pandas.append(Panda(serial=ps)) + if len(pandas) == 0: + print("No pandas connected") + assert False + + while True: + for panda in pandas: + print(panda.health()) + print("\n") + time.sleep(0.5) +