Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VCF annotation via asynchronous request-response pattern #108

Merged
merged 20 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions README-async.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# AnyVar Asynchronous VCF Annotation
AnyVar can use an
[asynchronous request-response pattern](https://learn.microsoft.com/en-us/azure/architecture/patterns/async-request-reply)
when annotating VCF files. This can improve reliability when serving remote clients by
eliminating long lived connections and allow AnyVar to scale horizontally instead of vertically
to serve a larger request volume. AnyVar utilizes the [Celery](https://docs.celeryq.dev/)
distributed task queue to manage the asynchronous tasks.

## How It Works
AnyVar can be run as a FastAPI app that provides a REST API. The REST API is run using
uvicorn or gunicorn, eg:
```shell
% uvicorn anyvar.restapi.main:app
```

AnyVar can also be run as a Celery worker app that processes tasks submitted through the REST API, eg:
```shell
% celery -A anyvar.queueing.celery_worker:celery_app worker
```

When VCF files are submitted to the `/vcf` endpoint with the `run_async=True` query parameter,
the REST API submits a task to the Celery worker via a queue and immediately returns a `202 Accepted`
response with a `Location` header indicating where the client should poll for status and results.
Once the VCF is annotated and the result is ready, the polling request will return the annotated
VCF file. For example:
```
> PUT /vcf?run_async=True HTTP/1.1
> Content-type: multipart/form-data...

< HTTP/1.1 202 Accepted
< Location: /vcf/a1ac7850-0df7-4db6-82ab-b19bce93faf3
< Retry-After: 120

> GET /vcf/a1ac7850-0df7-4db6-82ab-b19bce93faf3 HTTP/1.1

< HTTP/1.1 202 Accepted

> GET /vcf/a1ac7850-0df7-4db6-82ab-b19bce93faf3 HTTP/1.1

> HTTP/1.1 200 OK
>
> ##fileformat=VCFv4.2...
```

The client can provide a `run_id=...` query parameter with the initial PUT request. If one is not
provided, a random UUID will be generated (as illustrated above).

## Setting Up Asynchronous VCF Processing
Enabling asychronous VCF processing requires some additional setup.

### Install the Necessary Dependencies
Asynchronous VCF processing requires the installation of additional, optional dependencies:
```shell
% pip install .[queueing]
```
This will install the `celery[redis]` module and its dependencies. To connect Celery to a different
message broker or backend, install the appropriate extras with Celery.

### Start an Instance of Redis
Celery relies on a message broker and result backend to manage the task queue and store results.
The simplest option is to use a single instance of [Redis](https://redis.io) for both purposes. This
documentation and the default settings will both assume this configuration. For other message broker
and result backend options, refer to the Celery documentation.

If a Docker engine is available, start a local instance of Redis:
```shell
% docker run -d -p 6379:6379 redis:alpine
```
Or follow the [instructions](https://redis.io/docs/latest/get-started/) to run locally.

### Create a Scratch Directory for File Storage
AnyVar does not store the actual VCF files in Redis for asynchronous processing, only paths to the file.
This allows very large VCF files to be asychronously processed. All REST API and worker instances of AnyVar
require access to the same shared file system.

### Start the REST API
Start the REST API with environment variables to set shared resource locations:
```shell
% CELERY_BROKER_URL="redis://localhost:6379/0" \
CELERY_BACKEND_URL="redis://localhost:6379/0" \
ANYVAR_VCF_ASYNC_WORK_DIR="/path/to/shared/file/system" \
uvicorn anyvar.restapi.main:app
```

### Start a Celery Worker
Start a Celery worker with environment variables to set shared resource locations:
```shell
% CELERY_BROKER_URL="redis://localhost:6379/0" \
CELERY_BACKEND_URL="redis://localhost:6379/0" \
ANYVAR_VCF_ASYNC_WORK_DIR="/path/to/shared/file/system" \
celery -A anyvar.queueing.celery_worker:celery_app worker
```
To start multiple Celery workers use the `--concurrency` option.

> [!CAUTION]
> Celery supports different pool types (prefork, threads, etc.).
> AnyVar ONLY supports the `prefork` and `solo` pool types.


### Submit an Async VCF Request
Now that the REST API and Celery worker are running, submit an async VCF request with cURL:
```shell
% curl -v -X PUT -F "vcf=@test.vcf" 'https://localhost:8000/vcf?run_async=True&run_id=12345'
```
And then check its status:
```shell
% curl -v 'https://localhost:8000/vcf/12345'
```

## Additional Environment Variables
In addition to the environment variables mentioned previously, the following environment variables
are directly supported and applied by AnyVar during startup. It is advisable to understand the underlying
Celery configuration options in more detail before making any changes. The Celery configuration parameter
name corresponding to each environment variable can be derived by removing the leading `CELERY_` and lower
casing the remaining, e.g.: `CELERY_TASK_DEFAULT_QUEUE` -> `task_default_queue`.
| Variable | Description | Default |
| -------- | ------- | ------- |
| CELERY_TASK_DEFAULT_QUEUE | The name of the queue for tasks | anyvar_q |
| CELERY_EVENT_QUEUE_PREFIX | The prefix for event receiver queue names | anyvar_ev |
| CELERY_TIMEZONE | The timezone that Celery operates in | UTC |
| CELERY_RESULT_EXPIRES | Number of seconds after submission before a result expires from the backend | 7200 |
| CELERY_TASK_ACKS_LATE | Whether workers acknowledge tasks before (`false`) or after (`true`) they are run | true |
| CELERY_TASK_REJECT_ON_WORKER_LOST | Whether to reject (`true`) or fail (`false`) a task when a worker dies mid-task | false |
| CELERY_WORKER_PREFETCH_MULTIPLIER | How many tasks a worker should fetch from the queue at a time | 1 |
| CELERY_TASK_TIME_LIMIT | Maximum time a task may run before it is terminated | 3900 |
| CELERY_SOFT_TIME_LIMIT | Amount of time a task can run before an exception is triggered, allowing for cleanup | 3600 |
| CELERY_WORKER_SEND_TASK_EVENTS | Change to `true` to cause Celery workers to emit task events for monitoring purposes | false |
| ANYVAR_VCF_ASYNC_FAILURE_STATUS_CODE | What HTTP status code to return for failed asynchronous tasks | 500 |
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ CREATE TABLE ... (
)
```

### Enabling Asynchronous VCF Annotation
AnyVar can support using the asynchronous request-response pattern when annotating VCF files.
This can improve reliability when serving remote clients by eliminating long lived connections
and allow AnyVar to scale out instead of up to serve a larger request volume.

See [README-async.md](README-async.md) for more details.

### Starting the REST service locally

Once the data dependencies are setup, start the REST server with:
Expand Down Expand Up @@ -214,6 +221,15 @@ able to pass when run in isolation. By default, the tests will use a Postgres da
installation. To run the tests against a Snowflake database, change the
`ANYVAR_TEST_STORAGE_URI` to a Snowflake URI and run the tests.

For the `tests/test_vcf::test_vcf_registration_async` unit test to pass, a real broker and backend
are required for Celery to interact with. Set the `CELERY_BROKER_URL` and `CELERY_BACKEND_URL`
environment variables. The simplest solution is to run Redis locally and use that for both
the broker and the backend, eg:
```shell
% export CELERY_BROKER_URL="redis://"
% export CELERY_BACKEND_URL="redis://"
```

## Logging
AnyVar uses the [Python Logging Module](https://docs.python.org/3/howto/logging.html) to
output information and diagnostics. By default, log output is directed to standard output
Expand Down
10 changes: 8 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,23 @@ dependencies = [
"uvicorn",
"ga4gh.vrs[extras]~=2.0.0a11",
"sqlalchemy~=1.4.54",
"pyaml",
ehclark marked this conversation as resolved.
Show resolved Hide resolved
]
dynamic = ["version"]

[project.optional-dependencies]
postgres = ["psycopg[binary]"]
snowflake = ["snowflake-sqlalchemy~=1.5.1"]
tests = [
queueing = [
"celery[redis]~=5.4.0",
"aiofiles",
]
test = [
"pytest",
"pytest-cov",
"pytest-mock",
"httpx"
"httpx",
"celery[pytest]",
]
dev = [
"ruff==0.5.0",
Expand Down
24 changes: 24 additions & 0 deletions src/anyvar/anyvar.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,32 @@

"""

import importlib.util
import logging
import logging.config
import os
import pathlib
from collections.abc import MutableMapping
from urllib.parse import urlparse

import yaml
from ga4gh.vrs import vrs_deref, vrs_enref

from anyvar.storage import DEFAULT_STORAGE_URI, _Storage
from anyvar.translate.translate import _Translator
from anyvar.translate.vrs_python import VrsPythonTranslator
from anyvar.utils.types import VrsObject

# Configure logging from file or use default
logging_config_file = os.environ.get("ANYVAR_LOGGING_CONFIG", None)
if logging_config_file and pathlib.Path(logging_config_file).is_file():
with pathlib.Path(logging_config_file).open() as fd:
try:
config = yaml.safe_load(fd.read())
logging.config.dictConfig(config)
except Exception:
logging.exception("Error in Logging Configuration. Using default configs")

_logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -60,6 +74,16 @@ def create_translator() -> _Translator:
return VrsPythonTranslator()


def has_queueing_enabled() -> bool:
"""Determine whether or not asynchronous task queueing is enabled"""
return (
importlib.util.find_spec("aiofiles") is not None
and importlib.util.find_spec("celery") is not None
and os.environ.get("CELERY_BROKER_URL", "") != ""
and os.environ.get("ANYVAR_VCF_ASYNC_WORK_DIR", "") != ""
)


class AnyVar:
"""Define core AnyVar class."""

Expand Down
1 change: 1 addition & 0 deletions src/anyvar/queueing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Provides asynchronous tasks via Celery integration"""
Loading