Skip to content

Commit

Permalink
Choose pi controller based on pi model
Browse files Browse the repository at this point in the history
Pi5 has a new SOC routing, so we need another GPIO library, RPi.GPIO has not support. Reads out pi config and chooses the according controller for the board. In case the device is a RPI5, we choose the new controller for the device.
  • Loading branch information
AndreWohnsland committed Sep 14, 2024
1 parent c3a0b55 commit eb678e3
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/machine/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from src.machine.generic_board import GenericController
from src.machine.interface import PinController
from src.machine.leds import LedController
from src.machine.raspberry import RpiController
from src.machine.raspberry import choose_pi_controller
from src.machine.reverter import Reverter
from src.models import Ingredient
from src.utils import time_print
Expand Down Expand Up @@ -45,7 +45,7 @@ def __init__(self):
def _chose_controller(self) -> PinController:
"""Select the controller class for the Pin."""
if cfg.MAKER_BOARD == "RPI":
return RpiController(cfg.MAKER_PINS_INVERTED)
return choose_pi_controller(cfg.MAKER_PINS_INVERTED)
# In case none is found, fall back to generic using python-periphery
return GenericController(cfg.MAKER_PINS_INVERTED)

Expand Down
102 changes: 102 additions & 0 deletions src/machine/raspberry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import contextlib
from typing import Literal, Optional

from src.logger_handler import LoggerHandler
from src.machine.interface import GPIOController, PinController
from src.utils import time_print

logger = LoggerHandler("RpiController")

Expand All @@ -14,6 +16,28 @@
except ModuleNotFoundError:
DEV = True

try:
# pylint: disable=import-error
from gpiozero import InputDevice, OutputDevice # type: ignore

ZERO_DEV = False
except ModuleNotFoundError:
ZERO_DEV = True


def is_rpi5() -> bool:
model = "No Model"
with contextlib.suppress(FileNotFoundError), open("/proc/device-tree/model") as f:
model = f.read()
return "Raspberry Pi 5" in model


def choose_pi_controller(inverted: bool) -> PinController:
"""Need to choose another controller for Raspberry Pi 5."""
if is_rpi5():
return Rpi5Controller(inverted)
return RpiController(inverted)


class RpiController(PinController):
"""Controller class to control Raspberry Pi pins."""
Expand Down Expand Up @@ -71,6 +95,84 @@ def read_pin(self, pin: int) -> bool:
return False


class Rpi5Controller(PinController):
"""Controller class to control Raspberry Pi pins using gpiozero."""

def __init__(self, inverted: bool) -> None:
super().__init__()
self.inverted = inverted
self.devenvironment = ZERO_DEV
self.low: bool = False
self.high: bool = True
if inverted:
self.low, self.high = self.high, self.low
self.active_high = not inverted # Control active_high based on the inverted parameter
self.gpios = {} # Dictionary to store GPIO pin objects (InputDevice/OutputDevice)
self.dev_displayed = False

def initialize_pin_list(self, pin_list: list[int], is_input: bool = False, pull_down: bool = True):
"""Set up the given pin list using gpiozero with error handling."""
if not self.dev_displayed:
time_print(f"Devenvironment on the RPi5 module is {'on' if self.devenvironment else 'off'}")
self.dev_displayed = True

if self.devenvironment:
logger.log_event("WARNING", f"Could not import gpiozero. Will not be able to control pins: {pin_list}")
return
for pin in pin_list:
try:
if is_input:
# Set pull-up/down configuration via InputDevice
pull_up = not pull_down
self.gpios[pin] = InputDevice(pin, pull_up=pull_up)
else:
# Initialize OutputDevice with active_high based on the inverted flag
self.gpios[pin] = OutputDevice(pin, initial_value=self.low, active_high=self.active_high)
except Exception as e:
# Catch any error and continue, printing the pin and error message
logger.log_event("WARNING", f"Error: Could not initialize GPIO pin {pin}. Reason: {e!s}")

def activate_pin_list(self, pin_list: list[int]):
"""Activates the given pin list (sets to high)."""
if self.devenvironment:
return
for pin in pin_list:
if pin in self.gpios and isinstance(self.gpios[pin], OutputDevice):
self.gpios[pin].on()
else:
logger.log_event(
"WARNING", f"Could not activate GPIO pin {pin}. Reason: Pin not activated or not an OutputDevice."
)

def close_pin_list(self, pin_list: list[int]):
"""Close (deactivate) the given pin_list (sets to low)."""
if self.devenvironment:
return
for pin in pin_list:
if pin in self.gpios and isinstance(self.gpios[pin], OutputDevice):
self.gpios[pin].off()

def cleanup_pin_list(self, pin_list: Optional[list[int]] = None):
"""Clean up the given pin list, or all pins if none is given."""
if pin_list is None:
# Clean up all pins
for pin, device in self.gpios.items():
device.close()
self.gpios.clear()
else:
for pin in pin_list:
if pin in self.gpios:
self.gpios[pin].close()
del self.gpios[pin]

def read_pin(self, pin: int) -> bool:
"""Return the state of the given pin (True if high, False if low)."""
if pin in self.gpios and isinstance(self.gpios[pin], InputDevice):
return self.gpios[pin].is_active # True if high, False if low
logger.log_event("WARNING", f"Could not read GPIO pin {pin}. Reason: Pin not found or not an InputDevice.")
return False


class RaspberryGPIO(GPIOController):
def __init__(self, inverted: bool, pin: int):
low = GPIO.LOW if not DEV else 0
Expand Down

0 comments on commit eb678e3

Please sign in to comment.