Skip to content

Commit

Permalink
Merge pull request #122 from sorru94/add-rmv-interfaces-while-connected
Browse files Browse the repository at this point in the history
Add and remove interfaces while connected
  • Loading branch information
harlem88 authored Oct 5, 2023
2 parents 15bbe45 + b71be12 commit c356ebb
Show file tree
Hide file tree
Showing 12 changed files with 868 additions and 31 deletions.
14 changes: 14 additions & 0 deletions .github/workflows/e2e-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ jobs:
echo "E2E_DEVICE_2_ID=$DEVICE_2_ID" >> $GITHUB_ENV
CREDENTIALS_SECRET_2=$(astartectl pairing agent register --compact-output -- "$DEVICE_2_ID")
echo "E2E_CREDENTIALS_SECRET_2=$CREDENTIALS_SECRET_2" >> $GITHUB_ENV
- name: Setup env variables for device 3
run: |
DEVICE_3_ID=$(astartectl utils device-id generate-random)
echo "E2E_DEVICE_3_ID=$DEVICE_3_ID" >> $GITHUB_ENV
CREDENTIALS_SECRET_3=$(astartectl pairing agent register --compact-output -- "$DEVICE_3_ID")
echo "E2E_CREDENTIALS_SECRET_3=$CREDENTIALS_SECRET_3" >> $GITHUB_ENV
- name: Install the astarte device Python module
run: |
python3 -m pip install -e .[e2e]
Expand All @@ -81,3 +87,11 @@ jobs:
command: |
export REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
python3 e2etest/persistency/main.py
- name: Run test for reconnection
uses: nick-fields/retry@v2
with:
timeout_seconds: 30
max_attempts: 2
command: |
export REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
python3 e2etest/reconnection/main.py
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.12.1] - Unreleased

### Added
- Adding or removing interfaces from a device while the device is connected.
If an interface is added or removed the new device introspection is immediately sent to Astarte.

### Fixed
- False values on boolean endpoints for server owned interfaces are correctly processed.

Expand Down
2 changes: 2 additions & 0 deletions astarte/device/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@
InterfaceFileDecodeError,
InterfaceNotFoundError,
JWTGenerationError,
DeviceConnectingError,
DeviceDisconnectedError,
)
27 changes: 27 additions & 0 deletions astarte/device/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ def delete_prop(self, interface: str, path: str) -> None:
The path to the property endpoint.
"""

@abstractmethod
def delete_props_from_interface(self, interface: str) -> None:
"""
Delete all the properties from the database belonging to an interface.
Parameters
----------
interface : str
The interface name.
"""

@abstractmethod
def clear(self) -> None:
"""
Expand Down Expand Up @@ -206,6 +217,22 @@ def delete_prop(self, interface: str, path: str) -> None:
)
connection.commit()

def delete_props_from_interface(self, interface: str) -> None:
"""
Delete all the properties from the database belonging to an interface.
Parameters
----------
interface : str
See documentation in AstarteDatabase.
"""
connection = sqlite3.connect(self.__database_path)
connection.cursor().execute(
"DELETE FROM properties WHERE interface=?",
(interface,),
)
connection.commit()

def clear(self) -> None:
"""
Fully clear the database of all the properties.
Expand Down
6 changes: 0 additions & 6 deletions astarte/device/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ def add_interface_from_json(self, interface_json: dict):
"""
Adds an interface to the device.
It has to be called before :py:func:`connect`, as it will be used for building the device
introspection.
Parameters
----------
interface_json : dict
Expand Down Expand Up @@ -157,9 +154,6 @@ def remove_interface(self, interface_name: str) -> None:
"""
Removes an Interface from the device.
Removes an Interface definition from the device. It has to be called before
:py:func:`connect`, as it will be used for building the device introspection.
Parameters
----------
interface_name : str
Expand Down
70 changes: 62 additions & 8 deletions astarte/device/device_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,35 @@
from collections.abc import Callable
from datetime import datetime
from urllib.parse import urlparse
from enum import Enum

import bson
import paho.mqtt.client as mqtt
from astarte.device import crypto, pairing_handler
from astarte.device.interface import Interface
from astarte.device.database import AstarteDatabaseSQLite, AstarteDatabase
from astarte.device.exceptions import ValidationError, PersistencyDirectoryNotFoundError, APIError
from astarte.device.exceptions import (
ValidationError,
PersistencyDirectoryNotFoundError,
APIError,
InterfaceNotFoundError,
DeviceConnectingError,
DeviceDisconnectedError,
)

from astarte.device.device import Device


class ConnectionState(Enum):
"""
Possible connection states for a device.
"""

CONNECTING = 1
CONNECTED = 2
DISCONNECTED = 3


class DeviceMqtt(Device): # pylint: disable=too-many-instance-attributes
"""
Astarte device implementation using the MQTT transport protocol.
Expand Down Expand Up @@ -147,7 +165,7 @@ def __init__(
# TODO: Implement device registration using token on connect
# self.__jwt_token: str | None = None
self.__is_crypto_setup = False
self.__is_connected = False
self.__connection_state = ConnectionState.DISCONNECTED
self.__ignore_ssl_errors = ignore_ssl_errors

self.on_connected: Callable[[DeviceMqtt], None] | None = None
Expand All @@ -172,8 +190,20 @@ def add_interface_from_json(self, interface_json: dict):
----------
interface_json : dict
See parent class.
Raises
------
DeviceConnectingError
When attempting to add an interface while the device if performing a connection.
"""
if self.__connection_state is ConnectionState.CONNECTING:
raise DeviceConnectingError("Interfaces cannot be added while device is connecting.")
self._introspection.add_interface(interface_json)
if self.__connection_state is ConnectionState.CONNECTED:
interface = Interface(interface_json)
if interface.is_server_owned():
self.__mqtt_client.subscribe(f"{self.__get_base_topic()}/{interface.name}/#", qos=2)
self.__send_introspection()

def remove_interface(self, interface_name: str) -> None:
"""
Expand All @@ -183,8 +213,26 @@ def remove_interface(self, interface_name: str) -> None:
----------
interface_name : str
See parent class.
"""
Raises
------
DeviceConnectingError
When attempting to add an interface while the device if performing a connection.
InterfaceNotFoundError
When the provided interface can't be found in the device introspection.
"""
if self.__connection_state is ConnectionState.CONNECTING:
raise DeviceConnectingError("Interfaces cannot be removed while device is connecting.")
interface = self._introspection.get_interface(interface_name)
if interface is None:
raise InterfaceNotFoundError(f"Interface {interface_name} not found in introspection.")
self._introspection.remove_interface(interface_name)
if self.__connection_state is ConnectionState.CONNECTED:
if interface.is_type_properties():
self.__prop_database.delete_props_from_interface(interface_name)
self.__send_introspection()
if interface.is_server_owned():
self.__mqtt_client.unsubscribe(f"{self.__get_base_topic()}/{interface.name}/#")

def get_device_id(self) -> str:
"""
Expand Down Expand Up @@ -247,7 +295,7 @@ def connect(self) -> None:
APIError
When the obtained broker URL is invalid.
"""
if self.__is_connected:
if self.__connection_state is ConnectionState.CONNECTED:
return

self.__setup_crypto()
Expand All @@ -272,6 +320,7 @@ def connect(self) -> None:
if (parsed_url.hostname is None) or (parsed_url.port is None):
raise APIError("Received invalid broker URL.")

self.__connection_state = ConnectionState.CONNECTING
self.__mqtt_client.connect_async(parsed_url.hostname, parsed_url.port)
self.__mqtt_client.loop_start()

Expand All @@ -285,7 +334,7 @@ def disconnect(self) -> None:
should result in the callback on_disconnected being called with return code parameter 0,
meaning the disconnection happened following an explicit disconnection request.
"""
if not self.__is_connected:
if self.__connection_state is not ConnectionState.CONNECTED:
return

self.__mqtt_client.disconnect()
Expand All @@ -300,7 +349,7 @@ def is_connected(self) -> bool:
bool
The device connection status.
"""
return self.__is_connected
return self.__connection_state is ConnectionState.CONNECTED

def _send_generic(
self,
Expand All @@ -326,12 +375,17 @@ def _send_generic(
Raises
------
DeviceDisconnectedError
When this function is called while the device is not connected to Astarte.
ValidationError
When:
- Attempting to send to a server owned interface.
- Sending to an endpoint that is not present in the interface.
- The payload validation fails.
"""
if self.__connection_state is not ConnectionState.CONNECTED:
raise DeviceDisconnectedError("Send operation failed due to missing connection.")

bson_payload = b""
if payload is not None:
object_payload = {"v": payload}
Expand Down Expand Up @@ -387,7 +441,7 @@ def __on_connect(self, _client, _userdata, flags: dict, rc):
logging.error("Connection failed! %s", rc)
return

self.__is_connected = True
self.__connection_state = ConnectionState.CONNECTED

if not flags["session present"]:
logging.debug("Session flag is not present, performing a clean session procedure")
Expand Down Expand Up @@ -420,7 +474,7 @@ def __on_disconnect(self, _client, _userdata, rc):
-------
"""
self.__is_connected = False
self.__connection_state = ConnectionState.DISCONNECTED

if self.on_disconnected:
if self._loop:
Expand Down
23 changes: 23 additions & 0 deletions astarte/device/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,26 @@ class JWTGenerationError(AstarteError):

def __init__(self, msg):
self.msg = msg


class DeviceConnectingError(AstarteError):
"""Exception raised when an operation is attempted while the device MQTT client has been
started but is not yet connected.
Attributes:
msg -- A message error carrying further details
"""

def __init__(self, msg):
self.msg = msg


class DeviceDisconnectedError(AstarteError):
"""Exception raised if attempting a send while the device is disconnected from Astarte.
Attributes:
msg -- A message error carrying further details
"""

def __init__(self, msg):
self.msg = msg
4 changes: 4 additions & 0 deletions e2etest/base/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ def main(cb_loop: asyncio.AbstractEventLoop, test_cfg: TestCfg):

time.sleep(1)

if not device.is_connected():
print("Connection failed.", flush=True)
sys.exit(1)

test_datastream_from_device_to_server(device, test_cfg)

time.sleep(1)
Expand Down
5 changes: 5 additions & 0 deletions e2etest/persistency/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,13 @@ def main(cb_loop: asyncio.AbstractEventLoop, test_cfg: TestCfg):
device.on_data_received = on_data_received_cbk
device.on_disconnected = on_disconnected_cbk
device.connect()

time.sleep(1)

if not device.is_connected():
print("Connection failed.", flush=True)
sys.exit(1)

assert peek_database(persistency_dir, test_cfg.device_id) == list()
assert peek_astarte(test_cfg) == {
test_cfg.interface_device_prop: {},
Expand Down
Loading

0 comments on commit c356ebb

Please sign in to comment.