Skip to content

Commit

Permalink
Workflow: YAML interface (#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
ykeremy authored Mar 25, 2024
1 parent cf4749c commit 0b5456a
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 16 deletions.
2 changes: 2 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ repos:
- types-requests
- types-cachetools
- alembic
- "sqlalchemy[mypy]"
- types-PyYAML
exclude: |
(?x)(
^tests.*|
Expand Down
40 changes: 25 additions & 15 deletions skyvern/forge/sdk/db/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,21 +689,31 @@ async def update_workflow(
title: str | None = None,
description: str | None = None,
workflow_definition: dict[str, Any] | None = None,
) -> Workflow | None:
async with self.Session() as session:
workflow = (await session.scalars(select(WorkflowModel).filter_by(workflow_id=workflow_id))).first()
if workflow:
if title:
workflow.title = title
if description:
workflow.description = description
if workflow_definition:
workflow.workflow_definition = workflow_definition
await session.commit()
await session.refresh(workflow)
return convert_to_workflow(workflow, self.debug_enabled)
LOG.error("Workflow not found, nothing to update", workflow_id=workflow_id)
return None
) -> Workflow:
try:
async with self.Session() as session:
if workflow := await session.scalars(select(WorkflowModel).filter_by(workflow_id=workflow_id).first()):
if title:
workflow.title = title
if description:
workflow.description = description
if workflow_definition:
workflow.workflow_definition = workflow_definition
await session.commit()
await session.refresh(workflow)
return convert_to_workflow(workflow, self.debug_enabled)
else:
raise NotFoundError("Workflow not found")
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise
except NotFoundError:
LOG.error("No workflow found to update", workflow_id=workflow_id)
LOG.error("NotFoundError", exc_info=True)
raise
except Exception:
LOG.error("UnexpectedError", exc_info=True)
raise

async def create_workflow_run(
self, workflow_id: str, proxy_location: ProxyLocation | None = None, webhook_callback_url: str | None = None
Expand Down
30 changes: 30 additions & 0 deletions skyvern/forge/sdk/routes/agent_protocol.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Annotated, Any

import structlog
import yaml
from fastapi import APIRouter, BackgroundTasks, Depends, Header, HTTPException, Query, Request, Response, status
from fastapi.responses import ORJSONResponse
from pydantic import BaseModel
Expand All @@ -25,9 +26,11 @@
from skyvern.forge.sdk.settings_manager import SettingsManager
from skyvern.forge.sdk.workflow.models.workflow import (
RunWorkflowResponse,
Workflow,
WorkflowRequestBody,
WorkflowRunStatusResponse,
)
from skyvern.forge.sdk.workflow.models.yaml import WorkflowCreateYAMLRequest

base_router = APIRouter()

Expand Down Expand Up @@ -446,3 +449,30 @@ async def get_workflow_run(
return await app.WORKFLOW_SERVICE.build_workflow_run_status_response(
workflow_id=workflow_id, workflow_run_id=workflow_run_id, organization_id=current_org.organization_id
)


@base_router.post(
"/workflows",
openapi_extra={
"requestBody": {
"content": {"application/x-yaml": {"schema": WorkflowCreateYAMLRequest.model_json_schema()}},
"required": True,
},
},
response_model=Workflow,
)
async def create_workflow(
request: Request,
current_org: Organization = Depends(org_auth_service.get_current_org),
) -> Workflow:
analytics.capture("skyvern-oss-agent-workflow-create")
raw_yaml = await request.body()
try:
workflow_yaml = yaml.safe_load(raw_yaml)
except yaml.YAMLError:
raise HTTPException(status_code=422, detail="Invalid YAML")

workflow_create_request = WorkflowCreateYAMLRequest.model_validate(workflow_yaml)
return await app.WORKFLOW_SERVICE.create_workflow_from_request(
organization_id=current_org.organization_id, request=workflow_create_request
)
6 changes: 6 additions & 0 deletions skyvern/forge/sdk/workflow/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ def get_value(self, key: str) -> Any:
"""
return self.values[key]

def has_parameter(self, key: str) -> bool:
return key in self.parameters

def has_value(self, key: str) -> bool:
return key in self.values

def set_value(self, key: str, value: Any) -> None:
self.values[key] = value

Expand Down
8 changes: 8 additions & 0 deletions skyvern/forge/sdk/workflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,11 @@ def __init__(self, key: str, retry_count: int | None = None) -> None:
elif retry_count == 0:
message += " Max duplicate retries reached, aborting."
super().__init__(message)


class WorkflowDefinitionHasDuplicateParameterKeys(BaseWorkflowException):
def __init__(self, duplicate_keys: set[str]) -> None:
super().__init__(
f"WorkflowDefinition has parameters with duplicate keys. Each parameter needs to have a unique "
f"key. Duplicate key(s): {','.join(duplicate_keys)}"
)
11 changes: 11 additions & 0 deletions skyvern/forge/sdk/workflow/models/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> OutputParameter
will_retry = True
workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id=workflow_run_id)
workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id=workflow_run.workflow_id)
# if the task url is parameterized, we need to get the value from the workflow run context
if self.url and workflow_run_context.has_parameter(self.url) and workflow_run_context.has_value(self.url):
task_url_parameter_value = workflow_run_context.get_value(self.url)
if task_url_parameter_value:
LOG.info(
"Task URL is parameterized, using parameter value",
task_url_parameter_value=task_url_parameter_value,
task_url_parameter_key=self.url,
)
self.url = task_url_parameter_value

# TODO (kerem) we should always retry on terminated. We should make a distinction between retriable and
# non-retryable terminations
while will_retry:
Expand Down
112 changes: 112 additions & 0 deletions skyvern/forge/sdk/workflow/models/yaml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import abc
from typing import Annotated, Any, Literal

from pydantic import BaseModel, Field

from skyvern.forge.sdk.workflow.models.block import BlockType
from skyvern.forge.sdk.workflow.models.parameter import ParameterType, WorkflowParameterType


class ParameterYAML(BaseModel, abc.ABC):
parameter_type: ParameterType
key: str
description: str | None = None


class AWSSecretParameterYAML(ParameterYAML):
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
# Parameter 1 of Literal[...] cannot be of type "Any"
# This pattern already works in block.py but since the ParameterType is not defined in this file, mypy is not able
# to infer the type of the parameter_type attribute.
parameter_type: Literal[ParameterType.AWS_SECRET] = ParameterType.AWS_SECRET # type: ignore
aws_key: str


class WorkflowParameterYAML(ParameterYAML):
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
# Parameter 1 of Literal[...] cannot be of type "Any"
# This pattern already works in block.py but since the ParameterType is not defined in this file, mypy is not able
# to infer the type of the parameter_type attribute.
parameter_type: Literal[ParameterType.WORKFLOW] = ParameterType.WORKFLOW # type: ignore
workflow_parameter_type: WorkflowParameterType
default_value: str | int | float | bool | dict | list | None = None


class ContextParameterYAML(ParameterYAML):
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
# Parameter 1 of Literal[...] cannot be of type "Any"
# This pattern already works in block.py but since the ParameterType is not defined in this file, mypy is not able
# to infer the type of the parameter_type attribute.
parameter_type: Literal[ParameterType.CONTEXT] = ParameterType.CONTEXT # type: ignore
source_workflow_parameter_key: str


class OutputParameterYAML(ParameterYAML):
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
# Parameter 1 of Literal[...] cannot be of type "Any"
# This pattern already works in block.py but since the ParameterType is not defined in this file, mypy is not able
# to infer the type of the parameter_type attribute.
parameter_type: Literal[ParameterType.OUTPUT] = ParameterType.OUTPUT # type: ignore


class BlockYAML(BaseModel, abc.ABC):
block_type: BlockType
label: str
output_parameter_key: str | None = None


class TaskBlockYAML(BlockYAML):
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
# Parameter 1 of Literal[...] cannot be of type "Any"
# This pattern already works in block.py but since the BlockType is not defined in this file, mypy is not able
# to infer the type of the parameter_type attribute.
block_type: Literal[BlockType.TASK] = BlockType.TASK # type: ignore

url: str | None = None
title: str = "Untitled Task"
navigation_goal: str | None = None
data_extraction_goal: str | None = None
data_schema: dict[str, Any] | None = None
error_code_mapping: dict[str, str] | None = None
max_retries: int = 0
parameter_keys: list[str] | None = None


class ForLoopBlockYAML(BlockYAML):
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
# Parameter 1 of Literal[...] cannot be of type "Any"
# This pattern already works in block.py but since the BlockType is not defined in this file, mypy is not able
# to infer the type of the parameter_type attribute.
block_type: Literal[BlockType.FOR_LOOP] = BlockType.FOR_LOOP # type: ignore

loop_over_parameter_key: str
loop_block: BlockYAML


class CodeBlockYAML(BlockYAML):
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
# Parameter 1 of Literal[...] cannot be of type "Any"
# This pattern already works in block.py but since the BlockType is not defined in this file, mypy is not able
# to infer the type of the parameter_type attribute.
block_type: Literal[BlockType.CODE] = BlockType.CODE # type: ignore

code: str
parameter_keys: list[str] | None = None


PARAMETER_YAML_SUBCLASSES = AWSSecretParameterYAML | WorkflowParameterYAML | ContextParameterYAML | OutputParameterYAML
PARAMETER_YAML_TYPES = Annotated[PARAMETER_YAML_SUBCLASSES, Field(discriminator="parameter_type")]

BLOCK_YAML_SUBCLASSES = TaskBlockYAML | ForLoopBlockYAML | CodeBlockYAML
BLOCK_YAML_TYPES = Annotated[BLOCK_YAML_SUBCLASSES, Field(discriminator="block_type")]


class WorkflowDefinitionYAML(BaseModel):
parameters: list[PARAMETER_YAML_TYPES]
blocks: list[BLOCK_YAML_TYPES]


class WorkflowCreateYAMLRequest(BaseModel):
title: str
description: str | None = None
workflow_definition: WorkflowDefinitionYAML
Loading

0 comments on commit 0b5456a

Please sign in to comment.