diff --git a/CHANGELOG.md b/CHANGELOG.md index 7413cd1a..cfd7ce35 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,13 +16,21 @@ Types of changes # Latch SDK Changelog -## v2.53.2 - 2024-10-02 +## v2.53.3 - 2024-10-02 ### Added * Nextflow - Add support for uploading command logs after task execution +## v2.53.2 - 2024-10-02 + +### Changed + +* All requests done by `latch cp` now + - use connection pooling / reuses + - have automatic retries with backoff + ## v2.53.1 - 2024-10-02 ### Added diff --git a/latch/ldata/_transfer/download.py b/latch/ldata/_transfer/download.py index 33ee130e..1906ef21 100644 --- a/latch/ldata/_transfer/download.py +++ b/latch/ldata/_transfer/download.py @@ -10,7 +10,6 @@ from latch_sdk_config.latch import config as latch_config from latch.ldata.type import LDataNodeType -from latch_cli import tinyrequests from latch_cli.constants import Units from latch_cli.utils import get_auth_header, human_readable_time, with_si_suffix from latch_cli.utils.path import normalize_path @@ -18,7 +17,7 @@ from .manager import TransferStateManager from .node import get_node_data from .progress import Progress, ProgressBars, get_free_index -from .utils import get_max_workers +from .utils import get_max_workers, http_session class GetSignedUrlData(TypedDict): @@ -39,7 +38,7 @@ class DownloadJob: class DownloadResult: num_files: int total_bytes: int - total_time: int + total_time: float def download( @@ -73,7 +72,7 @@ def download( else: endpoint = latch_config.api.data.get_signed_url - res = tinyrequests.post( + res = http_session.post( endpoint, headers={"Authorization": get_auth_header()}, json={"path": normalized}, @@ -212,7 +211,7 @@ def download_file( ) -> int: # todo(ayush): benchmark parallelized downloads using the range header with open(job.dest, "wb") as f: - res = tinyrequests.get(job.signed_url, stream=True) + res = http_session.get(job.signed_url, stream=True) if res.status_code != 200: raise RuntimeError( f"failed to download {job.dest.name}: {res.status_code}:" diff --git a/latch/ldata/_transfer/upload.py b/latch/ldata/_transfer/upload.py index bc87f335..93b798c9 100644 --- a/latch/ldata/_transfer/upload.py +++ b/latch/ldata/_transfer/upload.py @@ -16,7 +16,6 @@ from typing_extensions import TypeAlias from latch.ldata.type import LatchPathError, LDataNodeType -from latch_cli import tinyrequests from latch_cli.constants import Units, latch_constants from latch_cli.utils import get_auth_header, urljoins, with_si_suffix from latch_cli.utils.path import normalize_path @@ -25,7 +24,7 @@ from .node import get_node_data from .progress import Progress, ProgressBars from .throttle import Throttle -from .utils import get_max_workers +from .utils import get_max_workers, http_session if TYPE_CHECKING: PathQueueType: TypeAlias = "Queue[Optional[Path]]" @@ -299,9 +298,6 @@ class StartUploadReturnType: dest: str -MAX_RETRIES = 5 - - def start_upload( src: Path, dest: str, @@ -353,7 +349,7 @@ def start_upload( time.sleep(throttle.get_delay()) start = time.monotonic() - res = tinyrequests.post( + res = http_session.post( latch_config.api.data.start_upload, headers={"Authorization": get_auth_header()}, json={ @@ -361,7 +357,6 @@ def start_upload( "content_type": content_type, "part_count": part_count, }, - num_retries=MAX_RETRIES, ) end = time.monotonic() @@ -412,7 +407,7 @@ def upload_file_chunk( f.seek(part_size * part_index) data = f.read(part_size) - res = tinyrequests.put(url, data=data) + res = http_session.put(url, data=data) if res.status_code != 200: raise HTTPException( f"failed to upload part {part_index} of {src}: {res.status_code}" @@ -467,7 +462,7 @@ def end_upload( parts: List[CompletedPart], progress_bars: Optional[ProgressBars] = None, ): - res = tinyrequests.post( + res = http_session.post( latch_config.api.data.end_upload, headers={"Authorization": get_auth_header()}, json={ diff --git a/latch/ldata/_transfer/utils.py b/latch/ldata/_transfer/utils.py index 8ee438d5..5d8a20f8 100644 --- a/latch/ldata/_transfer/utils.py +++ b/latch/ldata/_transfer/utils.py @@ -2,12 +2,24 @@ import time from typing import Any, Dict, Optional -from gql.gql import DocumentNode +import requests +import requests.adapters from gql.transport.exceptions import TransportClosed, TransportServerError +from graphql.language import DocumentNode from latch_sdk_gql import JsonValue from latch_sdk_gql.execute import execute -from latch_cli import tinyrequests +http_session = requests.Session() + +_adapter = requests.adapters.HTTPAdapter( + max_retries=requests.adapters.Retry( + status_forcelist=[429, 500, 502, 503, 504], + backoff_factor=1, + allowed_methods=["GET", "PUT", "POST"], + ) +) +http_session.mount("https://", _adapter) +http_session.mount("http://", _adapter) # todo(rahul): move this function into latch_sdk_gql.execute @@ -21,6 +33,7 @@ def query_with_retry( Send GraphQL query request. Retry on Server or Connection failures. Implements exponential backoff between retries """ + err = None attempt = 0 while attempt < num_retries: attempt += 1 @@ -34,7 +47,10 @@ def query_with_retry( # todo(rahul): tune the sleep interval based on the startup time of the vacuole time.sleep(2**attempt * 5) - raise err + if err is not None: + raise err + + raise RuntimeError("gql retries exceeded") def get_max_workers() -> int: diff --git a/latch_cli/tinyrequests.py b/latch_cli/tinyrequests.py index 42a8a257..3b875a13 100644 --- a/latch_cli/tinyrequests.py +++ b/latch_cli/tinyrequests.py @@ -90,11 +90,6 @@ def _req( port = parts.port if parts.port is not None else 443 - # ayush: this is not threadsafe (as in the connection could be created - # multiple times) but its probably fine - - # todo(rteqs): removed caching single connections, implement a connection pool instead. - retries = 3 while True: conn = HTTPSConnection(parts.hostname, port, timeout=90) diff --git a/setup.py b/setup.py index 3a510c96..99eb6e9e 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name="latch", - version="v2.53.2", + version="v2.53.3", author_email="kenny@latch.bio", description="The Latch SDK", packages=find_packages(),