From 768fdf7e19e94facbbfb7ad7259c1f87f8e86a19 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Sun, 13 Oct 2019 12:13:44 -0700 Subject: [PATCH] bytes() > chr().encode() --- python/uds.py | 82 +++++++++++++++++++++++++++------------------------ 1 file changed, 43 insertions(+), 39 deletions(-) diff --git a/python/uds.py b/python/uds.py index dc09d3fd6e709e..438a2ab719c37c 100644 --- a/python/uds.py +++ b/python/uds.py @@ -270,7 +270,7 @@ def __init__(self, panda, tx_addr, rx_addr=None, bus=0, timeout=10, debug=False) # standard 11 bit response addr (add 8) self.rx_addr = tx_addr+8 elif tx_addr > 0x10000000 and tx_addr < 0xFFFFFFFF: - # standard 19 bit response addr (flip last two bytes) + # standard 29 bit response addr (flip last two bytes) self.rx_addr = (tx_addr & 0xFFFF0000) + (tx_addr<<8 & 0xFF00) + (tx_addr>>8 & 0xFF) else: raise ValueError("invalid tx_addr: {}".format(tx_addr)) @@ -336,28 +336,32 @@ def _isotp_thread(self, debug): if tx_frame["done"] != False: tx_frame["done"] = True self.rx_queue.put(b"\x7F\xFF\xFFtx: no active frame") - # TODO: support wait - if rx_data[0] == 0x31: - tx_frame["done"] = True - self.rx_queue.put(b"\x7F\xFF\xFFtx: flow-control error - wait not supported") if rx_data[0] == 0x32: + # 0x32 = overflow/abort tx_frame["done"] = True self.rx_queue.put(b"\x7F\xFF\xFFtx: flow-control error - overflow/abort") - delay_ts = rx_data[2] & 0x7F - # scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1 - delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000. - # first frame = 6 bytes, each consecutive frame = 7 bytes - start = 6 + tx_frame["idx"] * 7 - count = rx_data[1] - end = start + count * 7 if count > 0 else tx_frame["size"] - for i in range(start, end, 7): - tx_frame["idx"] += 1 - # consecutive tx frames - msg = (chr(0x20 | (tx_frame["idx"] & 0xF)).encode() + tx_frame["data"][i:i+7]).ljust(8, b"\x00") - if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg))) - self.panda.can_send(self.tx_addr, msg, self.bus) - if delay_ts > 0: - time.sleep(delay_ts / delay_div) + if rx_data[0] != 0x30 and rx_data[0] != 0x31: + # 0x30 = continue + # 0x31 = wait + tx_frame["done"] = True + self.rx_queue.put(b"\x7F\xFF\xFFtx: flow-control error - invalid transfer state indicator") + if rx_data[0] == 0x30: + delay_ts = rx_data[2] & 0x7F + # scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1 + delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000. + # first frame = 6 bytes, each consecutive frame = 7 bytes + start = 6 + tx_frame["idx"] * 7 + count = rx_data[1] + end = start + count * 7 if count > 0 else tx_frame["size"] + for i in range(start, end, 7): + if delay_ts > 0 and i > start: + if (debug): print("D: {}".format(delay_ts / delay_div)) + time.sleep(delay_ts / delay_div) + tx_frame["idx"] += 1 + # consecutive tx frames + msg = (bytes([0x20 | (tx_frame["idx"] & 0xF)]) + tx_frame["data"][i:i+7]).ljust(8, b"\x00") + if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg))) + self.panda.can_send(self.tx_addr, msg, self.bus) if end >= tx_frame["size"]: tx_frame["done"] = True @@ -369,7 +373,7 @@ def _isotp_thread(self, debug): if tx_frame["size"] < 8: # single frame tx_frame["done"] = True - msg = (chr(tx_frame["size"]).encode() + tx_frame["data"]).ljust(8, b"\x00") + msg = (bytes([tx_frame["size"]]) + tx_frame["data"]).ljust(8, b"\x00") if (debug): print("S: {} {}".format(hex(self.tx_addr), hexlify(msg))) self.panda.can_send(self.tx_addr, msg, self.bus) else: @@ -386,9 +390,9 @@ def _isotp_thread(self, debug): # generic uds request def _uds_request(self, service_type, subfunction=None, data=None): - req = chr(service_type).encode() + req = bytes([service_type]) if subfunction is not None: - req += chr(subfunction).encode() + req += bytes([subfunction]) if data is not None: req += data self.tx_queue.put(req) @@ -459,7 +463,7 @@ def security_access(self, access_type, security_key=None): return security_seed def communication_control(self, control_type, message_type): - data = chr(message_type).encode() + data = bytes([message_type]) self._uds_request(SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data) def tester_present(self, ): @@ -494,7 +498,7 @@ def response_on_event(self, response_event_type, store_event, window_time, event if store_event: response_event_type |= 0x20 # TODO: split record parameters into arrays - data = char(window_time) + event_type_record + service_response_record + data = bytes([window_time, event_type_record, service_response_record]) resp = self._uds_request(SERVICE_TYPE.RESPONSE_ON_EVENT, subfunction=response_event_type, data=data) if response_event_type == RESPONSE_EVENT_TYPE.REPORT_ACTIVATED_EVENTS: @@ -512,7 +516,7 @@ def response_on_event(self, response_event_type, store_event, window_time, event def link_control(self, link_control_type, baud_rate_type=None): if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE: # baud_rate_type = BAUD_RATE_TYPE - data = chr(baud_rate_type).encode() + data = bytes([baud_rate_type]) elif link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE: # baud_rate_type = custom value (3 bytes big-endian) data = struct.pack('!I', baud_rate_type)[1:] @@ -534,7 +538,7 @@ def read_memory_by_address(self, memory_address, memory_size, memory_address_byt raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data = chr(memory_size_bytes<<4 | memory_address_bytes).encode() + data = bytes([memory_size_bytes<<4 | memory_address_bytes]) if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) @@ -556,7 +560,7 @@ def read_scaling_data_by_identifier(self, data_identifier_type): def read_data_by_periodic_identifier(self, transmission_mode_type, periodic_data_identifier): # TODO: support list of identifiers - data = chr(transmission_mode_type).encode() + chr(periodic_data_identifier).encode() + data = bytes([transmission_mode_type, periodic_data_identifier]) self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data) def dynamically_define_data_identifier(self, dynamic_definition_type, dynamic_data_identifier, source_definitions, memory_address_bytes=4, memory_size_bytes=1): @@ -568,9 +572,9 @@ def dynamically_define_data_identifier(self, dynamic_definition_type, dynamic_da data = struct.pack('!H', dynamic_data_identifier) if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER: for s in source_definitions: - data += struct.pack('!H', s["data_identifier"]) + chr(s["position"]).encode() + chr(s["memory_size"]).encode() + data += struct.pack('!H', s["data_identifier"]) + bytes([s["position"], s["memory_size"]]) elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS: - data += chr(memory_size_bytes<<4 | memory_address_bytes).encode() + data += bytes([memory_size_bytes<<4 | memory_address_bytes]) for s in source_definitions: if s["memory_address"] >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(s["memory_address"])) @@ -596,7 +600,7 @@ def write_memory_by_address(self, memory_address, memory_size, data_record, memo raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data = chr(memory_size_bytes<<4 | memory_address_bytes).encode() + data = bytes([memory_size_bytes<<4 | memory_address_bytes]) if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) @@ -621,7 +625,7 @@ def read_dtc_information(self, dtc_report_type, dtc_status_mask_type=DTC_STATUS_ dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK or \ dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK or \ dtc_report_type == DTC_REPORT_TYPE.EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK: - data += chr(dtc_status_mask_type).encode() + data += bytes([dtc_status_mask_type]) # dtc_mask_record if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \ dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \ @@ -637,11 +641,11 @@ def read_dtc_information(self, dtc_report_type, dtc_status_mask_type=DTC_STATUS_ # dtc_extended_record_num if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \ dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER: - data += chr(dtc_extended_record_num).encode() + data += bytes([dtc_extended_record_num]) # dtc_severity_mask_type if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD or \ dtc_report_type == DTC_REPORT_TYPE.DTC_BY_SEVERITY_MASK_RECORD: - data += chr(dtc_severity_mask_type).encode() + chr(dtc_status_mask_type).encode() + data += bytes([dtc_severity_mask_type, dtc_status_mask_type]) resp = self._uds_request(SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data) @@ -665,13 +669,13 @@ def routine_control(self, routine_control_type, routine_identifier_type, routine return resp[2:] def request_download(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00): - data = chr(data_format).encode() + data = bytes([data_format]) if memory_address_bytes < 1 or memory_address_bytes > 4: raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data += chr(memory_size_bytes<<4 | memory_address_bytes).encode() + data += bytes([memory_size_bytes<<4 | memory_address_bytes]) if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) @@ -690,13 +694,13 @@ def request_download(self, memory_address, memory_size, memory_address_bytes=4, return max_num_bytes # max number of bytes per transfer data request def request_upload(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00): - data = chr(data_format).encode() + data = bytes([data_format]) if memory_address_bytes < 1 or memory_address_bytes > 4: raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) - data += chr(memory_size_bytes<<4 | memory_address_bytes).encode() + data += bytes([memory_size_bytes<<4 | memory_address_bytes]) if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) @@ -715,7 +719,7 @@ def request_upload(self, memory_address, memory_size, memory_address_bytes=4, me return max_num_bytes # max number of bytes per transfer data request def transfer_data(self, block_sequence_count, data=b''): - data = chr(block_sequence_count).encode() + data + data = bytes([block_sequence_count]) + data resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data) resp_id = resp[0] if len(resp) > 0 else None if resp_id != block_sequence_count: