From 7c7b7050239447a3cc37266aa898835485788edf Mon Sep 17 00:00:00 2001 From: flxdot Date: Sun, 28 Apr 2024 18:16:49 +0200 Subject: [PATCH 1/4] create blackbox to record data --- .../carlos/edge/device/runtime.py | 30 ++++-- .../carlos/edge/device/storage/blackbox.py | 98 +++++++++++++++++++ .../edge/device/storage/blackbox_test.py | 78 +++++++++++++++ .../edge/device/storage/timeseries_data.py | 2 +- 4 files changed, 200 insertions(+), 8 deletions(-) create mode 100644 lib/py_edge_device/carlos/edge/device/storage/blackbox.py create mode 100644 lib/py_edge_device/carlos/edge/device/storage/blackbox_test.py diff --git a/lib/py_edge_device/carlos/edge/device/runtime.py b/lib/py_edge_device/carlos/edge/device/runtime.py index 7ab7940f..2e977e0c 100644 --- a/lib/py_edge_device/carlos/edge/device/runtime.py +++ b/lib/py_edge_device/carlos/edge/device/runtime.py @@ -3,7 +3,7 @@ import asyncio import signal -from datetime import timedelta +from datetime import UTC, datetime, timedelta from typing import Self from apscheduler import AsyncScheduler @@ -19,6 +19,8 @@ from .communication import ClientEdgeCommunicationHandler from .config import load_drivers from .constants import LOCAL_DEVICE_STORAGE_PATH +from .storage.blackbox import Blackbox +from .storage.connection import get_async_storage_engine from .storage.migration import alembic_upgrade @@ -142,6 +144,8 @@ def __init__(self): self.drivers = {driver.identifier: driver for driver in load_drivers()} validate_device_address_space(self.drivers.values()) + self.blackbox = Blackbox(engine=get_async_storage_engine()) + def setup(self) -> Self: """Sets up the I/O peripherals.""" for driver in self.drivers.values(): @@ -157,19 +161,31 @@ async def register_tasks(self, scheduler: AsyncScheduler) -> Self: if isinstance(driver, InputDriver): await scheduler.add_schedule( func_or_task_id=self.read_input, - kwargs={"driver": driver.identifier}, + kwargs={"driver_identifier": driver.identifier}, trigger=IntervalTrigger(seconds=INPUT_SAMPLE_INTERVAL), ) return self - async def read_input(self, driver: str): + async def read_input(self, driver_identifier: str): """Reads the value of the input driver.""" - logger.debug(f"Reading data from driver {driver}.") - data = await self.drivers[driver].read_async() - # todo: write to database - logger.debug(f"Received data from driver {driver}: {data}") + logger.debug(f"Reading data from driver {driver_identifier}.") + + read_start = datetime.now(tz=UTC) + data = await self.drivers[driver_identifier].read_async() + read_end = datetime.now(tz=UTC) + # We assume that the actual read time is in the middle of the + # start and end time. + read_act = read_start + (read_end - read_start) / 2 + + logger.debug(f"Received data from driver {driver_identifier}: {data}") + + await self.blackbox.record( + driver_identifier=driver_identifier, + read_timestamp=read_act, + data=data, + ) async def send_ping( diff --git a/lib/py_edge_device/carlos/edge/device/storage/blackbox.py b/lib/py_edge_device/carlos/edge/device/storage/blackbox.py new file mode 100644 index 00000000..1dfaa418 --- /dev/null +++ b/lib/py_edge_device/carlos/edge/device/storage/blackbox.py @@ -0,0 +1,98 @@ +from datetime import datetime + +from loguru import logger +from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine + +from .timeseries_data import TimeseriesInput, add_timeseries_data +from .timeseries_index import ( + TimeseriesIndexMutation, + create_timeseries_index, + find_timeseries_index, +) + + +class Blackbox: + """The black box is used to store measurement data locally on the device. This data + is stored to be send to the server at a later time. This is useful in case the device + is not able to send the data to the server immediately. The black box stores the data + in a SQLite database. The data is stored in the timeseries_data table. + """ + + def __init__(self, engine: AsyncEngine): + self._engine = engine + self._timeseries_id_index: dict[str, dict[str, int]] = {} + + async def record( + self, + driver_identifier: str, + read_timestamp: datetime, + data: dict[str, float], + ) -> None: + """Inserts the reading into the database.""" + + async with self._engine.connect() as connection: + + timeseries_id_to_value: dict[int, float] = {} + + await self._ensure_index_hydrated(connection, driver_identifier) + + for driver_signal, value in data.items(): + timeseries_id = await self._get_timeseries_id( + connection=connection, + driver_identifier=driver_identifier, + driver_signal=driver_signal, + ) + timeseries_id_to_value[timeseries_id] = value + + ts_input = TimeseriesInput( + timestamp_utc=read_timestamp, values=timeseries_id_to_value + ) + + await add_timeseries_data(connection=connection, timeseries_input=ts_input) + + logger.debug(f"Recorded data from driver {driver_identifier}.") + + async def _ensure_index_hydrated( + self, connection: AsyncConnection, driver_identifier: str + ): + """Hydrates the timeseries_id index for the given driver_identifier.""" + + if driver_identifier in self._timeseries_id_index: + return + + self._timeseries_id_index[driver_identifier] = {} + + matching = await find_timeseries_index( + connection=connection, driver_identifier=driver_identifier + ) + for match in matching: + self._timeseries_id_index[driver_identifier][ + match.driver_signal + ] = match.timeseries_id + + async def _get_timeseries_id( + self, connection: AsyncConnection, driver_identifier: str, driver_signal: str + ) -> int: + """Fetches the timeseries_id for the given driver_identifier and driver_signal. + If the timeseries_id does not exist, it is created and returned. + + :param connection: The connection to the database. + :param driver_identifier: The driver identifier. + :param driver_signal: The driver signal. + :return: The timeseries_id. + """ + + try: + timeseries_id = self._timeseries_id_index[driver_identifier][driver_signal] + except KeyError: + created = await create_timeseries_index( + connection=connection, + timeseries_index=TimeseriesIndexMutation( + driver_identifier=driver_identifier, + driver_signal=driver_signal, + ), + ) + timeseries_id = created.timeseries_id + self._timeseries_id_index[driver_identifier][driver_signal] = timeseries_id + + return timeseries_id diff --git a/lib/py_edge_device/carlos/edge/device/storage/blackbox_test.py b/lib/py_edge_device/carlos/edge/device/storage/blackbox_test.py new file mode 100644 index 00000000..944ca421 --- /dev/null +++ b/lib/py_edge_device/carlos/edge/device/storage/blackbox_test.py @@ -0,0 +1,78 @@ +from datetime import UTC, datetime +from random import randint + +from sqlalchemy import delete, func, select +from sqlalchemy.ext.asyncio import AsyncEngine + +from .blackbox import Blackbox +from .orm import TimeseriesDataOrm, TimeseriesIndexOrm +from .timeseries_index import find_timeseries_index + + +async def test_blackbox(async_engine: AsyncEngine): + """This function ensures that the blackbox works as expected.""" + + blackbox = Blackbox(engine=async_engine) + + fake_data = { + "driver_signal_int": 1, + "driver_signal_float": 2.0, + "driver_signal_bool": True, + } + driver_identifier = "driver_identifier" + + await blackbox.record( + driver_identifier=driver_identifier, + read_timestamp=datetime.now(tz=UTC), + data=fake_data, + ) + + # check if all index entries are made + async with async_engine.connect() as connection: + index_entries = await find_timeseries_index( + connection, driver_identifier=driver_identifier + ) + assert len(index_entries) == len(fake_data) + + for index_entry in index_entries: + assert index_entry.driver_identifier == driver_identifier + assert index_entry.driver_signal in fake_data + assert index_entry.server_timeseries_id is None + + sample_cnt = randint(3, 10) + + # Record with multiple blackboxes to hit different paths of the code + blackbox2 = Blackbox(engine=async_engine) + + # check if the data is recorded correctly + for _ in range(sample_cnt): + fake_data = { + "driver_signal_int": randint(-100, 100), + "driver_signal_float": randint(-100, 100) * 1.0, + "driver_signal_bool": bool(randint(0, 1)), + } + + await blackbox2.record( + driver_identifier=driver_identifier, + read_timestamp=datetime.now(tz=UTC), + data=fake_data, + ) + + # count the number of entries per timeseries_id + async with async_engine.connect() as connection: + query = select( + TimeseriesDataOrm.timeseries_id, + func.count(TimeseriesDataOrm.timeseries_id).label("sample_cnt"), + ).group_by(TimeseriesDataOrm.timeseries_id) + + result = (await connection.execute(query)).all() + + for timeseries_id, cnt in result: + assert cnt == sample_cnt + 1 # +1 because of the first record + + # final cleanup + async with async_engine.connect() as connection: + # clean up + await connection.execute(delete(TimeseriesDataOrm)) + await connection.execute(delete(TimeseriesIndexOrm)) + await connection.commit() diff --git a/lib/py_edge_device/carlos/edge/device/storage/timeseries_data.py b/lib/py_edge_device/carlos/edge/device/storage/timeseries_data.py index 4e99ce12..e1e8c19a 100644 --- a/lib/py_edge_device/carlos/edge/device/storage/timeseries_data.py +++ b/lib/py_edge_device/carlos/edge/device/storage/timeseries_data.py @@ -45,7 +45,7 @@ async def add_timeseries_data( { "timeseries_id": timeseries_id, "timestamp_utc": int(timeseries_input.timestamp_utc.timestamp()), - "value": value, + "value": float(value), } for timeseries_id, value in timeseries_input.values.items() ] From 4a071c95bc3d53649ede523a11ca6b892deb5307 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sun, 28 Apr 2024 16:24:04 +0000 Subject: [PATCH 2/4] =?UTF-8?q?Bump=20lib/py=5Fedge=5Fdevice=20version:=20?= =?UTF-8?q?0.1.8=20=E2=86=92=200.1.9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/py_edge_device/pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/py_edge_device/pyproject.toml b/lib/py_edge_device/pyproject.toml index 116103cd..cb28956c 100644 --- a/lib/py_edge_device/pyproject.toml +++ b/lib/py_edge_device/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "carlos.edge.device" -version = "0.1.8" +version = "0.1.9" description = "The library for the edge device of the carlos project." authors = ["Felix Fanghanel"] license = "MIT" @@ -35,7 +35,7 @@ requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" [tool.bumpversion] -current_version = "0.1.8" +current_version = "0.1.9" commit = true tag = false parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)(\\-(?P[a-z0-9\\.]+))?" From 0697200e662337f7598c0d2b3ec6afbec6c57c2b Mon Sep 17 00:00:00 2001 From: flxdot Date: Sun, 28 Apr 2024 18:28:09 +0200 Subject: [PATCH 3/4] small refactoring --- .../carlos/edge/device/runtime.py | 2 +- .../carlos/edge/device/storage/blackbox.py | 30 +++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/lib/py_edge_device/carlos/edge/device/runtime.py b/lib/py_edge_device/carlos/edge/device/runtime.py index 2e977e0c..3c4a844c 100644 --- a/lib/py_edge_device/carlos/edge/device/runtime.py +++ b/lib/py_edge_device/carlos/edge/device/runtime.py @@ -133,7 +133,7 @@ async def _send_pending_data(self): pass -INPUT_SAMPLE_INTERVAL = 60 +INPUT_SAMPLE_INTERVAL = 2.5 * 60 # 2.5 minutes """The time between two consecutive samples of the input devices in seconds.""" diff --git a/lib/py_edge_device/carlos/edge/device/storage/blackbox.py b/lib/py_edge_device/carlos/edge/device/storage/blackbox.py index 1dfaa418..548e05a5 100644 --- a/lib/py_edge_device/carlos/edge/device/storage/blackbox.py +++ b/lib/py_edge_device/carlos/edge/device/storage/blackbox.py @@ -32,10 +32,9 @@ async def record( async with self._engine.connect() as connection: - timeseries_id_to_value: dict[int, float] = {} - await self._ensure_index_hydrated(connection, driver_identifier) + timeseries_id_to_value: dict[int, float] = {} for driver_signal, value in data.items(): timeseries_id = await self._get_timeseries_id( connection=connection, @@ -44,12 +43,13 @@ async def record( ) timeseries_id_to_value[timeseries_id] = value - ts_input = TimeseriesInput( - timestamp_utc=read_timestamp, values=timeseries_id_to_value + await add_timeseries_data( + connection=connection, + timeseries_input=TimeseriesInput( + timestamp_utc=read_timestamp, values=timeseries_id_to_value + ), ) - await add_timeseries_data(connection=connection, timeseries_input=ts_input) - logger.debug(f"Recorded data from driver {driver_identifier}.") async def _ensure_index_hydrated( @@ -60,15 +60,14 @@ async def _ensure_index_hydrated( if driver_identifier in self._timeseries_id_index: return - self._timeseries_id_index[driver_identifier] = {} - matching = await find_timeseries_index( connection=connection, driver_identifier=driver_identifier ) + + driver_index = {} for match in matching: - self._timeseries_id_index[driver_identifier][ - match.driver_signal - ] = match.timeseries_id + driver_index[match.driver_signal] = match.timeseries_id + self._timeseries_id_index[driver_identifier] = driver_index async def _get_timeseries_id( self, connection: AsyncConnection, driver_identifier: str, driver_signal: str @@ -83,7 +82,7 @@ async def _get_timeseries_id( """ try: - timeseries_id = self._timeseries_id_index[driver_identifier][driver_signal] + return self._timeseries_id_index[driver_identifier][driver_signal] except KeyError: created = await create_timeseries_index( connection=connection, @@ -92,7 +91,8 @@ async def _get_timeseries_id( driver_signal=driver_signal, ), ) - timeseries_id = created.timeseries_id - self._timeseries_id_index[driver_identifier][driver_signal] = timeseries_id + self._timeseries_id_index[driver_identifier][ + driver_signal + ] = created.timeseries_id - return timeseries_id + return created.timeseries_id From 6a0ef57cc20fa09b654c1e8072171776cff33d7a Mon Sep 17 00:00:00 2001 From: flxdot Date: Sun, 28 Apr 2024 18:29:33 +0200 Subject: [PATCH 4/4] update lock files --- services/api/poetry.lock | 12 ++++++------ services/device/poetry.lock | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/services/api/poetry.lock b/services/api/poetry.lock index e3e6f7d6..dc36da2f 100644 --- a/services/api/poetry.lock +++ b/services/api/poetry.lock @@ -275,7 +275,7 @@ url = "../../lib/py_carlos_database" [[package]] name = "carlos-edge-device" -version = "0.1.8" +version = "0.1.9" description = "The library for the edge device of the carlos project." optional = false python-versions = ">=3.11,<3.12" @@ -1498,23 +1498,23 @@ tests = ["coverage[toml] (==5.0.4)", "pytest (>=6.0.0,<7.0.0)"] [[package]] name = "pytest" -version = "8.1.2" +version = "8.2.0" description = "pytest: simple powerful testing with Python" optional = false python-versions = ">=3.8" files = [ - {file = "pytest-8.1.2-py3-none-any.whl", hash = "sha256:6c06dc309ff46a05721e6fd48e492a775ed8165d2ecdf57f156a80c7e95bb142"}, - {file = "pytest-8.1.2.tar.gz", hash = "sha256:f3c45d1d5eed96b01a2aea70dee6a4a366d51d38f9957768083e4fecfc77f3ef"}, + {file = "pytest-8.2.0-py3-none-any.whl", hash = "sha256:1733f0620f6cda4095bbf0d9ff8022486e91892245bb9e7d5542c018f612f233"}, + {file = "pytest-8.2.0.tar.gz", hash = "sha256:d507d4482197eac0ba2bae2e9babf0672eb333017bcedaa5fb1a3d42c1174b3f"}, ] [package.dependencies] colorama = {version = "*", markers = "sys_platform == \"win32\""} iniconfig = "*" packaging = "*" -pluggy = ">=1.4,<2.0" +pluggy = ">=1.5,<2.0" [package.extras] -testing = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] +dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] [[package]] name = "pytest-asyncio" diff --git a/services/device/poetry.lock b/services/device/poetry.lock index 9f374fcf..add9898f 100644 --- a/services/device/poetry.lock +++ b/services/device/poetry.lock @@ -185,7 +185,7 @@ test = ["coverage", "freezegun", "pre-commit", "pytest", "pytest-cov", "pytest-m [[package]] name = "carlos-edge-device" -version = "0.1.8" +version = "0.1.9" description = "The library for the edge device of the carlos project." optional = false python-versions = ">=3.11,<3.12" @@ -1095,23 +1095,23 @@ windows-terminal = ["colorama (>=0.4.6)"] [[package]] name = "pytest" -version = "8.1.2" +version = "8.2.0" description = "pytest: simple powerful testing with Python" optional = false python-versions = ">=3.8" files = [ - {file = "pytest-8.1.2-py3-none-any.whl", hash = "sha256:6c06dc309ff46a05721e6fd48e492a775ed8165d2ecdf57f156a80c7e95bb142"}, - {file = "pytest-8.1.2.tar.gz", hash = "sha256:f3c45d1d5eed96b01a2aea70dee6a4a366d51d38f9957768083e4fecfc77f3ef"}, + {file = "pytest-8.2.0-py3-none-any.whl", hash = "sha256:1733f0620f6cda4095bbf0d9ff8022486e91892245bb9e7d5542c018f612f233"}, + {file = "pytest-8.2.0.tar.gz", hash = "sha256:d507d4482197eac0ba2bae2e9babf0672eb333017bcedaa5fb1a3d42c1174b3f"}, ] [package.dependencies] colorama = {version = "*", markers = "sys_platform == \"win32\""} iniconfig = "*" packaging = "*" -pluggy = ">=1.4,<2.0" +pluggy = ">=1.5,<2.0" [package.extras] -testing = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] +dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] [[package]] name = "pytest-asyncio"