From 0b5456a4c6b1f2f03d6aa3bf0b1b513b3cf8d1bc Mon Sep 17 00:00:00 2001 From: Kerem Yilmaz Date: Sun, 24 Mar 2024 22:55:38 -0700 Subject: [PATCH] Workflow: YAML interface (#123) --- .pre-commit-config.yaml | 2 + skyvern/forge/sdk/db/client.py | 40 ++++--- skyvern/forge/sdk/routes/agent_protocol.py | 30 +++++ skyvern/forge/sdk/workflow/context_manager.py | 6 + skyvern/forge/sdk/workflow/exceptions.py | 8 ++ skyvern/forge/sdk/workflow/models/block.py | 11 ++ skyvern/forge/sdk/workflow/models/yaml.py | 112 +++++++++++++++++ skyvern/forge/sdk/workflow/service.py | 113 +++++++++++++++++- 8 files changed, 306 insertions(+), 16 deletions(-) create mode 100644 skyvern/forge/sdk/workflow/models/yaml.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6b464b378..fd4b0fb14 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -44,6 +44,8 @@ repos: - types-requests - types-cachetools - alembic + - "sqlalchemy[mypy]" + - types-PyYAML exclude: | (?x)( ^tests.*| diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index e73c95419..a8128b424 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -689,21 +689,31 @@ async def update_workflow( title: str | None = None, description: str | None = None, workflow_definition: dict[str, Any] | None = None, - ) -> Workflow | None: - async with self.Session() as session: - workflow = (await session.scalars(select(WorkflowModel).filter_by(workflow_id=workflow_id))).first() - if workflow: - if title: - workflow.title = title - if description: - workflow.description = description - if workflow_definition: - workflow.workflow_definition = workflow_definition - await session.commit() - await session.refresh(workflow) - return convert_to_workflow(workflow, self.debug_enabled) - LOG.error("Workflow not found, nothing to update", workflow_id=workflow_id) - return None + ) -> Workflow: + try: + async with self.Session() as session: + if workflow := await session.scalars(select(WorkflowModel).filter_by(workflow_id=workflow_id).first()): + if title: + workflow.title = title + if description: + workflow.description = description + if workflow_definition: + workflow.workflow_definition = workflow_definition + await session.commit() + await session.refresh(workflow) + return convert_to_workflow(workflow, self.debug_enabled) + else: + raise NotFoundError("Workflow not found") + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except NotFoundError: + LOG.error("No workflow found to update", workflow_id=workflow_id) + LOG.error("NotFoundError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise async def create_workflow_run( self, workflow_id: str, proxy_location: ProxyLocation | None = None, webhook_callback_url: str | None = None diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 831d9b6e8..3c34bfc70 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -1,6 +1,7 @@ from typing import Annotated, Any import structlog +import yaml from fastapi import APIRouter, BackgroundTasks, Depends, Header, HTTPException, Query, Request, Response, status from fastapi.responses import ORJSONResponse from pydantic import BaseModel @@ -25,9 +26,11 @@ from skyvern.forge.sdk.settings_manager import SettingsManager from skyvern.forge.sdk.workflow.models.workflow import ( RunWorkflowResponse, + Workflow, WorkflowRequestBody, WorkflowRunStatusResponse, ) +from skyvern.forge.sdk.workflow.models.yaml import WorkflowCreateYAMLRequest base_router = APIRouter() @@ -446,3 +449,30 @@ async def get_workflow_run( return await app.WORKFLOW_SERVICE.build_workflow_run_status_response( workflow_id=workflow_id, workflow_run_id=workflow_run_id, organization_id=current_org.organization_id ) + + +@base_router.post( + "/workflows", + openapi_extra={ + "requestBody": { + "content": {"application/x-yaml": {"schema": WorkflowCreateYAMLRequest.model_json_schema()}}, + "required": True, + }, + }, + response_model=Workflow, +) +async def create_workflow( + request: Request, + current_org: Organization = Depends(org_auth_service.get_current_org), +) -> Workflow: + analytics.capture("skyvern-oss-agent-workflow-create") + raw_yaml = await request.body() + try: + workflow_yaml = yaml.safe_load(raw_yaml) + except yaml.YAMLError: + raise HTTPException(status_code=422, detail="Invalid YAML") + + workflow_create_request = WorkflowCreateYAMLRequest.model_validate(workflow_yaml) + return await app.WORKFLOW_SERVICE.create_workflow_from_request( + organization_id=current_org.organization_id, request=workflow_create_request + ) diff --git a/skyvern/forge/sdk/workflow/context_manager.py b/skyvern/forge/sdk/workflow/context_manager.py index c781772a0..a08ce4bbe 100644 --- a/skyvern/forge/sdk/workflow/context_manager.py +++ b/skyvern/forge/sdk/workflow/context_manager.py @@ -60,6 +60,12 @@ def get_value(self, key: str) -> Any: """ return self.values[key] + def has_parameter(self, key: str) -> bool: + return key in self.parameters + + def has_value(self, key: str) -> bool: + return key in self.values + def set_value(self, key: str, value: Any) -> None: self.values[key] = value diff --git a/skyvern/forge/sdk/workflow/exceptions.py b/skyvern/forge/sdk/workflow/exceptions.py index 2f9943658..18112ae7a 100644 --- a/skyvern/forge/sdk/workflow/exceptions.py +++ b/skyvern/forge/sdk/workflow/exceptions.py @@ -21,3 +21,11 @@ def __init__(self, key: str, retry_count: int | None = None) -> None: elif retry_count == 0: message += " Max duplicate retries reached, aborting." super().__init__(message) + + +class WorkflowDefinitionHasDuplicateParameterKeys(BaseWorkflowException): + def __init__(self, duplicate_keys: set[str]) -> None: + super().__init__( + f"WorkflowDefinition has parameters with duplicate keys. Each parameter needs to have a unique " + f"key. Duplicate key(s): {','.join(duplicate_keys)}" + ) diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index d76534390..00ba04ef9 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -110,6 +110,17 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> OutputParameter will_retry = True workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id=workflow_run_id) workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id=workflow_run.workflow_id) + # if the task url is parameterized, we need to get the value from the workflow run context + if self.url and workflow_run_context.has_parameter(self.url) and workflow_run_context.has_value(self.url): + task_url_parameter_value = workflow_run_context.get_value(self.url) + if task_url_parameter_value: + LOG.info( + "Task URL is parameterized, using parameter value", + task_url_parameter_value=task_url_parameter_value, + task_url_parameter_key=self.url, + ) + self.url = task_url_parameter_value + # TODO (kerem) we should always retry on terminated. We should make a distinction between retriable and # non-retryable terminations while will_retry: diff --git a/skyvern/forge/sdk/workflow/models/yaml.py b/skyvern/forge/sdk/workflow/models/yaml.py new file mode 100644 index 000000000..df5f35e95 --- /dev/null +++ b/skyvern/forge/sdk/workflow/models/yaml.py @@ -0,0 +1,112 @@ +import abc +from typing import Annotated, Any, Literal + +from pydantic import BaseModel, Field + +from skyvern.forge.sdk.workflow.models.block import BlockType +from skyvern.forge.sdk.workflow.models.parameter import ParameterType, WorkflowParameterType + + +class ParameterYAML(BaseModel, abc.ABC): + parameter_type: ParameterType + key: str + description: str | None = None + + +class AWSSecretParameterYAML(ParameterYAML): + # There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error: + # Parameter 1 of Literal[...] cannot be of type "Any" + # This pattern already works in block.py but since the ParameterType is not defined in this file, mypy is not able + # to infer the type of the parameter_type attribute. + parameter_type: Literal[ParameterType.AWS_SECRET] = ParameterType.AWS_SECRET # type: ignore + aws_key: str + + +class WorkflowParameterYAML(ParameterYAML): + # There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error: + # Parameter 1 of Literal[...] cannot be of type "Any" + # This pattern already works in block.py but since the ParameterType is not defined in this file, mypy is not able + # to infer the type of the parameter_type attribute. + parameter_type: Literal[ParameterType.WORKFLOW] = ParameterType.WORKFLOW # type: ignore + workflow_parameter_type: WorkflowParameterType + default_value: str | int | float | bool | dict | list | None = None + + +class ContextParameterYAML(ParameterYAML): + # There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error: + # Parameter 1 of Literal[...] cannot be of type "Any" + # This pattern already works in block.py but since the ParameterType is not defined in this file, mypy is not able + # to infer the type of the parameter_type attribute. + parameter_type: Literal[ParameterType.CONTEXT] = ParameterType.CONTEXT # type: ignore + source_workflow_parameter_key: str + + +class OutputParameterYAML(ParameterYAML): + # There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error: + # Parameter 1 of Literal[...] cannot be of type "Any" + # This pattern already works in block.py but since the ParameterType is not defined in this file, mypy is not able + # to infer the type of the parameter_type attribute. + parameter_type: Literal[ParameterType.OUTPUT] = ParameterType.OUTPUT # type: ignore + + +class BlockYAML(BaseModel, abc.ABC): + block_type: BlockType + label: str + output_parameter_key: str | None = None + + +class TaskBlockYAML(BlockYAML): + # There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error: + # Parameter 1 of Literal[...] cannot be of type "Any" + # This pattern already works in block.py but since the BlockType is not defined in this file, mypy is not able + # to infer the type of the parameter_type attribute. + block_type: Literal[BlockType.TASK] = BlockType.TASK # type: ignore + + url: str | None = None + title: str = "Untitled Task" + navigation_goal: str | None = None + data_extraction_goal: str | None = None + data_schema: dict[str, Any] | None = None + error_code_mapping: dict[str, str] | None = None + max_retries: int = 0 + parameter_keys: list[str] | None = None + + +class ForLoopBlockYAML(BlockYAML): + # There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error: + # Parameter 1 of Literal[...] cannot be of type "Any" + # This pattern already works in block.py but since the BlockType is not defined in this file, mypy is not able + # to infer the type of the parameter_type attribute. + block_type: Literal[BlockType.FOR_LOOP] = BlockType.FOR_LOOP # type: ignore + + loop_over_parameter_key: str + loop_block: BlockYAML + + +class CodeBlockYAML(BlockYAML): + # There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error: + # Parameter 1 of Literal[...] cannot be of type "Any" + # This pattern already works in block.py but since the BlockType is not defined in this file, mypy is not able + # to infer the type of the parameter_type attribute. + block_type: Literal[BlockType.CODE] = BlockType.CODE # type: ignore + + code: str + parameter_keys: list[str] | None = None + + +PARAMETER_YAML_SUBCLASSES = AWSSecretParameterYAML | WorkflowParameterYAML | ContextParameterYAML | OutputParameterYAML +PARAMETER_YAML_TYPES = Annotated[PARAMETER_YAML_SUBCLASSES, Field(discriminator="parameter_type")] + +BLOCK_YAML_SUBCLASSES = TaskBlockYAML | ForLoopBlockYAML | CodeBlockYAML +BLOCK_YAML_TYPES = Annotated[BLOCK_YAML_SUBCLASSES, Field(discriminator="block_type")] + + +class WorkflowDefinitionYAML(BaseModel): + parameters: list[PARAMETER_YAML_TYPES] + blocks: list[BLOCK_YAML_TYPES] + + +class WorkflowCreateYAMLRequest(BaseModel): + title: str + description: str | None = None + workflow_definition: WorkflowDefinitionYAML diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index d0e77078d..352f79b9e 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -20,9 +20,13 @@ from skyvern.forge.sdk.core.skyvern_context import SkyvernContext from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.tasks import Task, TaskStatus +from skyvern.forge.sdk.workflow.exceptions import WorkflowDefinitionHasDuplicateParameterKeys +from skyvern.forge.sdk.workflow.models.block import BlockType, BlockTypeVar, CodeBlock, ForLoopBlock, TaskBlock from skyvern.forge.sdk.workflow.models.parameter import ( AWSSecretParameter, OutputParameter, + Parameter, + ParameterType, WorkflowParameter, WorkflowParameterType, ) @@ -36,6 +40,7 @@ WorkflowRunStatus, WorkflowRunStatusResponse, ) +from skyvern.forge.sdk.workflow.models.yaml import BLOCK_YAML_TYPES, WorkflowCreateYAMLRequest from skyvern.webeye.browser_factory import BrowserState LOG = structlog.get_logger() @@ -252,7 +257,7 @@ async def update_workflow( title: str | None = None, description: str | None = None, workflow_definition: WorkflowDefinition | None = None, - ) -> Workflow | None: + ) -> Workflow: if workflow_definition: workflow_definition.validate() return await app.DATABASE.update_workflow( @@ -604,3 +609,109 @@ async def persist_debug_artifacts( await self.persist_har_data(browser_state, last_step, workflow, workflow_run) await self.persist_tracing_data(browser_state, last_step, workflow_run) + + async def create_workflow_from_request(self, organization_id: str, request: WorkflowCreateYAMLRequest) -> Workflow: + LOG.info("Creating workflow from request", organization_id=organization_id, title=request.title) + try: + workflow = await self.create_workflow( + organization_id=organization_id, + title=request.title, + description=request.description, + workflow_definition=WorkflowDefinition(blocks=[]), + ) + # Create parameters from the request + parameters = {} + duplicate_parameter_keys = set() + for parameter in request.workflow_definition.parameters: + if parameter.key in parameters: + LOG.error(f"Duplicate parameter key {parameter.key}") + duplicate_parameter_keys.add(parameter.key) + continue + if parameter.parameter_type == ParameterType.AWS_SECRET: + parameters[parameter.key] = await self.create_aws_secret_parameter( + workflow_id=workflow.workflow_id, + aws_key=parameter.aws_key, + key=parameter.key, + description=parameter.description, + ) + elif parameter.parameter_type == ParameterType.WORKFLOW: + parameters[parameter.key] = await self.create_workflow_parameter( + workflow_id=workflow.workflow_id, + workflow_parameter_type=parameter.workflow_parameter_type, + key=parameter.key, + default_value=parameter.default_value, + description=parameter.description, + ) + elif parameter.parameter_type == ParameterType.OUTPUT: + parameters[parameter.key] = await self.create_output_parameter( + workflow_id=workflow.workflow_id, + key=parameter.key, + description=parameter.description, + ) + if duplicate_parameter_keys: + raise WorkflowDefinitionHasDuplicateParameterKeys(duplicate_keys=duplicate_parameter_keys) + # Create blocks from the request + block_label_mapping = {} + blocks = [] + for block_yaml in request.workflow_definition.blocks: + block = await self.block_yaml_to_block(block_yaml, parameters) + blocks.append(block) + block_label_mapping[block.label] = block + + # Set the blocks for the workflow definition + workflow_definition = WorkflowDefinition(blocks=blocks) + workflow = await self.update_workflow( + workflow_id=workflow.workflow_id, + workflow_definition=workflow_definition, + ) + LOG.info( + f"Created workflow from request, title: {request.title}", + parameter_keys=[parameter.key for parameter in parameters.values()], + block_labels=[block.label for block in blocks], + organization_id=organization_id, + title=request.title, + workflow_id=workflow.workflow_id, + ) + return workflow + except Exception as e: + LOG.exception(f"Failed to create workflow from request, title: {request.title}") + raise e + + @staticmethod + async def block_yaml_to_block(block_yaml: BLOCK_YAML_TYPES, parameters: dict[str, Parameter]) -> BlockTypeVar: + output_parameter = parameters.get(block_yaml.output_parameter_key) if block_yaml.output_parameter_key else None + if block_yaml.block_type == BlockType.TASK: + task_block_parameters = ( + [parameters[parameter_key] for parameter_key in block_yaml.parameter_keys] + if block_yaml.parameter_keys + else [] + ) + return TaskBlock( + label=block_yaml.label, + url=block_yaml.url, + title=block_yaml.title, + parameters=task_block_parameters, + output_parameter=output_parameter, + navigation_goal=block_yaml.navigation_goal, + data_extraction_goal=block_yaml.data_extraction_goal, + data_schema=block_yaml.data_schema, + error_code_mapping=block_yaml.error_code_mapping, + max_retries=block_yaml.max_retries, + ) + elif block_yaml.block_type == BlockType.FOR_LOOP: + return ForLoopBlock( + label=block_yaml.label, + loop_over_parameter_key=parameters[block_yaml.loop_over_parameter_key], + loop_block=WorkflowService.block_yaml_to_block(block_yaml.loop_block, parameters), + output_parameter=output_parameter, + ) + elif block_yaml.block_type == BlockType.CODE: + return CodeBlock( + label=block_yaml.label, + code=block_yaml.code, + parameters=[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys] + if block_yaml.parameter_keys + else [], + output_parameter=output_parameter, + ) + raise ValueError(f"Invalid block type {block_yaml.block_type}")