Skip to content

Commit

Permalink
feat: Create blackbox to record data (#44)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
flxdot and github-actions[bot] authored Apr 28, 2024
1 parent 7bcc5d5 commit 9021aba
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 23 deletions.
32 changes: 24 additions & 8 deletions lib/py_edge_device/carlos/edge/device/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import asyncio
import signal
from datetime import timedelta
from datetime import UTC, datetime, timedelta
from typing import Self

from apscheduler import AsyncScheduler
Expand All @@ -19,6 +19,8 @@
from .communication import ClientEdgeCommunicationHandler
from .config import load_drivers
from .constants import LOCAL_DEVICE_STORAGE_PATH
from .storage.blackbox import Blackbox
from .storage.connection import get_async_storage_engine
from .storage.migration import alembic_upgrade


Expand Down Expand Up @@ -131,7 +133,7 @@ async def _send_pending_data(self):
pass


INPUT_SAMPLE_INTERVAL = 60
INPUT_SAMPLE_INTERVAL = 2.5 * 60 # 2.5 minutes
"""The time between two consecutive samples of the input devices in seconds."""


Expand All @@ -142,6 +144,8 @@ def __init__(self):
self.drivers = {driver.identifier: driver for driver in load_drivers()}
validate_device_address_space(self.drivers.values())

self.blackbox = Blackbox(engine=get_async_storage_engine())

def setup(self) -> Self:
"""Sets up the I/O peripherals."""
for driver in self.drivers.values():
Expand All @@ -157,19 +161,31 @@ async def register_tasks(self, scheduler: AsyncScheduler) -> Self:
if isinstance(driver, InputDriver):
await scheduler.add_schedule(
func_or_task_id=self.read_input,
kwargs={"driver": driver.identifier},
kwargs={"driver_identifier": driver.identifier},
trigger=IntervalTrigger(seconds=INPUT_SAMPLE_INTERVAL),
)

return self

async def read_input(self, driver: str):
async def read_input(self, driver_identifier: str):
"""Reads the value of the input driver."""
logger.debug(f"Reading data from driver {driver}.")
data = await self.drivers[driver].read_async()

# todo: write to database
logger.debug(f"Received data from driver {driver}: {data}")
logger.debug(f"Reading data from driver {driver_identifier}.")

read_start = datetime.now(tz=UTC)
data = await self.drivers[driver_identifier].read_async()
read_end = datetime.now(tz=UTC)
# We assume that the actual read time is in the middle of the
# start and end time.
read_act = read_start + (read_end - read_start) / 2

logger.debug(f"Received data from driver {driver_identifier}: {data}")

await self.blackbox.record(
driver_identifier=driver_identifier,
read_timestamp=read_act,
data=data,
)


async def send_ping(
Expand Down
98 changes: 98 additions & 0 deletions lib/py_edge_device/carlos/edge/device/storage/blackbox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from datetime import datetime

from loguru import logger
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine

from .timeseries_data import TimeseriesInput, add_timeseries_data
from .timeseries_index import (
TimeseriesIndexMutation,
create_timeseries_index,
find_timeseries_index,
)


class Blackbox:
"""The black box is used to store measurement data locally on the device. This data
is stored to be send to the server at a later time. This is useful in case the device
is not able to send the data to the server immediately. The black box stores the data
in a SQLite database. The data is stored in the timeseries_data table.
"""

def __init__(self, engine: AsyncEngine):
self._engine = engine
self._timeseries_id_index: dict[str, dict[str, int]] = {}

async def record(
self,
driver_identifier: str,
read_timestamp: datetime,
data: dict[str, float],
) -> None:
"""Inserts the reading into the database."""

async with self._engine.connect() as connection:

await self._ensure_index_hydrated(connection, driver_identifier)

timeseries_id_to_value: dict[int, float] = {}
for driver_signal, value in data.items():
timeseries_id = await self._get_timeseries_id(
connection=connection,
driver_identifier=driver_identifier,
driver_signal=driver_signal,
)
timeseries_id_to_value[timeseries_id] = value

await add_timeseries_data(
connection=connection,
timeseries_input=TimeseriesInput(
timestamp_utc=read_timestamp, values=timeseries_id_to_value
),
)

logger.debug(f"Recorded data from driver {driver_identifier}.")

async def _ensure_index_hydrated(
self, connection: AsyncConnection, driver_identifier: str
):
"""Hydrates the timeseries_id index for the given driver_identifier."""

if driver_identifier in self._timeseries_id_index:
return

matching = await find_timeseries_index(
connection=connection, driver_identifier=driver_identifier
)

driver_index = {}
for match in matching:
driver_index[match.driver_signal] = match.timeseries_id
self._timeseries_id_index[driver_identifier] = driver_index

async def _get_timeseries_id(
self, connection: AsyncConnection, driver_identifier: str, driver_signal: str
) -> int:
"""Fetches the timeseries_id for the given driver_identifier and driver_signal.
If the timeseries_id does not exist, it is created and returned.
:param connection: The connection to the database.
:param driver_identifier: The driver identifier.
:param driver_signal: The driver signal.
:return: The timeseries_id.
"""

try:
return self._timeseries_id_index[driver_identifier][driver_signal]
except KeyError:
created = await create_timeseries_index(
connection=connection,
timeseries_index=TimeseriesIndexMutation(
driver_identifier=driver_identifier,
driver_signal=driver_signal,
),
)
self._timeseries_id_index[driver_identifier][
driver_signal
] = created.timeseries_id

return created.timeseries_id
78 changes: 78 additions & 0 deletions lib/py_edge_device/carlos/edge/device/storage/blackbox_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from datetime import UTC, datetime
from random import randint

from sqlalchemy import delete, func, select
from sqlalchemy.ext.asyncio import AsyncEngine

from .blackbox import Blackbox
from .orm import TimeseriesDataOrm, TimeseriesIndexOrm
from .timeseries_index import find_timeseries_index


async def test_blackbox(async_engine: AsyncEngine):
"""This function ensures that the blackbox works as expected."""

blackbox = Blackbox(engine=async_engine)

fake_data = {
"driver_signal_int": 1,
"driver_signal_float": 2.0,
"driver_signal_bool": True,
}
driver_identifier = "driver_identifier"

await blackbox.record(
driver_identifier=driver_identifier,
read_timestamp=datetime.now(tz=UTC),
data=fake_data,
)

# check if all index entries are made
async with async_engine.connect() as connection:
index_entries = await find_timeseries_index(
connection, driver_identifier=driver_identifier
)
assert len(index_entries) == len(fake_data)

for index_entry in index_entries:
assert index_entry.driver_identifier == driver_identifier
assert index_entry.driver_signal in fake_data
assert index_entry.server_timeseries_id is None

sample_cnt = randint(3, 10)

# Record with multiple blackboxes to hit different paths of the code
blackbox2 = Blackbox(engine=async_engine)

# check if the data is recorded correctly
for _ in range(sample_cnt):
fake_data = {
"driver_signal_int": randint(-100, 100),
"driver_signal_float": randint(-100, 100) * 1.0,
"driver_signal_bool": bool(randint(0, 1)),
}

await blackbox2.record(
driver_identifier=driver_identifier,
read_timestamp=datetime.now(tz=UTC),
data=fake_data,
)

# count the number of entries per timeseries_id
async with async_engine.connect() as connection:
query = select(
TimeseriesDataOrm.timeseries_id,
func.count(TimeseriesDataOrm.timeseries_id).label("sample_cnt"),
).group_by(TimeseriesDataOrm.timeseries_id)

result = (await connection.execute(query)).all()

for timeseries_id, cnt in result:
assert cnt == sample_cnt + 1 # +1 because of the first record

# final cleanup
async with async_engine.connect() as connection:
# clean up
await connection.execute(delete(TimeseriesDataOrm))
await connection.execute(delete(TimeseriesIndexOrm))
await connection.commit()
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def add_timeseries_data(
{
"timeseries_id": timeseries_id,
"timestamp_utc": int(timeseries_input.timestamp_utc.timestamp()),
"value": value,
"value": float(value),
}
for timeseries_id, value in timeseries_input.values.items()
]
Expand Down
4 changes: 2 additions & 2 deletions lib/py_edge_device/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "carlos.edge.device"
version = "0.1.8"
version = "0.1.9"
description = "The library for the edge device of the carlos project."
authors = ["Felix Fanghanel"]
license = "MIT"
Expand Down Expand Up @@ -35,7 +35,7 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.bumpversion]
current_version = "0.1.8"
current_version = "0.1.9"
commit = true
tag = false
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)(\\-(?P<prerelease>[a-z0-9\\.]+))?"
Expand Down
12 changes: 6 additions & 6 deletions services/api/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions services/device/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 9021aba

Please sign in to comment.