From cb343814870d495b29e93ccb73811d28bb6f4ab6 Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Wed, 17 Jul 2024 18:33:14 -0700 Subject: [PATCH 1/5] add output_shortcuts to latch metadata for nextflow --- latch/types/metadata.py | 31 ++++++- latch_cli/nextflow/workflow.py | 142 ++++++++++++++++++++++----------- latch_cli/snakemake/utils.py | 2 + 3 files changed, 127 insertions(+), 48 deletions(-) diff --git a/latch/types/metadata.py b/latch/types/metadata.py index 99b38c96..30204384 100644 --- a/latch/types/metadata.py +++ b/latch/types/metadata.py @@ -31,7 +31,7 @@ from latch_cli.snakemake.config.utils import validate_snakemake_type from latch_cli.utils import identifier_suffix_from_str -from .directory import LatchDir +from .directory import LatchDir, LatchOutputDir from .file import LatchFile @@ -477,6 +477,18 @@ class SnakemakeFileMetadata: """ +@dataclass +class ShortcutPath: + display_name: str + """ + Name of shortcut as it appears in the Latch Console + """ + path: Path + """ + Sub-path to expose + """ + + @dataclass class NextflowParameter(Generic[T], LatchParameter): type: Optional[Type[T]] = None @@ -501,8 +513,25 @@ class NextflowParameter(Generic[T], LatchParameter): Should return the path of the constructed samplesheet. If samplesheet_type is also specified, this takes precedence. Only used if the provided parameter is a samplesheet (samplesheet=True) """ + shortcut_paths: Optional[List[ShortcutPath]] = None + """ + Output paths that will be exposed in the UI as shortcuts. Should be used to + expose important workflow outputs to the user. + + Only valid for LatchDir type + """ def __post_init__(self): + if self.shortcut_paths is not None and self.type not in { + LatchDir, + LatchOutputDir, + }: + click.secho( + "Shortcut paths can only be defined for LatchDir parameters.", + fg="red", + ) + raise click.exceptions.Exit(1) + if not self.samplesheet or self.samplesheet_constructor is not None: return diff --git a/latch_cli/nextflow/workflow.py b/latch_cli/nextflow/workflow.py index 52a41a62..c0af9df2 100644 --- a/latch_cli/nextflow/workflow.py +++ b/latch_cli/nextflow/workflow.py @@ -2,13 +2,14 @@ from enum import Enum from pathlib import Path from textwrap import dedent -from typing import Any, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple import click import latch.types.metadata as metadata from latch.types.directory import LatchDir, LatchOutputDir from latch.types.file import LatchFile +from latch.types.metadata import NextflowParameter from latch_cli.snakemake.config.utils import get_preamble, type_repr from latch_cli.snakemake.utils import reindent from latch_cli.utils import identifier_from_str, urljoins @@ -67,59 +68,60 @@ def initialize() -> str: @nextflow_runtime_task(cpu={cpu}, memory={memory}, storage_gib={storage_gib}) def nextflow_runtime(pvc_name: str, {param_signature}) -> None: - try: - shared_dir = Path("/nf-workdir") + shared_dir = Path("/nf-workdir") +{output_shortcuts} {samplesheet_constructors} - ignore_list = [ - "latch", - ".latch", - ".git", - "nextflow", - ".nextflow", - "work", - "results", - "miniconda", - "anaconda3", - "mambaforge", - ] - - shutil.copytree( - Path("/root"), - shared_dir, - ignore=lambda src, names: ignore_list, - ignore_dangling_symlinks=True, - dirs_exist_ok=True, - ) + ignore_list = [ + "latch", + ".latch", + ".git", + "nextflow", + ".nextflow", + "work", + "results", + "miniconda", + "anaconda3", + "mambaforge", + ] + + shutil.copytree( + Path("/root"), + shared_dir, + ignore=lambda src, names: ignore_list, + ignore_dangling_symlinks=True, + dirs_exist_ok=True, + ) - profile_list = {execution_profile} - if {configurable_profiles}: - profile_list.extend([p.value for p in execution_profiles]) - - if len(profile_list) == 0: - profile_list.append("standard") - - profiles = ','.join(profile_list) - - cmd = [ - "/root/nextflow", - "run", - str(shared_dir / "{nf_script}"), - "-work-dir", - str(shared_dir), - "-profile", - profiles, - "-c", - "latch.config", - "-resume", + profile_list = {execution_profile} + if {configurable_profiles}: + profile_list.extend([p.value for p in execution_profiles]) + + if len(profile_list) == 0: + profile_list.append("standard") + + profiles = ','.join(profile_list) + + cmd = [ + "/root/nextflow", + "run", + str(shared_dir / "{nf_script}"), + "-work-dir", + str(shared_dir), + "-profile", + profiles, + "-c", + "latch.config", + "-resume", {params_to_flags} - ] + ] - print("Launching Nextflow Runtime") - print(' '.join(cmd)) - print(flush=True) + print("Launching Nextflow Runtime") + print(' '.join(cmd)) + print(flush=True) + try: env = {{ **os.environ, "NXF_HOME": "/root/.nextflow", @@ -207,6 +209,51 @@ def generate_nextflow_config(pkg_root: Path): click.secho(f"Nextflow Latch config written to {config_path}", fg="green") +def get_output_shortcuts(parameters: Dict[str, NextflowParameter]) -> str: + output_shortcuts = [ + (name, shortcut) + for name, param in parameters.items() + if param.shortcut_paths is not None + for shortcut in param.shortcut_paths + ] + + if len(output_shortcuts) == 0: + return "" + + code_block = "output_shortcuts = []\n" + + for name, shortcut in output_shortcuts: + code_block += dedent(f"""\ + output_shortcuts.append({{ + 'display_name': '{shortcut.display_name}', + 'shortcut': os.path.join({name}.remote_path, '{str(shortcut.path).lstrip("/")}'), + }}) + """) + + code_block += dedent(f"""\ + metadata = {{"output_shortcuts": output_shortcuts}} + + execute( + gql.gql(\""" + mutation UpdateNfExecutionMetadata( + $argToken: String!, + $argMetadata: JSON! + + ) {{ + updateNfExecutionMetadata( + input: {{argToken: $argToken, argMetadata: $argMetadata}} + ) {{ + clientMutationId + }} + }} + \"""), + {{"argMetadata": metadata}}, + ) + """) + + return code_block + + def generate_nextflow_workflow( pkg_root: Path, metadata_root: Path, @@ -342,6 +389,7 @@ def generate_nextflow_workflow( execution_profile=( execution_profile.split(",") if execution_profile is not None else [] ), + output_shortcuts=reindent(get_output_shortcuts(parameters), 1), configurable_profiles=len(profile_options) > 0, preambles="\n\n".join(list(preambles)), samplesheet_funs="\n".join(samplesheet_funs), diff --git a/latch_cli/snakemake/utils.py b/latch_cli/snakemake/utils.py index 06c2bc91..6b064958 100644 --- a/latch_cli/snakemake/utils.py +++ b/latch_cli/snakemake/utils.py @@ -21,6 +21,8 @@ def load_snakemake_metadata(pkg_root: Path, metadata_root: Path) -> Optional[Pat # todo(maximsmol): use a stateful writer that keeps track of indent level def reindent(x: str, level: int) -> str: + if len(x) == 0: + return x if x[0] == "\n": x = x[1:] return textwrap.indent(textwrap.dedent(x), " " * level) From 98417872ce2f0541b62cb87401f4c7772d9863d2 Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Thu, 18 Jul 2024 14:57:37 -0700 Subject: [PATCH 2/5] make output paths generic --- latch/executions.py | 33 ++++++++++++++++++++- latch/types/metadata.py | 24 ++++------------ latch_cli/nextflow/workflow.py | 52 +++++++++++----------------------- 3 files changed, 54 insertions(+), 55 deletions(-) diff --git a/latch/executions.py b/latch/executions.py index 0202056f..3f6d2a2a 100644 --- a/latch/executions.py +++ b/latch/executions.py @@ -1,12 +1,15 @@ import os import re from dataclasses import dataclass -from typing import Optional +from typing import List, Optional, Union import click import gql from latch_sdk_gql.execute import execute +from latch.types.directory import LatchDir +from latch.types.file import LatchFile + pod_name_regex = re.compile( r""" ^( @@ -103,3 +106,31 @@ def rename_current_execution(name: str): """), {"argName": name, "argToken": token}, ) + + +def add_results(results: List[str]): + token = os.environ.get("FLYTE_INTERNAL_EXECUTION_ID", None) + if token is None: + # noop during local execution / testing + click.secho( + "Running in local execution context - skip adding output results.", + dim=True, + italic=True, + ) + return + + execute( + gql.gql(""" + mutation addExecutionResults( + $argToken: String!, + $argPaths: [String]! + ) { + addExecutionResults( + input: {argToken: $argToken, argPaths: $argPaths}} + ) { + clientMutationId + } + } + """), + {"argToken": token, "argPaths": results}, + ) diff --git a/latch/types/metadata.py b/latch/types/metadata.py index 30204384..e9be1df1 100644 --- a/latch/types/metadata.py +++ b/latch/types/metadata.py @@ -477,18 +477,6 @@ class SnakemakeFileMetadata: """ -@dataclass -class ShortcutPath: - display_name: str - """ - Name of shortcut as it appears in the Latch Console - """ - path: Path - """ - Sub-path to expose - """ - - @dataclass class NextflowParameter(Generic[T], LatchParameter): type: Optional[Type[T]] = None @@ -513,21 +501,21 @@ class NextflowParameter(Generic[T], LatchParameter): Should return the path of the constructed samplesheet. If samplesheet_type is also specified, this takes precedence. Only used if the provided parameter is a samplesheet (samplesheet=True) """ - shortcut_paths: Optional[List[ShortcutPath]] = None + results_paths: Optional[List[Path]] = None """ - Output paths that will be exposed in the UI as shortcuts. Should be used to - expose important workflow outputs to the user. + Output sub-paths that will be exposed in the UI under the "Results" tab on the workflow execution page. - Only valid for LatchDir type + Only valid where the `type` attribute is a LatchDir """ def __post_init__(self): - if self.shortcut_paths is not None and self.type not in { + if self.results_paths is not None and self.type not in { LatchDir, LatchOutputDir, }: click.secho( - "Shortcut paths can only be defined for LatchDir parameters.", + "`results_paths` attribute can only be defined for parameters" + " of type `LatchDir`.", fg="red", ) raise click.exceptions.Exit(1) diff --git a/latch_cli/nextflow/workflow.py b/latch_cli/nextflow/workflow.py index c0af9df2..f99321d4 100644 --- a/latch_cli/nextflow/workflow.py +++ b/latch_cli/nextflow/workflow.py @@ -209,47 +209,27 @@ def generate_nextflow_config(pkg_root: Path): click.secho(f"Nextflow Latch config written to {config_path}", fg="green") -def get_output_shortcuts(parameters: Dict[str, NextflowParameter]) -> str: +def get_results_code_block(parameters: Dict[str, NextflowParameter]) -> str: output_shortcuts = [ - (name, shortcut) - for name, param in parameters.items() - if param.shortcut_paths is not None - for shortcut in param.shortcut_paths + (var_name, sub_path) + for var_name, param in parameters.items() + if param.results_paths is not None + for sub_path in param.results_paths ] if len(output_shortcuts) == 0: return "" - code_block = "output_shortcuts = []\n" - - for name, shortcut in output_shortcuts: - code_block += dedent(f"""\ - output_shortcuts.append({{ - 'display_name': '{shortcut.display_name}', - 'shortcut': os.path.join({name}.remote_path, '{str(shortcut.path).lstrip("/")}'), - }}) - """) - - code_block += dedent(f"""\ - metadata = {{"output_shortcuts": output_shortcuts}} - - execute( - gql.gql(\""" - mutation UpdateNfExecutionMetadata( - $argToken: String!, - $argMetadata: JSON! - - ) {{ - updateNfExecutionMetadata( - input: {{argToken: $argToken, argMetadata: $argMetadata}} - ) {{ - clientMutationId - }} - }} - \"""), - {{"argMetadata": metadata}}, - ) - """) + code_block = dedent(""" + from latch.execution import add_results + + results = [] + """) + + for var_name, sub_path in output_shortcuts: + code_block += dedent(f"results.append(os.path.join({var_name}.remote_path, '{str(sub_path).lstrip("/")}'))\n") + + code_block += dedent(f"add_results(results)\n") return code_block @@ -389,7 +369,7 @@ def generate_nextflow_workflow( execution_profile=( execution_profile.split(",") if execution_profile is not None else [] ), - output_shortcuts=reindent(get_output_shortcuts(parameters), 1), + output_shortcuts=reindent(get_results_code_block(parameters), 1), configurable_profiles=len(profile_options) > 0, preambles="\n\n".join(list(preambles)), samplesheet_funs="\n".join(samplesheet_funs), From a3b20544654ebd2fcbd2182576d09097a9327239 Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Thu, 18 Jul 2024 15:25:45 -0700 Subject: [PATCH 3/5] update function name --- latch/executions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/latch/executions.py b/latch/executions.py index 3f6d2a2a..2c885981 100644 --- a/latch/executions.py +++ b/latch/executions.py @@ -125,7 +125,7 @@ def add_results(results: List[str]): $argToken: String!, $argPaths: [String]! ) { - addExecutionResults( + addExecutionResultsByToken( input: {argToken: $argToken, argPaths: $argPaths}} ) { clientMutationId From b856b4c25a0eea7ffb727f2323404bb65740c101 Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Fri, 19 Jul 2024 12:29:54 -0700 Subject: [PATCH 4/5] bug fixes --- latch/executions.py | 4 ++-- latch_cli/nextflow/workflow.py | 9 ++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/latch/executions.py b/latch/executions.py index 2c885981..bd98c2e3 100644 --- a/latch/executions.py +++ b/latch/executions.py @@ -108,7 +108,7 @@ def rename_current_execution(name: str): ) -def add_results(results: List[str]): +def add_execution_results(results: List[str]): token = os.environ.get("FLYTE_INTERNAL_EXECUTION_ID", None) if token is None: # noop during local execution / testing @@ -126,7 +126,7 @@ def add_results(results: List[str]): $argPaths: [String]! ) { addExecutionResultsByToken( - input: {argToken: $argToken, argPaths: $argPaths}} + input: {argToken: $argToken, argPaths: $argPaths} ) { clientMutationId } diff --git a/latch_cli/nextflow/workflow.py b/latch_cli/nextflow/workflow.py index f99321d4..5d0a27af 100644 --- a/latch_cli/nextflow/workflow.py +++ b/latch_cli/nextflow/workflow.py @@ -221,15 +221,18 @@ def get_results_code_block(parameters: Dict[str, NextflowParameter]) -> str: return "" code_block = dedent(""" - from latch.execution import add_results + from latch.executions import add_execution_results results = [] """) for var_name, sub_path in output_shortcuts: - code_block += dedent(f"results.append(os.path.join({var_name}.remote_path, '{str(sub_path).lstrip("/")}'))\n") + code_block += dedent( + f"results.append(os.path.join({var_name}.remote_path," + f" '{str(sub_path).lstrip('/')}'))\n" + ) - code_block += dedent(f"add_results(results)\n") + code_block += dedent(f"add_execution_results(results)\n") return code_block From 68772494935504bb031eb2ea2ade51f712e59084 Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Tue, 23 Jul 2024 17:35:08 -0700 Subject: [PATCH 5/5] change func name for publishing results --- latch/executions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/latch/executions.py b/latch/executions.py index bd98c2e3..44e958aa 100644 --- a/latch/executions.py +++ b/latch/executions.py @@ -125,7 +125,7 @@ def add_execution_results(results: List[str]): $argToken: String!, $argPaths: [String]! ) { - addExecutionResultsByToken( + executionInfoMetadataPublishResults( input: {argToken: $argToken, argPaths: $argPaths} ) { clientMutationId