-
Notifications
You must be signed in to change notification settings - Fork 2
/
add_CMIP6.py
139 lines (117 loc) · 4.85 KB
/
add_CMIP6.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import argparse
import json
import logging
import os
import sys
from typing import Any, MutableMapping, Optional, Union
from pystac import STACValidationError
from pystac.extensions.datacube import DatacubeExtension
from requests.sessions import Session
from STACpopulator import cli
from STACpopulator.log import add_logging_options
from STACpopulator.request_utils import add_request_options, apply_request_options
from STACpopulator.extensions.cmip6 import CMIP6Helper, CMIP6Properties
from STACpopulator.extensions.datacube import DataCubeHelper
from STACpopulator.extensions.thredds import THREDDSExtension, THREDDSHelper
from STACpopulator.input import ErrorLoader, GenericLoader, THREDDSLoader
from STACpopulator.models import GeoJSONPolygon
from STACpopulator.populator_base import STACpopulatorBase
LOGGER = logging.getLogger(__name__)
class CMIP6populator(STACpopulatorBase):
item_properties_model = CMIP6Properties
item_geometry_model = GeoJSONPolygon
def __init__(
self,
stac_host: str,
data_loader: GenericLoader,
update: Optional[bool] = False,
session: Optional[Session] = None,
config_file: Optional[Union[os.PathLike[str], str]] = None,
) -> None:
"""Constructor
:param stac_host: URL to the STAC API
:type stac_host: str
:param data_loader: loader to iterate over ingestion data.
"""
super().__init__(
stac_host, data_loader, update=update, session=session, config_file=config_file
)
def create_stac_item(
self, item_name: str, item_data: MutableMapping[str, Any]
) -> Union[None, MutableMapping[str, Any]]:
"""Creates the STAC item.
:param item_name: name of the STAC item. Interpretation of name is left to the input loader implementation
:type item_name: str
:param item_data: dictionary like representation of all information on the item
:type item_data: MutableMapping[str, Any]
:return: _description_
:rtype: MutableMapping[str, Any]
"""
# Add CMIP6 extension
try:
cmip_helper = CMIP6Helper(item_data, self.item_geometry_model)
item = cmip_helper.stac_item()
except Exception as e:
raise Exception("Failed to add CMIP6 extension") from e
# Add datacube extension
try:
dc_helper = DataCubeHelper(item_data)
dc_ext = DatacubeExtension.ext(item, add_if_missing=True)
dc_ext.apply(dimensions=dc_helper.dimensions, variables=dc_helper.variables)
except Exception as e:
raise Exception("Failed to add Datacube extension") from e
try:
thredds_helper = THREDDSHelper(item_data["access_urls"])
thredds_ext = THREDDSExtension.ext(item)
thredds_ext.apply(thredds_helper.services, thredds_helper.links)
except Exception as e:
raise Exception("Failed to add THREDDS extension") from e
try:
item.validate()
except STACValidationError as e:
raise Exception("Failed to validate STAC item") from e
# print(json.dumps(item.to_dict()))
return json.loads(json.dumps(item.to_dict()))
def add_parser_args(parser: argparse.ArgumentParser) -> None:
parser.description="CMIP6 STAC populator from a THREDDS catalog or NCML XML."
parser.add_argument("stac_host", help="STAC API URL")
parser.add_argument("href", help="URL to a THREDDS catalog or a NCML XML with CMIP6 metadata.")
parser.add_argument("--update", action="store_true", help="Update collection and its items")
parser.add_argument(
"--mode",
choices=["full", "single"],
default="full",
help="Operation mode, processing the full dataset or only the single reference.",
)
parser.add_argument(
"--config",
type=str,
help=(
"Override configuration file for the populator. "
"By default, uses the adjacent configuration to the implementation class."
),
)
add_request_options(parser)
add_logging_options(parser)
def runner(ns: argparse.Namespace) -> int:
LOGGER.info(f"Arguments to call: {vars(ns)}")
with Session() as session:
apply_request_options(session, ns)
if ns.mode == "full":
data_loader = THREDDSLoader(ns.href, session=session)
else:
# To be implemented
data_loader = ErrorLoader()
c = CMIP6populator(
ns.stac_host, data_loader, update=ns.update, session=session, config_file=ns.config
)
c.ingest()
return 0
def main(*args: str) -> int:
parser = argparse.ArgumentParser()
add_parser_args(parser)
ns = parser.parse_args(args or None)
ns.populator = os.path.basename(os.path.dirname(__file__))
return cli.run(ns)
if __name__ == "__main__":
sys.exit(main())