Skip to content

Commit

Permalink
feat: send data from device and insert into database
Browse files Browse the repository at this point in the history
  • Loading branch information
flxdot committed May 1, 2024
1 parent ee5ab1d commit b95ee04
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 4 deletions.
27 changes: 26 additions & 1 deletion lib/py_edge_device/carlos/edge/device/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
EdgeVersionPayload,
MessageType,
)
from carlos.edge.interface.messages import DeviceConfigResponsePayload
from carlos.edge.interface.messages import (
DeviceConfigResponsePayload,
DriverDataAckPayload,
)
from loguru import logger
from semver import Version

from .constants import VERSION
from .storage.connection import get_async_storage_engine
from .storage.exceptions import NotFoundError
from .storage.timeseries_data import confirm_staged_data
from .storage.timeseries_index import find_timeseries_index, update_timeseries_index
from .update import update_device

Expand All @@ -38,6 +42,7 @@ def __init__(self, protocol: EdgeProtocol, device_id: DeviceId):
{
MessageType.EDGE_VERSION: handle_edge_version,
MessageType.DEVICE_CONFIG_RESPONSE: handle_device_config_response,
MessageType.DRIVER_DATA_ACK: handle_driver_data_ack,
}
)

Expand Down Expand Up @@ -123,3 +128,23 @@ async def handle_device_config_response(
f"Error updating timeseries index for {driver_identifier=} and"
f" {signal_identifier=}."
)


async def handle_driver_data_ack(
protocol: EdgeProtocol, message: CarlosMessage
): # pragma: no cover
"""Handles the incoming driver data ack message.
This message is sent by the server as an acknowledgment of the driver data
received.
:param protocol: The protocol to use for communication.
:param message: The incoming message.
"""

ackn_message = DriverDataAckPayload.model_validate(message.payload)

async with get_async_storage_engine().connect() as connection:
await confirm_staged_data(
connection=connection, staging_id=ackn_message.staging_id
)
21 changes: 19 additions & 2 deletions lib/py_edge_device/carlos/edge/device/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
from .communication import ClientEdgeCommunicationHandler
from .constants import LOCAL_DEVICE_STORAGE_PATH
from .driver_manager import DriverManager
from .storage.connection import get_async_storage_engine
from .storage.migration import alembic_upgrade
from .storage.timeseries_data import stage_timeseries_data


# We don't cover this in the unit tests. This needs to be tested in an integration test.
Expand Down Expand Up @@ -145,8 +147,23 @@ async def _run_task_scheduler(self):

async def _send_pending_data(self):
"""Sends the pending data to the server."""
logger.debug("Sending pending data to the server.")
pass

if not self.communication_handler.protocol.is_connected:
logger.warning("Cannot send pending data, as the device is not connected.")
return

async with get_async_storage_engine().connect() as connection:
staged_data = await stage_timeseries_data(connection=connection)

if staged_data is not None:
return

await self.communication_handler.send(
CarlosMessage(
message_type=MessageType.DRIVER_DATA,
payload=staged_data,
)
)


async def send_ping(
Expand Down
1 change: 1 addition & 0 deletions lib/py_edge_interface/carlos/edge/interface/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"CarlosPayload",
"DeviceConfigPayload",
"DeviceConfigResponsePayload",
"DriverDataAckPayload",
"DriverDataPayload",
"DriverTimeseries",
"EdgeVersionPayload",
Expand Down
40 changes: 39 additions & 1 deletion lib/py_edge_server/carlos/edge/server/device_handler.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
__all__ = ["ServerEdgeCommunicationHandler"]

from datetime import datetime

from carlos.database.connection import get_async_carlos_db_connection
from carlos.database.context import RequestContext
from carlos.database.data.timeseries import add_timeseries
from carlos.database.device import (
CarlosDeviceDriverCreate,
CarlosDeviceSignalCreate,
Expand All @@ -20,7 +22,10 @@
EdgeProtocol,
MessageType,
)
from carlos.edge.interface.messages import DeviceConfigResponsePayload
from carlos.edge.interface.messages import (
DeviceConfigResponsePayload,
DriverDataPayload, DriverDataAckPayload,
)

from carlos.edge.server.constants import CLIENT_NAME

Expand All @@ -34,6 +39,7 @@ def __init__(self, device_id: DeviceId, protocol: EdgeProtocol):
self.register_handlers(
{
MessageType.DEVICE_CONFIG: self.handle_device_config,
MessageType.DRIVER_DATA: self.handle_driver_data,
}
)

Expand Down Expand Up @@ -152,3 +158,35 @@ async def _build_device_config_response(
message_type=MessageType.DEVICE_CONFIG_RESPONSE,
payload=DeviceConfigResponsePayload(timeseries_index=timeseries_index),
)

async def handle_driver_data(self, protocol: EdgeProtocol, message: CarlosMessage):
"""Handles the DRIVER_DATA message.
:param protocol: The protocol to use for communication.
:param message: The incoming message.
"""

driver_data = DriverDataPayload.model_validate(message.payload)

async with get_async_carlos_db_connection(
client_name=CLIENT_NAME
) as connection:
context = RequestContext(connection=connection)

for timeseries_id, driver_timeseries in driver_data.data.items():
await add_timeseries(
context=context,
timeseries_id=timeseries_id,
timestamps=[
datetime.fromtimestamp(ts)
for ts in driver_timeseries.timestamps_utc
],
values=driver_timeseries.values,
)

await self.send(
CarlosMessage(
message_type=MessageType.DRIVER_DATA_ACK,
payload=DriverDataAckPayload(staging_id=driver_data.staging_id),
)
)

0 comments on commit b95ee04

Please sign in to comment.