Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflow run streaming API #887

Merged
merged 2 commits into from
Sep 30, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions skyvern/forge/sdk/routes/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from skyvern.forge import app
from skyvern.forge.sdk.schemas.tasks import TaskStatus
from skyvern.forge.sdk.services.org_auth_service import get_current_org
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRun, WorkflowRunStatus

LOG = structlog.get_logger()
websocket_router = APIRouter()
Expand Down Expand Up @@ -116,3 +117,134 @@ async def task_stream(
return
LOG.info("WebSocket connection closed successfully", task_id=task_id, organization_id=organization_id)
return


@websocket_router.websocket("/workflow_runs/{workflow_run_id}")
async def workflow_run_streaming(
websocket: WebSocket,
workflow_run_id: str,
apikey: str | None = None,
token: str | None = None,
) -> None:
try:
await websocket.accept()
if not token and not apikey:
await websocket.send_text("No valid credential provided")
return
except ConnectionClosedOK:
LOG.info("WofklowRun Streaming: ConnectionClosedOK error. Streaming won't start")
return

try:
organization = await get_current_org(x_api_key=apikey, authorization=token)
organization_id = organization.organization_id
except Exception:
LOG.exception("WofklowRun Streaming: Error while getting organization", workflow_run_id=workflow_run_id)
try:
await websocket.send_text("Invalid credential provided")
except ConnectionClosedOK:
LOG.info("WofklowRun Streaming: ConnectionClosedOK error while sending invalid credential message")
return

LOG.info(
"WofklowRun Streaming: Started workflow run streaming",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
# timestamp last time when streaming activity happens
last_activity_timestamp = datetime.utcnow()

try:
while True:
# if no activity for 5 minutes, close the connection
if (datetime.utcnow() - last_activity_timestamp).total_seconds() > STREAMING_TIMEOUT:
LOG.info(
"WofklowRun Streaming: No activity for 5 minutes. Closing connection",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
await websocket.send_json(
{
"workflow_run_id": workflow_run_id,
"status": "timeout",
}
)
return

workflow_run = await app.DATABASE.get_workflow_run(workflow_run_id=workflow_run_id)
if not workflow_run or workflow_run.organization_id != organization_id:
LOG.info(
"WofklowRun Streaming: Workflow not found",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
await websocket.send_json(
{
"workflow_run_id": workflow_run_id,
"status": "not_found",
}
)
return
if workflow_run.status in [
WorkflowRunStatus.completed,
WorkflowRunStatus.failed,
WorkflowRunStatus.terminated,
]:
LOG.info(
"Workflow run is in a final state. Closing connection",
workflow_run_status=workflow_run.status,
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
await websocket.send_json(
{
"workflow_run_id": workflow_run_id,
"status": workflow_run.status,
}
)
return

if WorkflowRun.status == WorkflowRunStatus.running:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect status check. Use workflow_run.status instead of WorkflowRun.status. This issue appears in other parts of the code as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logical error: Use workflow_run.status instead of WorkflowRun.status to check the status of the current workflow run instance.

file_name = f"{workflow_run_id}.png"
screenshot = await app.STORAGE.get_streaming_file(organization_id, file_name)
if screenshot:
encoded_screenshot = base64.b64encode(screenshot).decode("utf-8")
await websocket.send_json(
{
"workflow_run_id": workflow_run_id,
"status": workflow_run.status,
"screenshot": encoded_screenshot,
}
)
last_activity_timestamp = datetime.utcnow()
await asyncio.sleep(2)

except ValidationError as e:
await websocket.send_text(f"Invalid data: {e}")
except WebSocketDisconnect:
LOG.info(
"WofklowRun Streaming: WebSocket connection closed",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
except ConnectionClosedOK:
LOG.info(
"WofklowRun Streaming: ConnectionClosedOK error while streaming",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
return
except Exception:
LOG.warning(
"WofklowRun Streaming: Error while streaming",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
exc_info=True,
)
return
LOG.info(
"WofklowRun Streaming: WebSocket connection closed successfully",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
return
Loading