Skip to content

Commit

Permalink
Add and remove interfaces while the device is connected
Browse files Browse the repository at this point in the history
Signed-off-by: Simone Orru <simone.orru@secomind.com>
  • Loading branch information
sorru94 committed Sep 26, 2023
1 parent a0a8de8 commit ae648c9
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 22 deletions.
2 changes: 2 additions & 0 deletions astarte/device/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@
InterfaceFileDecodeError,
InterfaceNotFoundError,
JWTGenerationError,
DeviceConnectingError,
DeviceDisconnectedError,
)
27 changes: 27 additions & 0 deletions astarte/device/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ def delete_prop(self, interface: str, path: str) -> None:
The path to the property endpoint.
"""

@abstractmethod
def delete_props_from_interface(self, interface: str) -> None:
"""
Delete all the properties from the database belonging to an interface.
Parameters
----------
interface : str
The interface name.
"""

@abstractmethod
def clear(self) -> None:
"""
Expand Down Expand Up @@ -206,6 +217,22 @@ def delete_prop(self, interface: str, path: str) -> None:
)
connection.commit()

def delete_props_from_interface(self, interface: str) -> None:
"""
Delete all the properties from the database belonging to an interface.
Parameters
----------
interface : str
See documentation in AstarteDatabase.
"""
connection = sqlite3.connect(self.__database_path)
connection.cursor().execute(
"DELETE FROM properties WHERE interface=?",
(interface,),
)
connection.commit()

def clear(self) -> None:
"""
Fully clear the database of all the properties.
Expand Down
6 changes: 0 additions & 6 deletions astarte/device/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ def add_interface_from_json(self, interface_json: dict):
"""
Adds an interface to the device.
It has to be called before :py:func:`connect`, as it will be used for building the device
introspection.
Parameters
----------
interface_json : dict
Expand Down Expand Up @@ -157,9 +154,6 @@ def remove_interface(self, interface_name: str) -> None:
"""
Removes an Interface from the device.
Removes an Interface definition from the device. It has to be called before
:py:func:`connect`, as it will be used for building the device introspection.
Parameters
----------
interface_name : str
Expand Down
74 changes: 72 additions & 2 deletions astarte/device/device_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@
from astarte.device import crypto, pairing_handler
from astarte.device.interface import Interface
from astarte.device.database import AstarteDatabaseSQLite, AstarteDatabase
from astarte.device.exceptions import ValidationError, PersistencyDirectoryNotFoundError, APIError
from astarte.device.exceptions import (
ValidationError,
PersistencyDirectoryNotFoundError,
APIError,
InterfaceNotFoundError,
DeviceConnectingError,
DeviceDisconnectedError,
)

from astarte.device.device import Device

Expand Down Expand Up @@ -147,6 +154,7 @@ def __init__(
# TODO: Implement device registration using token on connect
# self.__jwt_token: str | None = None
self.__is_crypto_setup = False
self.__is_connecting = False
self.__is_connected = False
self.__ignore_ssl_errors = ignore_ssl_errors

Expand All @@ -172,8 +180,18 @@ def add_interface_from_json(self, interface_json: dict):
----------
interface_json : dict
See parent class.
Raises
------
DeviceConnectingError
When attempting to add an interface while the device if performing a connection.
"""
if self.__is_connecting:
raise DeviceConnectingError("Interfaces cannot be added while device is connecting.")
self._introspection.add_interface(interface_json)
if self.__is_connected:
self.__subscribe_interface(Interface(interface_json))
self.__send_introspection()

def remove_interface(self, interface_name: str) -> None:
"""
Expand All @@ -183,8 +201,25 @@ def remove_interface(self, interface_name: str) -> None:
----------
interface_name : str
See parent class.
"""
Raises
------
DeviceConnectingError
When attempting to add an interface while the device if performing a connection.
InterfaceNotFoundError
When the provided interface can't be found in the device introspection.
"""
if self.__is_connecting:
raise DeviceConnectingError("Interfaces cannot be removed while device is connecting.")
interface = self._introspection.get_interface(interface_name)
if interface is None:
raise InterfaceNotFoundError(f"Interface {interface_name} not found in introspection.")
self._introspection.remove_interface(interface_name)
if self.__is_connected:
if interface.is_type_properties():
self.__prop_database.delete_props_from_interface(interface_name)
self.__send_introspection()
self.__unsubscribe_interface(interface)

def get_device_id(self) -> str:
"""
Expand Down Expand Up @@ -272,6 +307,7 @@ def connect(self) -> None:
if (parsed_url.hostname is None) or (parsed_url.port is None):
raise APIError("Received invalid broker URL.")

self.__is_connecting = True
self.__mqtt_client.connect_async(parsed_url.hostname, parsed_url.port)
self.__mqtt_client.loop_start()

Expand Down Expand Up @@ -326,12 +362,17 @@ def _send_generic(
Raises
------
DeviceDisconnectedError
When this function is called while the device is not connected to Astarte.
ValidationError
When:
- Attempting to send to a server owned interface.
- Sending to an endpoint that is not present in the interface.
- The payload validation fails.
"""
if not self.__is_connected:
raise DeviceDisconnectedError("Send operation failed due to missing connection.")

bson_payload = b""
if payload is not None:
object_payload = {"v": payload}
Expand Down Expand Up @@ -388,6 +429,7 @@ def __on_connect(self, _client, _userdata, flags: dict, rc):
return

self.__is_connected = True
self.__is_connecting = False

if not flags["session present"]:
logging.debug("Session flag is not present, performing a clean session procedure")
Expand Down Expand Up @@ -517,8 +559,36 @@ def __setup_subscriptions(self) -> None:
f"{self.__get_base_topic()}/control/consumer/properties", qos=2
)
for interface in self._introspection.get_all_server_owned_interfaces():
self.__subscribe_interface(interface)

def __subscribe_interface(self, interface: Interface) -> None:
"""
Subscribe an interface to a topic from the MQTT broker.
This interface only performs the subscription for server owned interfaces.
Parameters
----------
interface: Interface
The server owned interface to use for the subscription.
"""
if interface.is_server_owned():
self.__mqtt_client.subscribe(f"{self.__get_base_topic()}/{interface.name}/#", qos=2)

def __unsubscribe_interface(self, interface: Interface) -> None:
"""
Unsubscribe an interface to a topic from the MQTT broker.
This interface only performs the unsubscription for server owned interfaces.
Parameters
----------
interface: Interface
The server owned interface to use for the unsubscription.
"""
if interface.is_server_owned():
self.__mqtt_client.unsubscribe(f"{self.__get_base_topic()}/{interface.name}/#")

def __send_introspection(self) -> None:
"""
Utility function used to send the introspection to Astarte
Expand Down
23 changes: 23 additions & 0 deletions astarte/device/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,26 @@ class JWTGenerationError(AstarteError):

def __init__(self, msg):
self.msg = msg


class DeviceConnectingError(AstarteError):
"""Exception raised when an operation is attempted while the device MQTT client has been
started but is not yet connected.
Attributes:
msg -- A message error carrying further details
"""

def __init__(self, msg):
self.msg = msg


class DeviceDisconnectedError(AstarteError):
"""Exception raised if attempting a send while the device is disconnected from Astarte.
Attributes:
msg -- A message error carrying further details
"""

def __init__(self, msg):
self.msg = msg
20 changes: 20 additions & 0 deletions tests/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,26 @@ def test_delete_property(self, mock_sqlite3_connect):
)
mock_connection.commit.assert_called_once_with()

@mock.patch("astarte.device.database.sqlite3.connect")
def test_delete_props_from_interface(self, mock_sqlite3_connect):
mock_database_name = mock.MagicMock()

mock_connection = mock_sqlite3_connect.return_value
mock_cursor = mock_sqlite3_connect.return_value.cursor.return_value

db = database.AstarteDatabaseSQLite(mock_database_name)
mock_sqlite3_connect.reset_mock()

mock_interface = mock.MagicMock()
mock_path = mock.MagicMock()
db.delete_props_from_interface(mock_interface)

mock_sqlite3_connect.assert_called_once_with(mock_database_name)
mock_sqlite3_connect.return_value.cursor.assert_called_once_with()
execute_expected_arg = "DELETE FROM properties WHERE interface=?"
mock_cursor.execute.assert_called_once_with(execute_expected_arg, (mock_interface))
mock_connection.commit.assert_called_once_with()

@mock.patch("astarte.device.database.sqlite3.connect")
def test_clear(self, mock_sqlite3_connect):
mock_database_name = mock.MagicMock()
Expand Down
Loading

0 comments on commit ae648c9

Please sign in to comment.