From a36ca220fa0d5e12b7d65a6c31c97a8982b6f25a Mon Sep 17 00:00:00 2001 From: MarinkoMagla <159032106+MarinkoMagla@users.noreply.github.com> Date: Sat, 31 Aug 2024 22:01:28 +0200 Subject: [PATCH] Defining types in return dictionaries (#1923) * Defining types in return dictionaries * Correcting indent * Returning the class instead of dict * Fixing failing linter * Fixing whitespace error --- python/ccp.py | 67 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 40 insertions(+), 27 deletions(-) diff --git a/python/ccp.py b/python/ccp.py index 3ba1ef3588..b6c6186ed7 100644 --- a/python/ccp.py +++ b/python/ccp.py @@ -2,6 +2,35 @@ import time import struct from enum import IntEnum, Enum +from dataclasses import dataclass +from typing import Optional + +@dataclass +class ExchangeStationIdsReturn: + id_length: int + data_type: int + available: int + protected: int + +@dataclass +class GetDaqListSizeReturn: + list_size: int + first_pid: int + +@dataclass +class GetSessionStatusReturn: + status: int + info: Optional[int] + +@dataclass +class DiagnosticServiceReturn: + length: int + type: int + +@dataclass +class ActionServiceReturn: + length: int + type: int class COMMAND_CODE(IntEnum): CONNECT = 0x01 @@ -140,15 +169,10 @@ def connect(self, station_addr: int) -> None: self._send_cro(COMMAND_CODE.CONNECT, struct.pack(" dict: + def exchange_station_ids(self, device_id_info: bytes = b"") -> ExchangeStationIdsReturn: self._send_cro(COMMAND_CODE.EXCHANGE_ID, device_id_info) resp = self._recv_dto(0.025) - return { # TODO: define a type - "id_length": resp[0], - "data_type": resp[1], - "available": resp[2], - "protected": resp[3], - } + return ExchangeStationIdsReturn(id_length=resp[0], data_type=resp[1], available=resp[2], protected=resp[3]) def get_seed(self, resource_mask: int) -> bytes: if resource_mask > 255: @@ -211,15 +235,12 @@ def select_calibration_page(self) -> None: self._send_cro(COMMAND_CODE.SELECT_CAL_PAGE) self._recv_dto(0.025) - def get_daq_list_size(self, list_num: int, can_id: int = 0) -> dict: + def get_daq_list_size(self, list_num: int, can_id: int = 0) -> GetDaqListSizeReturn: if list_num > 255: raise ValueError("list number must be less than 256") self._send_cro(COMMAND_CODE.GET_DAQ_SIZE, bytes([list_num, 0]) + struct.pack(f"{self.byte_order.value}I", can_id)) resp = self._recv_dto(0.025) - return { # TODO: define a type - "list_size": resp[0], - "first_pid": resp[1], - } + return GetDaqListSizeReturn(list_size=resp[0], first_pid=resp[1]) def set_daq_list_pointer(self, list_num: int, odt_num: int, element_num: int) -> None: if list_num > 255: @@ -266,13 +287,11 @@ def set_session_status(self, status: int) -> None: self._send_cro(COMMAND_CODE.SET_S_STATUS, bytes([status])) self._recv_dto(0.025) - def get_session_status(self) -> dict: + def get_session_status(self) -> GetSessionStatusReturn: self._send_cro(COMMAND_CODE.GET_S_STATUS) resp = self._recv_dto(0.025) - return { # TODO: define a type - "status": resp[0], - "info": resp[2] if resp[1] else None, - } + info = resp[2] if resp[1] else None + return GetSessionStatusReturn(status=resp[0], info=info) def build_checksum(self, size: int) -> bytes: self._send_cro(COMMAND_CODE.BUILD_CHKSUM, struct.pack(f"{self.byte_order.value}I", size)) @@ -310,29 +329,23 @@ def move_memory_block(self, size: int) -> None: self._send_cro(COMMAND_CODE.MOVE, struct.pack(f"{self.byte_order.value}I", size)) self._recv_dto(0.025) - def diagnostic_service(self, service_num: int, data: bytes = b"") -> dict: + def diagnostic_service(self, service_num: int, data: bytes = b"") -> DiagnosticServiceReturn: if service_num > 65535: raise ValueError("service number must be less than 65536") if len(data) > 4: raise ValueError("max data size is 4 bytes") self._send_cro(COMMAND_CODE.DIAG_SERVICE, struct.pack(f"{self.byte_order.value}H", service_num) + data) resp = self._recv_dto(0.025) - return { # TODO: define a type - "length": resp[0], - "type": resp[1], - } + return DiagnosticServiceReturn(length=resp[0], type=resp[1]) - def action_service(self, service_num: int, data: bytes = b"") -> dict: + def action_service(self, service_num: int, data: bytes = b"") -> ActionServiceReturn: if service_num > 65535: raise ValueError("service number must be less than 65536") if len(data) > 4: raise ValueError("max data size is 4 bytes") self._send_cro(COMMAND_CODE.ACTION_SERVICE, struct.pack(f"{self.byte_order.value}H", service_num) + data) resp = self._recv_dto(0.025) - return { # TODO: define a type - "length": resp[0], - "type": resp[1], - } + return ActionServiceReturn(length=resp[0], type=resp[1]) def test_availability(self, station_addr: int) -> None: if station_addr > 65535: