Skip to content

Commit

Permalink
Defining types in return dictionaries (commaai#1923)
Browse files Browse the repository at this point in the history
* Defining types in return dictionaries

* Correcting indent

* Returning the class instead of dict

* Fixing failing linter

* Fixing whitespace error
  • Loading branch information
MarinkoMagla authored Aug 31, 2024
1 parent bd6cec3 commit a36ca22
Showing 1 changed file with 40 additions and 27 deletions.
67 changes: 40 additions & 27 deletions python/ccp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,35 @@
import time
import struct
from enum import IntEnum, Enum
from dataclasses import dataclass
from typing import Optional

@dataclass
class ExchangeStationIdsReturn:
id_length: int
data_type: int
available: int
protected: int

@dataclass
class GetDaqListSizeReturn:
list_size: int
first_pid: int

@dataclass
class GetSessionStatusReturn:
status: int
info: Optional[int]

@dataclass
class DiagnosticServiceReturn:
length: int
type: int

@dataclass
class ActionServiceReturn:
length: int
type: int

class COMMAND_CODE(IntEnum):
CONNECT = 0x01
Expand Down Expand Up @@ -140,15 +169,10 @@ def connect(self, station_addr: int) -> None:
self._send_cro(COMMAND_CODE.CONNECT, struct.pack("<H", station_addr))
self._recv_dto(0.025)

def exchange_station_ids(self, device_id_info: bytes = b"") -> dict:
def exchange_station_ids(self, device_id_info: bytes = b"") -> ExchangeStationIdsReturn:
self._send_cro(COMMAND_CODE.EXCHANGE_ID, device_id_info)
resp = self._recv_dto(0.025)
return { # TODO: define a type
"id_length": resp[0],
"data_type": resp[1],
"available": resp[2],
"protected": resp[3],
}
return ExchangeStationIdsReturn(id_length=resp[0], data_type=resp[1], available=resp[2], protected=resp[3])

def get_seed(self, resource_mask: int) -> bytes:
if resource_mask > 255:
Expand Down Expand Up @@ -211,15 +235,12 @@ def select_calibration_page(self) -> None:
self._send_cro(COMMAND_CODE.SELECT_CAL_PAGE)
self._recv_dto(0.025)

def get_daq_list_size(self, list_num: int, can_id: int = 0) -> dict:
def get_daq_list_size(self, list_num: int, can_id: int = 0) -> GetDaqListSizeReturn:
if list_num > 255:
raise ValueError("list number must be less than 256")
self._send_cro(COMMAND_CODE.GET_DAQ_SIZE, bytes([list_num, 0]) + struct.pack(f"{self.byte_order.value}I", can_id))
resp = self._recv_dto(0.025)
return { # TODO: define a type
"list_size": resp[0],
"first_pid": resp[1],
}
return GetDaqListSizeReturn(list_size=resp[0], first_pid=resp[1])

def set_daq_list_pointer(self, list_num: int, odt_num: int, element_num: int) -> None:
if list_num > 255:
Expand Down Expand Up @@ -266,13 +287,11 @@ def set_session_status(self, status: int) -> None:
self._send_cro(COMMAND_CODE.SET_S_STATUS, bytes([status]))
self._recv_dto(0.025)

def get_session_status(self) -> dict:
def get_session_status(self) -> GetSessionStatusReturn:
self._send_cro(COMMAND_CODE.GET_S_STATUS)
resp = self._recv_dto(0.025)
return { # TODO: define a type
"status": resp[0],
"info": resp[2] if resp[1] else None,
}
info = resp[2] if resp[1] else None
return GetSessionStatusReturn(status=resp[0], info=info)

def build_checksum(self, size: int) -> bytes:
self._send_cro(COMMAND_CODE.BUILD_CHKSUM, struct.pack(f"{self.byte_order.value}I", size))
Expand Down Expand Up @@ -310,29 +329,23 @@ def move_memory_block(self, size: int) -> None:
self._send_cro(COMMAND_CODE.MOVE, struct.pack(f"{self.byte_order.value}I", size))
self._recv_dto(0.025)

def diagnostic_service(self, service_num: int, data: bytes = b"") -> dict:
def diagnostic_service(self, service_num: int, data: bytes = b"") -> DiagnosticServiceReturn:
if service_num > 65535:
raise ValueError("service number must be less than 65536")
if len(data) > 4:
raise ValueError("max data size is 4 bytes")
self._send_cro(COMMAND_CODE.DIAG_SERVICE, struct.pack(f"{self.byte_order.value}H", service_num) + data)
resp = self._recv_dto(0.025)
return { # TODO: define a type
"length": resp[0],
"type": resp[1],
}
return DiagnosticServiceReturn(length=resp[0], type=resp[1])

def action_service(self, service_num: int, data: bytes = b"") -> dict:
def action_service(self, service_num: int, data: bytes = b"") -> ActionServiceReturn:
if service_num > 65535:
raise ValueError("service number must be less than 65536")
if len(data) > 4:
raise ValueError("max data size is 4 bytes")
self._send_cro(COMMAND_CODE.ACTION_SERVICE, struct.pack(f"{self.byte_order.value}H", service_num) + data)
resp = self._recv_dto(0.025)
return { # TODO: define a type
"length": resp[0],
"type": resp[1],
}
return ActionServiceReturn(length=resp[0], type=resp[1])

def test_availability(self, station_addr: int) -> None:
if station_addr > 65535:
Expand Down

0 comments on commit a36ca22

Please sign in to comment.