diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 471e58437..c5433eb8d 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -888,17 +888,11 @@ async def execute_task_webhook( artifact_types=[ArtifactType.SCREENSHOT_ACTION], n=SettingsManager.get_settings().TASK_RESPONSE_ACTION_SCREENSHOT_COUNT, ) - latest_action_screenshot_urls = [] + if latest_action_screenshot_artifacts: - for artifact in latest_action_screenshot_artifacts: - screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(artifact) - if screenshot_url: - latest_action_screenshot_urls.append(screenshot_url) - else: - LOG.error( - "Failed to get share link for action screenshot", - artifact_id=artifact.artifact_id, - ) + latest_action_screenshot_urls = await app.ARTIFACT_MANAGER.get_share_links( + latest_action_screenshot_artifacts + ) else: LOG.error("Failed to get latest action screenshots") diff --git a/skyvern/forge/sdk/api/aws.py b/skyvern/forge/sdk/api/aws.py index 440112ddd..dae777c9a 100644 --- a/skyvern/forge/sdk/api/aws.py +++ b/skyvern/forge/sdk/api/aws.py @@ -75,17 +75,21 @@ async def download_file(self, uri: str, client: AioBaseClient = None) -> bytes | return None @execute_with_async_client(client_type=AWSClientType.S3) - async def create_presigned_url(self, uri: str, client: AioBaseClient = None) -> str | None: + async def create_presigned_urls(self, uris: list[str], client: AioBaseClient = None) -> list[str] | None: + presigned_urls = [] try: - parsed_uri = S3Uri(uri) - url = await client.generate_presigned_url( - "get_object", - Params={"Bucket": parsed_uri.bucket, "Key": parsed_uri.key}, - ExpiresIn=SettingsManager.get_settings().PRESIGNED_URL_EXPIRATION, - ) - return url + for uri in uris: + parsed_uri = S3Uri(uri) + url = await client.generate_presigned_url( + "get_object", + Params={"Bucket": parsed_uri.bucket, "Key": parsed_uri.key}, + ExpiresIn=SettingsManager.get_settings().PRESIGNED_URL_EXPIRATION, + ) + presigned_urls.append(url) + + return presigned_urls except Exception: - LOG.exception("Failed to create presigned url.", uri=uri) + LOG.exception("Failed to create presigned url for S3 objects.", uris=uris) return None diff --git a/skyvern/forge/sdk/artifact/manager.py b/skyvern/forge/sdk/artifact/manager.py index 6a22ad9fb..8cd38144b 100644 --- a/skyvern/forge/sdk/artifact/manager.py +++ b/skyvern/forge/sdk/artifact/manager.py @@ -71,6 +71,9 @@ async def retrieve_artifact(self, artifact: Artifact) -> bytes | None: async def get_share_link(self, artifact: Artifact) -> str | None: return await app.STORAGE.get_share_link(artifact) + async def get_share_links(self, artifacts: list[Artifact]) -> list[str] | None: + return await app.STORAGE.get_share_links(artifacts) + async def wait_for_upload_aiotasks_for_task(self, task_id: str) -> None: try: st = time.time() diff --git a/skyvern/forge/sdk/artifact/storage/base.py b/skyvern/forge/sdk/artifact/storage/base.py index c145e1f75..cfd78c558 100644 --- a/skyvern/forge/sdk/artifact/storage/base.py +++ b/skyvern/forge/sdk/artifact/storage/base.py @@ -40,6 +40,10 @@ async def retrieve_artifact(self, artifact: Artifact) -> bytes | None: async def get_share_link(self, artifact: Artifact) -> str | None: pass + @abstractmethod + async def get_share_links(self, artifacts: list[Artifact]) -> list[str] | None: + pass + @abstractmethod async def store_artifact_from_path(self, artifact: Artifact, path: str) -> None: pass diff --git a/skyvern/forge/sdk/artifact/storage/local.py b/skyvern/forge/sdk/artifact/storage/local.py index c50eec7ef..5fcb28000 100644 --- a/skyvern/forge/sdk/artifact/storage/local.py +++ b/skyvern/forge/sdk/artifact/storage/local.py @@ -52,6 +52,9 @@ async def retrieve_artifact(self, artifact: Artifact) -> bytes | None: async def get_share_link(self, artifact: Artifact) -> str: return artifact.uri + async def get_share_links(self, artifacts: list[Artifact]) -> list[str]: + return [artifact.uri for artifact in artifacts] + @staticmethod def _parse_uri_to_path(uri: str) -> str: parsed_uri = urlparse(uri) diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 07b47054e..f5ab34bf6 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -1,4 +1,3 @@ -import asyncio from typing import Annotated, Any import structlog @@ -238,17 +237,9 @@ async def get_task( artifact_types=[ArtifactType.SCREENSHOT_ACTION], n=SettingsManager.get_settings().TASK_RESPONSE_ACTION_SCREENSHOT_COUNT, ) - latest_action_screenshot_urls = [] + latest_action_screenshot_urls: list[str] | None = None if latest_action_screenshot_artifacts: - for artifact in latest_action_screenshot_artifacts: - screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(artifact) - if screenshot_url: - latest_action_screenshot_urls.append(screenshot_url) - else: - LOG.error( - "Failed to get share link for action screenshot", - artifact_id=artifact.artifact_id, - ) + latest_action_screenshot_urls = await app.ARTIFACT_MANAGER.get_share_links(latest_action_screenshot_artifacts) elif task_obj.status in [TaskStatus.terminated, TaskStatus.completed]: LOG.error( "Failed to get latest action screenshots in task response", @@ -416,9 +407,12 @@ async def get_agent_task_step_artifacts( organization_id=current_org.organization_id, ) if SettingsManager.get_settings().ENV != "local": - signed_urls = await asyncio.gather(*[app.ARTIFACT_MANAGER.get_share_link(artifact) for artifact in artifacts]) - for i, artifact in enumerate(artifacts): - artifact.signed_url = signed_urls[i] + signed_urls = await app.ARTIFACT_MANAGER.get_share_links(artifacts) + if signed_urls: + for i, artifact in enumerate(artifacts): + artifact.signed_url = signed_urls[i] + else: + LOG.error("Failed to get signed urls for artifacts", task_id=task_id, step_id=step_id) return ORJSONResponse([artifact.model_dump() for artifact in artifacts]) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 0fb08e31c..7f4f3586e 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -451,7 +451,8 @@ async def build_workflow_run_status_response( workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id) workflow_run_tasks = await app.DATABASE.get_tasks_by_workflow_run_id(workflow_run_id=workflow_run_id) - screenshot_urls = [] + screenshot_artifacts = [] + screenshot_urls: list[str] | None = None # get the last screenshot for the last 3 tasks of the workflow run for task in workflow_run_tasks[::-1]: screenshot_artifact = await app.DATABASE.get_latest_artifact( @@ -460,11 +461,11 @@ async def build_workflow_run_status_response( organization_id=organization_id, ) if screenshot_artifact: - screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(screenshot_artifact) - if screenshot_url: - screenshot_urls.append(screenshot_url) - if len(screenshot_urls) >= 3: + screenshot_artifacts.append(screenshot_artifact) + if len(screenshot_artifacts) >= 3: break + if screenshot_artifacts: + screenshot_urls = await app.ARTIFACT_MANAGER.get_share_links(screenshot_artifacts) recording_url = None recording_artifact = await app.DATABASE.get_artifact_for_workflow_run(