Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execution results page #476

Merged
merged 5 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion latch/executions.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import os
import re
from dataclasses import dataclass
from typing import Optional
from typing import List, Optional, Union

import click
import gql
from latch_sdk_gql.execute import execute

from latch.types.directory import LatchDir
from latch.types.file import LatchFile

pod_name_regex = re.compile(
r"""
^(
Expand Down Expand Up @@ -103,3 +106,31 @@ def rename_current_execution(name: str):
"""),
{"argName": name, "argToken": token},
)


def add_execution_results(results: List[str]):
token = os.environ.get("FLYTE_INTERNAL_EXECUTION_ID", None)
if token is None:
# noop during local execution / testing
click.secho(
"Running in local execution context - skip adding output results.",
dim=True,
italic=True,
)
return

execute(
gql.gql("""
mutation addExecutionResults(
$argToken: String!,
$argPaths: [String]!
) {
executionInfoMetadataPublishResults(
input: {argToken: $argToken, argPaths: $argPaths}
) {
clientMutationId
}
}
"""),
{"argToken": token, "argPaths": results},
)
19 changes: 18 additions & 1 deletion latch/types/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from latch_cli.snakemake.config.utils import validate_snakemake_type
from latch_cli.utils import identifier_suffix_from_str

from .directory import LatchDir
from .directory import LatchDir, LatchOutputDir
from .file import LatchFile


Expand Down Expand Up @@ -501,8 +501,25 @@ class NextflowParameter(Generic[T], LatchParameter):
Should return the path of the constructed samplesheet. If samplesheet_type is also specified, this takes precedence.
Only used if the provided parameter is a samplesheet (samplesheet=True)
"""
results_paths: Optional[List[Path]] = None
"""
Output sub-paths that will be exposed in the UI under the "Results" tab on the workflow execution page.

Only valid where the `type` attribute is a LatchDir
"""

def __post_init__(self):
if self.results_paths is not None and self.type not in {
LatchDir,
LatchOutputDir,
}:
click.secho(
"`results_paths` attribute can only be defined for parameters"
" of type `LatchDir`.",
fg="red",
)
raise click.exceptions.Exit(1)

if not self.samplesheet or self.samplesheet_constructor is not None:
return

Expand Down
125 changes: 78 additions & 47 deletions latch_cli/nextflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
from enum import Enum
from pathlib import Path
from textwrap import dedent
from typing import Any, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple

import click

import latch.types.metadata as metadata
from latch.types.directory import LatchDir, LatchOutputDir
from latch.types.file import LatchFile
from latch.types.metadata import NextflowParameter
from latch_cli.snakemake.config.utils import get_preamble, type_repr
from latch_cli.snakemake.utils import reindent
from latch_cli.utils import identifier_from_str, urljoins
Expand Down Expand Up @@ -67,59 +68,60 @@ def initialize() -> str:

@nextflow_runtime_task(cpu={cpu}, memory={memory}, storage_gib={storage_gib})
def nextflow_runtime(pvc_name: str, {param_signature}) -> None:
try:
shared_dir = Path("/nf-workdir")
shared_dir = Path("/nf-workdir")

{output_shortcuts}
{samplesheet_constructors}

ignore_list = [
"latch",
".latch",
".git",
"nextflow",
".nextflow",
"work",
"results",
"miniconda",
"anaconda3",
"mambaforge",
]

shutil.copytree(
Path("/root"),
shared_dir,
ignore=lambda src, names: ignore_list,
ignore_dangling_symlinks=True,
dirs_exist_ok=True,
)
ignore_list = [
"latch",
".latch",
".git",
"nextflow",
".nextflow",
"work",
"results",
"miniconda",
"anaconda3",
"mambaforge",
]

shutil.copytree(
Path("/root"),
shared_dir,
ignore=lambda src, names: ignore_list,
ignore_dangling_symlinks=True,
dirs_exist_ok=True,
)

profile_list = {execution_profile}
if {configurable_profiles}:
profile_list.extend([p.value for p in execution_profiles])

if len(profile_list) == 0:
profile_list.append("standard")

profiles = ','.join(profile_list)

cmd = [
"/root/nextflow",
"run",
str(shared_dir / "{nf_script}"),
"-work-dir",
str(shared_dir),
"-profile",
profiles,
"-c",
"latch.config",
"-resume",
profile_list = {execution_profile}
if {configurable_profiles}:
profile_list.extend([p.value for p in execution_profiles])

if len(profile_list) == 0:
profile_list.append("standard")

profiles = ','.join(profile_list)

cmd = [
"/root/nextflow",
"run",
str(shared_dir / "{nf_script}"),
"-work-dir",
str(shared_dir),
"-profile",
profiles,
"-c",
"latch.config",
"-resume",
{params_to_flags}
]
]

print("Launching Nextflow Runtime")
print(' '.join(cmd))
print(flush=True)
print("Launching Nextflow Runtime")
print(' '.join(cmd))
print(flush=True)

try:
env = {{
**os.environ,
"NXF_HOME": "/root/.nextflow",
Expand Down Expand Up @@ -207,6 +209,34 @@ def generate_nextflow_config(pkg_root: Path):
click.secho(f"Nextflow Latch config written to {config_path}", fg="green")


def get_results_code_block(parameters: Dict[str, NextflowParameter]) -> str:
output_shortcuts = [
(var_name, sub_path)
for var_name, param in parameters.items()
if param.results_paths is not None
for sub_path in param.results_paths
]

if len(output_shortcuts) == 0:
return ""

code_block = dedent("""
from latch.executions import add_execution_results

results = []
""")

for var_name, sub_path in output_shortcuts:
code_block += dedent(
f"results.append(os.path.join({var_name}.remote_path,"
f" '{str(sub_path).lstrip('/')}'))\n"
)

code_block += dedent(f"add_execution_results(results)\n")

return code_block


def generate_nextflow_workflow(
pkg_root: Path,
metadata_root: Path,
Expand Down Expand Up @@ -342,6 +372,7 @@ def generate_nextflow_workflow(
execution_profile=(
execution_profile.split(",") if execution_profile is not None else []
),
output_shortcuts=reindent(get_results_code_block(parameters), 1),
configurable_profiles=len(profile_options) > 0,
preambles="\n\n".join(list(preambles)),
samplesheet_funs="\n".join(samplesheet_funs),
Expand Down
2 changes: 2 additions & 0 deletions latch_cli/snakemake/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def load_snakemake_metadata(pkg_root: Path, metadata_root: Path) -> Optional[Pat

# todo(maximsmol): use a stateful writer that keeps track of indent level
def reindent(x: str, level: int) -> str:
if len(x) == 0:
return x
if x[0] == "\n":
x = x[1:]
return textwrap.indent(textwrap.dedent(x), " " * level)
Loading