Skip to content

Commit

Permalink
Merge pull request #476 from latchbio/rahuldesai1/execution-results-page
Browse files Browse the repository at this point in the history
execution results page
  • Loading branch information
rahuldesai1 authored Jul 24, 2024
2 parents 59e01e1 + 6877249 commit 6564adf
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 49 deletions.
33 changes: 32 additions & 1 deletion latch/executions.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import os
import re
from dataclasses import dataclass
from typing import Optional
from typing import List, Optional, Union

import click
import gql
from latch_sdk_gql.execute import execute

from latch.types.directory import LatchDir
from latch.types.file import LatchFile

pod_name_regex = re.compile(
r"""
^(
Expand Down Expand Up @@ -103,3 +106,31 @@ def rename_current_execution(name: str):
"""),
{"argName": name, "argToken": token},
)


def add_execution_results(results: List[str]):
token = os.environ.get("FLYTE_INTERNAL_EXECUTION_ID", None)
if token is None:
# noop during local execution / testing
click.secho(
"Running in local execution context - skip adding output results.",
dim=True,
italic=True,
)
return

execute(
gql.gql("""
mutation addExecutionResults(
$argToken: String!,
$argPaths: [String]!
) {
executionInfoMetadataPublishResults(
input: {argToken: $argToken, argPaths: $argPaths}
) {
clientMutationId
}
}
"""),
{"argToken": token, "argPaths": results},
)
19 changes: 18 additions & 1 deletion latch/types/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from latch_cli.snakemake.config.utils import validate_snakemake_type
from latch_cli.utils import identifier_suffix_from_str

from .directory import LatchDir
from .directory import LatchDir, LatchOutputDir
from .file import LatchFile


Expand Down Expand Up @@ -510,8 +510,25 @@ class NextflowParameter(Generic[T], LatchParameter):
Should return the path of the constructed samplesheet. If samplesheet_type is also specified, this takes precedence.
Only used if the provided parameter is a samplesheet (samplesheet=True)
"""
results_paths: Optional[List[Path]] = None
"""
Output sub-paths that will be exposed in the UI under the "Results" tab on the workflow execution page.
Only valid where the `type` attribute is a LatchDir
"""

def __post_init__(self):
if self.results_paths is not None and self.type not in {
LatchDir,
LatchOutputDir,
}:
click.secho(
"`results_paths` attribute can only be defined for parameters"
" of type `LatchDir`.",
fg="red",
)
raise click.exceptions.Exit(1)

if not self.samplesheet or self.samplesheet_constructor is not None:
return

Expand Down
125 changes: 78 additions & 47 deletions latch_cli/nextflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
from enum import Enum
from pathlib import Path
from textwrap import dedent
from typing import Any, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple

import click

import latch.types.metadata as metadata
from latch.types.directory import LatchDir, LatchOutputDir
from latch.types.file import LatchFile
from latch.types.metadata import NextflowParameter
from latch_cli.snakemake.config.utils import get_preamble, type_repr
from latch_cli.snakemake.utils import reindent
from latch_cli.utils import identifier_from_str, urljoins
Expand Down Expand Up @@ -69,59 +70,60 @@ def initialize() -> str:
@nextflow_runtime_task(cpu={cpu}, memory={memory}, storage_gib={storage_gib})
def nextflow_runtime(pvc_name: str, {param_signature}) -> None:
try:
shared_dir = Path("/nf-workdir")
shared_dir = Path("/nf-workdir")
{output_shortcuts}
{samplesheet_constructors}
ignore_list = [
"latch",
".latch",
".git",
"nextflow",
".nextflow",
"work",
"results",
"miniconda",
"anaconda3",
"mambaforge",
]
shutil.copytree(
Path("/root"),
shared_dir,
ignore=lambda src, names: ignore_list,
ignore_dangling_symlinks=True,
dirs_exist_ok=True,
)
ignore_list = [
"latch",
".latch",
".git",
"nextflow",
".nextflow",
"work",
"results",
"miniconda",
"anaconda3",
"mambaforge",
]
shutil.copytree(
Path("/root"),
shared_dir,
ignore=lambda src, names: ignore_list,
ignore_dangling_symlinks=True,
dirs_exist_ok=True,
)
profile_list = {execution_profile}
if {configurable_profiles}:
profile_list.extend([p.value for p in execution_profiles])
if len(profile_list) == 0:
profile_list.append("standard")
profiles = ','.join(profile_list)
cmd = [
"/root/nextflow",
"run",
str(shared_dir / "{nf_script}"),
"-work-dir",
str(shared_dir),
"-profile",
profiles,
"-c",
"latch.config",
"-resume",
profile_list = {execution_profile}
if {configurable_profiles}:
profile_list.extend([p.value for p in execution_profiles])
if len(profile_list) == 0:
profile_list.append("standard")
profiles = ','.join(profile_list)
cmd = [
"/root/nextflow",
"run",
str(shared_dir / "{nf_script}"),
"-work-dir",
str(shared_dir),
"-profile",
profiles,
"-c",
"latch.config",
"-resume",
{params_to_flags}
]
]
print("Launching Nextflow Runtime")
print(' '.join(cmd))
print(flush=True)
print("Launching Nextflow Runtime")
print(' '.join(cmd))
print(flush=True)
try:
env = {{
**os.environ,
"NXF_HOME": "/root/.nextflow",
Expand Down Expand Up @@ -209,6 +211,34 @@ def generate_nextflow_config(pkg_root: Path):
click.secho(f"Nextflow Latch config written to {config_path}", fg="green")


def get_results_code_block(parameters: Dict[str, NextflowParameter]) -> str:
output_shortcuts = [
(var_name, sub_path)
for var_name, param in parameters.items()
if param.results_paths is not None
for sub_path in param.results_paths
]

if len(output_shortcuts) == 0:
return ""

code_block = dedent("""
from latch.executions import add_execution_results
results = []
""")

for var_name, sub_path in output_shortcuts:
code_block += dedent(
f"results.append(os.path.join({var_name}.remote_path,"
f" '{str(sub_path).lstrip('/')}'))\n"
)

code_block += dedent(f"add_execution_results(results)\n")

return code_block


def generate_nextflow_workflow(
pkg_root: Path,
metadata_root: Path,
Expand Down Expand Up @@ -344,6 +374,7 @@ def generate_nextflow_workflow(
execution_profile=(
execution_profile.split(",") if execution_profile is not None else []
),
output_shortcuts=reindent(get_results_code_block(parameters), 1),
configurable_profiles=len(profile_options) > 0,
preambles="\n\n".join(list(preambles)),
samplesheet_funs="\n".join(samplesheet_funs),
Expand Down
2 changes: 2 additions & 0 deletions latch_cli/snakemake/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def load_snakemake_metadata(pkg_root: Path, metadata_root: Path) -> Optional[Pat

# todo(maximsmol): use a stateful writer that keeps track of indent level
def reindent(x: str, level: int) -> str:
if len(x) == 0:
return x
if x[0] == "\n":
x = x[1:]
return textwrap.indent(textwrap.dedent(x), " " * level)

0 comments on commit 6564adf

Please sign in to comment.