diff --git a/examples/eps_read_software_ids.py b/examples/eps_read_software_ids.py deleted file mode 100755 index 8967a83a34e9af..00000000000000 --- a/examples/eps_read_software_ids.py +++ /dev/null @@ -1,60 +0,0 @@ -#!/usr/bin/env python3 -from panda import Panda -from panda.python.uds import UdsClient, NegativeResponseError, DATA_IDENTIFIER_TYPE - -if __name__ == "__main__": - address = 0x18da30f1 # Honda EPS - panda = Panda() - panda.set_safety_mode(Panda.SAFETY_ELM327) - uds_client = UdsClient(panda, address, bus=1 if panda.has_obd() else 0, debug=False) - - print("tester present ...") - uds_client.tester_present() - - try: - print("") - print("read data by id: boot software id ...") - data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_IDENTIFICATION) - print(data.decode('utf-8')) - except NegativeResponseError as e: - print(e) - - try: - print("") - print("read data by id: application software id ...") - data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION) - print(data.decode('utf-8')) - except NegativeResponseError as e: - print(e) - - try: - print("") - print("read data by id: application data id ...") - data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_DATA_IDENTIFICATION) - print(data.decode('utf-8')) - except NegativeResponseError as e: - print(e) - - try: - print("") - print("read data by id: boot software fingerprint ...") - data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_FINGERPRINT) - print(data.decode('utf-8')) - except NegativeResponseError as e: - print(e) - - try: - print("") - print("read data by id: application software fingerprint ...") - data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_FINGERPRINT) - print(data.decode('utf-8')) - except NegativeResponseError as e: - print(e) - - try: - print("") - print("read data by id: application data fingerprint ...") - data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_DATA_FINGERPRINT) - print(data.decode('utf-8')) - except NegativeResponseError as e: - print(e) diff --git a/examples/query_fw_versions.py b/examples/query_fw_versions.py new file mode 100755 index 00000000000000..c6704f0d4b386d --- /dev/null +++ b/examples/query_fw_versions.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +from tqdm import tqdm +from panda import Panda +from panda.python.uds import UdsClient, MessageTimeoutError, NegativeResponseError, DATA_IDENTIFIER_TYPE + +if __name__ == "__main__": + addrs = [0x700 + i for i in range(256)] + addrs += [0x18da0000 + (i<<8) + 0xf1 for i in range(256)] + results = {} + + panda = Panda() + panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + print("querying addresses ...") + for addr in tqdm(addrs): + # skip functional broadcast addrs + if addr == 0x7df or addr == 0x18db33f1: + continue + + uds_client = UdsClient(panda, addr, bus=1 if panda.has_obd() else 0, timeout=0.1, debug=False) + try: + uds_client.tester_present() + except NegativeResponseError: + pass + except MessageTimeoutError: + continue + + resp = {} + + try: + data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_IDENTIFICATION) + if data: resp[DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_IDENTIFICATION] = data + except NegativeResponseError: + pass + + try: + data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION) + if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION] = data + except NegativeResponseError: + pass + + try: + data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_DATA_IDENTIFICATION) + if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_DATA_IDENTIFICATION] = data + except NegativeResponseError: + pass + + try: + data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_FINGERPRINT) + if data: resp[DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_FINGERPRINT] = data + except NegativeResponseError: + pass + + try: + data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_FINGERPRINT) + if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_FINGERPRINT] = data + except NegativeResponseError: + pass + + try: + data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_DATA_FINGERPRINT) + if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_DATA_FINGERPRINT] = data + except NegativeResponseError: + pass + + if resp.keys(): + results[addr] = resp + + print("results:") + for addr, resp in results.items(): + for id, dat in resp.items(): + print(hex(addr), hex(id), dat.decode()) diff --git a/python/uds.py b/python/uds.py index 1967f0bb07bdf9..190796bb6d3d84 100644 --- a/python/uds.py +++ b/python/uds.py @@ -3,6 +3,7 @@ import struct from typing import Callable, NamedTuple, Tuple, List from enum import IntEnum + class SERVICE_TYPE(IntEnum): DIAGNOSTIC_SESSION_CONTROL = 0x10 ECU_RESET = 0x11 @@ -276,6 +277,23 @@ def __init__(self, can_send: Callable[[Tuple[int, bytes, int]], None], can_recv: self.bus = bus self.debug = debug + def _recv_filter(self, bus, addr): + # handle functionl addresses (switch to first addr to respond) + if self.tx_addr == 0x7DF: + is_response = addr >= 0x7E8 and addr <= 0x7EF + if is_response: + if self.debug: print(f"switch to physical addr {hex(addr)}") + self.tx_addr = addr-8 + self.rx_addr = addr + return is_response + if self.tx_addr == 0x18DB33F1: + is_response = addr >= 0x18DAF100 and addr <= 0x18DAF1FF + if is_response: + if self.debug: print(f"switch to physical addr {hex(addr)}") + self.tx_addr = 0x18DA00F1 + (addr<<8 & 0xFF00) + self.rx_addr = addr + return bus == self.bus and addr == self.rx_addr + def recv(self, drain=False) -> List[bytes]: msg_array = [] while True: @@ -284,7 +302,7 @@ def recv(self, drain=False) -> List[bytes]: if self.debug: print("CAN-RX: drain - {}".format(len(msgs))) else: for rx_addr, rx_ts, rx_data, rx_bus in msgs or []: - if rx_bus == self.bus and rx_addr == self.rx_addr and len(rx_data) > 0: + if self._recv_filter(rx_bus, rx_addr) and len(rx_data) > 0: rx_data = bytes(rx_data) # convert bytearray to bytes if self.debug: print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}") msg_array.append(rx_data) @@ -295,7 +313,7 @@ def recv(self, drain=False) -> List[bytes]: def send(self, msgs: List[bytes], delay: float=0) -> None: first = True for msg in msgs: - if not first and delay: + if delay and not first: if self.debug: print(f"CAN-TX: delay - {delay}") time.sleep(delay) if self.debug: print(f"CAN-TX: {hex(self.tx_addr)} - 0x{bytes.hex(msg)}") @@ -317,6 +335,11 @@ def send(self, dat: bytes) -> None: self.tx_idx = 0 self.tx_done = False + self.rx_dat = b"" + self.rx_len = 0 + self.rx_idx = 0 + self.rx_done = False + if self.debug: print(f"ISO-TP: REQUEST - 0x{bytes.hex(self.tx_dat)}") self._tx_first_frame() @@ -333,11 +356,6 @@ def _tx_first_frame(self) -> None: self._can_client.send([msg]) def recv(self) -> bytes: - self.rx_dat = b"" - self.rx_len = 0 - self.rx_idx = 0 - self.rx_done = False - start_time = time.time() try: while True: @@ -351,7 +369,7 @@ def recv(self) -> bytes: if time.time() - start_time > self.timeout: raise MessageTimeoutError("timeout waiting for response") finally: - if self.debug: print(f"ISO-TP: RESPONSE - 0x{bytes.hex(self.rx_dat)}") + if self.debug and self.rx_dat: print(f"ISO-TP: RESPONSE - 0x{bytes.hex(self.rx_dat)}") def _isotp_rx_next(self, rx_data: bytes) -> None: # single rx_frame