From 8cba86ae1b368e90b4c19282767c72d42a0ffcf6 Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Tue, 12 Nov 2024 09:56:56 -0500 Subject: [PATCH 01/20] Add optional celery and aiofiles dependencies for async support --- pyproject.toml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index b0fa922..2d7263c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,12 +21,17 @@ dependencies = [ "uvicorn", "ga4gh.vrs[extras]~=2.0.0a11", "sqlalchemy~=1.4.54", + "pyaml", ] dynamic = ["version"] [project.optional-dependencies] postgres = ["psycopg[binary]"] snowflake = ["snowflake-sqlalchemy~=1.5.1"] +queueing = [ + "celery[redis]~=5.4.0", + "aiofiles", +] tests = [ "pytest", "pytest-cov", From 325200dfacf1782496ddcbe57d06d55e2283daa7 Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Tue, 12 Nov 2024 13:50:09 -0500 Subject: [PATCH 02/20] Add implementation for async VCF annotation --- src/anyvar/queueing/__init__.py | 1 + src/anyvar/queueing/celery_worker.py | 150 +++++++++++++ src/anyvar/restapi/main.py | 302 ++++++++++++++++++++++++--- src/anyvar/restapi/schema.py | 19 +- 4 files changed, 444 insertions(+), 28 deletions(-) create mode 100644 src/anyvar/queueing/__init__.py create mode 100644 src/anyvar/queueing/celery_worker.py diff --git a/src/anyvar/queueing/__init__.py b/src/anyvar/queueing/__init__.py new file mode 100644 index 0000000..b53ce1f --- /dev/null +++ b/src/anyvar/queueing/__init__.py @@ -0,0 +1 @@ +"""Provides asynchronous tasks via Celery integration""" diff --git a/src/anyvar/queueing/celery_worker.py b/src/anyvar/queueing/celery_worker.py new file mode 100644 index 0000000..79cad4d --- /dev/null +++ b/src/anyvar/queueing/celery_worker.py @@ -0,0 +1,150 @@ +"""Define the Celery app and tasks for asynchronous request-response support""" + +import logging +import os +from pathlib import Path + +import celery.signals +from celery import Celery, Task +from celery.result import AsyncResult + +import anyvar +from anyvar.extras.vcf import VcfRegistrar + +_logger = logging.getLogger(__name__) + +# Configure the Celery app +celery_app = Celery("anyvar") +celery_app.conf.update( + # general settings + task_default_queue=os.environ.get("CELERY_TASK_DEFAULT_QUEUE", "anyvar_q"), + event_queue_prefix=os.environ.get("CELERY_EVENT_QUEUE_PREFIX", "anyvar_ev"), + task_serializer="json", + result_serializer="json", + accept_content=["application/json"], + result_backend=os.environ.get("CELERY_BACKEND_URL", None), + result_backend_transport_options={"global_keyprefix": "anyvar_"}, + broker_url=os.environ.get("CELERY_BROKER_URL", None), + timezone=os.environ.get("CELERY_TIMEZONE", "UTC"), + result_expires=int(os.environ.get("CELERY_RESULT_EXPIRES", "3600")), + # task settings + task_ignore_result=False, + task_acks_late=os.environ.get("CELERY_TASK_ACKS_LATE", "true").lower() + in ["true", "yes", "1"], + task_reject_on_worker_lost=os.environ.get( + "CELERY_TASK_REJECT_ON_WORKER_LOST", "false" + ).lower() + in ["true", "yes", "1"], + # worker settings + worker_prefetch_multiplier=int( + os.environ.get("CELERY_WORKER_PREFETCH_MULTIPLIER", 1) + ), + task_time_limit=int(os.environ.get("CELERY_TASK_TIME_LIMIT", "3900")), + soft_time_limit=int(os.environ.get("CELERY_SOFT_TIME_LIMIT", "3600")), + worker_send_task_events=os.environ.get( + "CELERY_WORKER_SEND_TASK_EVENTS", "false" + ).lower() + in ["true", "yes", "1"], +) + +# if this is a worker, create/destroy the AnyVar app instance +# on startup and shutdown +anyvar_app = None + + +@celery.signals.worker_process_init.connect +def init_anyvar(**kwargs) -> None: # noqa: ARG001 + """On the `worker_process_init` signal, construct the AnyVar app instance""" + _logger.info("processing signal worker process init") + global anyvar_app + # create anyvar instance + if not anyvar_app: + _logger.info("creating anyvar app in worker process init") + storage = anyvar.anyvar.create_storage() + translator = anyvar.anyvar.create_translator() + anyvar_instance = anyvar.AnyVar(object_store=storage, translator=translator) + + # associate anyvar with the app state + anyvar_app = anyvar_instance + + +@celery.signals.worker_process_shutdown.connect +def teardown_anyvar(**kwargs) -> None: # noqa: ARG001 + """On the `worker_process_shutdown` signal, destroy the AnyVar app instance""" + _logger.info("processing signal worker process shutdown") + global anyvar_app + # close storage connector on shutdown + if anyvar_app: + _logger.info("closing anyvar app in worker process init") + anyvar_app.object_store.close() + anyvar_app = None + + +@celery_app.task(bind=True) +def annotate_vcf( + self: Task, + input_file_path: str, + assembly: str, + for_ref: bool, + allow_async_write: bool, +) -> str: + """Annotate the specified VCF file and return the path to the annotated file + :param input_file_path: path to the VCF file to be annotated + :param assembly: the reference assembly for the VCF + :param for_ref: whether to compute VRS IDs for REF alleles + :param allow_async_write: whether to allow async database writes + :return: path to the annotated VCF file + """ + global anyvar_app + try: + # create output file path + output_file_path = f"{input_file_path}_outputvcf" + _logger.info( + "%s - worker annotating vcf %s to %s", + self.request.id, + input_file_path, + output_file_path, + ) + + # annotation vcf with VRS IDs + registrar = VcfRegistrar(anyvar_app) + registrar.annotate( + vcf_in=input_file_path, + vcf_out=output_file_path, + compute_for_ref=for_ref, + assembly=assembly, + ) + + # wait for writes if necessary + if not allow_async_write: + _logger.info( + "%s - waiting for object store writes from API handler method", + self.request.id, + ) + anyvar_app.object_store.wait_for_writes() + + # remove input file + Path(input_file_path).unlink() + + # return output file path + return output_file_path + except Exception: + _logger.exception("%s - vcf annotation failed with exception", self.request.id) + raise + + +# after task is published, set the status to "SENT" +# this allows the web api to determine when a run_id is not found +@celery.signals.after_task_publish.connect +def update_sent_state(sender: str | None, headers: dict | None, **kwargs) -> None: # noqa: ARG001 + """On the `after_task_publish` signal, set the task status to SENT. This enables + the application to differentiate between task ids that are not complete and those + that do not exist. + """ + _logger.info("%s - after publish", headers["id"]) + task = celery_app.tasks.get(sender) + backend = task.backend if task else celery_app.backend + + result = AsyncResult(id=headers["id"]) + if result.status == "PENDING": + backend.store_result(headers["id"], None, "SENT") diff --git a/src/anyvar/restapi/main.py b/src/anyvar/restapi/main.py index c99ba68..99e4b0c 100644 --- a/src/anyvar/restapi/main.py +++ b/src/anyvar/restapi/main.py @@ -1,16 +1,31 @@ """Provide core route definitions for REST service.""" +import asyncio +import datetime import logging import logging.config import os import pathlib import tempfile +import uuid from contextlib import asynccontextmanager from http import HTTPStatus import ga4gh.vrs import yaml -from fastapi import Body, FastAPI, File, HTTPException, Path, Query, Request, UploadFile +from fastapi import ( + BackgroundTasks, + Body, + FastAPI, + File, + HTTPException, + Path, + Query, + Request, + Response, + UploadFile, + status, +) from fastapi.responses import FileResponse from pydantic import StrictStr @@ -20,12 +35,14 @@ from anyvar.restapi.schema import ( AnyVarStatsResponse, EndpointTag, + ErrorResponse, GetSequenceLocationResponse, GetVariationResponse, InfoResponse, RegisterVariationRequest, RegisterVariationResponse, RegisterVrsVariationResponse, + RunStatusResponse, SearchResponse, VariationStatisticType, ) @@ -35,6 +52,7 @@ ) from anyvar.utils.types import VrsVariation, variation_class_map +# Configure logging from file or use default logging_config_file = os.environ.get("ANYVAR_LOGGING_CONFIG", None) if logging_config_file and pathlib.Path(logging_config_file).is_file(): with pathlib.Path(logging_config_file).open() as fd: @@ -46,6 +64,22 @@ _logger = logging.getLogger(__name__) +# Determine whether asynchronous VCF annotation is enabled +# Get the working directory for asynchronous VCF annotation +async_work_dir = os.environ.get("ANYVAR_VCF_ASYNC_WORK_DIR", None) +# Get what response code to use for asynchronous VCF annotation failures +failure_status_code = int(os.environ.get("ANYVAR_VCF_ASYNC_FAILURE_STATUS_CODE", "500")) +try: + import aiofiles # noqa: I001 + import anyvar.queueing.celery_worker + from billiard.exceptions import TimeLimitExceeded + from celery.exceptions import WorkerLostError + from celery.result import AsyncResult +except ImportError: + has_queueing = False +else: + has_queueing = os.environ.get("CELERY_BROKER_URL", None) and async_work_dir + @asynccontextmanager async def app_lifespan(param_app: FastAPI): # noqa: ANN201 @@ -233,6 +267,8 @@ def register_vrs_object( ) async def annotate_vcf( request: Request, + response: Response, + bg_tasks: BackgroundTasks, vcf: UploadFile = File(..., description="VCF to register and annotate"), for_ref: bool = Query( default=True, description="Whether to compute VRS IDs for REF alleles" @@ -246,40 +282,252 @@ async def annotate_vcf( pattern="^(GRCh38|GRCh37)$", description="The reference assembly for the VCF", ), -) -> FileResponse | dict: + run_async: bool = Query( + default=False, + description="If true, immediately return a '202 Accepted' response and run asynchronously", + ), + run_id: str | None = Query( + default=None, + description="When running asynchronously, use the specified value as the run id instead generating a random uuid", + ), +) -> FileResponse | RunStatusResponse | ErrorResponse: """Register alleles from a VCF and return a file annotated with VRS IDs. :param request: FastAPI request object + :param response: FastAPI response object + :param bg_tasks: FastAPI background tasks object :param vcf: incoming VCF file object :param for_ref: whether to compute VRS IDs for REF alleles :param allow_async_write: whether to allow async database writes :param assembly: the reference assembly for the VCF - :return: streamed annotated file + :param run_async: whether to run the VCF annotation synchronously or asynchronously + :param run_id: user provided id for asynchronous VCF annotation + :return: streamed annotated file or a run status response for an asynchronous run """ - with tempfile.NamedTemporaryFile(delete=False) as temp_file: - temp_file.write(await vcf.read()) - temp_file.close() - - av: AnyVar = request.app.state.anyvar - registrar = VcfRegistrar(av) - with tempfile.NamedTemporaryFile(delete=False) as temp_out_file: - try: - registrar.annotate( - temp_file.name, - vcf_out=temp_out_file.name, - compute_for_ref=for_ref, - assembly=assembly, - ) - except (TranslatorConnectionError, OSError) as e: - _logger.error("Encountered error during VCF registration: %s", e) - return {"error": "VCF registration failed."} - except ValueError as e: - _logger.error("Encountered error during VCF registration: %s", e) - return {"error": "Encountered ValueError when registering VCF"} - if not allow_async_write: - _logger.info("Waiting for object store writes from API handler method") - av.object_store.wait_for_writes() - return FileResponse(temp_out_file.name) + # If async requested but not enabled, return an error + if run_async and not has_queueing: + response.status_code = status.HTTP_400_BAD_REQUEST + return ErrorResponse( + error="Required modules and/or configurations for asynchronous VCF annotation are missing" + ) + + # ensure the temporary file is flushed to disk + vcf.file.rollover() + + # Submit asynchronous run + if run_async: + return await _annotate_vcf_async( + response=response, + vcf=vcf, + for_ref=for_ref, + allow_async_write=allow_async_write, + assembly=assembly, + run_id=run_id, + ) + # Run synchronously + else: # noqa: RET505 + return await _annotate_vcf_sync( + request=request, + response=response, + bg_tasks=bg_tasks, + vcf=vcf, + for_ref=for_ref, + allow_async_write=allow_async_write, + assembly=assembly, + ) + + +async def _annotate_vcf_async( + response: Response, + vcf: UploadFile, + for_ref: bool, + allow_async_write: bool, + assembly: str, + run_id: str | None, +) -> RunStatusResponse: + """Annotate with VRS IDs asynchronously. See `annotate_vcf()` for parameter definitions.""" + # write file to shared storage area with a directory for each day and a random file name + utc_now = datetime.datetime.now(tz=datetime.UTC) + file_id = str(uuid.uuid4()) + input_file_path = pathlib.Path( + f"{async_work_dir}/{utc_now.year}{utc_now.month}{utc_now.day}/{file_id}" + ) + if not input_file_path.parent.exists(): + input_file_path.parent.mkdir(parents=True) + _logger.debug("writing working file for async vcf to %s", input_file_path) + + vcf_site_count = 0 + async with aiofiles.open(input_file_path, mode="wb") as fd: + while buffer := await vcf.read(1024 * 1024): + if ord("#") not in buffer and ord("\n") in buffer: + vcf_site_count = vcf_site_count + 1 + await fd.write(buffer) + _logger.debug("wrote working file for async vcf to %s", input_file_path) + _logger.debug("vcf site count of async vcf is %s", vcf_site_count) + + # submit async job + task_result = anyvar.queueing.celery_worker.annotate_vcf.apply_async( + kwargs={ + "input_file_path": str(input_file_path), + "assembly": assembly, + "for_ref": for_ref, + "allow_async_write": allow_async_write, + }, + task_id=run_id, + ) + _logger.info("%s - async vcf annotation run submitted", task_result.id) + + # set response headers + response.status_code = status.HTTP_202_ACCEPTED + response.headers["Location"] = f"/vcf/{task_result.id}" + # estimate for time is 500 variants per second + retry_after = max(1, round((vcf_site_count * (2 if for_ref else 1)) / 500, 0)) + _logger.debug("%s - retry after is %s", task_result.id, str(retry_after)) + response.headers["Retry-After"] = str(retry_after) + return RunStatusResponse( + run_id=task_result.id, + status="PENDING", + message=f"Run submitted. Check status at /vcf/{task_result.id}.", + ) + + +async def _annotate_vcf_sync( + request: Request, + response: Response, + bg_tasks: BackgroundTasks, + vcf: UploadFile, + for_ref: bool, + allow_async_write: bool, + assembly: str, +) -> FileResponse | ErrorResponse: + """Annotate with VRS IDs synchronously. See `annotate_vcf()` for parameter definitions.""" + av: AnyVar = request.app.state.anyvar + registrar = VcfRegistrar(av) + with tempfile.NamedTemporaryFile(delete=False) as temp_out_file: + try: + registrar.annotate( + vcf.file.name, + vcf_out=temp_out_file.name, + compute_for_ref=for_ref, + assembly=assembly, + ) + except (TranslatorConnectionError, OSError) as e: + _logger.error("Encountered error during VCF registration: %s", e) + response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + return ErrorResponse(error="VCF registration failed.") + except ValueError as e: + _logger.error("Encountered error during VCF registration: %s", e) + response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + return ErrorResponse(error="Encountered ValueError when registering VCF") + + if not allow_async_write: + _logger.info("Waiting for object store writes from API handler method") + av.object_store.wait_for_writes() + bg_tasks.add_task(os.unlink, temp_out_file.name) + return FileResponse(temp_out_file.name) + + +@app.get( + "/vcf/{run_id}", + summary="Poll for status and/or result for asynchronous VCF annotation", + description="Provide a valid run id to get the status and/or result of a VCF annotation run", + tags=[EndpointTag.VARIATIONS], + response_model=None, +) +async def get_result( + response: Response, + bg_tasks: BackgroundTasks, + run_id: str = Path(description="The run id to retrieve the result or status for"), +) -> RunStatusResponse | FileResponse | ErrorResponse: + """Return the status or result of an asynchronous registration of alleles from a VCF file. + :param response: FastAPI response object + :param bg_tasks: FastAPI background tasks object + :param run_id: asynchronous run id + :return: streamed annotated file or a run status response + """ + # Asynchronous VCF annotation not enabled, return error + if not has_queueing: + response.status_code = status.HTTP_400_BAD_REQUEST + return ErrorResponse( + error="Required modules and/or configurations for asynchronous VCF annotation are missing" + ) + + # get the async result + async_result = AsyncResult(id=run_id) + _logger.debug("%s - status is %s", run_id, async_result.status) + + # completed successfully + if async_result.status == "SUCCESS": + response.status_code = status.HTTP_200_OK + output_file_path = async_result.get() + async_result.forget() + _logger.debug("%s - output file path is %s", run_id, output_file_path) + bg_tasks.add_task(os.unlink, output_file_path) + return FileResponse(path=output_file_path) + + # failed - return an error response + elif ( # noqa: RET505 + async_result.status == "FAILURE" + and async_result.result + and isinstance(async_result.result, Exception) + ): + # get error message and code + error_msg = str(async_result.result) + error_code = ( + "TIME_LIMIT_EXCEEDED" + if isinstance(async_result.result, TimeLimitExceeded) + else ( + "WORKER_LOST_ERROR" + if isinstance(async_result.result, WorkerLostError) + else "RUN_FAILURE" + ) + ) + _logger.info("%s - failed with error %s", run_id, error_msg) + + # cleanup working files + input_file_path_str = async_result.kwargs.get("input_file_path", None) + if input_file_path_str: + input_file_path = pathlib.Path(input_file_path_str) + if input_file_path.is_file(): + bg_tasks.add_task(input_file_path.unlink, missing_ok=True) + output_file_path = pathlib.Path(f"{input_file_path_str}_outputvcf") + if output_file_path.is_file(): + bg_tasks.add_task(output_file_path.unlink, missing_ok=True) + + # forget the run and return the response + async_result.forget() + response.status_code = failure_status_code + return ErrorResponse(error_code=error_code, error=error_msg) + + # status here is either "SENT" or "PENDING" + else: + # the after_task_publish handler sets the state to "SENT" + # so a status of PENDING is actually unknown task + # but there can be a race condition, so if status is pending + # pause half a second and check again + if async_result.status == "PENDING": + await asyncio.sleep(0.5) + async_result = AsyncResult(id=run_id) + _logger.debug( + "%s - after 0.5 second wait, status is %s", run_id, async_result.status + ) + + # status is "PENDING" - unknown run id + if async_result.status == "PENDING": + response.status_code = status.HTTP_404_NOT_FOUND + return RunStatusResponse( + run_id=run_id, + status="NOT_FOUND", + status_message="Run not found", + ) + # status is "SENT" - return 202 + else: # noqa: RET505 + response.status_code = status.HTTP_202_ACCEPTED + return RunStatusResponse( + run_id=run_id, + status="PENDING", + status_message=f"Run not completed. Check status at /vcf/{run_id}", + ) @app.get( diff --git a/src/anyvar/restapi/schema.py b/src/anyvar/restapi/schema.py index f1db908..fa1f381 100644 --- a/src/anyvar/restapi/schema.py +++ b/src/anyvar/restapi/schema.py @@ -1,7 +1,7 @@ """Provide response definitions to REST API endpoint.""" from enum import Enum -from typing import Any +from typing import Any, Optional from ga4gh.vrs import models from pydantic import BaseModel, StrictInt, StrictStr @@ -184,3 +184,20 @@ class AnyVarStatsResponse(BaseModel): variation_type: VariationStatisticType count: StrictInt + + +class RunStatusResponse(BaseModel): + """Represents the response for triggering or checking the status of a run.""" + + run_id: str # Run ID + status: str # Run status + status_message: Optional[str] = ( # noqa: UP007 + None # Detailed status message for failures + ) + + +class ErrorResponse(BaseModel): + """Represents an error message""" + + error: str # Error message + error_code: Optional[str] = None # error code # noqa: UP007 From e9cd1e6fbf5ec4ee8701cc819b67ef7564c3c9a4 Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Tue, 12 Nov 2024 16:00:38 -0500 Subject: [PATCH 03/20] Move logging config to main anyvar module so it is activated for both restapi and worker Defend against null kwargs in task result Change worker AnyVar app init scheme to work with different Celery pool types --- src/anyvar/anyvar.py | 13 ++++++++ src/anyvar/queueing/celery_worker.py | 50 +++++++++++++++------------- src/anyvar/restapi/main.py | 28 +++++----------- 3 files changed, 48 insertions(+), 43 deletions(-) diff --git a/src/anyvar/anyvar.py b/src/anyvar/anyvar.py index 816883e..36501f0 100644 --- a/src/anyvar/anyvar.py +++ b/src/anyvar/anyvar.py @@ -4,10 +4,13 @@ """ import logging +import logging.config import os +import pathlib from collections.abc import MutableMapping from urllib.parse import urlparse +import yaml from ga4gh.vrs import vrs_deref, vrs_enref from anyvar.storage import DEFAULT_STORAGE_URI, _Storage @@ -15,6 +18,16 @@ from anyvar.translate.vrs_python import VrsPythonTranslator from anyvar.utils.types import VrsObject +# Configure logging from file or use default +logging_config_file = os.environ.get("ANYVAR_LOGGING_CONFIG", None) +if logging_config_file and pathlib.Path(logging_config_file).is_file(): + with pathlib.Path(logging_config_file).open() as fd: + try: + config = yaml.safe_load(fd.read()) + logging.config.dictConfig(config) + except Exception: + logging.exception("Error in Logging Configuration. Using default configs") + _logger = logging.getLogger(__name__) diff --git a/src/anyvar/queueing/celery_worker.py b/src/anyvar/queueing/celery_worker.py index 79cad4d..64c434b 100644 --- a/src/anyvar/queueing/celery_worker.py +++ b/src/anyvar/queueing/celery_worker.py @@ -2,6 +2,7 @@ import logging import os +import threading from pathlib import Path import celery.signals @@ -26,7 +27,7 @@ result_backend_transport_options={"global_keyprefix": "anyvar_"}, broker_url=os.environ.get("CELERY_BROKER_URL", None), timezone=os.environ.get("CELERY_TIMEZONE", "UTC"), - result_expires=int(os.environ.get("CELERY_RESULT_EXPIRES", "3600")), + result_expires=int(os.environ.get("CELERY_RESULT_EXPIRES", "7200")), # task settings task_ignore_result=False, task_acks_late=os.environ.get("CELERY_TASK_ACKS_LATE", "true").lower() @@ -49,35 +50,36 @@ # if this is a worker, create/destroy the AnyVar app instance # on startup and shutdown -anyvar_app = None +_anyvar_app = None +worker_init_lock = threading.Lock() -@celery.signals.worker_process_init.connect -def init_anyvar(**kwargs) -> None: # noqa: ARG001 - """On the `worker_process_init` signal, construct the AnyVar app instance""" - _logger.info("processing signal worker process init") - global anyvar_app - # create anyvar instance - if not anyvar_app: - _logger.info("creating anyvar app in worker process init") - storage = anyvar.anyvar.create_storage() - translator = anyvar.anyvar.create_translator() - anyvar_instance = anyvar.AnyVar(object_store=storage, translator=translator) +def get_anyvar_app() -> anyvar.AnyVar: + """Create AnyVar app as necessary and return it""" + with worker_init_lock: + global _anyvar_app + # create anyvar instance if necessary + if not _anyvar_app: + _logger.info("creating global anyvar app for worker") + storage = anyvar.anyvar.create_storage() + translator = anyvar.anyvar.create_translator() + anyvar_instance = anyvar.AnyVar(object_store=storage, translator=translator) + _anyvar_app = anyvar_instance - # associate anyvar with the app state - anyvar_app = anyvar_instance + return _anyvar_app -@celery.signals.worker_process_shutdown.connect +@celery.signals.worker_shutdown.connect def teardown_anyvar(**kwargs) -> None: # noqa: ARG001 """On the `worker_process_shutdown` signal, destroy the AnyVar app instance""" - _logger.info("processing signal worker process shutdown") - global anyvar_app - # close storage connector on shutdown - if anyvar_app: - _logger.info("closing anyvar app in worker process init") - anyvar_app.object_store.close() - anyvar_app = None + with worker_init_lock: + global _anyvar_app + _logger.info("processing signal worker shutdown") + # close storage connector on shutdown + if _anyvar_app: + _logger.info("closing anyvar app in worker process init") + _anyvar_app.object_store.close() + _anyvar_app = None @celery_app.task(bind=True) @@ -95,7 +97,6 @@ def annotate_vcf( :param allow_async_write: whether to allow async database writes :return: path to the annotated VCF file """ - global anyvar_app try: # create output file path output_file_path = f"{input_file_path}_outputvcf" @@ -107,6 +108,7 @@ def annotate_vcf( ) # annotation vcf with VRS IDs + anyvar_app = get_anyvar_app() registrar = VcfRegistrar(anyvar_app) registrar.annotate( vcf_in=input_file_path, diff --git a/src/anyvar/restapi/main.py b/src/anyvar/restapi/main.py index 99e4b0c..6d59daa 100644 --- a/src/anyvar/restapi/main.py +++ b/src/anyvar/restapi/main.py @@ -12,7 +12,6 @@ from http import HTTPStatus import ga4gh.vrs -import yaml from fastapi import ( BackgroundTasks, Body, @@ -52,16 +51,6 @@ ) from anyvar.utils.types import VrsVariation, variation_class_map -# Configure logging from file or use default -logging_config_file = os.environ.get("ANYVAR_LOGGING_CONFIG", None) -if logging_config_file and pathlib.Path(logging_config_file).is_file(): - with pathlib.Path(logging_config_file).open() as fd: - try: - config = yaml.safe_load(fd.read()) - logging.config.dictConfig(config) - except Exception: - logging.exception("Error in Logging Configuration. Using default configs") - _logger = logging.getLogger(__name__) # Determine whether asynchronous VCF annotation is enabled @@ -485,14 +474,15 @@ async def get_result( _logger.info("%s - failed with error %s", run_id, error_msg) # cleanup working files - input_file_path_str = async_result.kwargs.get("input_file_path", None) - if input_file_path_str: - input_file_path = pathlib.Path(input_file_path_str) - if input_file_path.is_file(): - bg_tasks.add_task(input_file_path.unlink, missing_ok=True) - output_file_path = pathlib.Path(f"{input_file_path_str}_outputvcf") - if output_file_path.is_file(): - bg_tasks.add_task(output_file_path.unlink, missing_ok=True) + if async_result.kwargs: + input_file_path_str = async_result.kwargs.get("input_file_path", None) + if input_file_path_str: + input_file_path = pathlib.Path(input_file_path_str) + if input_file_path.is_file(): + bg_tasks.add_task(input_file_path.unlink, missing_ok=True) + output_file_path = pathlib.Path(f"{input_file_path_str}_outputvcf") + if output_file_path.is_file(): + bg_tasks.add_task(output_file_path.unlink, missing_ok=True) # forget the run and return the response async_result.forget() From c0e38e99ddbcc2d9f4b970cfb32acb428fabe91b Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Thu, 14 Nov 2024 09:51:56 -0500 Subject: [PATCH 04/20] Documentation and logging updates --- src/anyvar/queueing/celery_worker.py | 22 +++++++++++++--------- src/anyvar/restapi/main.py | 18 ++++++++++++++++-- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/src/anyvar/queueing/celery_worker.py b/src/anyvar/queueing/celery_worker.py index 64c434b..424c506 100644 --- a/src/anyvar/queueing/celery_worker.py +++ b/src/anyvar/queueing/celery_worker.py @@ -48,14 +48,13 @@ in ["true", "yes", "1"], ) -# if this is a worker, create/destroy the AnyVar app instance -# on startup and shutdown +# if this is a celery worker, we need an AnyVar app instance _anyvar_app = None worker_init_lock = threading.Lock() def get_anyvar_app() -> anyvar.AnyVar: - """Create AnyVar app as necessary and return it""" + """Create AnyVar app associated with the Celery work as necessary and return it""" with worker_init_lock: global _anyvar_app # create anyvar instance if necessary @@ -71,7 +70,7 @@ def get_anyvar_app() -> anyvar.AnyVar: @celery.signals.worker_shutdown.connect def teardown_anyvar(**kwargs) -> None: # noqa: ARG001 - """On the `worker_process_shutdown` signal, destroy the AnyVar app instance""" + """On the `worker_shutdown` signal, destroy the AnyVar app instance""" with worker_init_lock: global _anyvar_app _logger.info("processing signal worker shutdown") @@ -90,7 +89,8 @@ def annotate_vcf( for_ref: bool, allow_async_write: bool, ) -> str: - """Annotate the specified VCF file and return the path to the annotated file + """Annotate the specified VCF file and return the path to the annotated file. + The input file is deleted when the annotation completes successfully. :param input_file_path: path to the VCF file to be annotated :param assembly: the reference assembly for the VCF :param for_ref: whether to compute VRS IDs for REF alleles @@ -101,13 +101,13 @@ def annotate_vcf( # create output file path output_file_path = f"{input_file_path}_outputvcf" _logger.info( - "%s - worker annotating vcf %s to %s", + "%s - annotating vcf file %s, outputting to %s", self.request.id, input_file_path, output_file_path, ) - # annotation vcf with VRS IDs + # annotate vcf with VRS IDs anyvar_app = get_anyvar_app() registrar = VcfRegistrar(anyvar_app) registrar.annotate( @@ -116,6 +116,10 @@ def annotate_vcf( compute_for_ref=for_ref, assembly=assembly, ) + _logger.info( + "%s - annotation completed", + self.request.id, + ) # wait for writes if necessary if not allow_async_write: @@ -135,13 +139,13 @@ def annotate_vcf( raise -# after task is published, set the status to "SENT" -# this allows the web api to determine when a run_id is not found @celery.signals.after_task_publish.connect def update_sent_state(sender: str | None, headers: dict | None, **kwargs) -> None: # noqa: ARG001 """On the `after_task_publish` signal, set the task status to SENT. This enables the application to differentiate between task ids that are not complete and those that do not exist. + :param sender: the name of the task + :param headers: the task message headers """ _logger.info("%s - after publish", headers["id"]) task = celery_app.tasks.get(sender) diff --git a/src/anyvar/restapi/main.py b/src/anyvar/restapi/main.py index 6d59daa..197fc80 100644 --- a/src/anyvar/restapi/main.py +++ b/src/anyvar/restapi/main.py @@ -364,7 +364,11 @@ async def _annotate_vcf_async( }, task_id=run_id, ) - _logger.info("%s - async vcf annotation run submitted", task_result.id) + _logger.info( + "%s - async annotation run submitted for vcf with %s sites", + task_result.id, + vcf_site_count, + ) # set response headers response.status_code = status.HTTP_202_ACCEPTED @@ -471,7 +475,7 @@ async def get_result( else "RUN_FAILURE" ) ) - _logger.info("%s - failed with error %s", run_id, error_msg) + _logger.debug("%s - failed with error %s", run_id, error_msg) # cleanup working files if async_result.kwargs: @@ -479,9 +483,19 @@ async def get_result( if input_file_path_str: input_file_path = pathlib.Path(input_file_path_str) if input_file_path.is_file(): + _logger.debug( + "%s - adding task to remove input file %s", + run_id, + str(input_file_path), + ) bg_tasks.add_task(input_file_path.unlink, missing_ok=True) output_file_path = pathlib.Path(f"{input_file_path_str}_outputvcf") if output_file_path.is_file(): + _logger.debug( + "%s - adding task to remove output file %s", + run_id, + str(output_file_path), + ) bg_tasks.add_task(output_file_path.unlink, missing_ok=True) # forget the run and return the response From a10540420f08c2ec5873d9c2a3397267a1688560 Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Thu, 14 Nov 2024 15:05:09 -0500 Subject: [PATCH 05/20] Changing project file optional deps to "test" to match Makefile --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 2d7263c..37b0dc2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ queueing = [ "celery[redis]~=5.4.0", "aiofiles", ] -tests = [ +test = [ "pytest", "pytest-cov", "pytest-mock", From 23989ef00cb893b57f8c45e9b3e9214cf85cfecf Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Fri, 15 Nov 2024 16:33:07 -0500 Subject: [PATCH 06/20] Move has_queueing_enabled() function to top level --- src/anyvar/anyvar.py | 11 +++++++++++ src/anyvar/restapi/main.py | 22 +++++++++------------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/anyvar/anyvar.py b/src/anyvar/anyvar.py index 36501f0..604ba07 100644 --- a/src/anyvar/anyvar.py +++ b/src/anyvar/anyvar.py @@ -3,6 +3,7 @@ """ +import importlib.util import logging import logging.config import os @@ -73,6 +74,16 @@ def create_translator() -> _Translator: return VrsPythonTranslator() +def has_queueing_enabled() -> bool: + """Determine whether or not asynchronous task queueing is enabled""" + return ( + importlib.util.find_spec("aiofiles") is not None + and importlib.util.find_spec("celery") is not None + and os.environ.get("CELERY_BROKER_URL", "") != "" + and os.environ.get("ANYVAR_VCF_ASYNC_WORK_DIR", "") != "" + ) + + class AnyVar: """Define core AnyVar class.""" diff --git a/src/anyvar/restapi/main.py b/src/anyvar/restapi/main.py index 197fc80..8fcb401 100644 --- a/src/anyvar/restapi/main.py +++ b/src/anyvar/restapi/main.py @@ -51,13 +51,6 @@ ) from anyvar.utils.types import VrsVariation, variation_class_map -_logger = logging.getLogger(__name__) - -# Determine whether asynchronous VCF annotation is enabled -# Get the working directory for asynchronous VCF annotation -async_work_dir = os.environ.get("ANYVAR_VCF_ASYNC_WORK_DIR", None) -# Get what response code to use for asynchronous VCF annotation failures -failure_status_code = int(os.environ.get("ANYVAR_VCF_ASYNC_FAILURE_STATUS_CODE", "500")) try: import aiofiles # noqa: I001 import anyvar.queueing.celery_worker @@ -65,9 +58,9 @@ from celery.exceptions import WorkerLostError from celery.result import AsyncResult except ImportError: - has_queueing = False -else: - has_queueing = os.environ.get("CELERY_BROKER_URL", None) and async_work_dir + pass + +_logger = logging.getLogger(__name__) @asynccontextmanager @@ -294,7 +287,7 @@ async def annotate_vcf( :return: streamed annotated file or a run status response for an asynchronous run """ # If async requested but not enabled, return an error - if run_async and not has_queueing: + if run_async and not anyvar.anyvar.has_queueing_enabled(): response.status_code = status.HTTP_400_BAD_REQUEST return ErrorResponse( error="Required modules and/or configurations for asynchronous VCF annotation are missing" @@ -336,6 +329,7 @@ async def _annotate_vcf_async( ) -> RunStatusResponse: """Annotate with VRS IDs asynchronously. See `annotate_vcf()` for parameter definitions.""" # write file to shared storage area with a directory for each day and a random file name + async_work_dir = os.environ.get("ANYVAR_VCF_ASYNC_WORK_DIR", None) utc_now = datetime.datetime.now(tz=datetime.UTC) file_id = str(uuid.uuid4()) input_file_path = pathlib.Path( @@ -439,7 +433,7 @@ async def get_result( :return: streamed annotated file or a run status response """ # Asynchronous VCF annotation not enabled, return error - if not has_queueing: + if not anyvar.anyvar.has_queueing_enabled(): response.status_code = status.HTTP_400_BAD_REQUEST return ErrorResponse( error="Required modules and/or configurations for asynchronous VCF annotation are missing" @@ -500,7 +494,9 @@ async def get_result( # forget the run and return the response async_result.forget() - response.status_code = failure_status_code + response.status_code = int( + os.environ.get("ANYVAR_VCF_ASYNC_FAILURE_STATUS_CODE", "500") + ) return ErrorResponse(error_code=error_code, error=error_msg) # status here is either "SENT" or "PENDING" From 2d56bb2798895b8aff4a4b1919c9844157b88a73 Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Fri, 15 Nov 2024 16:33:36 -0500 Subject: [PATCH 07/20] Fixing small errors --- src/anyvar/restapi/main.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/anyvar/restapi/main.py b/src/anyvar/restapi/main.py index 8fcb401..09df063 100644 --- a/src/anyvar/restapi/main.py +++ b/src/anyvar/restapi/main.py @@ -342,7 +342,7 @@ async def _annotate_vcf_async( vcf_site_count = 0 async with aiofiles.open(input_file_path, mode="wb") as fd: while buffer := await vcf.read(1024 * 1024): - if ord("#") not in buffer and ord("\n") in buffer: + if ord("\n") in buffer: vcf_site_count = vcf_site_count + 1 await fd.write(buffer) _logger.debug("wrote working file for async vcf to %s", input_file_path) @@ -374,7 +374,7 @@ async def _annotate_vcf_async( return RunStatusResponse( run_id=task_result.id, status="PENDING", - message=f"Run submitted. Check status at /vcf/{task_result.id}.", + status_message=f"Run submitted. Check status at /vcf/{task_result.id}", ) @@ -446,7 +446,7 @@ async def get_result( # completed successfully if async_result.status == "SUCCESS": response.status_code = status.HTTP_200_OK - output_file_path = async_result.get() + output_file_path = async_result.result async_result.forget() _logger.debug("%s - output file path is %s", run_id, output_file_path) bg_tasks.add_task(os.unlink, output_file_path) From 01a30acaf64a18cf376b3ba82bbe41f1389ef8d3 Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Fri, 15 Nov 2024 16:39:10 -0500 Subject: [PATCH 08/20] Add unit tests for async VCF processing --- pyproject.toml | 3 +- tests/conftest.py | 15 ++++ tests/test_vcf.py | 181 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 198 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 37b0dc2..6080fc7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,8 @@ test = [ "pytest", "pytest-cov", "pytest-mock", - "httpx" + "httpx", + "celery[pytest]", ] dev = [ "ruff==0.5.0", diff --git a/tests/conftest.py b/tests/conftest.py index 26b33eb..1e7e99e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,6 +8,8 @@ from anyvar.anyvar import AnyVar, create_storage, create_translator from anyvar.restapi.main import app as anyvar_restapi +pytest_plugins = ("celery.contrib.pytest",) + def pytest_collection_modifyitems(items): """Modify test items in place to ensure test modules run in a given order.""" @@ -58,3 +60,16 @@ def alleles(test_data_dir) -> dict: """Provide allele fixture object.""" with (test_data_dir / "variations.json").open() as f: return json.load(f)["alleles"] + + +@pytest.fixture(scope="session") +def celery_config(): + return { + "broker_url": os.environ.get("CELERY_BROKER_URL", "redis://"), + "result_backend": os.environ.get("CELERY_BACKEND_URL", "redis://"), + "task_default_queue": "anyvar_q", + "event_queue_prefix": "anyvar_ev", + "task_serializer": "json", + "result_serializer": "json", + "accept_content": ["application/json"], + } diff --git a/tests/test_vcf.py b/tests/test_vcf.py index cb211a1..d5019a0 100644 --- a/tests/test_vcf.py +++ b/tests/test_vcf.py @@ -1,9 +1,19 @@ """Test VCF input/output features.""" import io +import os +import pathlib +import shutil +import time from http import HTTPStatus import pytest +from billiard.exceptions import TimeLimitExceeded +from celery.contrib.testing.worker import start_worker +from celery.exceptions import WorkerLostError + +import anyvar.anyvar +from anyvar.queueing.celery_worker import celery_app @pytest.fixture() @@ -130,3 +140,174 @@ def test_vcf_registration_invalid_assembly(client, sample_vcf_grch37): files={"vcf": ("test.vcf", sample_vcf_grch37)}, ) assert resp.status_code == HTTPStatus.UNPROCESSABLE_ENTITY + + +def test_vcf_registration_async(client, sample_vcf_grch38, mocker): + """Test the async VCF annotation process using a real Celery worker and background task""" + mocker.patch.dict( + os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "tests/tmp_async_work_dir"} + ) + assert anyvar.anyvar.has_queueing_enabled(), "async vcf queueing is not enabled" + with start_worker( + celery_app, + pool="solo", + loglevel="info", + perform_ping_check=False, + shutdown_timeout=30, + ): + resp = client.put( + "/vcf", + params={"assembly": "GRCh38", "run_id": "12345", "run_async": True}, + files={"vcf": ("test.vcf", sample_vcf_grch38)}, + ) + assert resp.status_code == HTTPStatus.ACCEPTED + assert "status_message" in resp.json() + assert ( + resp.json()["status_message"] == "Run submitted. Check status at /vcf/12345" + ) + assert "status" in resp.json() + assert resp.json()["status"] == "PENDING" + assert "run_id" in resp.json() + assert resp.json()["run_id"] == "12345" + + time.sleep(5) + + resp = client.get("/vcf/12345") + assert resp.status_code == HTTPStatus.OK + assert ( + b"VRS_Allele_IDs=ga4gh:VA.ryPubD68BB0D-D78L_kK4993mXmsNNWe,ga4gh:VA._QhHH18HBAIeLos6npRgR-S_0lAX5KR6" + in resp.content + ) + shutil.rmtree("tests/tmp_async_work_dir") + + +def test_vcf_get_result_no_async(client, mocker): + """Tests that a 400 is returned when async processing is not enabled""" + mocker.patch.dict( + os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "", "CELERY_BROKER_URL": ""} + ) + resp = client.get("/vcf/12345") + assert resp.status_code == HTTPStatus.BAD_REQUEST + assert "error" in resp.json() + assert ( + resp.json()["error"] + == "Required modules and/or configurations for asynchronous VCF annotation are missing" + ) + + +def test_vcf_get_result_success(client, mocker): + """Tests the get async VCF result endpoint when annotation was successful""" + mocker.patch.dict( + os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "./", "CELERY_BROKER_URL": "redis://"} + ) + mock_result = mocker.patch("anyvar.restapi.main.AsyncResult") + mock_result.return_value.status = "SUCCESS" + mock_result.return_value.result = __file__ + mock_bg_tasks = mocker.patch("anyvar.restapi.main.BackgroundTasks.add_task") + resp = client.get("/vcf/12345") + assert resp.status_code == HTTPStatus.OK + with pathlib.Path(__file__).open(mode="rb") as fd: + assert resp.content == fd.read() + mock_result.return_value.forget.assert_called_once() + mock_bg_tasks.assert_called_with(os.unlink, __file__) + + +def test_vcf_get_result_failure_timeout(client, mocker): + """Tests the get async VCF result endpoint when annotation fails due to task timeout""" + mocker.patch.dict( + os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "./", "CELERY_BROKER_URL": "redis://"} + ) + mock_result = mocker.patch("anyvar.restapi.main.AsyncResult") + mock_result.return_value.status = "FAILURE" + mock_result.return_value.result = TimeLimitExceeded("task timed out") + mock_result.return_value.kwargs = {"input_file_path": __file__} + mock_bg_tasks = mocker.patch("anyvar.restapi.main.BackgroundTasks.add_task") + resp = client.get("/vcf/12345") + assert resp.status_code == HTTPStatus.INTERNAL_SERVER_ERROR + assert "error" in resp.json() + assert resp.json()["error"] == "TimeLimitExceeded('task timed out',)" + assert "error_code" in resp.json() + assert resp.json()["error_code"] == "TIME_LIMIT_EXCEEDED" + mock_result.return_value.forget.assert_called_once() + mock_bg_tasks.assert_called_once() + + +def test_vcf_get_result_failure_worker_lost(client, mocker): + """Tests the get async VCF result endpoint when annotation failed due to lost worker""" + mocker.patch.dict( + os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "./", "CELERY_BROKER_URL": "redis://"} + ) + mock_result = mocker.patch("anyvar.restapi.main.AsyncResult") + mock_result.return_value.status = "FAILURE" + mock_result.return_value.result = WorkerLostError("killed") + mock_result.return_value.kwargs = {"input_file_path": __file__} + mock_bg_tasks = mocker.patch("anyvar.restapi.main.BackgroundTasks.add_task") + resp = client.get("/vcf/12345") + assert resp.status_code == HTTPStatus.INTERNAL_SERVER_ERROR + assert "error" in resp.json() + assert resp.json()["error"] == "killed" + assert "error_code" in resp.json() + assert resp.json()["error_code"] == "WORKER_LOST_ERROR" + mock_result.return_value.forget.assert_called_once() + mock_bg_tasks.assert_called_once() + + +def test_vcf_get_result_failure_other(client, mocker): + """Tests the get async VCF result endpoint when annotation failed due to an error""" + mocker.patch.dict( + os.environ, + { + "ANYVAR_VCF_ASYNC_WORK_DIR": "./", + "CELERY_BROKER_URL": "redis://", + "ANYVAR_VCF_ASYNC_FAILURE_STATUS_CODE": "200", + }, + ) + mock_result = mocker.patch("anyvar.restapi.main.AsyncResult") + mock_result.return_value.status = "FAILURE" + mock_result.return_value.result = KeyError("foo") + mock_result.return_value.kwargs = {"input_file_path": __file__} + mock_bg_tasks = mocker.patch("anyvar.restapi.main.BackgroundTasks.add_task") + resp = client.get("/vcf/12345") + assert resp.status_code == HTTPStatus.OK + assert "error" in resp.json() + assert resp.json()["error"] == "'foo'" + assert "error_code" in resp.json() + assert resp.json()["error_code"] == "RUN_FAILURE" + mock_result.return_value.forget.assert_called_once() + mock_bg_tasks.assert_called_once() + + +def test_vcf_get_result_notfound(client, mocker): + """Tests the get async VCF result endpoint when run id is not found""" + mocker.patch.dict( + os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "./", "CELERY_BROKER_URL": "redis://"} + ) + mock_result = mocker.patch("anyvar.restapi.main.AsyncResult") + mock_result.return_value.status = "PENDING" + resp = client.get("/vcf/12345") + assert resp.status_code == HTTPStatus.NOT_FOUND + assert "status_message" in resp.json() + assert resp.json()["status_message"] == "Run not found" + assert "status" in resp.json() + assert resp.json()["status"] == "NOT_FOUND" + assert "run_id" in resp.json() + assert resp.json()["run_id"] == "12345" + + +def test_vcf_get_result_notcomplete(client, mocker): + """Tests the get async VCF result endpoint when annotation is not yet complete""" + mocker.patch.dict( + os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "./", "CELERY_BROKER_URL": "redis://"} + ) + mock_result = mocker.patch("anyvar.restapi.main.AsyncResult") + mock_result.return_value.status = "SENT" + resp = client.get("/vcf/12345") + assert resp.status_code == HTTPStatus.ACCEPTED + assert "status_message" in resp.json() + assert ( + resp.json()["status_message"] == "Run not completed. Check status at /vcf/12345" + ) + assert "status" in resp.json() + assert resp.json()["status"] == "PENDING" + assert "run_id" in resp.json() + assert resp.json()["run_id"] == "12345" From 2f9d0cfe716305c95b539a5bc16af41daa896df4 Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Mon, 18 Nov 2024 11:19:06 -0500 Subject: [PATCH 09/20] Don't allow duplicate run ids to be active concurrently --- src/anyvar/restapi/main.py | 11 ++++++++++- tests/test_vcf.py | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/anyvar/restapi/main.py b/src/anyvar/restapi/main.py index 09df063..879d750 100644 --- a/src/anyvar/restapi/main.py +++ b/src/anyvar/restapi/main.py @@ -326,8 +326,17 @@ async def _annotate_vcf_async( allow_async_write: bool, assembly: str, run_id: str | None, -) -> RunStatusResponse: +) -> RunStatusResponse | ErrorResponse: """Annotate with VRS IDs asynchronously. See `annotate_vcf()` for parameter definitions.""" + # if run_id is provided, validate it does not already exist + if run_id: + existing_result = AsyncResult(id=run_id) + if existing_result.status != "PENDING": + response.status_code = status.HTTP_400_BAD_REQUEST + return ErrorResponse( + error=f"An existing run with id {run_id} is {existing_result.status}. Fetch the completed run result before submitting with the same run_id." + ) + # write file to shared storage area with a directory for each day and a random file name async_work_dir = os.environ.get("ANYVAR_VCF_ASYNC_WORK_DIR", None) utc_now = datetime.datetime.now(tz=datetime.UTC) diff --git a/tests/test_vcf.py b/tests/test_vcf.py index d5019a0..b0d1578 100644 --- a/tests/test_vcf.py +++ b/tests/test_vcf.py @@ -181,6 +181,44 @@ def test_vcf_registration_async(client, sample_vcf_grch38, mocker): shutil.rmtree("tests/tmp_async_work_dir") +def test_vcf_submit_no_async(client, sample_vcf_grch38, mocker): + """Tests that a 400 is returned when async processing is not enabled""" + mocker.patch.dict( + os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "", "CELERY_BROKER_URL": ""} + ) + resp = client.put( + "/vcf", + params={"assembly": "GRCh38", "run_id": "12345", "run_async": True}, + files={"vcf": ("test.vcf", sample_vcf_grch38)}, + ) + assert resp.status_code == HTTPStatus.BAD_REQUEST + assert "error" in resp.json() + assert ( + resp.json()["error"] + == "Required modules and/or configurations for asynchronous VCF annotation are missing" + ) + + +def test_vcf_submit_duplicate_run_id(client, sample_vcf_grch38, mocker): + """Tests the submit VCF endpoint when there is already a run for the specified run id""" + mocker.patch.dict( + os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "./", "CELERY_BROKER_URL": "redis://"} + ) + mock_result = mocker.patch("anyvar.restapi.main.AsyncResult") + mock_result.return_value.status = "SENT" + resp = client.put( + "/vcf", + params={"assembly": "GRCh38", "run_id": "12345", "run_async": True}, + files={"vcf": ("test.vcf", sample_vcf_grch38)}, + ) + assert resp.status_code == HTTPStatus.BAD_REQUEST + assert "error" in resp.json() + assert ( + resp.json()["error"] + == "An existing run with id 12345 is SENT. Fetch the completed run result before submitting with the same run_id." + ) + + def test_vcf_get_result_no_async(client, mocker): """Tests that a 400 is returned when async processing is not enabled""" mocker.patch.dict( From d2232f5775ca007c10ed772cfe069f1af66a0a1b Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Mon, 18 Nov 2024 11:22:33 -0500 Subject: [PATCH 10/20] Add documentation for async capabilities --- README-async.md | 121 ++++++++++++++++++++++++++++++++++++++++++++++++ README.md | 16 +++++++ 2 files changed, 137 insertions(+) create mode 100644 README-async.md diff --git a/README-async.md b/README-async.md new file mode 100644 index 0000000..ac6c5a4 --- /dev/null +++ b/README-async.md @@ -0,0 +1,121 @@ +# AnyVar Asynchronous VCF Annotation +AnyVar can use an +[asynchronous request-response pattern](https://learn.microsoft.com/en-us/azure/architecture/patterns/async-request-reply) +when annotating VCF files. This can improve reliability when serving remote clients by +eliminating long lived connections and allow AnyVar to scale horizontally instead of vertically +to serve a larger request volume. AnyVar utilizes the [Celery](https://docs.celeryq.dev/) +distributed task queue to manage the asynchronous tasks. + +### How It Works +AnyVar can be run as a FastAPI app that provides a REST API. The REST API is run using +uvicorn or gunicorn, eg: +```shell +% uvicorn anyvar.restapi.main:app +``` + +AnyVar can also be run as a Celery worker app that processes tasks submitted through the REST API, eg: +```shell +% celery -A anyvar.queueing.celery_worker:celery_app worker +``` + +When VCF files are submitted to the `/vcf` endpoint with the `run_async=True` query parameter, +the REST API submits a task to the Celery worker via a queue and immediately returns a `202 Accepted` +response with a `Location` header indicating where the client should poll for status and results. +Once the VCF is annotated and the result is ready, the polling request will return the annotated +VCF file. For example: +``` +> PUT /vcf?run_async=True HTTP/1.1 +> Content-type: multipart/form-data... + +< HTTP/1.1 202 Accepted +< Location: /vcf/a1ac7850-0df7-4db6-82ab-b19bce93faf3 +< Retry-After: 120 + +> GET /vcf/a1ac7850-0df7-4db6-82ab-b19bce93faf3 HTTP/1.1 + +< HTTP/1.1 202 Accepted + +> GET /vcf/a1ac7850-0df7-4db6-82ab-b19bce93faf3 HTTP/1.1 + +> HTTP/1.1 200 OK +> +> ##fileformat=VCFv4.2... +``` + +The client can provide a `run_id=...` query parameter with the initial PUT request. If one is not +provided, a random UUID will be generated (as illustrated above). + +### Setting Up Asynchronous VCF Processing +Enabling asychronous VCF processing requires some additional setup. + +#### Install the Necessary Dependencies +Asynchronous VCF processing requires the installation of additional, optional dependencies: +```shell +% pip install .[queueing] +``` + +#### Start an Instance of Redis +Celery relies on a message broker and result backend to manage the task queue and store results. +The simplest option is to use a single instance of [Redis](https://redis.io) for both purposes. This +documentation and the default settings will both assume this configuration. For other message broker +and result backend options, refer to the Celery documentation. + +If a Docker engine is available, start a local instance of Redis: +```shell +% docker run -d -p 6379:6379 redis:alpine +``` +Or follow the [instructions](https://redis.io/docs/latest/get-started/) to run locally. + +#### Create a Scratch Directory for File Storage +AnyVar does not store the actual VCF files in Redis for asynchronous processing, only paths to the file. +This allows very large VCF files to be asychronously processed. All REST API and worker instances of AnyVar +require access to the same shared file system. + +#### Start the REST API +Start the REST API with environment variables to set shared resource locations: +```shell +% CELERY_BROKER_URL="redis://localhost:6379/0" \ + CELERY_BACKEND_URL="redis://localhost:6379/0" \ + ANYVAR_VCF_ASYNC_WORK_DIR="/path/to/shared/file/system" \ + uvicorn anyvar.restapi.main:app +``` + +#### Start a Celery Worker +Start a Celery worker with environment variables to set shared resource locations: +```shell +% CELERY_BROKER_URL="redis://localhost:6379/0" \ + CELERY_BACKEND_URL="redis://localhost:6379/0" \ + ANYVAR_VCF_ASYNC_WORK_DIR="/path/to/shared/file/system" \ + celery -A anyvar.queueing.celery_worker:celery_app worker +``` +To start multiple Celery workers use the `--concurrency` option. +Note that Celery supports multiple worker pool types (prefork, threads, etc). AnyVar +should only be run using the prefork pool, which is the default, because the AnyVar code +is not thread safe. + +#### Submit an Async VCF Request +Now that the REST API and Celery worker are running, submit an async VCF request with cURL: +```shell +% curl -v -X PUT -F "vcf=@test.vcf" 'https://localhost:8000/vcf?run_async=True&run_id=12345' +``` +And then check its status: +```shell +% curl -v 'https://localhost:8000/vcf/12345' +``` + +### Additional Environment Variables +In addition to the environment variables mentioned previously, the following environment variables +can be set to adjust the behavior of AnyVar and Celery: +| Variable | Description | Default | +| -------- | ------- | ------- | +| CELERY_TASK_DEFAULT_QUEUE | The name of the queue for tasks | anyvar_q | +| CELERY_EVENT_QUEUE_PREFIX | The prefix for event receiver queue names | anyvar_ev | +| CELERY_TIMEZONE | The timezone that Celery operates in | UTC | +| CELERY_RESULT_EXPIRES | Number of seconds after submission before a result expires from the backend | 7200 | +| CELERY_TASK_ACKS_LATE | Whether workers acknowledge tasks before (`false`) or after (`true`) they are run | true | +| CELERY_TASK_REJECT_ON_WORKER_LOST | Whether to reject (`true`) or fail (`false`) a task when a worker dies mid-task | false | +| CELERY_WORKER_PREFETCH_MULTIPLIER | How many tasks a worker should fetch from the queue at a time | 1 | +| CELERY_TASK_TIME_LIMIT | Maximum time a task may run before it is terminated | 3900 | +| CELERY_SOFT_TIME_LIMIT | Amount of time a task can run before an exception is triggered, allowing for cleanup | 3600 | +| CELERY_WORKER_SEND_TASK_EVENTS | Change to `true` to cause Celery workers to emit task events for monitoring purposes | false | +| ANYVAR_VCF_ASYNC_FAILURE_STATUS_CODE | What HTTP status code to return for failed asynchronous tasks | 500 | diff --git a/README.md b/README.md index d278989..d61f841 100644 --- a/README.md +++ b/README.md @@ -178,6 +178,13 @@ CREATE TABLE ... ( ) ``` +### Enabling Asynchronous VCF Annotation +AnyVar can support using the asynchronous request-response pattern when annotating VCF files. +This can improve reliability when serving remote clients by eliminating long lived connections +and allow AnyVar to scale out instead of up to serve a larger request volume. + +See README-async.md for more details. + ### Starting the REST service locally Once the data dependencies are setup, start the REST server with: @@ -214,6 +221,15 @@ able to pass when run in isolation. By default, the tests will use a Postgres da installation. To run the tests against a Snowflake database, change the `ANYVAR_TEST_STORAGE_URI` to a Snowflake URI and run the tests. +For the `tests/test_vcf::test_vcf_registration_async` unit test to pass, a real broker and backend +are required for Celery to interact with. Set the `CELERY_BROKER_URL` and `CELERY_BACKEND_URL` +environment variables. The simplest solution is to run Redis locally and use that for both +the broker and the backend, eg: +```shell +% export CELERY_BROKER_URL="redis://" +% export CELERY_BACKEND_URL="redis://" +``` + ## Logging AnyVar uses the [Python Logging Module](https://docs.python.org/3/howto/logging.html) to output information and diagnostics. By default, log output is directed to standard output From db4b32603ded27a9adb41a3095e5cee9d0d95c74 Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Mon, 18 Nov 2024 12:13:00 -0500 Subject: [PATCH 11/20] Documentation updates --- README-async.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/README-async.md b/README-async.md index ac6c5a4..4cfa647 100644 --- a/README-async.md +++ b/README-async.md @@ -89,9 +89,6 @@ Start a Celery worker with environment variables to set shared resource location celery -A anyvar.queueing.celery_worker:celery_app worker ``` To start multiple Celery workers use the `--concurrency` option. -Note that Celery supports multiple worker pool types (prefork, threads, etc). AnyVar -should only be run using the prefork pool, which is the default, because the AnyVar code -is not thread safe. #### Submit an Async VCF Request Now that the REST API and Celery worker are running, submit an async VCF request with cURL: From 19a8428f1ca9ac043a14a521c3e86831046e26f8 Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Mon, 18 Nov 2024 13:40:53 -0500 Subject: [PATCH 12/20] Documentation updates --- README-async.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/README-async.md b/README-async.md index 4cfa647..f5ad514 100644 --- a/README-async.md +++ b/README-async.md @@ -53,6 +53,8 @@ Asynchronous VCF processing requires the installation of additional, optional de ```shell % pip install .[queueing] ``` +This will install the `celery[redis]` module and its dependencies. To connect Celery to a different +message broker or backend, install the appropriate extras with Celery. #### Start an Instance of Redis Celery relies on a message broker and result backend to manage the task queue and store results. @@ -102,7 +104,10 @@ And then check its status: ### Additional Environment Variables In addition to the environment variables mentioned previously, the following environment variables -can be set to adjust the behavior of AnyVar and Celery: +are directly supported and applied by AnyVar during startup. It is advisable to understand the underlying +Celery configuration options in more detail before making any changes. The Celery configuration parameter +name corresponding to each environment variable can be derived by removing the leading `CELERY_` and lower +casing the remaining, e.g.: `CELERY_TASK_DEFAULT_QUEUE` -> `task_default_queue`. | Variable | Description | Default | | -------- | ------- | ------- | | CELERY_TASK_DEFAULT_QUEUE | The name of the queue for tasks | anyvar_q | From a59430c23ad01641ba31e2d26765ad6b6ac58b3e Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Mon, 18 Nov 2024 14:31:39 -0500 Subject: [PATCH 13/20] Change time estimate to 333/sec for async --- src/anyvar/restapi/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/anyvar/restapi/main.py b/src/anyvar/restapi/main.py index 879d750..8e9dcab 100644 --- a/src/anyvar/restapi/main.py +++ b/src/anyvar/restapi/main.py @@ -376,8 +376,8 @@ async def _annotate_vcf_async( # set response headers response.status_code = status.HTTP_202_ACCEPTED response.headers["Location"] = f"/vcf/{task_result.id}" - # estimate for time is 500 variants per second - retry_after = max(1, round((vcf_site_count * (2 if for_ref else 1)) / 500, 0)) + # low side estimate for time is 333 variants per second + retry_after = max(1, round((vcf_site_count * (2 if for_ref else 1)) / 333, 0)) _logger.debug("%s - retry after is %s", task_result.id, str(retry_after)) response.headers["Retry-After"] = str(retry_after) return RunStatusResponse( From c33b3ae6960926e1603bbe4d9292cf857ff1bce3 Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Mon, 18 Nov 2024 14:48:15 -0500 Subject: [PATCH 14/20] Documentation format updates --- README-async.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/README-async.md b/README-async.md index f5ad514..26493d0 100644 --- a/README-async.md +++ b/README-async.md @@ -6,7 +6,7 @@ eliminating long lived connections and allow AnyVar to scale horizontally instea to serve a larger request volume. AnyVar utilizes the [Celery](https://docs.celeryq.dev/) distributed task queue to manage the asynchronous tasks. -### How It Works +## How It Works AnyVar can be run as a FastAPI app that provides a REST API. The REST API is run using uvicorn or gunicorn, eg: ```shell @@ -45,10 +45,10 @@ VCF file. For example: The client can provide a `run_id=...` query parameter with the initial PUT request. If one is not provided, a random UUID will be generated (as illustrated above). -### Setting Up Asynchronous VCF Processing +## Setting Up Asynchronous VCF Processing Enabling asychronous VCF processing requires some additional setup. -#### Install the Necessary Dependencies +### Install the Necessary Dependencies Asynchronous VCF processing requires the installation of additional, optional dependencies: ```shell % pip install .[queueing] @@ -56,7 +56,7 @@ Asynchronous VCF processing requires the installation of additional, optional de This will install the `celery[redis]` module and its dependencies. To connect Celery to a different message broker or backend, install the appropriate extras with Celery. -#### Start an Instance of Redis +### Start an Instance of Redis Celery relies on a message broker and result backend to manage the task queue and store results. The simplest option is to use a single instance of [Redis](https://redis.io) for both purposes. This documentation and the default settings will both assume this configuration. For other message broker @@ -68,12 +68,12 @@ If a Docker engine is available, start a local instance of Redis: ``` Or follow the [instructions](https://redis.io/docs/latest/get-started/) to run locally. -#### Create a Scratch Directory for File Storage +### Create a Scratch Directory for File Storage AnyVar does not store the actual VCF files in Redis for asynchronous processing, only paths to the file. This allows very large VCF files to be asychronously processed. All REST API and worker instances of AnyVar require access to the same shared file system. -#### Start the REST API +### Start the REST API Start the REST API with environment variables to set shared resource locations: ```shell % CELERY_BROKER_URL="redis://localhost:6379/0" \ @@ -82,7 +82,7 @@ Start the REST API with environment variables to set shared resource locations: uvicorn anyvar.restapi.main:app ``` -#### Start a Celery Worker +### Start a Celery Worker Start a Celery worker with environment variables to set shared resource locations: ```shell % CELERY_BROKER_URL="redis://localhost:6379/0" \ @@ -92,7 +92,7 @@ Start a Celery worker with environment variables to set shared resource location ``` To start multiple Celery workers use the `--concurrency` option. -#### Submit an Async VCF Request +### Submit an Async VCF Request Now that the REST API and Celery worker are running, submit an async VCF request with cURL: ```shell % curl -v -X PUT -F "vcf=@test.vcf" 'https://localhost:8000/vcf?run_async=True&run_id=12345' @@ -102,7 +102,7 @@ And then check its status: % curl -v 'https://localhost:8000/vcf/12345' ``` -### Additional Environment Variables +## Additional Environment Variables In addition to the environment variables mentioned previously, the following environment variables are directly supported and applied by AnyVar during startup. It is advisable to understand the underlying Celery configuration options in more detail before making any changes. The Celery configuration parameter From 237b50bb2d6c32e47a0ed27871ac46fa768c7a86 Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Mon, 18 Nov 2024 14:54:01 -0500 Subject: [PATCH 15/20] Add link to async README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d61f841..95368df 100644 --- a/README.md +++ b/README.md @@ -183,7 +183,7 @@ AnyVar can support using the asynchronous request-response pattern when annotati This can improve reliability when serving remote clients by eliminating long lived connections and allow AnyVar to scale out instead of up to serve a larger request volume. -See README-async.md for more details. +See [README-async.md](README-async.md) for more details. ### Starting the REST service locally From e96a18677020a09cb51c7d064d1caf818080a3de Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Thu, 21 Nov 2024 09:22:32 -0500 Subject: [PATCH 16/20] Update to worker shutdown and cleanup handling --- src/anyvar/queueing/celery_worker.py | 105 ++++++++++++++++++++++++--- 1 file changed, 94 insertions(+), 11 deletions(-) diff --git a/src/anyvar/queueing/celery_worker.py b/src/anyvar/queueing/celery_worker.py index 424c506..1992c8f 100644 --- a/src/anyvar/queueing/celery_worker.py +++ b/src/anyvar/queueing/celery_worker.py @@ -49,13 +49,20 @@ ) # if this is a celery worker, we need an AnyVar app instance +# and to track how many tasks are concurrently running +# so that clean up can happen cleanly and at the right time +# For the prefork pool, this is not really needed since preforked +# workers are single threaded; but for the threads pool, this +# state is shared by the worker threads _anyvar_app = None -worker_init_lock = threading.Lock() +_current_task_count = 0 +_cleanup_flag = False +_shared_state_lock = threading.Lock() def get_anyvar_app() -> anyvar.AnyVar: """Create AnyVar app associated with the Celery work as necessary and return it""" - with worker_init_lock: + with _shared_state_lock: global _anyvar_app # create anyvar instance if necessary if not _anyvar_app: @@ -68,19 +75,90 @@ def get_anyvar_app() -> anyvar.AnyVar: return _anyvar_app -@celery.signals.worker_shutdown.connect -def teardown_anyvar(**kwargs) -> None: # noqa: ARG001 - """On the `worker_shutdown` signal, destroy the AnyVar app instance""" - with worker_init_lock: - global _anyvar_app - _logger.info("processing signal worker shutdown") - # close storage connector on shutdown - if _anyvar_app: - _logger.info("closing anyvar app in worker process init") +def teardown_anyvar_app() -> None: + """Shutdown the AnyVar app if it is safe to do so""" + global _anyvar_app + global _shared_state_lock + global _current_task_count + global _cleanup_flag + with _shared_state_lock: + # if it is safe to do so + if _cleanup_flag and _current_task_count == 0 and _anyvar_app: + # cleanly shutdown the AnyVar app, waiting for background writes + _logger.info("closing AnyVar app") + _anyvar_app.object_store.wait_for_writes() _anyvar_app.object_store.close() _anyvar_app = None +def enter_task() -> None: + """Increment the task counter""" + global _shared_state_lock + global _current_task_count + _logger.info( + "incrementing current task count from %s to %s", + _current_task_count, + _current_task_count + 1, + ) + with _shared_state_lock: + _current_task_count = _current_task_count + 1 + + +def exit_task() -> None: + """Decrement the task counter""" + global _shared_state_lock + global _current_task_count + _logger.info( + "decrementing current task count from %s to %s", + _current_task_count, + _current_task_count - 1, + ) + with _shared_state_lock: + _current_task_count = _current_task_count - 1 + + +@celery.signals.worker_shutting_down.connect +def on_worker_shutting_down(**kwargs) -> None: # noqa: ARG001 + """On the `worker_shutting_down` signal, set the cleanup flag and attempt tear down. + This signal is dispatched in both the prefork and threads pool types on the main process. + """ + _logger.info("processing signal worker_shutting_down") + global _shared_state_lock + global _cleanup_flag + with _shared_state_lock: + _cleanup_flag = True + + teardown_anyvar_app() + + +@celery.signals.worker_process_shutdown.connect +def on_worker_process_shutdown(**kwargs) -> None: # noqa: ARG001 + """On the `worker_process_shutdown` signal, set the cleanup flag and attempt tear down. + This signal is dispatched in the forked worker processes in the prefork pool. + """ + _logger.info("processing signal worker_process_shutdown") + global _shared_state_lock + global _cleanup_flag + with _shared_state_lock: + _cleanup_flag = True + + teardown_anyvar_app() + + +@celery.signals.worker_shutdown.connect +def on_worker_shutdown(**kwargs) -> None: # noqa: ARG001 + """On the `worker_shutdown` signal, set the cleanup flag and attempt tear down. + This signal is dispatched in both the prefork and threads pool types on the main process. + """ + _logger.info("processing signal worker_shutdown") + global _shared_state_lock + global _cleanup_flag + with _shared_state_lock: + _cleanup_flag = True + + teardown_anyvar_app() + + @celery_app.task(bind=True) def annotate_vcf( self: Task, @@ -98,6 +176,8 @@ def annotate_vcf( :return: path to the annotated VCF file """ try: + enter_task() + # create output file path output_file_path = f"{input_file_path}_outputvcf" _logger.info( @@ -137,6 +217,9 @@ def annotate_vcf( except Exception: _logger.exception("%s - vcf annotation failed with exception", self.request.id) raise + finally: + exit_task() + teardown_anyvar_app() @celery.signals.after_task_publish.connect From 9dfd81d263414fcd1f72c6bfa99b5e04485832f8 Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Fri, 22 Nov 2024 09:39:10 -0500 Subject: [PATCH 17/20] Improve logging --- src/anyvar/queueing/celery_worker.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/src/anyvar/queueing/celery_worker.py b/src/anyvar/queueing/celery_worker.py index 1992c8f..2945d8e 100644 --- a/src/anyvar/queueing/celery_worker.py +++ b/src/anyvar/queueing/celery_worker.py @@ -1,5 +1,6 @@ """Define the Celery app and tasks for asynchronous request-response support""" +import datetime import logging import os import threading @@ -75,7 +76,7 @@ def get_anyvar_app() -> anyvar.AnyVar: return _anyvar_app -def teardown_anyvar_app() -> None: +def maybe_teardown_anyvar_app() -> None: """Shutdown the AnyVar app if it is safe to do so""" global _anyvar_app global _shared_state_lock @@ -89,6 +90,7 @@ def teardown_anyvar_app() -> None: _anyvar_app.object_store.wait_for_writes() _anyvar_app.object_store.close() _anyvar_app = None + _cleanup_flag = False def enter_task() -> None: @@ -128,7 +130,7 @@ def on_worker_shutting_down(**kwargs) -> None: # noqa: ARG001 with _shared_state_lock: _cleanup_flag = True - teardown_anyvar_app() + maybe_teardown_anyvar_app() @celery.signals.worker_process_shutdown.connect @@ -142,7 +144,7 @@ def on_worker_process_shutdown(**kwargs) -> None: # noqa: ARG001 with _shared_state_lock: _cleanup_flag = True - teardown_anyvar_app() + maybe_teardown_anyvar_app() @celery.signals.worker_shutdown.connect @@ -156,7 +158,7 @@ def on_worker_shutdown(**kwargs) -> None: # noqa: ARG001 with _shared_state_lock: _cleanup_flag = True - teardown_anyvar_app() + maybe_teardown_anyvar_app() @celery_app.task(bind=True) @@ -177,6 +179,7 @@ def annotate_vcf( """ try: enter_task() + task_start = datetime.datetime.now(tz=datetime.UTC) # create output file path output_file_path = f"{input_file_path}_outputvcf" @@ -196,18 +199,25 @@ def annotate_vcf( compute_for_ref=for_ref, assembly=assembly, ) + elapsed = datetime.datetime.now(tz=datetime.UTC) - task_start _logger.info( - "%s - annotation completed", - self.request.id, + "%s - annotation completed in %s seconds", self.request.id, elapsed.seconds ) # wait for writes if necessary if not allow_async_write: _logger.info( - "%s - waiting for object store writes from API handler method", + "%s - waiting for object store writes from celery worker method", self.request.id, ) + write_start = datetime.datetime.now(tz=datetime.UTC) anyvar_app.object_store.wait_for_writes() + elapsed = datetime.datetime.now(tz=datetime.UTC) - write_start + _logger.info( + "%s - waited for object store writes for %s seconds", + self.request.id, + elapsed.seconds, + ) # remove input file Path(input_file_path).unlink() @@ -219,7 +229,7 @@ def annotate_vcf( raise finally: exit_task() - teardown_anyvar_app() + maybe_teardown_anyvar_app() @celery.signals.after_task_publish.connect @@ -230,7 +240,7 @@ def update_sent_state(sender: str | None, headers: dict | None, **kwargs) -> Non :param sender: the name of the task :param headers: the task message headers """ - _logger.info("%s - after publish", headers["id"]) + _logger.info("%s - after publish changing status to SENT", headers["id"]) task = celery_app.tasks.get(sender) backend = task.backend if task else celery_app.backend From 60f975ebaf36a1f2f87f5de7b778676b64e33be5 Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Fri, 22 Nov 2024 17:03:24 -0500 Subject: [PATCH 18/20] Add pool type warning to README --- README-async.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README-async.md b/README-async.md index 26493d0..66d429f 100644 --- a/README-async.md +++ b/README-async.md @@ -92,6 +92,11 @@ Start a Celery worker with environment variables to set shared resource location ``` To start multiple Celery workers use the `--concurrency` option. +[!CAUTION] +Celery supports different pool types (prefork, threads, etc.). +AnyVar ONLY supports the `prefork` and `solo` pool types. + + ### Submit an Async VCF Request Now that the REST API and Celery worker are running, submit an async VCF request with cURL: ```shell From 4101c1f80d724be53cca34d7091e0ed87b21b60e Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Fri, 22 Nov 2024 17:06:35 -0500 Subject: [PATCH 19/20] Fix caution block formatting --- README-async.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README-async.md b/README-async.md index 66d429f..c35e938 100644 --- a/README-async.md +++ b/README-async.md @@ -92,9 +92,9 @@ Start a Celery worker with environment variables to set shared resource location ``` To start multiple Celery workers use the `--concurrency` option. -[!CAUTION] -Celery supports different pool types (prefork, threads, etc.). -AnyVar ONLY supports the `prefork` and `solo` pool types. +> [!CAUTION] +> Celery supports different pool types (prefork, threads, etc.). +> AnyVar ONLY supports the `prefork` and `solo` pool types. ### Submit an Async VCF Request From 2ad7a2eccb0ac12d6f69afe2ef3405834143b907 Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Mon, 2 Dec 2024 13:37:17 -0500 Subject: [PATCH 20/20] Switch from pyaml to lower level pyyaml module --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 6080fc7..25a2226 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ dependencies = [ "uvicorn", "ga4gh.vrs[extras]~=2.0.0a11", "sqlalchemy~=1.4.54", - "pyaml", + "pyyaml", ] dynamic = ["version"]