Skip to content

Commit

Permalink
feat: send data from device and insert into database (#53)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
flxdot and github-actions[bot] authored May 1, 2024
1 parent ee5ab1d commit 1b50ea4
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 22 deletions.
27 changes: 26 additions & 1 deletion lib/py_edge_device/carlos/edge/device/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
EdgeVersionPayload,
MessageType,
)
from carlos.edge.interface.messages import DeviceConfigResponsePayload
from carlos.edge.interface.messages import (
DeviceConfigResponsePayload,
DriverDataAckPayload,
)
from loguru import logger
from semver import Version

from .constants import VERSION
from .storage.connection import get_async_storage_engine
from .storage.exceptions import NotFoundError
from .storage.timeseries_data import confirm_staged_data
from .storage.timeseries_index import find_timeseries_index, update_timeseries_index
from .update import update_device

Expand All @@ -38,6 +42,7 @@ def __init__(self, protocol: EdgeProtocol, device_id: DeviceId):
{
MessageType.EDGE_VERSION: handle_edge_version,
MessageType.DEVICE_CONFIG_RESPONSE: handle_device_config_response,
MessageType.DRIVER_DATA_ACK: handle_driver_data_ack,
}
)

Expand Down Expand Up @@ -123,3 +128,23 @@ async def handle_device_config_response(
f"Error updating timeseries index for {driver_identifier=} and"
f" {signal_identifier=}."
)


async def handle_driver_data_ack(
protocol: EdgeProtocol, message: CarlosMessage
): # pragma: no cover
"""Handles the incoming driver data ack message.
This message is sent by the server as an acknowledgment of the driver data
received.
:param protocol: The protocol to use for communication.
:param message: The incoming message.
"""

ackn_message = DriverDataAckPayload.model_validate(message.payload)

async with get_async_storage_engine().connect() as connection:
await confirm_staged_data(
connection=connection, staging_id=ackn_message.staging_id
)
22 changes: 20 additions & 2 deletions lib/py_edge_device/carlos/edge/device/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
from .communication import ClientEdgeCommunicationHandler
from .constants import LOCAL_DEVICE_STORAGE_PATH
from .driver_manager import DriverManager
from .storage.connection import get_async_storage_engine
from .storage.migration import alembic_upgrade
from .storage.timeseries_data import stage_timeseries_data


# We don't cover this in the unit tests. This needs to be tested in an integration test.
Expand Down Expand Up @@ -145,8 +147,24 @@ async def _run_task_scheduler(self):

async def _send_pending_data(self):
"""Sends the pending data to the server."""
logger.debug("Sending pending data to the server.")
pass

if not self.communication_handler.protocol.is_connected:
logger.warning("Cannot send pending data, as the device is not connected.")
return

async with get_async_storage_engine().connect() as connection:
staged_data = await stage_timeseries_data(connection=connection)

if staged_data is None:
logger.debug("No data available to be staged.")
return

await self.communication_handler.send(
CarlosMessage(
message_type=MessageType.DRIVER_DATA,
payload=staged_data,
)
)


async def send_ping(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ async def stage_timeseries_data(
)
sample_ids = (await connection.execute(sample_ids_query)).scalars().all()

if not sample_ids:
return None

stage_stmt = (
update(TimeseriesDataOrm)
.values(
Expand All @@ -122,9 +125,6 @@ async def stage_timeseries_data(
)
staged_rows = (await connection.execute(staged_query)).all()

if not staged_rows:
return None

for row in staged_rows:
if row.timeseries_id not in payload.data:
payload.data[row.timeseries_id] = DriverTimeseries(
Expand Down
4 changes: 2 additions & 2 deletions lib/py_edge_device/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "carlos.edge.device"
version = "0.1.12"
version = "0.1.13"
description = "The library for the edge device of the carlos project."
authors = ["Felix Fanghanel"]
license = "MIT"
Expand Down Expand Up @@ -35,7 +35,7 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.bumpversion]
current_version = "0.1.12"
current_version = "0.1.13"
commit = true
tag = false
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)(\\-(?P<prerelease>[a-z0-9\\.]+))?"
Expand Down
1 change: 1 addition & 0 deletions lib/py_edge_interface/carlos/edge/interface/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"CarlosPayload",
"DeviceConfigPayload",
"DeviceConfigResponsePayload",
"DriverDataAckPayload",
"DriverDataPayload",
"DriverTimeseries",
"EdgeVersionPayload",
Expand Down
4 changes: 2 additions & 2 deletions lib/py_edge_interface/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "carlos.edge.interface"
version = "0.1.8"
version = "0.1.9"
description = "Shared library to handle the edge communication."
authors = ["Felix Fanghanel"]
license = "MIT"
Expand All @@ -23,7 +23,7 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.bumpversion]
current_version = "0.1.8"
current_version = "0.1.9"
commit = true
tag = false
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)(\\-(?P<prerelease>[a-z0-9\\.]+))?"
Expand Down
53 changes: 52 additions & 1 deletion lib/py_edge_server/carlos/edge/server/device_handler.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
__all__ = ["ServerEdgeCommunicationHandler"]

from datetime import UTC, datetime

from carlos.database.connection import get_async_carlos_db_connection
from carlos.database.context import RequestContext
from carlos.database.data.timeseries import add_timeseries
from carlos.database.device import (
CarlosDeviceDriverCreate,
CarlosDeviceSignalCreate,
Expand All @@ -20,7 +22,11 @@
EdgeProtocol,
MessageType,
)
from carlos.edge.interface.messages import DeviceConfigResponsePayload
from carlos.edge.interface.messages import (
DeviceConfigResponsePayload,
DriverDataAckPayload,
DriverDataPayload,
)

from carlos.edge.server.constants import CLIENT_NAME

Expand All @@ -34,6 +40,7 @@ def __init__(self, device_id: DeviceId, protocol: EdgeProtocol):
self.register_handlers(
{
MessageType.DEVICE_CONFIG: self.handle_device_config,
MessageType.DRIVER_DATA: self.handle_driver_data,
}
)

Expand Down Expand Up @@ -152,3 +159,47 @@ async def _build_device_config_response(
message_type=MessageType.DEVICE_CONFIG_RESPONSE,
payload=DeviceConfigResponsePayload(timeseries_index=timeseries_index),
)

async def handle_driver_data(
self,
protocol: EdgeProtocol,
message: CarlosMessage,
): # pragma: no cover
"""Handles the DRIVER_DATA message.
:param protocol: The protocol to use for communication.
:param message: The incoming message.
"""

driver_data = DriverDataPayload.model_validate(message.payload)

async with get_async_carlos_db_connection(
client_name=CLIENT_NAME
) as connection:
context = RequestContext(connection=connection)

for timeseries_id, driver_timeseries in driver_data.data.items():
await add_timeseries(
context=context,
timeseries_id=timeseries_id,
timestamps=convert_timestamps_to_datetime(
driver_timeseries.timestamps_utc
),
values=driver_timeseries.values,
)

await self.send(
CarlosMessage(
message_type=MessageType.DRIVER_DATA_ACK,
payload=DriverDataAckPayload(staging_id=driver_data.staging_id),
)
)


def convert_timestamps_to_datetime(utc_timestamps: list[int]) -> list[datetime]:
"""Converts a list of timestamps to a list of datetime objects.
:param utc_timestamps: The list of timestamps. That are assumed to be in UTC.
:return: The list of datetime objects."""

return [datetime.fromtimestamp(ts, tz=UTC) for ts in utc_timestamps]
25 changes: 23 additions & 2 deletions lib/py_edge_server/carlos/edge/server/device_handler_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import UTC, datetime

import pytest
from carlos.database.context import RequestContext
from carlos.database.device import (
Expand All @@ -14,9 +16,12 @@
DriverSignal,
)
from carlos.edge.interface.plugin_pytest import EdgeProtocolTestingConnection
from carlos.edge.interface.units import UnitOfMeasurement

from ..interface.units import UnitOfMeasurement
from .device_handler import ServerEdgeCommunicationHandler
from .device_handler import (
ServerEdgeCommunicationHandler,
convert_timestamps_to_datetime,
)


@pytest.fixture()
Expand Down Expand Up @@ -146,3 +151,19 @@ async def test_handle_device_config(
device_id=driver.device_id,
driver_identifier=driver.driver_identifier,
)


def test_convert_timestamps_to_datetime():
"""Ensures that the convert_timestamps_to_datetime method works correctly."""

datetimes = [
datetime(2021, 1, 1, 0, 0, 0, tzinfo=UTC),
datetime(2021, 1, 2, 0, 0, 0, tzinfo=UTC),
datetime(2021, 1, 3, 0, 0, 0, tzinfo=UTC),
]

utc_timestamps = [int(dt.timestamp()) for dt in datetimes]

converted_datetimes = convert_timestamps_to_datetime(utc_timestamps)

assert converted_datetimes == datetimes, "The conversion did not work as expected."
14 changes: 13 additions & 1 deletion lib/py_edge_server/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions lib/py_edge_server/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "carlos.edge.server"
version = "0.1.4"
version = "0.1.5"
description = "The library for the edge server of the carlos project."
authors = ["Felix Fanghanel"]
license = "MIT"
Expand All @@ -27,7 +27,7 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.bumpversion]
current_version = "0.1.4"
current_version = "0.1.5"
commit = true
tag = false
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)(\\-(?P<prerelease>[a-z0-9\\.]+))?"
Expand Down
8 changes: 4 additions & 4 deletions services/api/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions services/device/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 1b50ea4

Please sign in to comment.