From dca176e7170901f054aef4fb93c8bf73d63a1026 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Sun, 25 Nov 2018 12:53:56 -0800 Subject: [PATCH] syntax errors --- python/uds.py | 183 ++++++++++++++++++++++++++------------------------ 1 file changed, 97 insertions(+), 86 deletions(-) diff --git a/python/uds.py b/python/uds.py index 185f4257ae190c..1244f2c50a78e1 100644 --- a/python/uds.py +++ b/python/uds.py @@ -1,10 +1,10 @@ #!/usr/bin/env python import time import struct -from enum import Enum +from enum import IntEnum import threading -class SERVICE_TYPE(Enum): +class SERVICE_TYPE(IntEnum): DIAGNOSTIC_SESSION_CONTROL = 0x10 ECU_RESET = 0x11 SECURITY_ACCESS = 0x27 @@ -83,41 +83,52 @@ class InvalidSubFunctioneError(Exception): pass # generic uds request -def _request(address, service_type, subfunction, data=None): +def _request(address, service_type, subfunction=None, data=None): # TODO: send request # TODO: wait for response - - # raise exception on error - if resp[0] == '\x7f' - error_code = resp[2] - error_desc = _negative_response_codes[error_code] - raise NegativeResponseError('[{}] {}'.format(hex(ord(error_code)), error_desc)) + #resp = '\x7f'+chr(service_type)+'\x10' + resp = '\x62\xf1\x90' + '\x57\x30\x4c\x30\x30\x30\x30\x34\x33\x4d\x42\x35\x34\x31\x33\x32\x36' resp_sid = ord(resp[0]) if len(resp) > 0 else None - if service != resp_sid + 0x40: + + # negative response + if resp_sid == 0x7F: + try: + error_service = SERVICE_TYPE(ord(resp[1])).name + except: + error_service = 'NON_STANDARD_SERVICE' + error_code = hex(ord(resp[2])) if len(resp) > 2 else '0x??' + try: + error_desc = _negative_response_codes[resp[2]] + except: + error_desc = 'unknown error' + raise NegativeResponseError('{} - {} - {}'.format(error_service, error_code, error_desc)) + + # positive response + if service_type+0x40 != resp_sid: resp_sid_hex = hex(resp_sid) if resp_sid is not None else None raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex)) - if subfunction is None: - resp_subf = ord(resp[1]) if len(resp) > 1 else None - if subfunction != resp_subf: - resp_subf_hex = hex(resp_subf) if resp_subf is not None else None - raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_subf))) + if subfunction is not None: + resp_sfn = ord(resp[1]) if len(resp) > 1 else None + if subfunction != resp_sfn: + resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None + raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_sfn))) # return data (exclude service id and sub-function id) return resp[(1 if subfunction is None else 2):] # services -class SESSION_TYPE(Enum): +class SESSION_TYPE(IntEnum): DEFAULT = 1 PROGRAMMING = 2 EXTENDED_DIAGNOSTIC = 3 SAFETY_SYSTEM_DIAGNOSTIC = 4 def diagnostic_session_control(address, session_type): - _request(address, service=SERVICE_TYPE.DIAGNOSTIC_SESSION_CONTROL, subfunction=session_type) + _request(address, SERVICE_TYPE.DIAGNOSTIC_SESSION_CONTROL, subfunction=session_type) -class RESET_TYPE(Enum): +class RESET_TYPE(IntEnum): HARD = 1 KEY_OFF_ON = 2 SOFT = 3 @@ -127,11 +138,11 @@ class RESET_TYPE(Enum): def ecu_reset(address, reset_type): resp = _request(address, SERVICE_TYPE.ECU_RESET, subfunction=reset_type) power_down_time = None - if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN + if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN: power_down_time = ord(resp[0]) return power_down_time -class ACCESS_TYPE(Enum): +class ACCESS_TYPE(IntEnum): REQUEST_SEED = 1 SEND_KEY = 2 @@ -141,30 +152,30 @@ def security_access(address, access_type, security_key=None): raise ValueError('security_key not allowed') if not request_seed and security_key is None: raise ValueError('security_key is missing') - resp = _request(address, service=SERVICE_TYPE.SECURITY_ACCESS, subfunction=access_type, data=security_key) + resp = _request(address, SERVICE_TYPE.SECURITY_ACCESS, subfunction=access_type, data=security_key) if request_seed: security_seed = resp return security_seed -class CONTROL_TYPE(Enum): +class CONTROL_TYPE(IntEnum): ENABLE_RX_ENABLE_TX = 0 ENABLE_RX_DISABLE_TX = 1 DISABLE_RX_ENABLE_TX = 2 DISABLE_RX_DISABLE_TX = 3 -class MESSAGE_TYPE(Enum): +class MESSAGE_TYPE(IntEnum): NORMAL = 1 NETWORK_MANAGEMENT = 2 NORMAL_AND_NETWORK_MANAGEMENT = 3 def communication_control(address, control_type, message_type): data = chr(message_type) - _request(address, service=SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data) + _request(address, SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data) def tester_present(address): - _request(address, service=SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00) + _request(address, SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00) -class TIMING_PARAMETER_TYPE(Enum): +class TIMING_PARAMETER_TYPE(IntEnum): READ_EXTENDED_SET = 1 SET_TO_DEFAULT_VALUES = 2 READ_CURRENTLY_ACTIVE = 3 @@ -180,7 +191,7 @@ def access_timing_parameter(address, timing_parameter_type, parameter_values): raise ValueError('parameter_values not allowed') if write_custom_values and parameter_values is None: raise ValueError('parameter_values is missing') - resp = _request(address, service=SERVICE_TYPE.ACCESS_TIMING_PARAMETER, subfunction=timing_parameter_type, data=parameter_values) + resp = _request(address, SERVICE_TYPE.ACCESS_TIMING_PARAMETER, subfunction=timing_parameter_type, data=parameter_values) if read_values: # TODO: parse response into values? parameter_values = resp @@ -188,18 +199,18 @@ def access_timing_parameter(address, timing_parameter_type, parameter_values): def secured_data_transmission(address, data): # TODO: split data into multiple input parameters? - resp = _request(address, service=SERVICE_TYPE.SECURED_DATA_TRANSMISSION, subfunction=None, data=data) + resp = _request(address, SERVICE_TYPE.SECURED_DATA_TRANSMISSION, subfunction=None, data=data) # TODO: parse response into multiple output values? return resp -class DTC_SETTING_TYPE(Enum): +class DTC_SETTING_TYPE(IntEnum): ON = 1 OFF = 2 def control_dtc_setting(address, dtc_setting_type): - _request(address, service=SERVICE_TYPE.CONTROL_DTC_SETTING, subfunction=dtc_setting_type) + _request(address, SERVICE_TYPE.CONTROL_DTC_SETTING, subfunction=dtc_setting_type) -class RESPONSE_EVENT_TYPE(Enum): +class RESPONSE_EVENT_TYPE(IntEnum): STOP_RESPONSE_ON_EVENT = 0 ON_DTC_STATUS_CHANGE = 1 ON_TIMER_INTERRUPT = 2 @@ -214,7 +225,7 @@ def response_on_event(address, response_event_type, store_event, window_time, ev response_event_type |= 0x20 # TODO: split record parameters into arrays data = char(window_time) + event_type_record + service_response_record - resp = _request(address, service=SERVICE_TYPE.RESPONSE_ON_EVENT, subfunction=response_event_type, data=data) + resp = _request(address, SERVICE_TYPE.RESPONSE_ON_EVENT, subfunction=response_event_type, data=data) if response_event_type == REPORT_ACTIVATED_EVENTS: return { @@ -228,12 +239,12 @@ def response_on_event(address, response_event_type, store_event, window_time, ev "data": resp[2:], # TODO: parse the reset of response } -class LINK_CONTROL_TYPE(Enum): +class LINK_CONTROL_TYPE(IntEnum): VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE = 1 VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE = 2 TRANSITION_BAUDRATE = 3 -class BAUD_RATE_TYPE(Enum): +class BAUD_RATE_TYPE(IntEnum): PC9600 = 1 PC19200 = 2 PC38400 = 3 @@ -253,9 +264,9 @@ def link_control(address, link_control_type, baud_rate_type=None): data = struct.pack('!I', baud_rate_type)[1:] else: data = None - _request(address, service=SERVICE_TYPE.LINK_CONTROL, subfunction=link_control_type, data=data) + _request(address, SERVICE_TYPE.LINK_CONTROL, subfunction=link_control_type, data=data) -class DATA_IDENTIFIER_TYPE(Enum): +class DATA_IDENTIFIER_TYPE(IntEnum): BOOT_SOFTWARE_IDENTIFICATION = 0XF180 APPLICATION_SOFTWARE_IDENTIFICATION = 0XF181 APPLICATION_DATA_IDENTIFICATION = 0XF182 @@ -291,7 +302,7 @@ class DATA_IDENTIFIER_TYPE(Enum): def read_data_by_identifier(address, data_identifier_type): # TODO: support list of identifiers data = struct.pack('!H', data_identifier_type) - resp = _request(address, service=SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data) + resp = _request(address, SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None if resp_id != data_identifier_type: raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) @@ -304,25 +315,25 @@ def read_memory_by_address(address, memory_address, memory_size, memory_address_ raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) data = struct.pack('!BB', memory_size_bytes, memory_address_bytes) - if memory_address >= 1<<(memory_address_bytes*8) + if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) data += struct.pack('!I', memory_address)[4-memory_address_bytes:] - if memory_size >= 1<<(memory_size_bytes*8) + if memory_size >= 1<<(memory_size_bytes*8): raise ValueError('invalid memory_size: {}'.format(memory_size)) data += struct.pack('!I', memory_size)[4-memory_size_bytes:] - resp = _request(address, service=SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data) + resp = _request(address, SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data) return resp def read_scaling_data_by_identifier(address, data_identifier_type): data = struct.pack('!H', data_identifier_type) - resp = _request(address, service=SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data) + resp = _request(address, SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None if resp_id != data_identifier_type: raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) return resp[2:] # TODO: parse the response -class TRANSMISSION_MODE_TYPE(Enum): +class TRANSMISSION_MODE_TYPE(IntEnum): SEND_AT_SLOW_RATE = 1 SEND_AT_MEDIUM_RATE = 2 SEND_AT_FAST_RATE = 3 @@ -331,9 +342,9 @@ class TRANSMISSION_MODE_TYPE(Enum): def read_data_by_periodic_identifier(address, transmission_mode_type, periodic_data_identifier): # TODO: support list of identifiers data = chr(transmission_mode_type) + chr(periodic_data_identifier) - _request(address, service=SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data) + _request(address, SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data) -class DYNAMIC_DEFINITION_TYPE(Enum): +class DYNAMIC_DEFINITION_TYPE(IntEnum): DEFINE_BY_IDENTIFIER = 1 DEFINE_BY_MEMORY_ADDRESS = 2 CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER = 3 @@ -352,21 +363,21 @@ def dynamically_define_data_identifier(address, dynamic_definition_type, dynamic elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS: data += struct.pack('!BB', memory_size_bytes, memory_address_bytes) for s in source_definitions: - if s["memory_address"] >= 1<<(memory_address_bytes*8) + if s["memory_address"] >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(s["memory_address"])) data += struct.pack('!I', memory_address)[4-memory_address_bytes:] - if s["memory_size"] >= 1<<(memory_size_bytes*8) + if s["memory_size"] >= 1<<(memory_size_bytes*8): raise ValueError('invalid memory_size: {}'.format(s["memory_size"])) data += struct.pack('!I', s["memory_size"])[4-memory_size_bytes:] elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER: pass else: raise ValueError('invalid dynamic identifier type: {}'.format(hex(dynamic_definition_type))) - _request(address, service=SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data) + _request(address, SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data) def write_data_by_identifier(address, data_identifier_type, data_record): data = struct.pack('!H', data_identifier_type) + data_record - resp = _request(address, service=SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data) + resp = _request(address, SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None if resp_id != data_identifier_type: raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) @@ -378,25 +389,25 @@ def write_memory_by_address(address, memory_address, memory_size, data_record, m raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) data = struct.pack('!BB', memory_size_bytes, memory_address_bytes) - if memory_address >= 1<<(memory_address_bytes*8) + if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) data += struct.pack('!I', memory_address)[4-memory_address_bytes:] - if memory_size >= 1<<(memory_size_bytes*8) + if memory_size >= 1<<(memory_size_bytes*8): raise ValueError('invalid memory_size: {}'.format(memory_size)) data += struct.pack('!I', memory_size)[4-memory_size_bytes:] data += data_record - _request(address, service=SERVICE_TYPE.WRITE_MEMORY_BY_ADDRESS, subfunction=0x00, data=data) + _request(address, SERVICE_TYPE.WRITE_MEMORY_BY_ADDRESS, subfunction=0x00, data=data) -class DTC_GROUP_TYPE(Enum): +class DTC_GROUP_TYPE(IntEnum): EMISSIONS = 0x000000 ALL = 0xFFFFFF def clear_diagnostic_information(address, dtc_group_type): data = struct.pack('!I', dtc_group_type)[1:] # 3 bytes - _request(address, service=SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data) + _request(address, SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data) -class DTC_REPORT_TYPE(Enum): +class DTC_REPORT_TYPE(IntEnum): NUMBER_OF_DTC_BY_STATUS_MASK = 0x01 DTC_BY_STATUS_MASK = 0x02 DTC_SNAPSHOT_IDENTIFICATION = 0x03 @@ -419,7 +430,7 @@ class DTC_REPORT_TYPE(Enum): DTC_FAULT_DETECTION_COUNTER = 0x14 DTC_WITH_PERMANENT_STATUS = 0x15 -class DTC_STATUS_MASK_TYPE(Enum): +class DTC_STATUS_MASK_TYPE(IntEnum): TEST_FAILED = 0x01 TEST_FAILED_THIS_OPERATION_CYCLE = 0x02 PENDING_DTC = 0x04 @@ -430,7 +441,7 @@ class DTC_STATUS_MASK_TYPE(Enum): WARNING_INDICATOR_REQUESTED = 0x80 ALL = 0xFF -class DTC_SEVERITY_MASK_TYPE(Enum): +class DTC_SEVERITY_MASK_TYPE(IntEnum): MAINTENANCE_ONLY = 0x20 CHECK_AT_NEXT_HALT = 0x40 CHECK_IMMEDIATELY = 0x80 @@ -439,40 +450,40 @@ class DTC_SEVERITY_MASK_TYPE(Enum): def read_dtc_information(address, dtc_report_type, dtc_status_mask_type=DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type=DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record=0xFFFFFF, dtc_snapshot_record_num=0xFF, dtc_extended_record_num=0xFF): data = '' # dtc_status_mask_type - if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_STATUS_MASK or - dtc_report_type == DTC_REPORT_TYPE.DTC_BY_STATUS_MASK or - dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_BY_STATUS_MASK or - dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK or - dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK or + if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_STATUS_MASK or \ + dtc_report_type == DTC_REPORT_TYPE.DTC_BY_STATUS_MASK or \ + dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_BY_STATUS_MASK or \ + dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK or \ + dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK or \ dtc_report_type == DTC_REPORT_TYPE.EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK: data += chr(dtc_status_mask_type) # dtc_mask_record - if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or - dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or - dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or - dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or + if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \ + dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \ + dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \ + dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \ dtc_report_type == DTC_REPORT_TYPE.SEVERITY_INFORMATION_OF_DTC: data += struct.pack('!I', dtc_mask_record)[1:] # 3 bytes # dtc_snapshot_record_num - if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or - dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or + if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \ + dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \ dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER: data += ord(dtc_snapshot_record_num) # dtc_extended_record_num - if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or + if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \ dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER: data += chr(dtc_extended_record_num) # dtc_severity_mask_type - if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD or + if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD or \ dtc_report_type == DTC_REPORT_TYPE.DTC_BY_SEVERITY_MASK_RECORD: data += chr(dtc_severity_mask_type) + chr(dtc_status_mask_type) - resp = _request(address, service=SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data) + resp = _request(address, SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data) # TODO: parse response return resp -class CONTROL_OPTION_TYPE(Enum): +class CONTROL_OPTION_TYPE(IntEnum): RETURN_CONTROL_TO_ECU = 0 RESET_TO_DEFAULT = 1 FREEZE_CURRENT_STATE = 2 @@ -480,25 +491,25 @@ class CONTROL_OPTION_TYPE(Enum): def input_output_control_by_identifier(address, data_identifier_type, control_option_record, control_enable_mask_record=''): data = struct.pack('!H', data_identifier_type) + control_option_record + control_enable_mask_record - resp = _request(address, service=SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data) + resp = _request(address, SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None if resp_id != data_identifier_type: raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) return resp[2:] -class ROUTINE_CONTROL_TYPE(Enum): +class ROUTINE_CONTROL_TYPE(IntEnum): START = 1 STOP = 2 REQUEST_RESULTS = 3 -class ROUTINE_IDENTIFIER_TYPE(Enum): +class ROUTINE_IDENTIFIER_TYPE(IntEnum): ERASE_MEMORY = 0xFF00 CHECK_PROGRAMMING_DEPENDENCIES = 0xFF01 ERASE_MIRROR_MEMORY_DTCS = 0xFF02 def routine_control(address, routine_control_type, routine_identifier_type, routine_option_record=''): data = struct.pack('!H', routine_identifier_type) + routine_option_record - _request(address, service=SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data) + _request(address, SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data) resp = resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None if resp_id != routine_identifier_type: raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id))) @@ -513,14 +524,14 @@ def request_download(address, memory_address, memory_size, memory_address_bytes= raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) data += struct.pack('!BB', memory_size_bytes, memory_address_bytes) - if memory_address >= 1<<(memory_address_bytes*8) + if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) data += struct.pack('!I', memory_address)[4-memory_address_bytes:] - if memory_size >= 1<<(memory_size_bytes*8) + if memory_size >= 1<<(memory_size_bytes*8): raise ValueError('invalid memory_size: {}'.format(memory_size)) data += struct.pack('!I', memory_size)[4-memory_size_bytes:] - resp = _request(address, service=SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data) + resp = _request(address, SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data) max_num_bytes_len = ord(resp[0]) >> 4 if len(resp) > 0 else None if max_num_bytes_len >= 1 and max_num_bytes_len <= 4: max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0] @@ -538,14 +549,14 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4, raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) data += struct.pack('!BB', memory_size_bytes, memory_address_bytes) - if memory_address >= 1<<(memory_address_bytes*8) + if memory_address >= 1<<(memory_address_bytes*8): raise ValueError('invalid memory_address: {}'.format(memory_address)) data += struct.pack('!I', memory_address)[4-memory_address_bytes:] - if memory_size >= 1<<(memory_size_bytes*8) + if memory_size >= 1<<(memory_size_bytes*8): raise ValueError('invalid memory_size: {}'.format(memory_size)) data += struct.pack('!I', memory_size)[4-memory_size_bytes:] - resp = _request(address, service=SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data) + resp = _request(address, SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data) max_num_bytes_len = ord(resp[0]) >> 4 if len(resp) > 0 else None if max_num_bytes_len >= 1 and max_num_bytes_len <= 4: max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0] @@ -555,16 +566,16 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4, return max_num_bytes # max number of bytes per transfer data request def transfer_data(address, block_sequence_count, data=''): - resp = _request(address, service=SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data) + resp = _request(address, SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data) resp_id = ord(resp[0]) if len(resp) > 0 else None if resp_id != block_sequence_count: raise ValueError('invalid block_sequence_count: {}'.format(resp_id)) return resp[1:] -def request_transfer_exit(address) - _request(address, service=SERVICE_TYPE.REQUEST_TRANSFER_EXIT, subfunction=None) +def request_transfer_exit(address): + _request(address, SERVICE_TYPE.REQUEST_TRANSFER_EXIT, subfunction=None) if __name__ == "__main__": - from . import uds # examples - vin = uds.read_data_by_identifier(0x18da10f1, uds.DATA_IDENTIFIER.VIN) + vin = read_data_by_identifier(0x18da10f1, DATA_IDENTIFIER_TYPE.VIN) + print(vin)