Skip to content

Asyncio Interrupts

Ellis Percival edited this page Feb 12, 2021 · 6 revisions

The way that interrupts are exposed and supported by different hardware modules and software libraries can vary greatly.

Sometimes the interrupts are available as software callbacks on the hardware's library. Sometimes the hardware will have registers that store which pin triggered the interrupt and what value it had. Sometimes the hardware will simply change the logic level on a dedicated pin to indicate that one of its inputs changed.

MQTT IO attempts to accommodate all of these various configurations as best it can.

Sometimes, like with the Raspberry Pi, this means that we get an interrupt on a pin, but still have to poll the pin for its current value (this only applies when the interrupt is configured to fire on both rising AND falling edges, as we can infer the value when it only triggers on a rising OR falling edge.)

Other times, like with the MCP23017, we get no information from the software library that an interrupt has occurred, but by connecting its interrupt output pin to a Raspberry Pi input pin, we can use the software callback to trigger a read of the MCP23017's registers that specify which pin changed, and what value it got set to.

This is configured by adding the interrupt_for section to a digital_inputs entry in the config file:

gpio_modules:
  - name: rpi
    module: raspberrypi

  - name: mcp
    module: mcp23017

digital_inputs:
  - name: pi5
    module: rpi
    pin: 5
    interrupt: falling
    interrupt_for:
      - mcp3

  - name: mcp3
    module: mcp23017
    pin: 3
    interrupt: both

It's up to each GPIO module to specify which interrupt features are supported by setting flags on its INTERRUPT_SUPPORT constant, as well as implement the methods to support them:

from . import GenericGPIO, InterruptSupport

class GPIO(GenericGPIO):
    """
    Implementation of GPIO class for the MCP23017 IO expander chip.
    Pin numbers 0 - 15.
    """

    INTERRUPT_SUPPORT = (
        InterruptSupport.FLAG_REGISTER
        | InterruptSupport.CAPTURE_REGISTER
        | InterruptSupport.INTERRUPT_PIN
        | InterruptSupport.SET_TRIGGERS
    )

    def get_int_pins(self):
        """
        Read the register and return a list of pins that triggered the interrupt.
        """
        return self.io.int_flag

    def get_captured_int_pin_values(self, pins=None):
        """
        Read the register that logs the values of the pins at the point of
        the last interrupt, and return a dict.

        If pins is None, then we get the whole register and return it, otherwise
        just return the values for the pins requested.
        """
        values = self.io.int_cap
        if pins is None:
            pins = range(16)
        pin_values = {}
        for pin in pins:
            pin_values[pin] = values[pin]
        return pin_values

    ...

This way, when an interrupt pin (for example on Raspberry Pi) is configured as an interrupt_for a pin on this module, the GenericGPIO.get_interrupt_values_remote() method will use the functions available to establish which pins were changed and what their values were.

If the hardware doesn't support these features, then GenericGPIO.get_interrupt_values_remote() will just fall back to polling the pin(s) and returning those values instead.

Clone this wiki locally