diff --git a/poetry.lock b/poetry.lock index 163fe69c9..3e20d9901 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1611,6 +1611,17 @@ docs = ["furo (>=2023.9.10)", "sphinx (>=7.2.6)", "sphinx-autodoc-typehints (>=1 testing = ["covdefaults (>=2.3)", "coverage (>=7.3.2)", "diff-cover (>=8)", "pytest (>=7.4.3)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)", "pytest-timeout (>=2.2)"] typing = ["typing-extensions (>=4.8)"] +[[package]] +name = "filetype" +version = "1.2.0" +description = "Infer file type and MIME type of any file/buffer. No external dependencies." +optional = false +python-versions = "*" +files = [ + {file = "filetype-1.2.0-py2.py3-none-any.whl", hash = "sha256:7ce71b6880181241cf7ac8697a2f1eb6a8bd9b429f7ad6d27b8db9ba5f1c2d25"}, + {file = "filetype-1.2.0.tar.gz", hash = "sha256:66b56cd6474bf41d8c54660347d37afcc3f7d1970648de365c102ef77548aadb"}, +] + [[package]] name = "flake8" version = "6.1.0" @@ -6870,4 +6881,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.11,<3.12" -content-hash = "9c2a8d3c2c9b239c6338f53485f9eace6d3eac112fa9246d9bc0a83c92f61a1d" +content-hash = "5f870005a2514272e756ca1c02e22ed94077fe1999ac6f838f94fd0f44f81965" diff --git a/pyproject.toml b/pyproject.toml index cd2cd9326..2aa04c00b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ typer = "^0.9.0" types-toml = "^0.10.8.7" apscheduler = "^3.10.4" httpx = "^0.27.0" +filetype = "^1.2.0" [tool.poetry.group.dev.dependencies] diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index fcb7d7049..494858912 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -179,3 +179,9 @@ def __init__(self, error_message: str) -> None: class WorkflowRunContextNotInitialized(SkyvernException): def __init__(self, workflow_run_id: str) -> None: super().__init__(f"WorkflowRunContext not initialized for workflow run {workflow_run_id}") + + +class DownloadFileMaxSizeExceeded(SkyvernException): + def __init__(self, max_size: int) -> None: + self.max_size = max_size + super().__init__(f"Download file size exceeded the maximum allowed size of {max_size} MB.") diff --git a/skyvern/forge/sdk/api/files.py b/skyvern/forge/sdk/api/files.py index bf172f43e..3328e1fb2 100644 --- a/skyvern/forge/sdk/api/files.py +++ b/skyvern/forge/sdk/api/files.py @@ -3,37 +3,54 @@ import zipfile from urllib.parse import urlparse -import requests +import aiohttp import structlog +from skyvern.exceptions import DownloadFileMaxSizeExceeded + LOG = structlog.get_logger() -def download_file(url: str) -> str | None: - # Send an HTTP request to the URL of the file, stream=True to prevent loading the content at once into memory - r = requests.get(url, stream=True) +async def download_file(url: str, max_size_mb: int | None = None) -> str: + try: + async with aiohttp.ClientSession(raise_for_status=True) as session: + LOG.info("Starting to download file") + async with session.get(url) as response: + # Check the content length if available + if max_size_mb and response.content_length and response.content_length > max_size_mb * 1024 * 1024: + # todo: move to root exception.py + raise DownloadFileMaxSizeExceeded(max_size_mb) + + # Parse the URL + a = urlparse(url) - # Check if the request is successful - if r.status_code == 200: - # Parse the URL - a = urlparse(url) + # Get the file name + temp_dir = tempfile.mkdtemp(prefix="skyvern_downloads_") - # Get the file name - temp_dir = tempfile.mkdtemp(prefix="skyvern_downloads_") + file_name = os.path.basename(a.path) + file_path = os.path.join(temp_dir, file_name) - file_name = os.path.basename(a.path) - file_path = os.path.join(temp_dir, file_name) + LOG.info(f"Downloading file to {file_path}") + with open(file_path, "wb") as f: + # Write the content of the request into the file + total_bytes_downloaded = 0 + async for chunk in response.content.iter_chunked(1024): + f.write(chunk) + total_bytes_downloaded += len(chunk) + if max_size_mb and total_bytes_downloaded > max_size_mb * 1024 * 1024: + raise DownloadFileMaxSizeExceeded(max_size_mb) - LOG.info(f"Downloading file to {file_path}") - with open(file_path, "wb") as f: - # Write the content of the request into the file - for chunk in r.iter_content(1024): - f.write(chunk) - LOG.info(f"File downloaded successfully to {file_path}") - return file_path - else: - LOG.error(f"Failed to download file, status code: {r.status_code}") - return None + LOG.info(f"File downloaded successfully to {file_path}") + return file_path + except aiohttp.ClientResponseError as e: + LOG.error(f"Failed to download file, status code: {e.status}") + raise + except DownloadFileMaxSizeExceeded as e: + LOG.error(f"Failed to download file, max size exceeded: {e.max_size}") + raise + except Exception: + LOG.exception("Failed to download file") + raise def zip_files(files_path: str, zip_file_path: str) -> str: diff --git a/skyvern/forge/sdk/workflow/exceptions.py b/skyvern/forge/sdk/workflow/exceptions.py index 8b3e9bb12..796f5c84f 100644 --- a/skyvern/forge/sdk/workflow/exceptions.py +++ b/skyvern/forge/sdk/workflow/exceptions.py @@ -31,6 +31,6 @@ def __init__(self, duplicate_keys: set[str]) -> None: ) -class DownloadFileMaxSizeExceeded(BaseWorkflowException): - def __init__(self, max_size: int) -> None: - super().__init__(f"Download file size exceeded the maximum allowed size of {max_size} MB.") +class InvalidEmailClientConfiguration(BaseWorkflowException): + def __init__(self, problems: list[str]) -> None: + super().__init__(f"Email client configuration is invalid. These parameters are missing or invalid: {problems}") diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 861b2acce..ea50b0026 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -1,12 +1,16 @@ import abc import json import os +import smtplib import uuid +from email.message import EmailMessage from enum import StrEnum +from pathlib import Path from tempfile import NamedTemporaryFile from typing import Annotated, Any, Literal, Union +from urllib.parse import urlparse -import aiohttp +import filetype import structlog from pydantic import BaseModel, Field @@ -19,13 +23,15 @@ from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.aws import AsyncAWSClient +from skyvern.forge.sdk.api.files import download_file from skyvern.forge.sdk.api.llm.api_handler_factory import LLMAPIHandlerFactory from skyvern.forge.sdk.schemas.tasks import TaskStatus from skyvern.forge.sdk.settings_manager import SettingsManager from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext -from skyvern.forge.sdk.workflow.exceptions import DownloadFileMaxSizeExceeded +from skyvern.forge.sdk.workflow.exceptions import InvalidEmailClientConfiguration from skyvern.forge.sdk.workflow.models.parameter import ( PARAMETER_TYPE, + AWSSecretParameter, ContextParameter, OutputParameter, WorkflowParameter, @@ -40,6 +46,7 @@ class BlockType(StrEnum): CODE = "code" TEXT_PROMPT = "text_prompt" DOWNLOAD_TO_S3 = "download_to_s3" + SEND_EMAIL = "send_email" class Block(BaseModel, abc.ABC): @@ -439,29 +446,6 @@ def get_all_parameters( ) -> list[PARAMETER_TYPE]: return [] - async def _download_file(self, max_size_mb: int = 5) -> str: - async with aiohttp.ClientSession() as session: - LOG.info("Downloading file", url=self.url) - async with session.get(self.url) as response: - # Check the content length if available - if response.content_length and response.content_length > max_size_mb * 1024 * 1024: - raise DownloadFileMaxSizeExceeded(max_size_mb) - - # Don't forget to delete the temporary file after we're done with it - temp_file = NamedTemporaryFile(delete=False) - - total_bytes_downloaded = 0 - async for chunk in response.content.iter_chunked(8192): - temp_file.write(chunk) - total_bytes_downloaded += len(chunk) - if total_bytes_downloaded > max_size_mb * 1024 * 1024: - raise DownloadFileMaxSizeExceeded(max_size_mb) - - # Seek back to the start of the file - temp_file.seek(0) - - return temp_file.name - async def _upload_file_to_s3(self, uri: str, file_path: str) -> None: try: client = self.get_async_aws_client() @@ -485,7 +469,7 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> OutputParameter self.url = task_url_parameter_value try: - file_path = await self._download_file() + file_path = await download_file(self.url, max_size_mb=10) except Exception as e: LOG.error("DownloadToS3Block: Failed to download file", url=self.url, error=str(e)) raise e @@ -516,5 +500,200 @@ async def execute(self, workflow_run_id: str, **kwargs: dict) -> OutputParameter return None -BlockSubclasses = Union[ForLoopBlock, TaskBlock, CodeBlock, TextPromptBlock, DownloadToS3Block] +class SendEmailBlock(Block): + block_type: Literal[BlockType.SEND_EMAIL] = BlockType.SEND_EMAIL + + smtp_host: AWSSecretParameter + smtp_port: AWSSecretParameter + smtp_username: AWSSecretParameter + # if you're using a Gmail account, you need to pass in an app password instead of your regular password + smtp_password: AWSSecretParameter + sender: str + recipients: list[str] + subject: str + body: str + file_attachments: list[str] = [] + + def get_all_parameters( + self, + ) -> list[PARAMETER_TYPE]: + return [self.smtp_host, self.smtp_port, self.smtp_username, self.smtp_password] + + def _decrypt_smtp_parameters(self, workflow_run_context: WorkflowRunContext) -> tuple[str, int, str, str]: + obfuscated_smtp_host_value = workflow_run_context.get_value(self.smtp_host.key) + obfuscated_smtp_port_value = workflow_run_context.get_value(self.smtp_port.key) + obfuscated_smtp_username_value = workflow_run_context.get_value(self.smtp_username.key) + obfuscated_smtp_password_value = workflow_run_context.get_value(self.smtp_password.key) + smtp_host_value = workflow_run_context.get_original_secret_value_or_none(obfuscated_smtp_host_value) + smtp_port_value = workflow_run_context.get_original_secret_value_or_none(obfuscated_smtp_port_value) + smtp_username_value = workflow_run_context.get_original_secret_value_or_none(obfuscated_smtp_username_value) + smtp_password_value = workflow_run_context.get_original_secret_value_or_none(obfuscated_smtp_password_value) + + email_config_problems = [] + if smtp_host_value is None: + email_config_problems.append("Missing SMTP server") + if smtp_port_value is None: + email_config_problems.append("Missing SMTP port") + elif not smtp_port_value.isdigit(): + email_config_problems.append("SMTP port should be a number") + if smtp_username_value is None: + email_config_problems.append("Missing SMTP username") + if smtp_password_value is None: + email_config_problems.append("Missing SMTP password") + + if email_config_problems: + raise InvalidEmailClientConfiguration(email_config_problems) + + return smtp_host_value, smtp_port_value, smtp_username_value, smtp_password_value + + def _get_file_paths(self, workflow_run_context: WorkflowRunContext) -> list[str]: + file_paths = [] + for file_path in self.file_attachments: + if not workflow_run_context.has_parameter(file_path): + file_paths.append(file_path) + continue + + file_path_parameter_value = workflow_run_context.get_value(file_path) + file_path_parameter_secret_value = workflow_run_context.get_original_secret_value_or_none( + file_path_parameter_value + ) + if file_path_parameter_secret_value: + file_paths.append(file_path_parameter_secret_value) + else: + file_paths.append(file_path_parameter_value) + + return file_paths + + async def _download_from_s3(self, s3_uri: str) -> str: + client = self.get_async_aws_client() + downloaded_bytes = await client.download_file(uri=s3_uri) + file_path = NamedTemporaryFile(delete=False) + file_path.write(downloaded_bytes) + return file_path.name + + async def _build_email_message( + self, workflow_run_context: WorkflowRunContext, workflow_run_id: str + ) -> EmailMessage: + msg = EmailMessage() + msg["Subject"] = self.subject + f" - Workflow Run ID: {workflow_run_id}" + msg["To"] = ", ".join(self.recipients) + msg["From"] = self.sender + msg.set_content(self.body) + + for filename in self._get_file_paths(workflow_run_context): + path = None + try: + if filename.startswith("s3://"): + path = await self._download_from_s3(filename) + elif filename.startswith("http://") or filename.startswith("https://"): + path = await download_file(filename) + else: + LOG.error("SendEmailBlock: Looking for file locally", filename=filename) + if not os.path.exists(filename): + raise FileNotFoundError(f"File not found: {filename}") + if not os.path.isfile(filename): + raise IsADirectoryError(f"Path is a directory: {filename}") + + LOG.info("SendEmailBlock: Found file locally", path=path) + path = filename + + if not path: + raise FileNotFoundError(f"File not found: {filename}") + + # Guess the content type based on the file's extension. Encoding + # will be ignored, although we should check for simple things like + # gzip'd or compressed files. + kind = filetype.guess(path) + if kind: + ctype = kind.mime + extension = kind.extension + else: + # No guess could be made, or the file is encoded (compressed), so + # use a generic bag-of-bits type. + ctype = "application/octet-stream" + extension = None + + maintype, subtype = ctype.split("/", 1) + attachment_filename = urlparse(filename).path.replace("/", "_") + + # Check if the filename has an extension + if not Path(attachment_filename).suffix: + # If no extension, guess it based on the MIME type + if extension: + attachment_filename += f".{extension}" + + LOG.info( + "SendEmailBlock: Adding attachment", + filename=attachment_filename, + maintype=maintype, + subtype=subtype, + ) + with open(path, "rb") as fp: + msg.add_attachment(fp.read(), maintype=maintype, subtype=subtype, filename=attachment_filename) + finally: + if path: + os.unlink(path) + + return msg + + async def execute(self, workflow_run_id: str, **kwargs: dict) -> OutputParameter | None: + workflow_run_context = self.get_workflow_run_context(workflow_run_id) + smtp_host_value, smtp_port_value, smtp_username_value, smtp_password_value = self._decrypt_smtp_parameters( + workflow_run_context + ) + + smtp_host = None + try: + smtp_host = smtplib.SMTP(smtp_host_value, smtp_port_value) + LOG.info("SendEmailBlock: Connected to SMTP server") + smtp_host.starttls() + smtp_host.login(smtp_username_value, smtp_password_value) + LOG.info("SendEmailBlock: Logged in to SMTP server") + message = await self._build_email_message(workflow_run_context, workflow_run_id) + smtp_host.send_message(message) + LOG.info("SendEmailBlock: Email sent") + except Exception as e: + LOG.error("SendEmailBlock: Failed to send email", error=str(e)) + if self.output_parameter: + await workflow_run_context.register_output_parameter_value_post_execution( + parameter=self.output_parameter, + value={ + "success": False, + "error": str(e), + }, + ) + await app.DATABASE.create_workflow_run_output_parameter( + workflow_run_id=workflow_run_id, + output_parameter_id=self.output_parameter.output_parameter_id, + value={ + "success": False, + "error": str(e), + }, + ) + return self.output_parameter + raise e + finally: + if smtp_host: + smtp_host.quit() + + if self.output_parameter: + await workflow_run_context.register_output_parameter_value_post_execution( + parameter=self.output_parameter, + value={ + "success": True, + }, + ) + await app.DATABASE.create_workflow_run_output_parameter( + workflow_run_id=workflow_run_id, + output_parameter_id=self.output_parameter.output_parameter_id, + value={ + "success": True, + }, + ) + return self.output_parameter + + return None + + +BlockSubclasses = Union[ForLoopBlock, TaskBlock, CodeBlock, TextPromptBlock, DownloadToS3Block, SendEmailBlock] BlockTypeVar = Annotated[BlockSubclasses, Field(discriminator="block_type")] diff --git a/skyvern/forge/sdk/workflow/models/yaml.py b/skyvern/forge/sdk/workflow/models/yaml.py index 03ab4a005..1d8db371e 100644 --- a/skyvern/forge/sdk/workflow/models/yaml.py +++ b/skyvern/forge/sdk/workflow/models/yaml.py @@ -117,10 +117,30 @@ class DownloadToS3BlockYAML(BlockYAML): url: str +class SendEmailBlockYAML(BlockYAML): + # There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error: + # Parameter 1 of Literal[...] cannot be of type "Any" + # This pattern already works in block.py but since the BlockType is not defined in this file, mypy is not able + # to infer the type of the parameter_type attribute. + block_type: Literal[BlockType.SEND_EMAIL] = BlockType.SEND_EMAIL # type: ignore + + smtp_host_secret_parameter_key: str + smtp_port_secret_parameter_key: str + smtp_username_secret_parameter_key: str + smtp_password_secret_parameter_key: str + sender: str + recipients: list[str] + subject: str + body: str + file_attachments: list[str] | None = None + + PARAMETER_YAML_SUBCLASSES = AWSSecretParameterYAML | WorkflowParameterYAML | ContextParameterYAML | OutputParameterYAML PARAMETER_YAML_TYPES = Annotated[PARAMETER_YAML_SUBCLASSES, Field(discriminator="parameter_type")] -BLOCK_YAML_SUBCLASSES = TaskBlockYAML | ForLoopBlockYAML | CodeBlockYAML | TextPromptBlockYAML | DownloadToS3BlockYAML +BLOCK_YAML_SUBCLASSES = ( + TaskBlockYAML | ForLoopBlockYAML | CodeBlockYAML | TextPromptBlockYAML | DownloadToS3BlockYAML | SendEmailBlockYAML +) BLOCK_YAML_TYPES = Annotated[BLOCK_YAML_SUBCLASSES, Field(discriminator="block_type")] diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index cecfc1d57..7588972fc 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -27,6 +27,7 @@ CodeBlock, DownloadToS3Block, ForLoopBlock, + SendEmailBlock, TaskBlock, TextPromptBlock, ) @@ -739,4 +740,18 @@ async def block_yaml_to_block(block_yaml: BLOCK_YAML_TYPES, parameters: dict[str output_parameter=output_parameter, url=block_yaml.url, ) + elif block_yaml.block_type == BlockType.SEND_EMAIL: + return SendEmailBlock( + label=block_yaml.label, + output_parameter=output_parameter, + smtp_host=parameters[block_yaml.smtp_host_secret_parameter_key], + smtp_port=parameters[block_yaml.smtp_port_secret_parameter_key], + smtp_username=parameters[block_yaml.smtp_username_secret_parameter_key], + smtp_password=parameters[block_yaml.smtp_password_secret_parameter_key], + sender=block_yaml.sender, + recipients=block_yaml.recipients, + subject=block_yaml.subject, + body=block_yaml.body, + file_attachments=block_yaml.file_attachments or [], + ) raise ValueError(f"Invalid block type {block_yaml.block_type}") diff --git a/skyvern/webeye/actions/handler.py b/skyvern/webeye/actions/handler.py index b3a40265d..02aab41d3 100644 --- a/skyvern/webeye/actions/handler.py +++ b/skyvern/webeye/actions/handler.py @@ -131,7 +131,7 @@ async def handle_upload_file_action( ) return [ActionFailure(ImaginaryFileUrl(action.file_url))] xpath = await validate_actions_in_dom(action, page, scraped_page) - file_path = download_file(file_url) + file_path = await download_file(file_url) locator = page.locator(f"xpath={xpath}") is_file_input = await is_file_input_element(locator) if is_file_input: @@ -413,7 +413,11 @@ async def chain_click( file: list[str] | str = [] if action.file_url: file_url = get_actual_value_of_parameter_if_secret(task, action.file_url) - file = download_file(file_url) or [] + try: + file = await download_file(file_url) + except Exception: + LOG.exception("Failed to download file, continuing without it", action=action, file_url=file_url) + file = [] fc_func = lambda fc: fc.set_files(files=file) page.on("filechooser", fc_func)