diff --git a/latch/executions.py b/latch/executions.py index 0202056f..44e958aa 100644 --- a/latch/executions.py +++ b/latch/executions.py @@ -1,12 +1,15 @@ import os import re from dataclasses import dataclass -from typing import Optional +from typing import List, Optional, Union import click import gql from latch_sdk_gql.execute import execute +from latch.types.directory import LatchDir +from latch.types.file import LatchFile + pod_name_regex = re.compile( r""" ^( @@ -103,3 +106,31 @@ def rename_current_execution(name: str): """), {"argName": name, "argToken": token}, ) + + +def add_execution_results(results: List[str]): + token = os.environ.get("FLYTE_INTERNAL_EXECUTION_ID", None) + if token is None: + # noop during local execution / testing + click.secho( + "Running in local execution context - skip adding output results.", + dim=True, + italic=True, + ) + return + + execute( + gql.gql(""" + mutation addExecutionResults( + $argToken: String!, + $argPaths: [String]! + ) { + executionInfoMetadataPublishResults( + input: {argToken: $argToken, argPaths: $argPaths} + ) { + clientMutationId + } + } + """), + {"argToken": token, "argPaths": results}, + ) diff --git a/latch/types/metadata.py b/latch/types/metadata.py index 99b38c96..e9be1df1 100644 --- a/latch/types/metadata.py +++ b/latch/types/metadata.py @@ -31,7 +31,7 @@ from latch_cli.snakemake.config.utils import validate_snakemake_type from latch_cli.utils import identifier_suffix_from_str -from .directory import LatchDir +from .directory import LatchDir, LatchOutputDir from .file import LatchFile @@ -501,8 +501,25 @@ class NextflowParameter(Generic[T], LatchParameter): Should return the path of the constructed samplesheet. If samplesheet_type is also specified, this takes precedence. Only used if the provided parameter is a samplesheet (samplesheet=True) """ + results_paths: Optional[List[Path]] = None + """ + Output sub-paths that will be exposed in the UI under the "Results" tab on the workflow execution page. + + Only valid where the `type` attribute is a LatchDir + """ def __post_init__(self): + if self.results_paths is not None and self.type not in { + LatchDir, + LatchOutputDir, + }: + click.secho( + "`results_paths` attribute can only be defined for parameters" + " of type `LatchDir`.", + fg="red", + ) + raise click.exceptions.Exit(1) + if not self.samplesheet or self.samplesheet_constructor is not None: return diff --git a/latch_cli/nextflow/workflow.py b/latch_cli/nextflow/workflow.py index 52a41a62..5d0a27af 100644 --- a/latch_cli/nextflow/workflow.py +++ b/latch_cli/nextflow/workflow.py @@ -2,13 +2,14 @@ from enum import Enum from pathlib import Path from textwrap import dedent -from typing import Any, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple import click import latch.types.metadata as metadata from latch.types.directory import LatchDir, LatchOutputDir from latch.types.file import LatchFile +from latch.types.metadata import NextflowParameter from latch_cli.snakemake.config.utils import get_preamble, type_repr from latch_cli.snakemake.utils import reindent from latch_cli.utils import identifier_from_str, urljoins @@ -67,59 +68,60 @@ def initialize() -> str: @nextflow_runtime_task(cpu={cpu}, memory={memory}, storage_gib={storage_gib}) def nextflow_runtime(pvc_name: str, {param_signature}) -> None: - try: - shared_dir = Path("/nf-workdir") + shared_dir = Path("/nf-workdir") +{output_shortcuts} {samplesheet_constructors} - ignore_list = [ - "latch", - ".latch", - ".git", - "nextflow", - ".nextflow", - "work", - "results", - "miniconda", - "anaconda3", - "mambaforge", - ] - - shutil.copytree( - Path("/root"), - shared_dir, - ignore=lambda src, names: ignore_list, - ignore_dangling_symlinks=True, - dirs_exist_ok=True, - ) + ignore_list = [ + "latch", + ".latch", + ".git", + "nextflow", + ".nextflow", + "work", + "results", + "miniconda", + "anaconda3", + "mambaforge", + ] + + shutil.copytree( + Path("/root"), + shared_dir, + ignore=lambda src, names: ignore_list, + ignore_dangling_symlinks=True, + dirs_exist_ok=True, + ) - profile_list = {execution_profile} - if {configurable_profiles}: - profile_list.extend([p.value for p in execution_profiles]) - - if len(profile_list) == 0: - profile_list.append("standard") - - profiles = ','.join(profile_list) - - cmd = [ - "/root/nextflow", - "run", - str(shared_dir / "{nf_script}"), - "-work-dir", - str(shared_dir), - "-profile", - profiles, - "-c", - "latch.config", - "-resume", + profile_list = {execution_profile} + if {configurable_profiles}: + profile_list.extend([p.value for p in execution_profiles]) + + if len(profile_list) == 0: + profile_list.append("standard") + + profiles = ','.join(profile_list) + + cmd = [ + "/root/nextflow", + "run", + str(shared_dir / "{nf_script}"), + "-work-dir", + str(shared_dir), + "-profile", + profiles, + "-c", + "latch.config", + "-resume", {params_to_flags} - ] + ] - print("Launching Nextflow Runtime") - print(' '.join(cmd)) - print(flush=True) + print("Launching Nextflow Runtime") + print(' '.join(cmd)) + print(flush=True) + try: env = {{ **os.environ, "NXF_HOME": "/root/.nextflow", @@ -207,6 +209,34 @@ def generate_nextflow_config(pkg_root: Path): click.secho(f"Nextflow Latch config written to {config_path}", fg="green") +def get_results_code_block(parameters: Dict[str, NextflowParameter]) -> str: + output_shortcuts = [ + (var_name, sub_path) + for var_name, param in parameters.items() + if param.results_paths is not None + for sub_path in param.results_paths + ] + + if len(output_shortcuts) == 0: + return "" + + code_block = dedent(""" + from latch.executions import add_execution_results + + results = [] + """) + + for var_name, sub_path in output_shortcuts: + code_block += dedent( + f"results.append(os.path.join({var_name}.remote_path," + f" '{str(sub_path).lstrip('/')}'))\n" + ) + + code_block += dedent(f"add_execution_results(results)\n") + + return code_block + + def generate_nextflow_workflow( pkg_root: Path, metadata_root: Path, @@ -342,6 +372,7 @@ def generate_nextflow_workflow( execution_profile=( execution_profile.split(",") if execution_profile is not None else [] ), + output_shortcuts=reindent(get_results_code_block(parameters), 1), configurable_profiles=len(profile_options) > 0, preambles="\n\n".join(list(preambles)), samplesheet_funs="\n".join(samplesheet_funs), diff --git a/latch_cli/snakemake/utils.py b/latch_cli/snakemake/utils.py index 06c2bc91..6b064958 100644 --- a/latch_cli/snakemake/utils.py +++ b/latch_cli/snakemake/utils.py @@ -21,6 +21,8 @@ def load_snakemake_metadata(pkg_root: Path, metadata_root: Path) -> Optional[Pat # todo(maximsmol): use a stateful writer that keeps track of indent level def reindent(x: str, level: int) -> str: + if len(x) == 0: + return x if x[0] == "\n": x = x[1:] return textwrap.indent(textwrap.dedent(x), " " * level)