Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send data from device and insert into database #53

Merged
merged 11 commits into from
May 1, 2024
27 changes: 26 additions & 1 deletion lib/py_edge_device/carlos/edge/device/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
EdgeVersionPayload,
MessageType,
)
from carlos.edge.interface.messages import DeviceConfigResponsePayload
from carlos.edge.interface.messages import (
DeviceConfigResponsePayload,
DriverDataAckPayload,
)
from loguru import logger
from semver import Version

from .constants import VERSION
from .storage.connection import get_async_storage_engine
from .storage.exceptions import NotFoundError
from .storage.timeseries_data import confirm_staged_data
from .storage.timeseries_index import find_timeseries_index, update_timeseries_index
from .update import update_device

Expand All @@ -38,6 +42,7 @@ def __init__(self, protocol: EdgeProtocol, device_id: DeviceId):
{
MessageType.EDGE_VERSION: handle_edge_version,
MessageType.DEVICE_CONFIG_RESPONSE: handle_device_config_response,
MessageType.DRIVER_DATA_ACK: handle_driver_data_ack,
}
)

Expand Down Expand Up @@ -123,3 +128,23 @@ async def handle_device_config_response(
f"Error updating timeseries index for {driver_identifier=} and"
f" {signal_identifier=}."
)


async def handle_driver_data_ack(
protocol: EdgeProtocol, message: CarlosMessage
): # pragma: no cover
"""Handles the incoming driver data ack message.

This message is sent by the server as an acknowledgment of the driver data
received.

:param protocol: The protocol to use for communication.
:param message: The incoming message.
"""

ackn_message = DriverDataAckPayload.model_validate(message.payload)

async with get_async_storage_engine().connect() as connection:
await confirm_staged_data(
connection=connection, staging_id=ackn_message.staging_id
)
22 changes: 20 additions & 2 deletions lib/py_edge_device/carlos/edge/device/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
from .communication import ClientEdgeCommunicationHandler
from .constants import LOCAL_DEVICE_STORAGE_PATH
from .driver_manager import DriverManager
from .storage.connection import get_async_storage_engine
from .storage.migration import alembic_upgrade
from .storage.timeseries_data import stage_timeseries_data


# We don't cover this in the unit tests. This needs to be tested in an integration test.
Expand Down Expand Up @@ -145,8 +147,24 @@ async def _run_task_scheduler(self):

async def _send_pending_data(self):
"""Sends the pending data to the server."""
logger.debug("Sending pending data to the server.")
pass

if not self.communication_handler.protocol.is_connected:
logger.warning("Cannot send pending data, as the device is not connected.")
return

async with get_async_storage_engine().connect() as connection:
staged_data = await stage_timeseries_data(connection=connection)

if staged_data is None:
logger.debug("No data available to be staged.")
return

await self.communication_handler.send(
CarlosMessage(
message_type=MessageType.DRIVER_DATA,
payload=staged_data,
)
)


async def send_ping(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ async def stage_timeseries_data(
)
sample_ids = (await connection.execute(sample_ids_query)).scalars().all()

if not sample_ids:
return None

stage_stmt = (
update(TimeseriesDataOrm)
.values(
Expand All @@ -122,9 +125,6 @@ async def stage_timeseries_data(
)
staged_rows = (await connection.execute(staged_query)).all()

if not staged_rows:
return None

for row in staged_rows:
if row.timeseries_id not in payload.data:
payload.data[row.timeseries_id] = DriverTimeseries(
Expand Down
4 changes: 2 additions & 2 deletions lib/py_edge_device/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "carlos.edge.device"
version = "0.1.12"
version = "0.1.13"
description = "The library for the edge device of the carlos project."
authors = ["Felix Fanghanel"]
license = "MIT"
Expand Down Expand Up @@ -35,7 +35,7 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.bumpversion]
current_version = "0.1.12"
current_version = "0.1.13"
commit = true
tag = false
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)(\\-(?P<prerelease>[a-z0-9\\.]+))?"
Expand Down
1 change: 1 addition & 0 deletions lib/py_edge_interface/carlos/edge/interface/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"CarlosPayload",
"DeviceConfigPayload",
"DeviceConfigResponsePayload",
"DriverDataAckPayload",
"DriverDataPayload",
"DriverTimeseries",
"EdgeVersionPayload",
Expand Down
4 changes: 2 additions & 2 deletions lib/py_edge_interface/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "carlos.edge.interface"
version = "0.1.8"
version = "0.1.9"
description = "Shared library to handle the edge communication."
authors = ["Felix Fanghanel"]
license = "MIT"
Expand All @@ -23,7 +23,7 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.bumpversion]
current_version = "0.1.8"
current_version = "0.1.9"
commit = true
tag = false
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)(\\-(?P<prerelease>[a-z0-9\\.]+))?"
Expand Down
53 changes: 52 additions & 1 deletion lib/py_edge_server/carlos/edge/server/device_handler.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
__all__ = ["ServerEdgeCommunicationHandler"]

from datetime import UTC, datetime

from carlos.database.connection import get_async_carlos_db_connection
from carlos.database.context import RequestContext
from carlos.database.data.timeseries import add_timeseries
from carlos.database.device import (
CarlosDeviceDriverCreate,
CarlosDeviceSignalCreate,
Expand All @@ -20,7 +22,11 @@
EdgeProtocol,
MessageType,
)
from carlos.edge.interface.messages import DeviceConfigResponsePayload
from carlos.edge.interface.messages import (
DeviceConfigResponsePayload,
DriverDataAckPayload,
DriverDataPayload,
)

from carlos.edge.server.constants import CLIENT_NAME

Expand All @@ -34,6 +40,7 @@ def __init__(self, device_id: DeviceId, protocol: EdgeProtocol):
self.register_handlers(
{
MessageType.DEVICE_CONFIG: self.handle_device_config,
MessageType.DRIVER_DATA: self.handle_driver_data,
}
)

Expand Down Expand Up @@ -152,3 +159,47 @@ async def _build_device_config_response(
message_type=MessageType.DEVICE_CONFIG_RESPONSE,
payload=DeviceConfigResponsePayload(timeseries_index=timeseries_index),
)

async def handle_driver_data(
self,
protocol: EdgeProtocol,
message: CarlosMessage,
): # pragma: no cover
"""Handles the DRIVER_DATA message.

:param protocol: The protocol to use for communication.
:param message: The incoming message.
"""

driver_data = DriverDataPayload.model_validate(message.payload)

async with get_async_carlos_db_connection(
client_name=CLIENT_NAME
) as connection:
context = RequestContext(connection=connection)

for timeseries_id, driver_timeseries in driver_data.data.items():
await add_timeseries(
context=context,
timeseries_id=timeseries_id,
timestamps=convert_timestamps_to_datetime(
driver_timeseries.timestamps_utc
),
values=driver_timeseries.values,
)

await self.send(
CarlosMessage(
message_type=MessageType.DRIVER_DATA_ACK,
payload=DriverDataAckPayload(staging_id=driver_data.staging_id),
)
)


def convert_timestamps_to_datetime(utc_timestamps: list[int]) -> list[datetime]:
"""Converts a list of timestamps to a list of datetime objects.

:param utc_timestamps: The list of timestamps. That are assumed to be in UTC.
:return: The list of datetime objects."""

return [datetime.fromtimestamp(ts, tz=UTC) for ts in utc_timestamps]
25 changes: 23 additions & 2 deletions lib/py_edge_server/carlos/edge/server/device_handler_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import UTC, datetime

import pytest
from carlos.database.context import RequestContext
from carlos.database.device import (
Expand All @@ -14,9 +16,12 @@
DriverSignal,
)
from carlos.edge.interface.plugin_pytest import EdgeProtocolTestingConnection
from carlos.edge.interface.units import UnitOfMeasurement

from ..interface.units import UnitOfMeasurement
from .device_handler import ServerEdgeCommunicationHandler
from .device_handler import (
ServerEdgeCommunicationHandler,
convert_timestamps_to_datetime,
)


@pytest.fixture()
Expand Down Expand Up @@ -146,3 +151,19 @@ async def test_handle_device_config(
device_id=driver.device_id,
driver_identifier=driver.driver_identifier,
)


def test_convert_timestamps_to_datetime():
"""Ensures that the convert_timestamps_to_datetime method works correctly."""

datetimes = [
datetime(2021, 1, 1, 0, 0, 0, tzinfo=UTC),
datetime(2021, 1, 2, 0, 0, 0, tzinfo=UTC),
datetime(2021, 1, 3, 0, 0, 0, tzinfo=UTC),
]

utc_timestamps = [int(dt.timestamp()) for dt in datetimes]

converted_datetimes = convert_timestamps_to_datetime(utc_timestamps)

assert converted_datetimes == datetimes, "The conversion did not work as expected."
14 changes: 13 additions & 1 deletion lib/py_edge_server/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions lib/py_edge_server/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "carlos.edge.server"
version = "0.1.4"
version = "0.1.5"
description = "The library for the edge server of the carlos project."
authors = ["Felix Fanghanel"]
license = "MIT"
Expand All @@ -27,7 +27,7 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.bumpversion]
current_version = "0.1.4"
current_version = "0.1.5"
commit = true
tag = false
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)(\\-(?P<prerelease>[a-z0-9\\.]+))?"
Expand Down
8 changes: 4 additions & 4 deletions services/api/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions services/device/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading