This repository enables one to spin-up a fastapi
/xpublish
server from either an Intake or STAC catalog.
- Serves cataloged
.zarr
or.nc
data over a REST API without pre-loading. - Parses catalog hierarchy and creates matching URL paths/prefixes (i.e., "
https://server.com/catalog1/sub_catalog2/sub_catalog_5/datasets/{dataset_id}
"). - Fully compatible with any 3rd xpublish dataset router, or app router plugins. For example,
xpublish-opendap
can be included to provide key OPeNDAP capabilities (i.e., slicing). - Leverages a factory pattern architecture and Python Abstract Base Classes (ABCs) to streamline the addition of more catalog schema as needs develop.
- Thanks to
FastAPI
, interactive Swagger and Redoc API documentation endpoints are automatically generated on spin-up.
- Pip install
catalog_to_xpublish
into your Python >=3.11 environment.pip install git+https://github.com/xpublish-experiments/Catalog-To-Xpublish.git
- Install any
xpublish
plugins you wish to use. For example, to installxpublish-opendap
:pip install xpublish-opendap
- Select a Intake
.yaml
or STAC.json
file path or URL to serve data from. - Use the
catalog_to_xpublish.create_app()
function to spin-up aFastAPI()
application. Note that the plugin objects (i.e.,xpublish_opendap.OpenDapPlugin
) should be passed in via a list toparam:xpublish_plugins
. See example below:import catalog_to_xpublish from fastapi import FastAPI from xpublish_opendap import OpenDapPlugin # identify a catalog CATALOG_URL = 'https://code.usgs.gov/wma/nhgf/stac/-/raw/main/catalog2/catalog.json' # create FastAPI app instance app = catalog_to_xpublish.create_app( catalog_path=CATALOG_URL, catalog_type='stac', app_name='Test NHGF Catalog Server', xpublish_plugins=[OpenDapPlugin], )
- Use
uvicorn
to deploy the server. For example, assuming the previous code snippet was called inrun_server.py
:# define server hosting location LOCAL_HOST = '127.0.0.1' LOCAL_PORT = 8000 # define wrap uvicorn.run() in main() def main() -> None: """Main function to run the server.""" uvicorn.run( 'run_server:app', host=LOCAL_HOST, port=LOCAL_PORT, reload=True, ) # start the server when run_server.py is executed if __name__ == '__main__': main()
- All core
xpublish
endpoints/routes are supported after adatasets/{dataset_id}
is selected. - At each level in the catalog hierarchy one can:
- See the parent catalog via
/parent_catalog
. - See all sub-catalogs via
/catalogs
. - See all servable datasets via
/datasets
. - Get the catalog represented as YAML text via
/yaml
. - Get the catalog represented as JSON via
/json
. - FastAPI documentation via
/docs
(for Swagger) or/redoc
for Redoc. Note thatxpublish
endpoints will only appear at a catalog level containing servable datasets.
- See the parent catalog via
- After a
datasets/{dataset_id}
is selected, one can also use any additional endpoints added viaxpublish
plugins. These endpoints will appear in the API documentation endpoint/docs
.
By default, catalog_to_xpublish
will log to the console at the "INFO" level.
One can change logging behaviors by passing in a config_logging_dict
argument to catalog_to_xpublish.create_app()
which contains any of following keys:
log_level
: The logging level. Default is "INFO".log_file_path
: The path to a log file. Default isNone
.date_format
: The date format for log messages. Default is'%Y-%m-%dT%H:%M:%S'
.log_format
: The log message format. Default is'[%(asctime)s] %(levelname)s - %(message)s'
.stream_handlers
: Stream handler(s) to use (can be a list). Default is is console.
For example, here is how one can log at the DEBUG level to an existing logging.StreamHandler
and log file:
import catalog_to_xpublish
import logging
# identify a catalog
CATALOG_URL = 'https://code.usgs.gov/wma/nhgf/stac/-/raw/main/catalog2/catalog.json'
# set logging config
existing_stream_handler: logging.StreamHandler = ...
LOGGING_CONFIG = {
'log_level': 'DEBUG',
'log_file_path': 'my_log_file.log',
'stream_handlers': [existing_stream_handler],
}
# create FastAPI app instance
app = catalog_to_xpublish.create_app(
catalog_path=CATALOG_URL,
catalog_type='stac',
config_logging_dict=LOGGING_CONFIG,
)
We strongly encourage open-source contributions to this repository! I am new to this tech stack, and likely have much to learn from the wider xpublish
community.
Please note any bugs or feature requests via our GitHub Issues page.
To contribute code, please follow the below steps:
-
Create a conda/mamba environment by cloning our developer environment (
dev_environment.yml
).'(base) cd {PATH}/{TO}/{REPO} (base) conda env create -f dev_environment.yml
-
Fork this repository and create a new branch off of
main
with a descriptive name. -
Make your changes and commit them to your new branch.
-
When finished run our pre-commit hooks and tests locally (see below).
(base) conda activate catalog_to_xpublish_dev (catalog_to_xpublish_dev) pre-commit run --all-files
-
Make new tests for any new features you add. This code will be pip installed by users to run servers, and we want to make sure it is robust. The file must be named following
pytest
conventions, we encouragetests/test_{#}_{new_feature_name}.py
format. -
Run (and pass) the our full test suite locally. Note: Some of our test datasets are hosted on AWS, and require AWS credentials to run locally. These tests are currently not-apart of the automated GitHub Actions test suite, however we aim to add them soon. The test only accesses public OSN data, so you can use any AWS credentials.
(catalog_to_xpublish_dev) pytest
- Make a pull request to merge your branch into
main
. We will review the pull request and merge it if it is deemed appropriate.
As environmental science progresses, we expect additional catalog schemas beyond Intake and STAC to become relevant. Alternatively, STAC supports extensions, and one may need to build an adjusted STAC implementation for specific needs/desires.
A catalog implementation consists of three core components/classes. To create a new catalog implementation one must define a concrete implementation of each of the following abstract base classes.
base.CatalogSearcher
- Responsible for parsing catalog hierarchy.base.CatalogToXarray
- Responsible for reading catalog items intoxarray.Dataset
objects and writing attributes.base.CatalogRouter
- Responsible for defining our core endpoints behavior and attaching it to afastapi.APIRouter
.
The concrete versions of the above classes then need to be decorated as "products" so that catalog_to_xpublish.CatalogImplementationFactory
can identify, validate, and register the new catalog implementation.
This process is demonstrated below:
- Create a concrete implementation of
base/searcher_base.CatalogSearcher
and decorate it as afactory.CatalogSearcherClass
. SeeCatalogSearcher
docstring information for more detail. Note that one must create a function for all abstract methods, even if it returns aNotImplementedError
.from catalog_to_xpublish.base import ( CatalogSearcher, CatalogEndpoint, ) from catalog_to_xpublish.factory import CatalogSearcherClass @CatalogSearcherClass class NewCatalogTypeSearcher(CatalogSearcher): catalog_type: str = 'new_catalog_type' @property def catalog_path(self) -> Path | str: ... @property def suffixes(self) -> List[str]: ... @property def catalog_object(self) -> object: ... def parse_catalog( self, catalog: Optional[object] = None, parent_path: Optional[str] = None, list_of_catalog_endpoints: Optional[List[CatalogEndpoint]] = None, ) -> List[CatalogEndpoint]: """Recursively searches a catalog for a search term.""" ...
- Create a concrete implementation of
base/io_base.CatalogToXarray
and decorate it as afactory.CatalogIOClass
. SeeCatalogToXarray
docstring information for more detail.import xarray as xr from catalog_to_xpublish.base import CatalogToXarray from catalog_to_xpublish.factory import CatalogIOClass @CatalogIOClass class NewCatalogToXarray(CatalogToXarray): catalog_type: str = 'new_catalog_type' def write_attributes( self, ds: xr.Dataset, info_dict: Dict[str, Any], ) -> xr.Dataset: """Write attributes from the catalog object to the dataset.attrs.""" ... def get_dataset_from_catalog( self, dataset_id: str, ) -> xr.Dataset: """Get an xarray dataset from the catalog object.""" ...
- Finally, create a concrete implementation of
base/router_base.CatalogRouter
and decorate it as afactory.CatalogRouterClass
. SeeCatalogRouter
docstring information for more detail. note that one can extend this class with additional endpoints however necessary.from fastapi.responses import ( HTMLResponse, PlainTextResponse, JSONResponse, ) from catalog_to_xpublish.server_functions import add_base_routes from catalog_to_xpublish.base import CatalogRouter from catalog_to_xpublish.factory import CatalogRouterClass @CatalogRouterClass class NewCatalogRouter(CatalogRouter): catalog_type: str = 'new_catalog_type' def __init__( self, catalog_endpoint_obj: CatalogEndpoint, prefix: Optional[str] = None, ) -> None: """An init that contains a call to server_functions.add_base_routes(self)""" # add routes for all base CatalogRouter endpoints/class functions super().__init__( catalog_endpoint_obj=catalog_endpoint_obj, prefix=prefix, ) ... def list_sub_catalogs(self) -> List[str]: """Returns a list of sub-catalogs.""" ... def get_parent_catalog(self) -> str: """Returns the parent catalog.""" ... def get_catalog_as_yaml(self) -> PlainTextResponse: """Returns the catalog yaml as plain text.""" ... def get_catalog_as_json(self) -> JSONResponse: """Returns the catalog as JSON.""" ...
- Make sure the catalog searcher, io class, and router class are included within their respective module
__init__.py
files. - At this upon spin-up the
factory.CatalogImplementationFactory
will recognize (via the decorators) each component of the catalog implementation and register it as valid. - Use
catalog_to_xpublish.CatalogImplementationFactory.get_all_implementations()
to return a dictionary of all registered catalog implementations.