Skip to content

Commit

Permalink
Migrate generic alerts to remove use of Book Keeper
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Chong <aaronchongth@gmail.com>
  • Loading branch information
aaronchongth committed Jul 22, 2024
1 parent eecd101 commit 379a554
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 27 deletions.
18 changes: 16 additions & 2 deletions packages/api-server/api_server/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

from api_server.fast_io.singleton_dep import singleton_dep
from api_server.models.user import User
from api_server.repositories.alerts import AlertRepository
from api_server.repositories.cached_files import get_cached_file_repo
from api_server.repositories.rmf import RmfRepository
from api_server.rmf_io.events import (
Expand Down Expand Up @@ -70,6 +71,7 @@ def __init__(
cached_files: CachedFilesRepository,
ros_node: rclpy.node.Node,
alert_events: AlertEvents,
alert_repo: AlertRepository,
rmf_events: RmfEvents,
rmf_repo: RmfRepository,
loop: asyncio.AbstractEventLoop,
Expand All @@ -79,6 +81,7 @@ def __init__(
self._cached_files = cached_files
self._ros_node = ros_node
self._alert_events = alert_events
self._alert_repo = alert_repo
self._rmf_events = rmf_events
self._rmf_repo = rmf_repo
self._loop = loop
Expand Down Expand Up @@ -354,8 +357,13 @@ def convert_alert(msg):
)

def handle_alert(alert: AlertRequest):
async def save(alert: AlertRequest):
await self._alert_repo.save_alert_request(alert)
self._alert_events.alert_requests.on_next(alert)
logging.debug("%s", alert)

logging.info(f"Received alert: {alert}")
self._alert_events.alert_requests.on_next(alert)
self._loop.create_task(save(alert))

alert_sub = self._ros_node.create_subscription(
RmfAlert,
Expand All @@ -379,8 +387,13 @@ def convert_alert_response(msg):
)

def handle_alert_response(alert_response: AlertResponse):
async def save(alert_response: AlertResponse):
await self._alert_repo.save_alert_response(alert_response)
self._alert_events.alert_responses.on_next(alert_response)
logging.debug("%s", alert_response)

logging.info(f"Received alert response: {alert_response}")
self._alert_events.alert_responses.on_next(alert_response)
self._loop.create_task(save(alert_response))

alert_response_sub = self._ros_node.create_subscription(
RmfAlertResponse,
Expand Down Expand Up @@ -505,6 +518,7 @@ def get_rmf_gateway():
get_cached_file_repo(),
get_ros_node(),
get_alert_events(),
AlertRepository(),
get_rmf_events(),
RmfRepository(User.get_system_user()),
asyncio.get_event_loop(),
Expand Down
25 changes: 0 additions & 25 deletions packages/api-server/api_server/models/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,6 @@ class AlertResponse(BaseModel):
def from_tortoise(tortoise: ttm.AlertResponse) -> "AlertResponse":
return AlertResponse(**dict(tortoise.data))

async def save(self) -> None:
await ttm.AlertResponse.update_or_create(
{
"response_time": datetime.fromtimestamp(
self.unix_millis_response_time / 1000
),
"response": self.response,
"data": self.json(),
},
id=self.id,
)


class AlertRequest(BaseModel):
class Tier(str, Enum):
Expand All @@ -53,16 +41,3 @@ class Tier(str, Enum):
@staticmethod
def from_tortoise(tortoise: ttm.AlertRequest) -> "AlertRequest":
return AlertRequest(**dict(tortoise.data))

async def save(self) -> None:
await ttm.AlertRequest.update_or_create(
{
"request_time": datetime.fromtimestamp(
self.unix_millis_alert_time / 1000
),
"response_expected": (len(self.responses_available) > 0),
"task_id": self.task_id,
"data": self.json(),
},
id=self.id,
)
25 changes: 25 additions & 0 deletions packages/api-server/api_server/repositories/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,28 @@ async def get_unresponded_alerts(
query = query.limit(pagination.limit).offset(pagination.offset)
unresponded_alerts = await query.all()
return [AlertRequest.from_tortoise(alert) for alert in unresponded_alerts]

async def save_alert_request(self, alert_request: AlertRequest) -> None:
await ttm.AlertRequest.update_or_create(
{
"request_time": datetime.fromtimestamp(
alert_request.unix_millis_alert_time / 1000
),
"response_expected": (len(alert_request.responses_available) > 0),
"task_id": alert_request.task_id,
"data": alert_request.json(),
},
id=alert_request.id,
)

async def save_alert_response(self, alert_response: AlertResponse) -> None:
await ttm.AlertResponse.update_or_create(
{
"response_time": datetime.fromtimestamp(
alert_response.unix_millis_response_time / 1000
),
"response": alert_response.response,
"data": alert_response.json(),
},
id=alert_response.id,
)

0 comments on commit 379a554

Please sign in to comment.