Skip to content

Commit

Permalink
IMPROVEMENT: Annotate more return types
Browse files Browse the repository at this point in the history
  • Loading branch information
amilcarlucas committed Nov 18, 2024
1 parent 30d576a commit cc42993
Show file tree
Hide file tree
Showing 14 changed files with 59 additions and 60 deletions.
2 changes: 1 addition & 1 deletion MethodicConfigurator/annotate_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def arg_parser():
logging.basicConfig(level=logging.WARNING, format="%(levelname)s: %(message)s")

# Custom validation for --max-line-length
def check_max_line_length(value):
def check_max_line_length(value: int) -> int:
if value < 50 or value > 300:
logging.critical("--max-line-length must be in the interval 50 .. 300, not %d", value)
raise SystemExit("Correct it and try again")
Expand Down
4 changes: 2 additions & 2 deletions MethodicConfigurator/backend_flightcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ def __list_network_ports() -> list[str]:
return ["tcp:127.0.0.1:5760", "udp:127.0.0.1:14550"]

# pylint: disable=duplicate-code
def __auto_detect_serial(self):
def __auto_detect_serial(self) -> list[mavutil.SerialPort]:
preferred_ports = [
"*FTDI*",
"*Arduino_Mega_2560*",
Expand Down Expand Up @@ -534,7 +534,7 @@ def get_connection_tuples(self):
"""
return self.__connection_tuples

def upload_file(self, local_filename: str, remote_filename: str, progress_callback=None):
def upload_file(self, local_filename: str, remote_filename: str, progress_callback=None) -> bool:
"""Upload a file to the flight controller."""
if self.master is None:
return False
Expand Down
6 changes: 3 additions & 3 deletions MethodicConfigurator/backend_mavftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ def append(self, v) -> None:
setting = MAVFTPSetting(name, s_type, default)
self._vars[setting.name] = setting

def __getattr__(self, name):
def __getattr__(self, name) -> Union[int, float]:
try:
return self._vars[name].value
return self._vars[name].value # type: ignore
except Exception as exc:
raise AttributeError from exc

Expand Down Expand Up @@ -957,7 +957,7 @@ def cmd_status(self) -> MAVFTPReturn:
)
return MAVFTPReturn("Status", ERR_None)

def __op_parse(self, m):
def __op_parse(self, m) -> FTP_OP:
"""parse a FILE_TRANSFER_PROTOCOL msg"""
hdr = bytearray(m.payload[0:12])
(seq, session, opcode, size, req_opcode, burst_complete, _pad, offset) = struct.unpack("<HBBBBBBI", hdr)
Expand Down
2 changes: 1 addition & 1 deletion MethodicConfigurator/frontend_tkinter_component_editor.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ def __set_gnss_type_and_protocol_from_fc_parameters(self, fc_parameters: dict) -
logging_error("GPS_TYPE %u not in gnss_receiver_connection", gps1_type)
self.data["Components"]["GNSS Receiver"]["FC Connection"]["Type"] = "None"

def __set_serial_type_and_protocol_from_fc_parameters(self, fc_parameters: dict):
def __set_serial_type_and_protocol_from_fc_parameters(self, fc_parameters: dict) -> bool:
if "RC_PROTOCOLS" in fc_parameters:
rc_protocols_nr = int(fc_parameters["RC_PROTOCOLS"])
# check if rc_protocols_nr is a power of two (only one bit set)
Expand Down
16 changes: 8 additions & 8 deletions MethodicConfigurator/frontend_tkinter_parameter_editor_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def __update_table(self, params, fc_parameters) -> None:
else _("No documentation available in apm.pdef.xml for this parameter")
)

column = []
column: list[tk.Widget] = []
column.append(self.__create_delete_button(param_name))
column.append(self.__create_parameter_name(param_name, param_metadata, doc_tooltip))
column.append(self.__create_flightcontroller_value(fc_parameters, param_name, param_default, doc_tooltip))
Expand Down Expand Up @@ -233,15 +233,15 @@ def __update_table(self, params, fc_parameters) -> None:
self.view_port.columnconfigure(5, weight=0) # Upload to FC
self.view_port.columnconfigure(6, weight=1) # Change Reason

def __create_delete_button(self, param_name):
def __create_delete_button(self, param_name) -> ttk.Button:
delete_button = ttk.Button(
self.view_port, text=_("Del"), style="narrow.TButton", command=lambda: self.__on_parameter_delete(param_name)
)
tooltip_msg = _("Delete {param_name} from the {self.current_file} file")
show_tooltip(delete_button, tooltip_msg.format(**locals()))
return delete_button

def __create_parameter_name(self, param_name, param_metadata, doc_tooltip):
def __create_parameter_name(self, param_name, param_metadata, doc_tooltip) -> ttk.Label:
is_calibration = param_metadata.get("Calibration", False) if param_metadata else False
is_readonly = param_metadata.get("ReadOnly", False) if param_metadata else False
parameter_label = ttk.Label(
Expand All @@ -257,7 +257,7 @@ def __create_parameter_name(self, param_name, param_metadata, doc_tooltip):
show_tooltip(parameter_label, doc_tooltip)
return parameter_label

def __create_flightcontroller_value(self, fc_parameters, param_name, param_default, doc_tooltip):
def __create_flightcontroller_value(self, fc_parameters, param_name, param_default, doc_tooltip) -> ttk.Label:
if param_name in fc_parameters:
value_str = format(fc_parameters[param_name], ".6f").rstrip("0").rstrip(".")
if param_default is not None and is_within_tolerance(fc_parameters[param_name], param_default.value):
Expand Down Expand Up @@ -302,7 +302,7 @@ def __create_new_value_entry( # pylint: disable=too-many-arguments
param_metadata,
param_default,
doc_tooltip,
):
) -> Union[PairTupleCombobox, ttk.Entry]:
present_as_forced = False
if (
self.current_file in self.local_filesystem.forced_parameters
Expand Down Expand Up @@ -449,7 +449,7 @@ def update_label() -> None:

window.wait_window() # Wait for the window to be closed

def __create_unit_label(self, param_metadata):
def __create_unit_label(self, param_metadata) -> ttk.Label:
unit_label = ttk.Label(self.view_port, text=param_metadata.get("unit") if param_metadata else "")
unit_tooltip = (
param_metadata.get("unit_tooltip")
Expand All @@ -460,15 +460,15 @@ def __create_unit_label(self, param_metadata):
show_tooltip(unit_label, unit_tooltip)
return unit_label

def __create_upload_checkbutton(self, param_name, fc_connected):
def __create_upload_checkbutton(self, param_name, fc_connected) -> ttk.Checkbutton:
self.upload_checkbutton_var[param_name] = tk.BooleanVar(value=fc_connected)
upload_checkbutton = ttk.Checkbutton(self.view_port, variable=self.upload_checkbutton_var[param_name])
upload_checkbutton.configure(state="normal" if fc_connected else "disabled")
msg = _("When selected upload {param_name} new value to the flight controller")
show_tooltip(upload_checkbutton, msg.format(**locals()))
return upload_checkbutton

def __create_change_reason_entry(self, param_name, param, new_value_entry):
def __create_change_reason_entry(self, param_name, param, new_value_entry) -> ttk.Entry:
present_as_forced = False
if (
self.current_file in self.local_filesystem.forced_parameters
Expand Down
2 changes: 1 addition & 1 deletion MethodicConfigurator/frontend_tkinter_template_overview.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def __sort_by_column(self, col: str, reverse: bool) -> None:
self.tree.heading(col, command=lambda: self.__sort_by_column(col, not reverse))


def argument_parser():
def argument_parser() -> argparse.Namespace:
"""
Parses command-line arguments for the script.
Expand Down
10 changes: 5 additions & 5 deletions MethodicConfigurator/mavftp_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import os
import sys
from argparse import ArgumentParser
from argparse import ArgumentParser, Namespace
from logging import basicConfig as logging_basicConfig
from logging import debug as logging_debug
from logging import error as logging_error
Expand All @@ -28,7 +28,7 @@


# pylint: disable=duplicate-code
def argument_parser():
def argument_parser() -> Namespace:
"""
Parses command-line arguments for the script.
"""
Expand Down Expand Up @@ -58,7 +58,7 @@ def argument_parser():
return parser.parse_args()


def auto_detect_serial():
def auto_detect_serial() -> list:
preferred_ports = [
"*FTDI*",
"*3D*",
Expand All @@ -85,10 +85,10 @@ def auto_detect_serial():
):
serial_list.pop(1)

return serial_list
return serial_list # type: ignore[no-any-return]


def auto_connect(device):
def auto_connect(device) -> mavutil.SerialPort:
comport = None
if device:
comport = mavutil.SerialPort(device=device, description=device)
Expand Down
4 changes: 2 additions & 2 deletions MethodicConfigurator/middleware_template_overview.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,5 @@ def columns() -> tuple[str, ...]:
_("GNSS\nConnection"),
)

def attributes(self):
return self.__dict__.keys()
def attributes(self) -> list[str]:
return self.__dict__.keys() # type: ignore
10 changes: 5 additions & 5 deletions MethodicConfigurator/param_pid_adjustment_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import os
import re
import subprocess
from typing import Optional
from typing import Callable, Optional, Union

PARAM_NAME_REGEX = r"^[A-Z][A-Z_0-9]*$"
PARAM_NAME_MAX_LEN = 16
VERSION = "1.0"


def parse_arguments():
def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="""
Expand Down Expand Up @@ -68,7 +68,7 @@ def parse_arguments():
return args


def ranged_type(value_type, min_value, max_value):
def ranged_type(value_type: type, min_value, max_value) -> Callable:
"""
Return function handle of an argument type function for ArgumentParser checking a range:
min_value <= arg <= max_value
Expand All @@ -78,14 +78,14 @@ def ranged_type(value_type, min_value, max_value):
max_value - maximum acceptable argument value
"""

def range_checker(arg: str):
def range_checker(arg: str) -> Union[int, float]:
try:
f = value_type(arg)
except ValueError as exc:
raise argparse.ArgumentTypeError(f"must be a valid {value_type}") from exc
if f < min_value or f > max_value:
raise argparse.ArgumentTypeError(f"must be within [{min_value}, {max_value}]")
return f
return f # type: ignore

# Return function handle to checking function
return range_checker
Expand Down
51 changes: 26 additions & 25 deletions MethodicConfigurator/tempcal_imu.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import re
import sys
from argparse import ArgumentParser
from typing import Union

import numpy as np
from matplotlib import pyplot as plt
Expand Down Expand Up @@ -103,7 +104,7 @@ def set_accel_tcal(self, imu, value) -> None:
def set_enable(self, imu, value) -> None:
self.enable[imu] = value

def correction(self, coeff, imu, temperature, axis, cal_temp): # pylint: disable=too-many-arguments
def correction(self, coeff, imu, temperature, axis, cal_temp) -> float: # pylint: disable=too-many-arguments
"""calculate correction from temperature calibration from log data using parameters"""
if self.enable[imu] != 1.0:
return 0.0
Expand All @@ -114,9 +115,9 @@ def correction(self, coeff, imu, temperature, axis, cal_temp): # pylint: disabl
temperature = constrain(temperature, self.tmin[imu], self.tmax[imu])
cal_temp = constrain(cal_temp, self.tmin[imu], self.tmax[imu])
poly = np.poly1d(coeff[axis])
return poly(cal_temp - TEMP_REF) - poly(temperature - TEMP_REF)
return poly(cal_temp - TEMP_REF) - poly(temperature - TEMP_REF) # type: ignore[no-any-return]

def correction_accel(self, imu, temperature):
def correction_accel(self, imu, temperature) -> Vector3:
"""calculate accel correction from temperature calibration from
log data using parameters"""
cal_temp = self.atcal.get(imu, TEMP_REF)
Expand All @@ -126,7 +127,7 @@ def correction_accel(self, imu, temperature):
self.correction(self.acoef[imu], imu, temperature, "Z", cal_temp),
)

def correction_gyro(self, imu, temperature):
def correction_gyro(self, imu, temperature) -> Vector3:
"""calculate gyro correction from temperature calibration from
log data using parameters"""
cal_temp = self.gtcal.get(imu, TEMP_REF)
Expand All @@ -136,7 +137,7 @@ def correction_gyro(self, imu, temperature):
self.correction(self.gcoef[imu], imu, temperature, "Z", cal_temp),
)

def param_string(self, imu):
def param_string(self, imu) -> str:
params = ""
params += f"INS_TCAL{imu+1}_ENABLE 1\n"
params += f"INS_TCAL{imu+1}_TMIN {self.tmin[imu]:.1f}\n"
Expand Down Expand Up @@ -172,15 +173,15 @@ def __update(self, x, y) -> None:
self.vec[i] += y * temp
temp *= x

def __get_polynomial(self):
def __get_polynomial(self) -> np.ndarray:
inv_mat = np.linalg.inv(self.mat)
res = np.zeros(self.porder)
for i in range(self.porder):
for j in range(self.porder):
res[i] += inv_mat[i][j] * self.vec[j]
return res

def polyfit(self, x, y, order):
def polyfit(self, x, y, order) -> np.ndarray:
self.porder = order + 1
self.mat = np.zeros((self.porder, self.porder))
self.vec = np.zeros(self.porder)
Expand All @@ -199,17 +200,17 @@ class IMUData:
"""

def __init__(self) -> None:
self.accel: dict = {}
self.gyro: dict = {}
self.accel: dict[int, dict[str, np.ndarray]] = {}
self.gyro: dict[int, dict[str, np.ndarray]] = {}

def IMUs(self):
def IMUs(self) -> list[int]:
"""return list of IMUs"""
if len(self.accel.keys()) != len(self.gyro.keys()):
print("accel and gyro data doesn't match")
sys.exit(1)
return self.accel.keys()
return self.accel.keys() # type: ignore[return-value]

def add_accel(self, imu, temperature, time, value) -> None:
def add_accel(self, imu: int, temperature, time, value) -> None:
if imu not in self.accel:
self.accel[imu] = {}
for axis in AXEST:
Expand All @@ -220,7 +221,7 @@ def add_accel(self, imu, temperature, time, value) -> None:
self.accel[imu]["Z"] = np.append(self.accel[imu]["Z"], value.z)
self.accel[imu]["time"] = np.append(self.accel[imu]["time"], time)

def add_gyro(self, imu, temperature, time, value) -> None:
def add_gyro(self, imu: int, temperature, time, value) -> None:
if imu not in self.gyro:
self.gyro[imu] = {}
for axis in AXEST:
Expand All @@ -237,7 +238,7 @@ def moving_average(self, data, w):
ret[w:] = ret[w:] - ret[:-w]
return ret[w - 1 :] / w

def FilterArray(self, data, width_s):
def FilterArray(self, data: dict[str, np.ndarray], width_s) -> dict[str, np.ndarray]:
"""apply moving average filter of width width_s seconds"""
nseconds = data["time"][-1] - data["time"][0]
nsamples = len(data["time"])
Expand All @@ -253,36 +254,36 @@ def Filter(self, width_s) -> None:
self.accel[imu] = self.FilterArray(self.accel[imu], width_s)
self.gyro[imu] = self.FilterArray(self.gyro[imu], width_s)

def accel_at_temp(self, imu, axis, temperature):
def accel_at_temp(self, imu: int, axis: str, temperature) -> float:
"""return the accel value closest to the given temperature"""
if temperature < self.accel[imu]["T"][0]:
return self.accel[imu][axis][0]
return self.accel[imu][axis][0] # type: ignore[no-any-return]
for i in range(len(self.accel[imu]["T"]) - 1):
if self.accel[imu]["T"][i] <= temperature <= self.accel[imu]["T"][i + 1]:
v1 = self.accel[imu][axis][i]
v2 = self.accel[imu][axis][i + 1]
p = (temperature - self.accel[imu]["T"][i]) / (self.accel[imu]["T"][i + 1] - self.accel[imu]["T"][i])
return v1 + (v2 - v1) * p
return self.accel[imu][axis][-1]
return v1 + (v2 - v1) * p # type: ignore[no-any-return]
return self.accel[imu][axis][-1] # type: ignore[no-any-return]

def gyro_at_temp(self, imu, axis, temperature):
def gyro_at_temp(self, imu: int, axis: str, temperature) -> float:
"""return the gyro value closest to the given temperature"""
if temperature < self.gyro[imu]["T"][0]:
return self.gyro[imu][axis][0]
return self.gyro[imu][axis][0] # type: ignore[no-any-return]
for i in range(len(self.gyro[imu]["T"]) - 1):
if self.gyro[imu]["T"][i] <= temperature <= self.gyro[imu]["T"][i + 1]:
v1 = self.gyro[imu][axis][i]
v2 = self.gyro[imu][axis][i + 1]
p = (temperature - self.gyro[imu]["T"][i]) / (self.gyro[imu]["T"][i + 1] - self.gyro[imu]["T"][i])
return v1 + (v2 - v1) * p
return self.gyro[imu][axis][-1]
return v1 + (v2 - v1) * p # type: ignore[no-any-return]
return self.gyro[imu][axis][-1] # type: ignore[no-any-return]


def constrain(value, minv, maxv):
def constrain(value, minv, maxv) -> Union[float, int]:
"""Constrain a value to a range."""
value = min(minv, value)
value = max(maxv, value)
return value
return value # type: ignore


def IMUfit( # noqa: PLR0915 pylint: disable=too-many-locals, too-many-branches, too-many-statements, too-many-arguments
Expand Down Expand Up @@ -487,7 +488,7 @@ def IMUfit( # noqa: PLR0915 pylint: disable=too-many-locals, too-many-branches,
plt.show()


def generate_calibration_file(outfile, online, progress_callback, data, c): # pylint: disable=too-many-locals
def generate_calibration_file(outfile, online, progress_callback, data, c) -> tuple[Coefficients, Coefficients]: # pylint: disable=too-many-locals
clog = c
c = Coefficients()

Expand Down
2 changes: 1 addition & 1 deletion copy_param_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@


# Function to get all subdirectories excluding the source (do not copy onto itself)
def get_subdirectories(base_dir, exclude_source=True):
def get_subdirectories(base_dir, exclude_source=True) -> list[str]:
subdirs = []
for root, dirs, _ in os.walk(base_dir):
rel_dir = os.path.relpath(root, base_dir)
Expand Down
Loading

0 comments on commit cc42993

Please sign in to comment.