diff --git a/CHANGELOG.md b/CHANGELOG.md index d3cc95bc..23419832 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,13 @@ Types of changes # Latch SDK Changelog +## 2.53.7 - 2024-10-11 + +### Added + +* Nextflow + - add `latch attach` command to attach to a nextflow work directory + ## 2.53.6 - 2024-10-11 ### Added diff --git a/latch_cli/services/k8s/attach.py b/latch_cli/services/k8s/attach.py index c15375ef..29e58430 100644 --- a/latch_cli/services/k8s/attach.py +++ b/latch_cli/services/k8s/attach.py @@ -5,7 +5,9 @@ from typing import Optional from urllib.parse import urljoin, urlparse +import click import websockets.client as websockets +import websockets.exceptions as ws_exceptions from latch_sdk_config.latch import NUCLEUS_URL from latch_cli.services.k8s.utils import get_execution_info @@ -16,20 +18,35 @@ async def connect(execution_id: str, session_id: str): async with websockets.connect( - urlparse(urljoin(NUCLEUS_URL, "/workflows/cli/attach")) + urlparse(urljoin(NUCLEUS_URL, "/workflows/cli/attach-nf-workdir")) ._replace(scheme="wss") .geturl(), close_timeout=0, extra_headers={"Authorization": get_auth_header()}, ) as ws: - request = {"execution_id": execution_id, "session_id": session_id} + request = {"execution_id": int(execution_id), "session_id": session_id} await ws.send(json.dumps(request)) + data = await ws.recv() + + msg = "" + try: + res = json.loads(data) + if "error" in res: + raise RuntimeError(res["error"]) + except json.JSONDecodeError: + msg = "Unable to connect to pod - internal error." + except RuntimeError as e: + msg = str(e) + + if msg != "": + raise RuntimeError(msg) + await forward_stdio(ws) def get_session_id(): - return secrets.token_bytes(18).hex() + return secrets.token_bytes(8).hex() def attach(execution_id: Optional[str] = None): @@ -42,7 +59,15 @@ def attach(execution_id: Optional[str] = None): old_settings_stdin = termios.tcgetattr(sys.stdin.fileno()) tty.setraw(sys.stdin) + msg = "" try: asyncio.run(connect(execution_info["id"], session_id)) + except ws_exceptions.ConnectionClosedError as e: + msg = json.loads(e.reason)["error"] + except RuntimeError as e: + msg = str(e) finally: termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, old_settings_stdin) + + if msg != "": + click.secho(msg, fg="red") diff --git a/latch_cli/services/k8s/execute.py b/latch_cli/services/k8s/execute.py index 63d23990..79aba127 100644 --- a/latch_cli/services/k8s/execute.py +++ b/latch_cli/services/k8s/execute.py @@ -36,6 +36,21 @@ async def connect(egn_info: EGNNode, container_info: Optional[ContainerNode]): } await ws.send(json.dumps(request)) + data = await ws.recv() + + msg = "" + try: + res = json.loads(data) + if "error" in res: + raise RuntimeError(res["error"]) + except json.JSONDecodeError: + msg = "Unable to connect to pod - internal error." + except RuntimeError as e: + msg = str(e) + + if msg != "": + raise RuntimeError(msg) + await forward_stdio(ws) diff --git a/setup.py b/setup.py index 79e1f898..a2cee049 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name="latch", - version="v2.53.6", + version="v2.53.7", author_email="kenny@latch.bio", description="The Latch SDK", packages=find_packages(),