Skip to content

Commit

Permalink
🔄 synced local 'skyvern/' with remote 'skyvern/'
Browse files Browse the repository at this point in the history
<!-- ELLIPSIS_HIDDEN -->

> [!IMPORTANT]
> Adds a WebSocket endpoint for streaming workflow run data with authentication, streaming logic, and error handling in `streaming.py`.
>
>   - **New Feature**:
>     - Adds `workflow_run_streaming` WebSocket endpoint in `streaming.py` for streaming workflow run data.
>   - **Authentication**:
>     - Validates `apikey` or `token` for organization access.
>     - Sends error message if credentials are invalid.
>   - **Streaming Logic**:
>     - Streams workflow run data if status is `running`.
>     - Sends base64-encoded screenshot if available.
>     - Closes connection if no activity for 5 minutes or if workflow run is in a final state.
>   - **Error Handling**:
>     - Handles `ValidationError`, `WebSocketDisconnect`, and `ConnectionClosedOK` exceptions with appropriate logging and messaging.
>
> <sup>This description was created by </sup>[<img alt="Ellipsis" src="https://img.shields.io/badge/Ellipsis-blue?color=175173">](https://www.ellipsis.dev?ref=Skyvern-AI%2Fskyvern-cloud&utm_source=github&utm_medium=referral)<sup> for 5998bf39c47d0e092a3245e6ef732209c44a2636. It will automatically update as commits are pushed.</sup>

<!-- ELLIPSIS_HIDDEN -->
  • Loading branch information
ykeremy committed Sep 30, 2024
1 parent 0b8f6ac commit d7b6cb4
Showing 1 changed file with 132 additions and 0 deletions.
132 changes: 132 additions & 0 deletions skyvern/forge/sdk/routes/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from skyvern.forge import app
from skyvern.forge.sdk.schemas.tasks import TaskStatus
from skyvern.forge.sdk.services.org_auth_service import get_current_org
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRun, WorkflowRunStatus

LOG = structlog.get_logger()
websocket_router = APIRouter()
Expand Down Expand Up @@ -116,3 +117,134 @@ async def task_stream(
return
LOG.info("WebSocket connection closed successfully", task_id=task_id, organization_id=organization_id)
return


@websocket_router.websocket("/workflow_runs/{workflow_run_id}")
async def workflow_run_streaming(
websocket: WebSocket,
workflow_run_id: str,
apikey: str | None = None,
token: str | None = None,
) -> None:
try:
await websocket.accept()
if not token and not apikey:
await websocket.send_text("No valid credential provided")
return
except ConnectionClosedOK:
LOG.info("WofklowRun Streaming: ConnectionClosedOK error. Streaming won't start")
return

try:
organization = await get_current_org(x_api_key=apikey, authorization=token)
organization_id = organization.organization_id
except Exception:
LOG.exception("WofklowRun Streaming: Error while getting organization", workflow_run_id=workflow_run_id)
try:
await websocket.send_text("Invalid credential provided")
except ConnectionClosedOK:
LOG.info("WofklowRun Streaming: ConnectionClosedOK error while sending invalid credential message")
return

LOG.info(
"WofklowRun Streaming: Started workflow run streaming",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
# timestamp last time when streaming activity happens
last_activity_timestamp = datetime.utcnow()

try:
while True:
# if no activity for 5 minutes, close the connection
if (datetime.utcnow() - last_activity_timestamp).total_seconds() > STREAMING_TIMEOUT:
LOG.info(
"WofklowRun Streaming: No activity for 5 minutes. Closing connection",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
await websocket.send_json(
{
"workflow_run_id": workflow_run_id,
"status": "timeout",
}
)
return

workflow_run = await app.DATABASE.get_workflow_run(workflow_run_id=workflow_run_id)
if not workflow_run or workflow_run.organization_id != organization_id:
LOG.info(
"WofklowRun Streaming: Workflow not found",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
await websocket.send_json(
{
"workflow_run_id": workflow_run_id,
"status": "not_found",
}
)
return
if workflow_run.status in [
WorkflowRunStatus.completed,
WorkflowRunStatus.failed,
WorkflowRunStatus.terminated,
]:
LOG.info(
"Workflow run is in a final state. Closing connection",
workflow_run_status=workflow_run.status,
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
await websocket.send_json(
{
"workflow_run_id": workflow_run_id,
"status": workflow_run.status,
}
)
return

if WorkflowRun.status == WorkflowRunStatus.running:
file_name = f"{workflow_run_id}.png"
screenshot = await app.STORAGE.get_streaming_file(organization_id, file_name)
if screenshot:
encoded_screenshot = base64.b64encode(screenshot).decode("utf-8")
await websocket.send_json(
{
"workflow_run_id": workflow_run_id,
"status": workflow_run.status,
"screenshot": encoded_screenshot,
}
)
last_activity_timestamp = datetime.utcnow()
await asyncio.sleep(2)

except ValidationError as e:
await websocket.send_text(f"Invalid data: {e}")
except WebSocketDisconnect:
LOG.info(
"WofklowRun Streaming: WebSocket connection closed",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
except ConnectionClosedOK:
LOG.info(
"WofklowRun Streaming: ConnectionClosedOK error while streaming",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
return
except Exception:
LOG.warning(
"WofklowRun Streaming: Error while streaming",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
exc_info=True,
)
return
LOG.info(
"WofklowRun Streaming: WebSocket connection closed successfully",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
return

0 comments on commit d7b6cb4

Please sign in to comment.