From eeb3769af763a3de66299c5741f08a5564f595f0 Mon Sep 17 00:00:00 2001 From: Mariano Martinez Grasso Date: Thu, 26 Dec 2024 17:34:54 -0300 Subject: [PATCH] Set an execution timeout for actions, and show an error in activity logs --- app/services/action_runner.py | 29 ++++++++++++++++++++++++++++- app/settings/base.py | 3 ++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/app/services/action_runner.py b/app/services/action_runner.py index 24555fe..ea68652 100644 --- a/app/services/action_runner.py +++ b/app/services/action_runner.py @@ -1,4 +1,6 @@ +import asyncio import logging +import time import httpx import pydantic @@ -81,7 +83,11 @@ async def execute_action(integration_id: str, action_id: str, config_overrides: if config_overrides: config_data.update(config_overrides) parsed_config = config_model.parse_obj(config_data) - result = await handler(integration=integration, action_config=parsed_config) + start_time = time.monotonic() + result = await asyncio.wait_for( + handler(integration=integration, action_config=parsed_config), + timeout=settings.MAX_ACTION_EXECUTION_TIME + ) except pydantic.ValidationError as e: message = f"Invalid configuration for action '{action_id}' and integration '{integration_id}': {e.errors()}" logger.error(message) @@ -118,6 +124,24 @@ async def execute_action(integration_id: str, action_id: str, config_overrides: status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content=jsonable_encoder({"detail": message}), ) + except asyncio.TimeoutError: + message = f"Action '{action_id}' timed out after {settings.MAX_ACTION_EXECUTION_TIME} seconds. Please consider splitting the workload in sub-actions." + logger.exception(message) + await publish_event( + event=IntegrationActionFailed( + payload=ActionExecutionFailed( + integration_id=integration_id, + action_id=action_id, + config_data={"configurations": [c.dict() for c in integration.configurations]}, + error=message + ) + ), + topic_name=settings.INTEGRATION_EVENTS_TOPIC, + ) + return JSONResponse( + status_code=status.HTTP_504_GATEWAY_TIMEOUT, + content=jsonable_encoder({"detail": message}), + ) except Exception as e: message = f"Internal error executing action '{action_id}': {e}" logger.exception(message) @@ -137,6 +161,9 @@ async def execute_action(integration_id: str, action_id: str, config_overrides: content=jsonable_encoder({"detail": message}), ) else: + end_time = time.monotonic() + execution_time = end_time - start_time + logger.debug(f"Action '{action_id}' executed successfully in {execution_time:.2f} seconds.") return result diff --git a/app/settings/base.py b/app/settings/base.py index df68b74..b63b9e3 100644 --- a/app/settings/base.py +++ b/app/settings/base.py @@ -65,9 +65,10 @@ INTEGRATION_SERVICE_URL = env.str("INTEGRATION_SERVICE_URL", None) # Define a string id here e.g. "my_tracker" PROCESS_PUBSUB_MESSAGES_IN_BACKGROUND = env.bool("PROCESS_PUBSUB_MESSAGES_IN_BACKGROUND", False) PROCESS_WEBHOOKS_IN_BACKGROUND = env.bool("PROCESS_WEBHOOKS_IN_BACKGROUND", True) - +MAX_ACTION_EXECUTION_TIME = env.int("MAX_ACTION_EXECUTION_TIME", 60 * 9) # 10 minutes is the maximum ack timeout # Settings for system events & commands (EDA) INTEGRATION_EVENTS_TOPIC = env.str("INTEGRATION_EVENTS_TOPIC", "integration-events") default_commands_topic = f"{INTEGRATION_TYPE_SLUG}-actions-topic" if INTEGRATION_TYPE_SLUG else None INTEGRATION_COMMANDS_TOPIC = env.str("INTEGRATION_COMMANDS_TOPIC", default_commands_topic) +TRIGGER_ACTIONS_ALWAYS_SYNC = env.bool("TRIGGER_ACTIONS_ALWAYS_SYNC", False)