diff --git a/.github/workflows/e2e-test.yaml b/.github/workflows/e2e-test.yaml index 1b65f021..174c0d26 100644 --- a/.github/workflows/e2e-test.yaml +++ b/.github/workflows/e2e-test.yaml @@ -61,6 +61,12 @@ jobs: echo "E2E_DEVICE_2_ID=$DEVICE_2_ID" >> $GITHUB_ENV CREDENTIALS_SECRET_2=$(astartectl pairing agent register --compact-output -- "$DEVICE_2_ID") echo "E2E_CREDENTIALS_SECRET_2=$CREDENTIALS_SECRET_2" >> $GITHUB_ENV + - name: Setup env variables for device 3 + run: | + DEVICE_3_ID=$(astartectl utils device-id generate-random) + echo "E2E_DEVICE_3_ID=$DEVICE_3_ID" >> $GITHUB_ENV + CREDENTIALS_SECRET_3=$(astartectl pairing agent register --compact-output -- "$DEVICE_3_ID") + echo "E2E_CREDENTIALS_SECRET_3=$CREDENTIALS_SECRET_3" >> $GITHUB_ENV - name: Install the astarte device Python module run: | python3 -m pip install -e .[e2e] @@ -81,3 +87,11 @@ jobs: command: | export REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt python3 e2etest/persistency/main.py + - name: Run test for reconnection + uses: nick-fields/retry@v2 + with: + timeout_seconds: 30 + max_attempts: 2 + command: | + export REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt + python3 e2etest/reconnection/main.py diff --git a/CHANGELOG.md b/CHANGELOG.md index c7d14995..092a96d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [0.12.1] - Unreleased + +### Added +- Adding or removing interfaces from a device while the device is connected. + If an interface is added or removed the new device introspection is immediately sent to Astarte. + ### Fixed - False values on boolean endpoints for server owned interfaces are correctly processed. diff --git a/astarte/device/__init__.py b/astarte/device/__init__.py index 1836af4e..2be8a3f6 100644 --- a/astarte/device/__init__.py +++ b/astarte/device/__init__.py @@ -41,4 +41,6 @@ InterfaceFileDecodeError, InterfaceNotFoundError, JWTGenerationError, + DeviceConnectingError, + DeviceDisconnectedError, ) diff --git a/astarte/device/database.py b/astarte/device/database.py index 4ce9a59f..78ebed46 100644 --- a/astarte/device/database.py +++ b/astarte/device/database.py @@ -85,6 +85,17 @@ def delete_prop(self, interface: str, path: str) -> None: The path to the property endpoint. """ + @abstractmethod + def delete_props_from_interface(self, interface: str) -> None: + """ + Delete all the properties from the database belonging to an interface. + + Parameters + ---------- + interface : str + The interface name. + """ + @abstractmethod def clear(self) -> None: """ @@ -206,6 +217,22 @@ def delete_prop(self, interface: str, path: str) -> None: ) connection.commit() + def delete_props_from_interface(self, interface: str) -> None: + """ + Delete all the properties from the database belonging to an interface. + + Parameters + ---------- + interface : str + See documentation in AstarteDatabase. + """ + connection = sqlite3.connect(self.__database_path) + connection.cursor().execute( + "DELETE FROM properties WHERE interface=?", + (interface,), + ) + connection.commit() + def clear(self) -> None: """ Fully clear the database of all the properties. diff --git a/astarte/device/device.py b/astarte/device/device.py index 19b9f181..0ba1946c 100644 --- a/astarte/device/device.py +++ b/astarte/device/device.py @@ -90,9 +90,6 @@ def add_interface_from_json(self, interface_json: dict): """ Adds an interface to the device. - It has to be called before :py:func:`connect`, as it will be used for building the device - introspection. - Parameters ---------- interface_json : dict @@ -157,9 +154,6 @@ def remove_interface(self, interface_name: str) -> None: """ Removes an Interface from the device. - Removes an Interface definition from the device. It has to be called before - :py:func:`connect`, as it will be used for building the device introspection. - Parameters ---------- interface_name : str diff --git a/astarte/device/device_mqtt.py b/astarte/device/device_mqtt.py index d1e8aca0..bc95b875 100644 --- a/astarte/device/device_mqtt.py +++ b/astarte/device/device_mqtt.py @@ -28,17 +28,35 @@ from collections.abc import Callable from datetime import datetime from urllib.parse import urlparse +from enum import Enum import bson import paho.mqtt.client as mqtt from astarte.device import crypto, pairing_handler from astarte.device.interface import Interface from astarte.device.database import AstarteDatabaseSQLite, AstarteDatabase -from astarte.device.exceptions import ValidationError, PersistencyDirectoryNotFoundError, APIError +from astarte.device.exceptions import ( + ValidationError, + PersistencyDirectoryNotFoundError, + APIError, + InterfaceNotFoundError, + DeviceConnectingError, + DeviceDisconnectedError, +) from astarte.device.device import Device +class ConnectionState(Enum): + """ + Possible connection states for a device. + """ + + CONNECTING = 1 + CONNECTED = 2 + DISCONNECTED = 3 + + class DeviceMqtt(Device): # pylint: disable=too-many-instance-attributes """ Astarte device implementation using the MQTT transport protocol. @@ -147,7 +165,7 @@ def __init__( # TODO: Implement device registration using token on connect # self.__jwt_token: str | None = None self.__is_crypto_setup = False - self.__is_connected = False + self.__connection_state = ConnectionState.DISCONNECTED self.__ignore_ssl_errors = ignore_ssl_errors self.on_connected: Callable[[DeviceMqtt], None] | None = None @@ -172,8 +190,20 @@ def add_interface_from_json(self, interface_json: dict): ---------- interface_json : dict See parent class. + + Raises + ------ + DeviceConnectingError + When attempting to add an interface while the device if performing a connection. """ + if self.__connection_state is ConnectionState.CONNECTING: + raise DeviceConnectingError("Interfaces cannot be added while device is connecting.") self._introspection.add_interface(interface_json) + if self.__connection_state is ConnectionState.CONNECTED: + interface = Interface(interface_json) + if interface.is_server_owned(): + self.__mqtt_client.subscribe(f"{self.__get_base_topic()}/{interface.name}/#", qos=2) + self.__send_introspection() def remove_interface(self, interface_name: str) -> None: """ @@ -183,8 +213,26 @@ def remove_interface(self, interface_name: str) -> None: ---------- interface_name : str See parent class. - """ + + Raises + ------ + DeviceConnectingError + When attempting to add an interface while the device if performing a connection. + InterfaceNotFoundError + When the provided interface can't be found in the device introspection. + """ + if self.__connection_state is ConnectionState.CONNECTING: + raise DeviceConnectingError("Interfaces cannot be removed while device is connecting.") + interface = self._introspection.get_interface(interface_name) + if interface is None: + raise InterfaceNotFoundError(f"Interface {interface_name} not found in introspection.") self._introspection.remove_interface(interface_name) + if self.__connection_state is ConnectionState.CONNECTED: + if interface.is_type_properties(): + self.__prop_database.delete_props_from_interface(interface_name) + self.__send_introspection() + if interface.is_server_owned(): + self.__mqtt_client.unsubscribe(f"{self.__get_base_topic()}/{interface.name}/#") def get_device_id(self) -> str: """ @@ -247,7 +295,7 @@ def connect(self) -> None: APIError When the obtained broker URL is invalid. """ - if self.__is_connected: + if self.__connection_state is ConnectionState.CONNECTED: return self.__setup_crypto() @@ -272,6 +320,7 @@ def connect(self) -> None: if (parsed_url.hostname is None) or (parsed_url.port is None): raise APIError("Received invalid broker URL.") + self.__connection_state = ConnectionState.CONNECTING self.__mqtt_client.connect_async(parsed_url.hostname, parsed_url.port) self.__mqtt_client.loop_start() @@ -285,7 +334,7 @@ def disconnect(self) -> None: should result in the callback on_disconnected being called with return code parameter 0, meaning the disconnection happened following an explicit disconnection request. """ - if not self.__is_connected: + if self.__connection_state is not ConnectionState.CONNECTED: return self.__mqtt_client.disconnect() @@ -300,7 +349,7 @@ def is_connected(self) -> bool: bool The device connection status. """ - return self.__is_connected + return self.__connection_state is ConnectionState.CONNECTED def _send_generic( self, @@ -326,12 +375,17 @@ def _send_generic( Raises ------ + DeviceDisconnectedError + When this function is called while the device is not connected to Astarte. ValidationError When: - Attempting to send to a server owned interface. - Sending to an endpoint that is not present in the interface. - The payload validation fails. """ + if self.__connection_state is not ConnectionState.CONNECTED: + raise DeviceDisconnectedError("Send operation failed due to missing connection.") + bson_payload = b"" if payload is not None: object_payload = {"v": payload} @@ -387,7 +441,7 @@ def __on_connect(self, _client, _userdata, flags: dict, rc): logging.error("Connection failed! %s", rc) return - self.__is_connected = True + self.__connection_state = ConnectionState.CONNECTED if not flags["session present"]: logging.debug("Session flag is not present, performing a clean session procedure") @@ -420,7 +474,7 @@ def __on_disconnect(self, _client, _userdata, rc): ------- """ - self.__is_connected = False + self.__connection_state = ConnectionState.DISCONNECTED if self.on_disconnected: if self._loop: diff --git a/astarte/device/exceptions.py b/astarte/device/exceptions.py index cdce0626..b47c3c1b 100644 --- a/astarte/device/exceptions.py +++ b/astarte/device/exceptions.py @@ -111,3 +111,26 @@ class JWTGenerationError(AstarteError): def __init__(self, msg): self.msg = msg + + +class DeviceConnectingError(AstarteError): + """Exception raised when an operation is attempted while the device MQTT client has been + started but is not yet connected. + + Attributes: + msg -- A message error carrying further details + """ + + def __init__(self, msg): + self.msg = msg + + +class DeviceDisconnectedError(AstarteError): + """Exception raised if attempting a send while the device is disconnected from Astarte. + + Attributes: + msg -- A message error carrying further details + """ + + def __init__(self, msg): + self.msg = msg diff --git a/e2etest/base/main.py b/e2etest/base/main.py index 2ec6e2ac..2b1db06b 100644 --- a/e2etest/base/main.py +++ b/e2etest/base/main.py @@ -109,6 +109,10 @@ def main(cb_loop: asyncio.AbstractEventLoop, test_cfg: TestCfg): time.sleep(1) + if not device.is_connected(): + print("Connection failed.", flush=True) + sys.exit(1) + test_datastream_from_device_to_server(device, test_cfg) time.sleep(1) diff --git a/e2etest/persistency/main.py b/e2etest/persistency/main.py index 1e18ebe6..8448e264 100644 --- a/e2etest/persistency/main.py +++ b/e2etest/persistency/main.py @@ -197,8 +197,13 @@ def main(cb_loop: asyncio.AbstractEventLoop, test_cfg: TestCfg): device.on_data_received = on_data_received_cbk device.on_disconnected = on_disconnected_cbk device.connect() + time.sleep(1) + if not device.is_connected(): + print("Connection failed.", flush=True) + sys.exit(1) + assert peek_database(persistency_dir, test_cfg.device_id) == list() assert peek_astarte(test_cfg) == { test_cfg.interface_device_prop: {}, diff --git a/e2etest/reconnection/main.py b/e2etest/reconnection/main.py new file mode 100644 index 00000000..13ad8403 --- /dev/null +++ b/e2etest/reconnection/main.py @@ -0,0 +1,429 @@ +# This file is part of Astarte. +# +# Copyright 2023 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +""" +End to end testing framework. +Specifically designed to test persistency. +""" +import os +import asyncio +import time +import sqlite3 +import pickle +import requests +from datetime import datetime, timezone +from pathlib import Path +from threading import Thread, Lock +from termcolor import cprint +import importlib.util +import sys + +# Assuming this script is called from the root folder of this project. +prj_path = Path(os.getcwd()) +if str(prj_path) not in sys.path: + sys.path.insert(0, str(prj_path)) + +from astarte.device import DeviceMqtt +from astarte.device import InterfaceNotFoundError, DeviceDisconnectedError + +config_path = Path.joinpath(Path.cwd(), "e2etest", "common", "config.py") +spec = importlib.util.spec_from_file_location("config", config_path) +config = importlib.util.module_from_spec(spec) +sys.modules["config"] = config +spec.loader.exec_module(config) + +http_requests_path = Path.joinpath(Path.cwd(), "e2etest", "common", "http_requests.py") +spec = importlib.util.spec_from_file_location("http_requests", http_requests_path) +http_requests = importlib.util.module_from_spec(spec) +sys.modules["http_requests"] = http_requests +spec.loader.exec_module(http_requests) + +from config import TestCfg +from http_requests import ( + get_server_interface, +) + + +def on_connected_cbk(_): + """ + Callback for a connection event. + """ + cprint("Device connected.", color="green", flush=True) + + +def on_data_received_cbk(_, name: str, path: str, payload: dict): + """ + Callback for a data reception event. + """ + cprint(f"Data received: {name}, {path}, {payload}.", color="red", flush=True) + + +def on_disconnected_cbk(_, reason: int): + """ + Callback for a disconnection event. + """ + if reason == 0: + cprint(f"Device gracefully disconnected.", color="green", flush=True) + else: + cprint(f"Device disconnected because: {reason}.", color="red", flush=True) + + +def device_connect(device: DeviceMqtt): + """ + Helper function to perform device connection. + """ + time.sleep(0.5) + + device.connect() + + time.sleep(0.5) + + if not device.is_connected(): + cprint("\nConnection failed.", color="red", flush=True) + sys.exit(1) + + +def device_disconnect(device: DeviceMqtt): + """ + Helper function to perform device disconnection. + """ + time.sleep(0.5) + + device.disconnect() + + time.sleep(0.5) + + if device.is_connected(): + cprint("\nDisconnection failed.", color="red", flush=True) + sys.exit(1) + + +def test_add_and_remove_interface_while_disconnected(device: DeviceMqtt, test_cfg: TestCfg): + """ + Test add and remove interface functionality while the device is disconnected. + + The device should be disconnected when calling whis function. + """ + cprint("\nTesting add/remove interface while disconnected.", color="cyan", flush=True) + + device_connect(device) + + device.send( + test_cfg.interface_device_data, + "/booleanarray_endpoint", + [False, True], + datetime.now(tz=timezone.utc), + ) + + json_res = get_server_interface(test_cfg, test_cfg.interface_device_data) + assert json_res["data"]["booleanarray_endpoint"]["value"] == [False, True] + + device_disconnect(device) + + device.remove_interface(test_cfg.interface_device_data) + + device_connect(device) + + try: + device.send( + test_cfg.interface_device_data, + "/booleanarray_endpoint", + [False, True], + datetime.now(tz=timezone.utc), + ) + except InterfaceNotFoundError: + # Correct behaviour + pass + else: + cprint("Exception not raised for send on removed interface.", color="red", flush=True) + sys.exit(1) + + try: + get_server_interface(test_cfg, test_cfg.interface_device_data) + except requests.exceptions.HTTPError: + # Correct behaviour + pass + else: + cprint("Exception not raised for http get on removed interface.", color="red", flush=True) + sys.exit(1) + + device_disconnect(device) + + device.add_interface_from_file( + test_cfg.interfaces_fld.joinpath( + "org.astarte-platform.python.e2etest.DeviceDatastream.json" + ) + ) + + device_connect(device) + + device.send( + test_cfg.interface_device_data, + "/booleanarray_endpoint", + [True, True], + datetime.now(tz=timezone.utc), + ) + + json_res = get_server_interface(test_cfg, test_cfg.interface_device_data) + assert json_res["data"]["booleanarray_endpoint"]["value"] == [True, True] + + +def test_add_and_remove_interface_while_connected(device: DeviceMqtt, test_cfg: TestCfg): + """ + Test add and remove interface functionality while the device is connected. + + The device should be connected when calling whis function. + """ + cprint("\nTesting add/remove interface while connected.", color="cyan", flush=True) + + device.remove_interface(test_cfg.interface_device_data) + + time.sleep(0.5) + + try: + device.send( + test_cfg.interface_device_data, + "/booleanarray_endpoint", + [False, True], + datetime.now(tz=timezone.utc), + ) + except InterfaceNotFoundError: + # Correct behaviour + pass + else: + cprint("Exception not raised for send on removed interface.", color="red", flush=True) + sys.exit(1) + + try: + get_server_interface(test_cfg, test_cfg.interface_device_data) + except requests.exceptions.HTTPError: + # Correct behaviour + pass + else: + cprint("Exception not raised for http get on removed interface.", color="red", flush=True) + sys.exit(1) + + time.sleep(0.5) + + device.add_interface_from_file( + test_cfg.interfaces_fld.joinpath( + "org.astarte-platform.python.e2etest.DeviceDatastream.json" + ) + ) + + time.sleep(0.5) + + device.send( + test_cfg.interface_device_data, + "/booleanarray_endpoint", + [False, False], + datetime.now(tz=timezone.utc), + ) + + json_res = get_server_interface(test_cfg, test_cfg.interface_device_data) + assert json_res["data"]["booleanarray_endpoint"]["value"] == [False, False] + + +def peek_database(persistency_dir: Path, device_id: str, interface_name: str): + """ + Take a peek in the device database. + """ + database_path = persistency_dir.joinpath(device_id, "caching", "astarte.db") + properties = ( + sqlite3.connect(database_path) + .cursor() + .execute("SELECT * FROM properties WHERE interface=?", (interface_name,)) + .fetchall() + ) + parsed_properties = [] + for interface, major, path, value in properties: + parsed_properties += [(interface, major, path, pickle.loads(value))] + return parsed_properties + + +def test_add_and_remove_property_interface_while_connected( + persistency_dir: Path, device: DeviceMqtt, test_cfg: TestCfg +): + """ + Test add and remove interface functionality while the device is connected specifically for a + property. + + The device should be connected when calling whis function. + """ + cprint("\nTesting add/remove property interface while connected.", color="cyan", flush=True) + + device.send( + test_cfg.interface_device_prop, + "/s12/booleanarray_endpoint", + [True, False], + None, + ) + + json_res = get_server_interface(test_cfg, test_cfg.interface_device_prop) + assert json_res["data"]["s12"]["booleanarray_endpoint"] == [True, False] + + prop_in_database = peek_database( + persistency_dir, test_cfg.device_id, test_cfg.interface_device_prop + ) + expected_prop_in_database = [ + ( + "org.astarte-platform.python.e2etest.DeviceProperty", + 0, + "/s12/booleanarray_endpoint", + [True, False], + ) + ] + assert prop_in_database == expected_prop_in_database + + time.sleep(0.5) + + device.remove_interface(test_cfg.interface_device_prop) + + time.sleep(0.5) + + try: + device.send( + test_cfg.interface_device_prop, + "/s12/booleanarray_endpoint", + [False, True], + None, + ) + except InterfaceNotFoundError: + # Correct behaviour + pass + else: + cprint("Exception not raised for send on removed interface.", color="red", flush=True) + sys.exit(1) + + try: + get_server_interface(test_cfg, test_cfg.interface_device_prop) + except requests.exceptions.HTTPError: + # Correct behaviour + pass + else: + cprint("Exception not raised for http get on removed interface.", color="red", flush=True) + sys.exit(1) + + prop_in_database = peek_database( + persistency_dir, test_cfg.device_id, test_cfg.interface_device_prop + ) + assert prop_in_database == list() + + time.sleep(0.5) + + device.add_interface_from_file( + test_cfg.interfaces_fld.joinpath("org.astarte-platform.python.e2etest.DeviceProperty.json") + ) + + time.sleep(0.5) + + device.send( + test_cfg.interface_device_prop, + "/s12/booleanarray_endpoint", + [False, False], + None, + ) + + json_res = get_server_interface(test_cfg, test_cfg.interface_device_prop) + assert json_res["data"]["s12"]["booleanarray_endpoint"] == [False, False] + + prop_in_database = peek_database( + persistency_dir, test_cfg.device_id, test_cfg.interface_device_prop + ) + expected_prop_in_database = [ + ( + "org.astarte-platform.python.e2etest.DeviceProperty", + 0, + "/s12/booleanarray_endpoint", + [False, False], + ) + ] + assert prop_in_database == expected_prop_in_database + + +def main(cb_loop: asyncio.AbstractEventLoop, test_cfg: TestCfg): + """ + Generate the device and run the end to end tests. + """ + persistency_dir = Path.joinpath(Path.cwd(), "e2etest", "reconnection", "build") + if not Path.is_dir(persistency_dir): + os.makedirs(persistency_dir) + device = DeviceMqtt( + device_id=test_cfg.device_id, + realm=test_cfg.realm, + credentials_secret=test_cfg.credentials_secret, + pairing_base_url=test_cfg.pairing_url, + persistency_dir=persistency_dir, + loop=cb_loop, + ignore_ssl_errors=False, + ) + interface_files = [ + test_cfg.interfaces_fld.joinpath( + "org.astarte-platform.python.e2etest.DeviceAggregate.json" + ), + test_cfg.interfaces_fld.joinpath( + "org.astarte-platform.python.e2etest.DeviceDatastream.json" + ), + test_cfg.interfaces_fld.joinpath("org.astarte-platform.python.e2etest.DeviceProperty.json"), + test_cfg.interfaces_fld.joinpath( + "org.astarte-platform.python.e2etest.ServerAggregate.json" + ), + test_cfg.interfaces_fld.joinpath( + "org.astarte-platform.python.e2etest.ServerDatastream.json" + ), + test_cfg.interfaces_fld.joinpath("org.astarte-platform.python.e2etest.ServerProperty.json"), + ] + for f in interface_files: + device.add_interface_from_file(f) + + device.on_connected = on_connected_cbk + device.on_data_received = on_data_received_cbk + device.on_disconnected = on_disconnected_cbk + + test_add_and_remove_interface_while_disconnected(device, test_cfg) + + time.sleep(0.5) + + test_add_and_remove_interface_while_connected(device, test_cfg) + + time.sleep(0.5) + + test_add_and_remove_property_interface_while_connected(persistency_dir, device, test_cfg) + + time.sleep(0.5) + + +def start_call_back_loop(loop: asyncio.AbstractEventLoop) -> None: + """ + Start an asyncio event loop, used for the device call back. + """ + asyncio.set_event_loop(loop) + loop.run_forever() + + +if __name__ == "__main__": + # Generate an async loop and thread + call_back_loop = asyncio.new_event_loop() + call_back_thread = Thread(target=start_call_back_loop, args=[call_back_loop], daemon=True) + call_back_thread.start() + + try: + main(call_back_loop, TestCfg(number=3)) + except Exception as e: + call_back_loop.stop() + call_back_thread.join(timeout=1) + raise e diff --git a/tests/test_database.py b/tests/test_database.py index e2e00f54..81213b36 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -200,6 +200,26 @@ def test_delete_property(self, mock_sqlite3_connect): ) mock_connection.commit.assert_called_once_with() + @mock.patch("astarte.device.database.sqlite3.connect") + def test_delete_props_from_interface(self, mock_sqlite3_connect): + mock_database_name = mock.MagicMock() + + mock_connection = mock_sqlite3_connect.return_value + mock_cursor = mock_sqlite3_connect.return_value.cursor.return_value + + db = database.AstarteDatabaseSQLite(mock_database_name) + mock_sqlite3_connect.reset_mock() + + mock_interface = mock.MagicMock() + mock_path = mock.MagicMock() + db.delete_props_from_interface(mock_interface) + + mock_sqlite3_connect.assert_called_once_with(mock_database_name) + mock_sqlite3_connect.return_value.cursor.assert_called_once_with() + execute_expected_arg = "DELETE FROM properties WHERE interface=?" + mock_cursor.execute.assert_called_once_with(execute_expected_arg, (mock_interface,)) + mock_connection.commit.assert_called_once_with() + @mock.patch("astarte.device.database.sqlite3.connect") def test_clear(self, mock_sqlite3_connect): mock_database_name = mock.MagicMock() diff --git a/tests/test_device_mqtt.py b/tests/test_device_mqtt.py index 6c1aed76..62393a41 100644 --- a/tests/test_device_mqtt.py +++ b/tests/test_device_mqtt.py @@ -31,8 +31,10 @@ import paho from paho.mqtt.client import Client +from astarte.device.interface import Interface from astarte.device.database import AstarteDatabaseSQLite from astarte.device import DeviceMqtt +from astarte.device.device_mqtt import ConnectionState from astarte.device.introspection import Introspection from astarte.device.exceptions import ( PersistencyDirectoryNotFoundError, @@ -41,6 +43,8 @@ ValidationError, InterfaceNotFoundError, APIError, + DeviceConnectingError, + DeviceDisconnectedError, ) @@ -111,20 +115,232 @@ def helper_initialize_device(self, mock_isdir, mock_db, loop): mock_db.assert_called_once_with(Path("./tests/device_id/caching/astarte.db")) return device + @mock.patch.object(Introspection, "add_interface") + def test_add_interface_from_json_while_not_connected(self, mock_add_interface): + device = self.helper_initialize_device(loop=None) + + interface_json = {"json content": 42} + device.add_interface_from_json(interface_json) + + mock_add_interface.assert_called_once_with(interface_json) + + # __send_introspection is tested together with the connect method + @mock.patch.object(DeviceMqtt, "_DeviceMqtt__send_introspection") + @mock.patch.object(Client, "subscribe") + @mock.patch("astarte.device.device_mqtt.Interface") + @mock.patch.object(Introspection, "add_interface") + def test_add_interface_from_json_while_connected( + self, mock_add_interface, mock_interface, mock_subscribe, mock__send_introspection + ): + device = self.helper_initialize_device(loop=None) + + mock_interface.return_value.name = "" + mock_interface.return_value.is_server_owned.return_value = True + + device._DeviceMqtt__connection_state = ConnectionState.CONNECTED + + interface_json = {"json content": 42} + device.add_interface_from_json(interface_json) + + mock_add_interface.assert_called_once_with(interface_json) + mock_interface.assert_called_once_with(interface_json) + mock_subscribe.assert_called_once_with("realm_name/device_id//#", qos=2) + mock__send_introspection.assert_called_once() + + # __send_introspection is tested together with the connect method + @mock.patch.object(DeviceMqtt, "_DeviceMqtt__send_introspection") + @mock.patch.object(Client, "subscribe") + @mock.patch("astarte.device.device_mqtt.Interface") + @mock.patch.object(Introspection, "add_interface") + def test_add_interface_from_json_while_connected_client_owned_interface( + self, mock_add_interface, mock_interface, mock_subscribe, mock__send_introspection + ): + device = self.helper_initialize_device(loop=None) + + mock_interface.return_value.is_server_owned.return_value = False + + device._DeviceMqtt__connection_state = ConnectionState.CONNECTED + + interface_json = {"json content": 42} + device.add_interface_from_json(interface_json) + + mock_add_interface.assert_called_once_with(interface_json) + mock_interface.assert_called_once_with(interface_json) + mock_subscribe.assert_not_called() + mock__send_introspection.assert_called_once() + + # __send_introspection is tested together with the connect method + @mock.patch.object(DeviceMqtt, "_DeviceMqtt__send_introspection") + @mock.patch.object(Client, "subscribe") + @mock.patch("astarte.device.device_mqtt.Interface") + @mock.patch.object(Introspection, "add_interface") + def test_add_interface_from_json_while_connecting_raises( + self, mock_add_interface, mock_interface, mock_subscribe, mock__send_introspection + ): + device = self.helper_initialize_device(loop=None) + + device._DeviceMqtt__connection_state = ConnectionState.CONNECTING + + interface_json = {"json content": 42} + self.assertRaises( + DeviceConnectingError, lambda: device.add_interface_from_json(interface_json) + ) + + mock_add_interface.assert_not_called() + mock_interface.assert_not_called() + mock_subscribe.assert_not_called() + mock__send_introspection.assert_not_called() + + @mock.patch.object(Introspection, "remove_interface") + @mock.patch.object(Introspection, "get_interface") + def test_remove_interface_while_not_connected(self, mock_get_interface, mock_remove_interface): + device = self.helper_initialize_device(loop=None) + + interface_name = "interface name" + device.remove_interface(interface_name) + mock_get_interface.assert_called_once_with(interface_name) + mock_remove_interface.assert_called_once_with(interface_name) + + # __send_introspection is tested together with the connect method + @mock.patch.object(AstarteDatabaseSQLite, "delete_props_from_interface") + @mock.patch.object(Client, "unsubscribe") + @mock.patch.object(DeviceMqtt, "_DeviceMqtt__send_introspection") + @mock.patch.object(Introspection, "remove_interface") + @mock.patch.object(Introspection, "get_interface") + def test_remove_interface_while_connected( + self, + mock_get_interface, + mock_remove_interface, + mock__send_introspection, + mock_unsubscribe, + mock_delete_props_from_interface, + ): + device = self.helper_initialize_device(loop=None) + + mock_interface = mock.MagicMock() + mock_interface.name = "interface name" + mock_interface.is_server_owned.return_value = True + mock_interface.is_type_properties.return_value = True + mock_get_interface.return_value = mock_interface + + device._DeviceMqtt__connection_state = ConnectionState.CONNECTED + + interface_name = "interface name" + device.remove_interface(interface_name) + + mock_get_interface.assert_called_once_with(interface_name) + mock_remove_interface.assert_called_once_with(interface_name) + mock_delete_props_from_interface.assert_called_once_with(interface_name) + mock__send_introspection.assert_called_once() + mock_interface.is_server_owned.assert_called_once() + mock_unsubscribe.assert_called_once_with(f"realm_name/device_id/{interface_name}/#") + + # __send_introspection is tested together with the connect method + @mock.patch.object(AstarteDatabaseSQLite, "delete_props_from_interface") + @mock.patch.object(Client, "unsubscribe") + @mock.patch.object(DeviceMqtt, "_DeviceMqtt__send_introspection") + @mock.patch.object(Introspection, "remove_interface") + @mock.patch.object(Introspection, "get_interface") + def test_remove_interface_while_connected_device_owned_datastream( + self, + mock_get_interface, + mock_remove_interface, + mock__send_introspection, + mock_unsubscribe, + mock_delete_props_from_interface, + ): + device = self.helper_initialize_device(loop=None) + + mock_interface = mock.MagicMock() + mock_interface.name = "interface name" + mock_interface.is_server_owned.return_value = False + mock_interface.is_type_properties.return_value = False + mock_get_interface.return_value = mock_interface + + device._DeviceMqtt__connection_state = ConnectionState.CONNECTED + + interface_name = "interface name" + device.remove_interface(interface_name) + + mock_get_interface.assert_called_once_with(interface_name) + mock_remove_interface.assert_called_once_with(interface_name) + mock_delete_props_from_interface.assert_not_called() + mock__send_introspection.assert_called_once() + mock_interface.is_server_owned.assert_called_once() + mock_unsubscribe.assert_not_called() + + # __send_introspection is tested together with the connect method + @mock.patch.object(AstarteDatabaseSQLite, "delete_props_from_interface") + @mock.patch.object(Client, "unsubscribe") + @mock.patch.object(DeviceMqtt, "_DeviceMqtt__send_introspection") + @mock.patch.object(Introspection, "remove_interface") + @mock.patch.object(Introspection, "get_interface") + def test_remove_interface_while_connecting_raises( + self, + mock_get_interface, + mock_remove_interface, + mock__send_introspection, + mock_unsubscribe, + mock_delete_props_from_interface, + ): + device = self.helper_initialize_device(loop=None) + + device._DeviceMqtt__connection_state = ConnectionState.CONNECTING + + interface_name = "interface name" + + self.assertRaises(DeviceConnectingError, lambda: device.remove_interface(interface_name)) + + mock_get_interface.assert_not_called() + mock_remove_interface.assert_not_called() + mock_delete_props_from_interface.assert_not_called() + mock__send_introspection.assert_not_called() + mock_unsubscribe.assert_not_called() + + # __send_introspection is tested together with the connect method + @mock.patch.object(AstarteDatabaseSQLite, "delete_props_from_interface") + @mock.patch.object(Client, "unsubscribe") + @mock.patch.object(DeviceMqtt, "_DeviceMqtt__send_introspection") + @mock.patch.object(Introspection, "remove_interface") + @mock.patch.object(Introspection, "get_interface") + def test_remove_interface_interface_not_in_introspection_raises( + self, + mock_get_interface, + mock_remove_interface, + mock__send_introspection, + mock_unsubscribe, + mock_delete_props_from_interface, + ): + device = self.helper_initialize_device(loop=None) + + mock_get_interface.return_value = None + + device._DeviceMqtt__connection_state = ConnectionState.CONNECTED + + interface_name = "interface name" + self.assertRaises(InterfaceNotFoundError, lambda: device.remove_interface(interface_name)) + + mock_get_interface.assert_called_once_with(interface_name) + mock_remove_interface.assert_not_called() + mock_delete_props_from_interface.assert_not_called() + mock__send_introspection.assert_not_called() + mock_unsubscribe.assert_not_called() + + @mock.patch.object(DeviceMqtt, "add_interface_from_json") @mock.patch("astarte.device.device.open", new_callable=mock.mock_open) @mock.patch("astarte.device.device.json.load", return_value="Fake json content") @mock.patch.object(Path, "is_file", return_value=True) - @mock.patch.object(Introspection, "add_interface") def test_add_interface_from_file( - self, mock_add_interface, mock_isfile, mock_json_load, mock_open + self, mock_isfile, mock_json_load, mock_open, mock_add_interface_from_json ): device = self.helper_initialize_device(loop=None) device.add_interface_from_file(Path.cwd()) - assert mock_isfile.call_count == 1 + + mock_isfile.assert_called_once() mock_open.assert_called_once_with(Path.cwd(), "r", encoding="utf-8") mock_json_load.assert_called_once() - mock_add_interface.assert_called_once_with("Fake json content") + mock_add_interface_from_json.assert_called_once_with("Fake json content") @mock.patch.object(Path, "is_file", return_value=False) def test_add_interface_from_file_missing_file_err(self, mock_isfile): @@ -133,7 +349,7 @@ def test_add_interface_from_file_missing_file_err(self, mock_isfile): self.assertRaises( InterfaceFileNotFoundError, lambda: device.add_interface_from_file(Path.cwd()) ) - assert mock_isfile.call_count == 1 + mock_isfile.assert_called_once() @mock.patch.object(Path, "is_file", return_value=True) @mock.patch("astarte.device.device.open", new_callable=mock.mock_open) @@ -148,9 +364,9 @@ def test_add_interface_from_file_incorrect_json_err( self.assertRaises( InterfaceFileDecodeError, lambda: device.add_interface_from_file(Path.cwd()) ) + mock_isfile.assert_called_once() mock_json_err.assert_called_once() mock_open.assert_called_once() - mock_isfile.assert_called_once() mock_json_load.assert_called_once() @mock.patch.object( @@ -192,14 +408,6 @@ def test_add_interface_from_dir_not_a_dir_err(self, mock_exists, mock_is_dir): mock_exists.assert_called_once() mock_is_dir.assert_called_once() - @mock.patch.object(Introspection, "remove_interface") - def test_remove_interface(self, mock_remove_interface): - device = self.helper_initialize_device(loop=None) - - interface_name = "interface name" - device.remove_interface(interface_name) - mock_remove_interface.assert_called_once_with(interface_name) - def test_get_device_id(self): device = self.helper_initialize_device(loop=None) self.assertEqual(device.get_device_id(), "device_id") @@ -286,7 +494,7 @@ def test_connect_already_connected( ): device = self.helper_initialize_device(loop=None) - device._DeviceMqtt__is_connected = True + device._DeviceMqtt__connection_state = ConnectionState.CONNECTED mock_urlparse.return_value.hostname = "mocked hostname" mock_urlparse.return_value.port = "mocked port" @@ -515,7 +723,7 @@ def test_disconnect(self, mock_disconnect): device.disconnect() mock_disconnect.assert_not_called() - device._DeviceMqtt__is_connected = True + device._DeviceMqtt__connection_state = ConnectionState.CONNECTED device.disconnect() mock_disconnect.assert_called_once() @@ -525,7 +733,7 @@ def test_is_connected(self): self.assertFalse(device.is_connected()) - device._DeviceMqtt__is_connected = True + device._DeviceMqtt__connection_state = ConnectionState.CONNECTED self.assertTrue(device.is_connected()) @@ -545,6 +753,8 @@ def test_send(self, mock_get_interface, mock_bson_dumps, mock_db_store, mock_mqt mock_bson_dumps.return_value = bytes("bson content", "utf-8") + device._DeviceMqtt__connection_state = ConnectionState.CONNECTED + interface_name = "interface name" interface_path = "interface path" payload = 12 @@ -566,6 +776,46 @@ def test_send(self, mock_get_interface, mock_bson_dumps, mock_db_store, mock_mqt qos=mock_interface.get_reliability.return_value, ) + @mock.patch.object(Client, "publish") + @mock.patch.object(AstarteDatabaseSQLite, "store_prop") + @mock.patch("astarte.device.device_mqtt.bson.dumps") + @mock.patch.object(Introspection, "get_interface") + def test_send_device_not_connected_raises_device_disconnected_err( + self, mock_get_interface, mock_bson_dumps, mock_db_store, mock_mqtt_publish + ): + device = self.helper_initialize_device(loop=None) + + mock_interface = mock.MagicMock() + mock_interface.name = "interface name" + mock_interface.is_aggregation_object.return_value = False + mock_interface.is_server_owned.return_value = False + mock_interface.is_type_properties.return_value = False + mock_get_interface.return_value = mock_interface + + mock_bson_dumps.return_value = bytes("bson content", "utf-8") + + device._DeviceMqtt__connection_state = ConnectionState.DISCONNECTED + + interface_name = "interface name" + interface_path = "interface path" + payload = 12 + timestamp = datetime.now() + self.assertRaises( + DeviceDisconnectedError, + lambda: device.send(interface_name, interface_path, payload, timestamp), + ) + + mock_get_interface.assert_called_once_with(interface_name) + mock_interface.is_aggregation_object.assert_called_once() + mock_interface.validate_payload_and_timestamp.assert_called_once_with( + interface_path, payload, timestamp + ) + mock_bson_dumps.assert_not_called() + mock_interface.is_type_properties.assert_not_called() + mock_db_store.assert_not_called() + mock_interface.get_reliability.assert_not_called() + mock_mqtt_publish.assert_not_called() + @mock.patch.object(Client, "publish") @mock.patch.object(AstarteDatabaseSQLite, "store_prop") @mock.patch("astarte.device.device_mqtt.bson.dumps") @@ -584,6 +834,8 @@ def test_send_zero_is_ok( mock_bson_dumps.return_value = bytes("bson content", "utf-8") + device._DeviceMqtt__connection_state = ConnectionState.CONNECTED + interface_name = "interface name" interface_path = "interface path" payload = 0 @@ -624,6 +876,8 @@ def test_send_a_property_is_ok( mock_bson_dumps.return_value = bytes("bson content", "utf-8") + device._DeviceMqtt__connection_state = ConnectionState.CONNECTED + interface_name = "interface name" interface_path = "interface path" payload = 12 @@ -841,6 +1095,8 @@ def test_send_aggregate( mock_bson_dumps.return_value = bytes("bson content", "utf-8") + device._DeviceMqtt__connection_state = ConnectionState.CONNECTED + interface_name = "interface name" interface_path = "interface path" payload = {"something": 12} @@ -1067,6 +1323,8 @@ def test_unset_property( mock_interface.is_type_properties.return_value = True mock_get_interface.return_value = mock_interface + device._DeviceMqtt__connection_state = ConnectionState.CONNECTED + interface_name = "interface name" interface_path = "interface path" device.unset_property(interface_name, interface_path) @@ -1185,6 +1443,8 @@ def test_unset_property_non_existing_mapping_raises( mock_interface.get_mapping.return_value = None mock_get_interface.return_value = mock_interface + device._DeviceMqtt__connection_state = ConnectionState.CONNECTED + interface_name = "interface name" interface_path = "interface path" self.assertRaises(