Skip to content

Commit

Permalink
Set an execution timeout for actions, and show an error in activity logs
Browse files Browse the repository at this point in the history
  • Loading branch information
marianobrc committed Dec 26, 2024
1 parent 6504c41 commit eeb3769
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
29 changes: 28 additions & 1 deletion app/services/action_runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import logging
import time

import httpx
import pydantic
Expand Down Expand Up @@ -81,7 +83,11 @@ async def execute_action(integration_id: str, action_id: str, config_overrides:
if config_overrides:
config_data.update(config_overrides)
parsed_config = config_model.parse_obj(config_data)
result = await handler(integration=integration, action_config=parsed_config)
start_time = time.monotonic()
result = await asyncio.wait_for(
handler(integration=integration, action_config=parsed_config),
timeout=settings.MAX_ACTION_EXECUTION_TIME
)
except pydantic.ValidationError as e:
message = f"Invalid configuration for action '{action_id}' and integration '{integration_id}': {e.errors()}"
logger.error(message)
Expand Down Expand Up @@ -118,6 +124,24 @@ async def execute_action(integration_id: str, action_id: str, config_overrides:
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=jsonable_encoder({"detail": message}),
)
except asyncio.TimeoutError:
message = f"Action '{action_id}' timed out after {settings.MAX_ACTION_EXECUTION_TIME} seconds. Please consider splitting the workload in sub-actions."
logger.exception(message)
await publish_event(
event=IntegrationActionFailed(
payload=ActionExecutionFailed(
integration_id=integration_id,
action_id=action_id,
config_data={"configurations": [c.dict() for c in integration.configurations]},
error=message
)
),
topic_name=settings.INTEGRATION_EVENTS_TOPIC,
)
return JSONResponse(
status_code=status.HTTP_504_GATEWAY_TIMEOUT,
content=jsonable_encoder({"detail": message}),
)
except Exception as e:
message = f"Internal error executing action '{action_id}': {e}"
logger.exception(message)
Expand All @@ -137,6 +161,9 @@ async def execute_action(integration_id: str, action_id: str, config_overrides:
content=jsonable_encoder({"detail": message}),
)
else:
end_time = time.monotonic()
execution_time = end_time - start_time
logger.debug(f"Action '{action_id}' executed successfully in {execution_time:.2f} seconds.")
return result


Expand Down
3 changes: 2 additions & 1 deletion app/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@
INTEGRATION_SERVICE_URL = env.str("INTEGRATION_SERVICE_URL", None) # Define a string id here e.g. "my_tracker"
PROCESS_PUBSUB_MESSAGES_IN_BACKGROUND = env.bool("PROCESS_PUBSUB_MESSAGES_IN_BACKGROUND", False)
PROCESS_WEBHOOKS_IN_BACKGROUND = env.bool("PROCESS_WEBHOOKS_IN_BACKGROUND", True)

MAX_ACTION_EXECUTION_TIME = env.int("MAX_ACTION_EXECUTION_TIME", 60 * 9) # 10 minutes is the maximum ack timeout

# Settings for system events & commands (EDA)
INTEGRATION_EVENTS_TOPIC = env.str("INTEGRATION_EVENTS_TOPIC", "integration-events")
default_commands_topic = f"{INTEGRATION_TYPE_SLUG}-actions-topic" if INTEGRATION_TYPE_SLUG else None
INTEGRATION_COMMANDS_TOPIC = env.str("INTEGRATION_COMMANDS_TOPIC", default_commands_topic)
TRIGGER_ACTIONS_ALWAYS_SYNC = env.bool("TRIGGER_ACTIONS_ALWAYS_SYNC", False)

0 comments on commit eeb3769

Please sign in to comment.