Skip to content

Commit

Permalink
uds: handle function addrs and fw version query example
Browse files Browse the repository at this point in the history
  • Loading branch information
gregjhogan committed Nov 17, 2019
1 parent 6626a54 commit 8138fc1
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 68 deletions.
60 changes: 0 additions & 60 deletions examples/eps_read_software_ids.py

This file was deleted.

71 changes: 71 additions & 0 deletions examples/query_fw_versions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env python3
from tqdm import tqdm
from panda import Panda
from panda.python.uds import UdsClient, MessageTimeoutError, NegativeResponseError, DATA_IDENTIFIER_TYPE

if __name__ == "__main__":
addrs = [0x700 + i for i in range(256)]
addrs += [0x18da0000 + (i<<8) + 0xf1 for i in range(256)]
results = {}

panda = Panda()
panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
print("querying addresses ...")
for addr in tqdm(addrs):
# skip functional broadcast addrs
if addr == 0x7df or addr == 0x18db33f1:
continue

uds_client = UdsClient(panda, addr, bus=1 if panda.has_obd() else 0, timeout=0.1, debug=False)
try:
uds_client.tester_present()
except NegativeResponseError:
pass
except MessageTimeoutError:
continue

resp = {}

try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_IDENTIFICATION)
if data: resp[DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_IDENTIFICATION] = data
except NegativeResponseError:
pass

try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION)
if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION] = data
except NegativeResponseError:
pass

try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_DATA_IDENTIFICATION)
if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_DATA_IDENTIFICATION] = data
except NegativeResponseError:
pass

try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_FINGERPRINT)
if data: resp[DATA_IDENTIFIER_TYPE.BOOT_SOFTWARE_FINGERPRINT] = data
except NegativeResponseError:
pass

try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_FINGERPRINT)
if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_FINGERPRINT] = data
except NegativeResponseError:
pass

try:
data = uds_client.read_data_by_identifier(DATA_IDENTIFIER_TYPE.APPLICATION_DATA_FINGERPRINT)
if data: resp[DATA_IDENTIFIER_TYPE.APPLICATION_DATA_FINGERPRINT] = data
except NegativeResponseError:
pass

if resp.keys():
results[addr] = resp

print("results:")
for addr, resp in results.items():
for id, dat in resp.items():
print(hex(addr), hex(id), dat.decode())
34 changes: 26 additions & 8 deletions python/uds.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import struct
from typing import Callable, NamedTuple, Tuple, List
from enum import IntEnum

class SERVICE_TYPE(IntEnum):
DIAGNOSTIC_SESSION_CONTROL = 0x10
ECU_RESET = 0x11
Expand Down Expand Up @@ -276,6 +277,23 @@ def __init__(self, can_send: Callable[[Tuple[int, bytes, int]], None], can_recv:
self.bus = bus
self.debug = debug

def _recv_filter(self, bus, addr):
# handle functionl addresses (switch to first addr to respond)
if self.tx_addr == 0x7DF:
is_response = addr >= 0x7E8 and addr <= 0x7EF
if is_response:
if self.debug: print(f"switch to physical addr {hex(addr)}")
self.tx_addr = addr-8
self.rx_addr = addr
return is_response
if self.tx_addr == 0x18DB33F1:
is_response = addr >= 0x18DAF100 and addr <= 0x18DAF1FF
if is_response:
if self.debug: print(f"switch to physical addr {hex(addr)}")
self.tx_addr = 0x18DA00F1 + (addr<<8 & 0xFF00)
self.rx_addr = addr
return bus == self.bus and addr == self.rx_addr

def recv(self, drain=False) -> List[bytes]:
msg_array = []
while True:
Expand All @@ -284,7 +302,7 @@ def recv(self, drain=False) -> List[bytes]:
if self.debug: print("CAN-RX: drain - {}".format(len(msgs)))
else:
for rx_addr, rx_ts, rx_data, rx_bus in msgs or []:
if rx_bus == self.bus and rx_addr == self.rx_addr and len(rx_data) > 0:
if self._recv_filter(rx_bus, rx_addr) and len(rx_data) > 0:
rx_data = bytes(rx_data) # convert bytearray to bytes
if self.debug: print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}")
msg_array.append(rx_data)
Expand All @@ -295,7 +313,7 @@ def recv(self, drain=False) -> List[bytes]:
def send(self, msgs: List[bytes], delay: float=0) -> None:
first = True
for msg in msgs:
if not first and delay:
if delay and not first:
if self.debug: print(f"CAN-TX: delay - {delay}")
time.sleep(delay)
if self.debug: print(f"CAN-TX: {hex(self.tx_addr)} - 0x{bytes.hex(msg)}")
Expand All @@ -317,6 +335,11 @@ def send(self, dat: bytes) -> None:
self.tx_idx = 0
self.tx_done = False

self.rx_dat = b""
self.rx_len = 0
self.rx_idx = 0
self.rx_done = False

if self.debug: print(f"ISO-TP: REQUEST - 0x{bytes.hex(self.tx_dat)}")
self._tx_first_frame()

Expand All @@ -333,11 +356,6 @@ def _tx_first_frame(self) -> None:
self._can_client.send([msg])

def recv(self) -> bytes:
self.rx_dat = b""
self.rx_len = 0
self.rx_idx = 0
self.rx_done = False

start_time = time.time()
try:
while True:
Expand All @@ -351,7 +369,7 @@ def recv(self) -> bytes:
if time.time() - start_time > self.timeout:
raise MessageTimeoutError("timeout waiting for response")
finally:
if self.debug: print(f"ISO-TP: RESPONSE - 0x{bytes.hex(self.rx_dat)}")
if self.debug and self.rx_dat: print(f"ISO-TP: RESPONSE - 0x{bytes.hex(self.rx_dat)}")

def _isotp_rx_next(self, rx_data: bytes) -> None:
# single rx_frame
Expand Down

0 comments on commit 8138fc1

Please sign in to comment.