Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Create blackbox to record data #44

Merged
merged 5 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions lib/py_edge_device/carlos/edge/device/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import asyncio
import signal
from datetime import timedelta
from datetime import UTC, datetime, timedelta
from typing import Self

from apscheduler import AsyncScheduler
Expand All @@ -19,6 +19,8 @@
from .communication import ClientEdgeCommunicationHandler
from .config import load_drivers
from .constants import LOCAL_DEVICE_STORAGE_PATH
from .storage.blackbox import Blackbox
from .storage.connection import get_async_storage_engine
from .storage.migration import alembic_upgrade


Expand Down Expand Up @@ -131,7 +133,7 @@ async def _send_pending_data(self):
pass


INPUT_SAMPLE_INTERVAL = 60
INPUT_SAMPLE_INTERVAL = 2.5 * 60 # 2.5 minutes
"""The time between two consecutive samples of the input devices in seconds."""


Expand All @@ -142,6 +144,8 @@ def __init__(self):
self.drivers = {driver.identifier: driver for driver in load_drivers()}
validate_device_address_space(self.drivers.values())

self.blackbox = Blackbox(engine=get_async_storage_engine())

def setup(self) -> Self:
"""Sets up the I/O peripherals."""
for driver in self.drivers.values():
Expand All @@ -157,19 +161,31 @@ async def register_tasks(self, scheduler: AsyncScheduler) -> Self:
if isinstance(driver, InputDriver):
await scheduler.add_schedule(
func_or_task_id=self.read_input,
kwargs={"driver": driver.identifier},
kwargs={"driver_identifier": driver.identifier},
trigger=IntervalTrigger(seconds=INPUT_SAMPLE_INTERVAL),
)

return self

async def read_input(self, driver: str):
async def read_input(self, driver_identifier: str):
"""Reads the value of the input driver."""
logger.debug(f"Reading data from driver {driver}.")
data = await self.drivers[driver].read_async()

# todo: write to database
logger.debug(f"Received data from driver {driver}: {data}")
logger.debug(f"Reading data from driver {driver_identifier}.")

read_start = datetime.now(tz=UTC)
data = await self.drivers[driver_identifier].read_async()
read_end = datetime.now(tz=UTC)
# We assume that the actual read time is in the middle of the
# start and end time.
read_act = read_start + (read_end - read_start) / 2

logger.debug(f"Received data from driver {driver_identifier}: {data}")

await self.blackbox.record(
driver_identifier=driver_identifier,
read_timestamp=read_act,
data=data,
)


async def send_ping(
Expand Down
98 changes: 98 additions & 0 deletions lib/py_edge_device/carlos/edge/device/storage/blackbox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from datetime import datetime

from loguru import logger
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine

from .timeseries_data import TimeseriesInput, add_timeseries_data
from .timeseries_index import (
TimeseriesIndexMutation,
create_timeseries_index,
find_timeseries_index,
)


class Blackbox:
"""The black box is used to store measurement data locally on the device. This data
is stored to be send to the server at a later time. This is useful in case the device
is not able to send the data to the server immediately. The black box stores the data
in a SQLite database. The data is stored in the timeseries_data table.
"""

def __init__(self, engine: AsyncEngine):
self._engine = engine
self._timeseries_id_index: dict[str, dict[str, int]] = {}

async def record(
self,
driver_identifier: str,
read_timestamp: datetime,
data: dict[str, float],
) -> None:
"""Inserts the reading into the database."""

async with self._engine.connect() as connection:

await self._ensure_index_hydrated(connection, driver_identifier)

timeseries_id_to_value: dict[int, float] = {}
for driver_signal, value in data.items():
timeseries_id = await self._get_timeseries_id(
connection=connection,
driver_identifier=driver_identifier,
driver_signal=driver_signal,
)
timeseries_id_to_value[timeseries_id] = value

await add_timeseries_data(
connection=connection,
timeseries_input=TimeseriesInput(
timestamp_utc=read_timestamp, values=timeseries_id_to_value
),
)

logger.debug(f"Recorded data from driver {driver_identifier}.")

async def _ensure_index_hydrated(
self, connection: AsyncConnection, driver_identifier: str
):
"""Hydrates the timeseries_id index for the given driver_identifier."""

if driver_identifier in self._timeseries_id_index:
return

matching = await find_timeseries_index(
connection=connection, driver_identifier=driver_identifier
)

driver_index = {}
for match in matching:
driver_index[match.driver_signal] = match.timeseries_id
self._timeseries_id_index[driver_identifier] = driver_index

async def _get_timeseries_id(
self, connection: AsyncConnection, driver_identifier: str, driver_signal: str
) -> int:
"""Fetches the timeseries_id for the given driver_identifier and driver_signal.
If the timeseries_id does not exist, it is created and returned.

:param connection: The connection to the database.
:param driver_identifier: The driver identifier.
:param driver_signal: The driver signal.
:return: The timeseries_id.
"""

try:
return self._timeseries_id_index[driver_identifier][driver_signal]
except KeyError:
created = await create_timeseries_index(
connection=connection,
timeseries_index=TimeseriesIndexMutation(
driver_identifier=driver_identifier,
driver_signal=driver_signal,
),
)
self._timeseries_id_index[driver_identifier][
driver_signal
] = created.timeseries_id

return created.timeseries_id
78 changes: 78 additions & 0 deletions lib/py_edge_device/carlos/edge/device/storage/blackbox_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from datetime import UTC, datetime
from random import randint

from sqlalchemy import delete, func, select
from sqlalchemy.ext.asyncio import AsyncEngine

from .blackbox import Blackbox
from .orm import TimeseriesDataOrm, TimeseriesIndexOrm
from .timeseries_index import find_timeseries_index


async def test_blackbox(async_engine: AsyncEngine):
"""This function ensures that the blackbox works as expected."""

blackbox = Blackbox(engine=async_engine)

fake_data = {
"driver_signal_int": 1,
"driver_signal_float": 2.0,
"driver_signal_bool": True,
}
driver_identifier = "driver_identifier"

await blackbox.record(
driver_identifier=driver_identifier,
read_timestamp=datetime.now(tz=UTC),
data=fake_data,
)

# check if all index entries are made
async with async_engine.connect() as connection:
index_entries = await find_timeseries_index(
connection, driver_identifier=driver_identifier
)
assert len(index_entries) == len(fake_data)

for index_entry in index_entries:
assert index_entry.driver_identifier == driver_identifier
assert index_entry.driver_signal in fake_data
assert index_entry.server_timeseries_id is None

sample_cnt = randint(3, 10)

# Record with multiple blackboxes to hit different paths of the code
blackbox2 = Blackbox(engine=async_engine)

# check if the data is recorded correctly
for _ in range(sample_cnt):
fake_data = {
"driver_signal_int": randint(-100, 100),
"driver_signal_float": randint(-100, 100) * 1.0,
"driver_signal_bool": bool(randint(0, 1)),
}

await blackbox2.record(
driver_identifier=driver_identifier,
read_timestamp=datetime.now(tz=UTC),
data=fake_data,
)

# count the number of entries per timeseries_id
async with async_engine.connect() as connection:
query = select(
TimeseriesDataOrm.timeseries_id,
func.count(TimeseriesDataOrm.timeseries_id).label("sample_cnt"),
).group_by(TimeseriesDataOrm.timeseries_id)

result = (await connection.execute(query)).all()

for timeseries_id, cnt in result:
assert cnt == sample_cnt + 1 # +1 because of the first record

# final cleanup
async with async_engine.connect() as connection:
# clean up
await connection.execute(delete(TimeseriesDataOrm))
await connection.execute(delete(TimeseriesIndexOrm))
await connection.commit()
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def add_timeseries_data(
{
"timeseries_id": timeseries_id,
"timestamp_utc": int(timeseries_input.timestamp_utc.timestamp()),
"value": value,
"value": float(value),
}
for timeseries_id, value in timeseries_input.values.items()
]
Expand Down
4 changes: 2 additions & 2 deletions lib/py_edge_device/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "carlos.edge.device"
version = "0.1.8"
version = "0.1.9"
description = "The library for the edge device of the carlos project."
authors = ["Felix Fanghanel"]
license = "MIT"
Expand Down Expand Up @@ -35,7 +35,7 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.bumpversion]
current_version = "0.1.8"
current_version = "0.1.9"
commit = true
tag = false
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)(\\-(?P<prerelease>[a-z0-9\\.]+))?"
Expand Down
12 changes: 6 additions & 6 deletions services/api/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions services/device/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading