Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mypy #235

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/typing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
name: Typing

on:
push:
branches:
- main
pull_request:
workflow_dispatch:

env:
DEFAULT_PYTHON: "3.11"

jobs:
mypy:
name: mypy
runs-on: ubuntu-latest
steps:
- name: ⤵️ Check out code from GitHub
uses: actions/checkout@v4.1.1
- name: 🏗 Set up Poetry
run: pipx install poetry
- name: 🏗 Set up Python ${{ env.DEFAULT_PYTHON }}
id: python
uses: actions/setup-python@v5.0.0
with:
python-version: ${{ env.DEFAULT_PYTHON }}
cache: "poetry"
- name: 🏗 Install workflow dependencies
run: |
poetry config virtualenvs.create true
poetry config virtualenvs.in-project true
- name: 🏗 Install dependencies
run: poetry install --no-interaction
- name: 🚀 Run mypy
run: poetry run mypy src tests
60 changes: 59 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,43 @@ pydantic = ">=1"
pytest = "^8.0"
pytest-asyncio = "^0.23"
ruff = "^0.2.2"
mypy = "^1.8.0"

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"


[tool.mypy]
# Specify the target platform details in config, so your developers are
# free to run mypy on Windows, Linux, or macOS and get consistent
# results.
platform = "linux"
python_version = "3.10"

# show error messages from unrelated files
follow_imports = "normal"

# suppress errors about unsatisfied imports
ignore_missing_imports = true

# be strict
check_untyped_defs = true
disallow_any_generics = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
strict_optional = true
warn_incomplete_stub = true
warn_no_return = true
warn_redundant_casts = true
warn_return_any = true
warn_unused_configs = true
warn_unused_ignores = true

[tool.ruff]
line-length = 80

Expand All @@ -61,6 +93,7 @@ ignore = [
"DTZ005",
"EM101",
"EM102",
"FBT001",
"FBT002",
"FBT003",
"G002",
Expand All @@ -76,6 +109,7 @@ ignore = [
"D102", # documentation info, should be removed after fixes
"D103", # documentation info, should be removed after fixes
"PERF102",
"PGH003",
"PLE1205",
"PLR2004", # Just annoying, not really useful
"PLR0912",
Expand All @@ -84,6 +118,7 @@ ignore = [
"RET507",
"SIM102",
"SLF001",
"S101",
"T201",
"UP007",
]
Expand Down
24 changes: 13 additions & 11 deletions roombapy/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@ class RoombaDiscovery:
udp_port = 5678
roomba_message = "irobotmcs"
amount_of_broadcasted_messages = 5
server_socket = None
log = None
server_socket: socket.socket
log: logging.Logger

def __init__(self):
def __init__(self) -> None:
"""Init discovery."""
self.server_socket = _get_socket()
self.log = logging.getLogger(__name__)

def find(self, ip=None):
def find(
self, ip: str | None = None
) -> Optional[RoombaInfo] | set[RoombaInfo]:
if ip is not None:
return self.get(ip)
return self.get_all()

def get_all(self):
def get_all(self) -> set[RoombaInfo]:
self._start_server()
self._broadcast_message(self.amount_of_broadcasted_messages)
robots = set()
Expand All @@ -40,12 +42,12 @@ def get_all(self):
break
return robots

def get(self, ip):
def get(self, ip: str) -> Optional[RoombaInfo]:
self._start_server()
self._send_message(ip)
return self._get_response(ip)

def _get_response(self, ip=None):
def _get_response(self, ip: str | None = None) -> Optional[RoombaInfo]:
try:
while True:
raw_response, addr = self.server_socket.recvfrom(1024)
Expand All @@ -63,20 +65,20 @@ def _get_response(self, ip=None):
self.log.info("Socket timeout")
return None

def _broadcast_message(self, amount):
def _broadcast_message(self, amount: int) -> None:
for i in range(amount):
self.server_socket.sendto(
self.roomba_message.encode(), (self.udp_address, self.udp_port)
)
self.log.debug("Broadcast message sent: %s", i)

def _send_message(self, udp_address):
def _send_message(self, udp_address: str) -> None:
self.server_socket.sendto(
self.roomba_message.encode(), (udp_address, self.udp_port)
)
self.log.debug("Message sent")

def _start_server(self):
def _start_server(self) -> None:
self.server_socket.bind((self.udp_bind_address, self.udp_port))
self.log.debug("Socket server started, port %s", self.udp_port)

Expand All @@ -99,7 +101,7 @@ def _decode_data(raw_response: bytes) -> Optional[RoombaInfo]:
return None


def _get_socket():
def _get_socket() -> socket.socket:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
server_socket.settimeout(5)
Expand Down
57 changes: 38 additions & 19 deletions roombapy/entry_points.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,50 @@
from __future__ import annotations

import sys

from roombapy import RoombaFactory
from roombapy import RoombaFactory, RoombaInfo
from roombapy.discovery import RoombaDiscovery
from roombapy.getpassword import RoombaPassword


def discovery():
def discovery() -> None:
roomba_ip = _get_ip_from_arg()

roomba_discovery = RoombaDiscovery()
if roomba_ip is not None:
print(roomba_discovery.find(roomba_ip))
return

robots_info = roomba_discovery.find()
for robot in robots_info:
print(robot)
if isinstance(robots_info, set):
for robot in robots_info:
print(robot)
else:
print(robots_info)


def password():
def password() -> None:
roomba_ip = _get_ip_from_arg()
_validate_ip(roomba_ip)
_wait_for_input()

roomba_discovery = RoombaDiscovery()
roomba_info = roomba_discovery.find(roomba_ip)
assert roomba_info is not None
_validate_roomba_info(roomba_info)

roomba_password = RoombaPassword(roomba_ip)
info = roomba_info
if isinstance(roomba_info, set):
info = next(iter(roomba_info))

assert isinstance(info, RoombaInfo)

roomba_password = RoombaPassword(info.ip)
found_password = roomba_password.get_password()
roomba_info.password = found_password
print(roomba_info)
info.password = found_password
print(info)


def connect():
def connect() -> None:
roomba_ip = _get_ip_from_arg()
_validate_ip(roomba_ip)

Expand All @@ -42,34 +53,42 @@ def connect():

roomba_discovery = RoombaDiscovery()
roomba_info = roomba_discovery.find(roomba_ip)
assert roomba_info is not None
_validate_roomba_info(roomba_info)

roomba = RoombaFactory.create_roomba(
roomba_info.ip, roomba_info.blid, roomba_password
)
info = roomba_info
if isinstance(roomba_info, set):
info = next(iter(roomba_info))

assert isinstance(info, RoombaInfo)
assert roomba_password

roomba = RoombaFactory.create_roomba(info.ip, info.blid, roomba_password)
roomba.register_on_message_callback(lambda msg: print(msg))
roomba.connect()

while True:
pass


def _validate_ip(ip):
def _validate_ip(ip: str | None) -> None:
if ip is None:
raise Exception("ip cannot be null")


def _validate_password(ip):
def _validate_password(ip: str | None) -> None:
if ip is None:
raise Exception("password cannot be null")


def _validate_roomba_info(roomba_info):
def _validate_roomba_info(
roomba_info: RoombaInfo | set[RoombaInfo] | None,
) -> None:
if roomba_info is None:
raise Exception("cannot find roomba")


def _wait_for_input():
def _wait_for_input() -> None:
print(
"Roomba have to be on Home Base powered on.\n"
"Press and hold HOME button until you hear series of tones.\n"
Expand All @@ -78,13 +97,13 @@ def _wait_for_input():
input("Press Enter to continue...")


def _get_ip_from_arg():
def _get_ip_from_arg() -> str | None:
if len(sys.argv) < 2:
return None
return str(sys.argv[1])


def _get_password_from_arg():
def _get_password_from_arg() -> str | None:
if len(sys.argv) < 3:
return None
return str(sys.argv[2])
Loading
Loading