Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding feature to intercept errors and save information to files #45

Merged
merged 12 commits into from
Feb 21, 2024
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ reports
STACpopulator.egg-info/
build
*.pyc

## Logs
*.jsonl
*.json
12 changes: 2 additions & 10 deletions STACpopulator/api_requests.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
import logging
import os
from typing import Any, Optional
from typing import Any, Optional, Union

import requests
from requests import Session
from colorlog import ColoredFormatter

LOGGER = logging.getLogger(__name__)
LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
formatter = ColoredFormatter(LOGFORMAT)
stream = logging.StreamHandler()
stream.setFormatter(formatter)
LOGGER.addHandler(stream)
LOGGER.setLevel(logging.INFO)
LOGGER.propagate = False


def stac_host_reachable(url: str, session: Optional[Session] = None) -> bool:
Expand Down Expand Up @@ -109,6 +101,6 @@ def post_stac_item(
r = session.put(os.path.join(stac_host, f"collections/{collection_id}/items/{item_id}"), json=json_data)
r.raise_for_status()
else:
LOGGER.info(f"Item {item_id} already exists.")
LOGGER.warn(f"Item {item_id} already exists.")
else:
r.raise_for_status()
60 changes: 43 additions & 17 deletions STACpopulator/cli.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import argparse
import glob
import importlib
import logging
import os
import sys
from datetime import datetime
from http import cookiejar
from typing import Callable, Optional

import requests
from http import cookiejar
from requests.auth import AuthBase, HTTPBasicAuth, HTTPDigestAuth, HTTPProxyAuth
from requests.sessions import Session

from STACpopulator import __version__
from STACpopulator.logging import setup_logging

POPULATORS = {}

Expand Down Expand Up @@ -54,19 +57,24 @@ def add_request_options(parser: argparse.ArgumentParser) -> None:
Adds arguments to a parser to allow update of a request session definition used across a populator procedure.
"""
parser.add_argument(
"--no-verify", "--no-ssl", "--no-ssl-verify", dest="verify", action="store_false",
help="Disable SSL verification (not recommended unless for development/test servers)."
)
parser.add_argument(
"--cert", type=argparse.FileType(), required=False, help="Path to a certificate file to use."
"--no-verify",
"--no-ssl",
"--no-ssl-verify",
dest="verify",
action="store_false",
help="Disable SSL verification (not recommended unless for development/test servers).",
)
parser.add_argument("--cert", type=argparse.FileType(), required=False, help="Path to a certificate file to use.")
parser.add_argument(
"--auth-handler", choices=["basic", "digest", "bearer", "proxy", "cookie"], required=False,
help="Authentication strategy to employ for the requests session."
"--auth-handler",
choices=["basic", "digest", "bearer", "proxy", "cookie"],
required=False,
help="Authentication strategy to employ for the requests session.",
)
parser.add_argument(
"--auth-identity", required=False,
help="Bearer token, cookie-jar file or proxy/digest/basic username:password for selected authorization handler."
"--auth-identity",
required=False,
help="Bearer token, cookie-jar file or proxy/digest/basic username:password for selected authorization handler.",
)


Expand All @@ -93,16 +101,25 @@ def apply_request_options(session: Session, namespace: argparse.Namespace) -> No

def make_main_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="stac-populator", description="STACpopulator operations.")
parser.add_argument("--version", "-V", action="version", version=f"%(prog)s {__version__}",
help="prints the version of the library and exits")
parser.add_argument(
"--version",
"-V",
action="version",
version=f"%(prog)s {__version__}",
help="prints the version of the library and exits",
)
commands = parser.add_subparsers(title="command", dest="command", description="STAC populator command to execute.")

run_cmd_parser = make_run_command_parser(parser.prog)
commands.add_parser(
"run",
prog=f"{parser.prog} {run_cmd_parser.prog}", parents=[run_cmd_parser],
formatter_class=run_cmd_parser.formatter_class, usage=run_cmd_parser.usage,
add_help=False, help=run_cmd_parser.description, description=run_cmd_parser.description
prog=f"{parser.prog} {run_cmd_parser.prog}",
parents=[run_cmd_parser],
formatter_class=run_cmd_parser.formatter_class,
usage=run_cmd_parser.usage,
add_help=False,
help=run_cmd_parser.description,
description=run_cmd_parser.description,
)

# add more commands as needed...
Expand Down Expand Up @@ -142,9 +159,12 @@ def make_run_command_parser(parent) -> argparse.ArgumentParser:
populator_prog = f"{parent} {parser.prog} {populator_name}"
subparsers.add_parser(
populator_name,
prog=populator_prog, parents=[populator_parser], formatter_class=populator_parser.formatter_class,
prog=populator_prog,
parents=[populator_parser],
formatter_class=populator_parser.formatter_class,
add_help=False, # add help disabled otherwise conflicts with this main populator help
help=populator_parser.description, description=populator_parser.description,
help=populator_parser.description,
description=populator_parser.description,
usage=populator_parser.usage,
)
POPULATORS[populator_name] = {
Expand All @@ -168,6 +188,12 @@ def main(*args: str) -> Optional[int]:
result = None
if populator_cmd == "run":
populator_name = params.pop("populator")

# Setup the application logger:
fname = f"{populator_name}_log_{datetime.utcnow().isoformat() + 'Z'}.jsonl"
log_level = logging.DEBUG if ns.debug else logging.INFO
setup_logging(fname, log_level)

if not populator_name:
parser.print_help()
return 0
Expand Down
57 changes: 31 additions & 26 deletions STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import argparse
import json
import logging
import os
from typing import Any, MutableMapping, NoReturn, Optional, Union

from requests.sessions import Session
from pystac.extensions.datacube import DatacubeExtension
from requests.sessions import Session

from STACpopulator.cli import add_request_options, apply_request_options
from STACpopulator.extensions.cmip6 import CMIP6Properties, CMIP6Helper
from STACpopulator.extensions.cmip6 import CMIP6Helper, CMIP6Properties
from STACpopulator.extensions.datacube import DataCubeHelper
from STACpopulator.extensions.thredds import THREDDSHelper, THREDDSExtension
from STACpopulator.input import GenericLoader, ErrorLoader, THREDDSLoader
from STACpopulator.extensions.thredds import THREDDSExtension, THREDDSHelper
from STACpopulator.input import ErrorLoader, GenericLoader, THREDDSLoader
from STACpopulator.models import GeoJSONPolygon
from STACpopulator.populator_base import STACpopulatorBase
from STACpopulator.stac_utils import get_logger

LOGGER = get_logger(__name__)
LOGGER = logging.getLogger(__name__)


class CMIP6populator(STACpopulatorBase):
Expand All @@ -29,6 +29,7 @@ def __init__(
update: Optional[bool] = False,
session: Optional[Session] = None,
config_file: Optional[Union[os.PathLike[str], str]] = None,
log_debug: Optional[bool] = False,
) -> None:
"""Constructor

Expand All @@ -37,14 +38,12 @@ def __init__(
:param data_loader: loader to iterate over ingestion data.
"""
super().__init__(
stac_host,
data_loader,
update=update,
session=session,
config_file=config_file,
stac_host, data_loader, update=update, session=session, config_file=config_file, log_debug=log_debug
)

def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
def create_stac_item(
self, item_name: str, item_data: MutableMapping[str, Any]
) -> Union[None, MutableMapping[str, Any]]:
"""Creates the STAC item.

:param item_name: name of the STAC item. Interpretation of name is left to the input loader implementation
Expand All @@ -58,26 +57,23 @@ def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any])
try:
cmip_helper = CMIP6Helper(item_data, self.item_geometry_model)
item = cmip_helper.stac_item()
except Exception:
LOGGER.error("Failed to add CMIP6 extension to item %s", item_name)
raise
except Exception as e:
raise Exception("Failed to add CMIP6 extension") from e

# Add datacube extension
try:
dc_helper = DataCubeHelper(item_data)
dc_ext = DatacubeExtension.ext(item, add_if_missing=True)
dc_ext.apply(dimensions=dc_helper.dimensions, variables=dc_helper.variables)
except Exception:
LOGGER.error("Failed to add Datacube extension to item %s", item_name)
raise
except Exception as e:
raise Exception("Failed to add Datacube extension") from e

try:
thredds_helper = THREDDSHelper(item_data["access_urls"])
thredds_ext = THREDDSExtension.ext(item)
thredds_ext.apply(thredds_helper.services, thredds_helper.links)
except Exception:
LOGGER.error("Failed to add THREDDS references to item %s", item_name)
raise
except Exception as e:
raise Exception("Failed to add THREDDS extension") from e

# print(json.dumps(item.to_dict()))
return json.loads(json.dumps(item.to_dict()))
Expand All @@ -88,14 +84,21 @@ def make_parser() -> argparse.ArgumentParser:
parser.add_argument("stac_host", type=str, help="STAC API address")
parser.add_argument("href", type=str, help="URL to a THREDDS catalog or a NCML XML with CMIP6 metadata.")
parser.add_argument("--update", action="store_true", help="Update collection and its items")
parser.add_argument("--mode", choices=["full", "single"], default="full",
help="Operation mode, processing the full dataset or only the single reference.")
parser.add_argument(
"--config", type=str, help=(
"--mode",
choices=["full", "single"],
default="full",
help="Operation mode, processing the full dataset or only the single reference.",
)
parser.add_argument(
"--config",
type=str,
help=(
"Override configuration file for the populator. "
"By default, uses the adjacent configuration to the implementation class."
)
),
)
parser.add_argument("--debug", action="store_true", help="Set logger level to debug")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be in the parent argparser (generic one before specific implementation is called).
ns.debug will not exist if using other implementations than CMIP6

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I've moved it. I'm not sure I like the current position much. Right now, one has to do:

stac-populator --debug run CMIP6_UofT stac-host catalog-url --auth-handler cookie --auth-identity ~/daccs_cookie.txt

I'd prefer if one could instead just do:

stac-populator run CMIP6_UofT stac-host catalog-url --auth-handler cookie --auth-identity ~/daccs_cookie.txt --debug

By the way, this might be related to #46. I noticed that there is something off with the CLI parser system you've put into place. I'll let you look into it since you put it in and hence you probably understand it better than me.

Copy link
Collaborator

@fmigneault fmigneault Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To allow stac-populator run CMIP6_UofT --debug ..., you could define the logging options as their own log_parser = argparse.ArgumentParser(...) and pass that log_parser in parents:

parents=[populator_parser],

Need to pass it down also to parser_maker so it can add log_parser to its own parents as well.

populator_parser = parser_maker()

add_request_options(parser)
return parser

Expand All @@ -111,7 +114,9 @@ def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn:
# To be implemented
data_loader = ErrorLoader()

c = CMIP6populator(ns.stac_host, data_loader, update=ns.update, session=session, config_file=ns.config)
c = CMIP6populator(
ns.stac_host, data_loader, update=ns.update, session=session, config_file=ns.config, log_debug=ns.debug
)
c.ingest()


Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import logging
import os.path
from typing import Any, MutableMapping, NoReturn, Optional

Expand All @@ -8,9 +9,8 @@
from STACpopulator.input import STACDirectoryLoader
from STACpopulator.models import GeoJSONPolygon
from STACpopulator.populator_base import STACpopulatorBase
from STACpopulator.stac_utils import get_logger

LOGGER = get_logger(__name__)
LOGGER = logging.getLogger(__name__)


class DirectoryPopulator(STACpopulatorBase):
Expand Down
12 changes: 3 additions & 9 deletions STACpopulator/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,12 @@
import requests
import siphon
import xncml
from colorlog import ColoredFormatter
from requests.sessions import Session
from siphon.catalog import TDSCatalog, session_manager

from STACpopulator.stac_utils import numpy_to_python_datatypes, url_validate

LOGGER = logging.getLogger(__name__)
LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
formatter = ColoredFormatter(LOGFORMAT)
stream = logging.StreamHandler()
stream.setFormatter(formatter)
LOGGER.addHandler(stream)
LOGGER.setLevel(logging.INFO)
LOGGER.propagate = False


class GenericLoader(ABC):
Expand Down Expand Up @@ -149,7 +141,9 @@ def __iter__(self) -> Iterator[Tuple[str, str, MutableMapping[str, Any]]]:
if self.catalog_head.datasets.items():
for item_name, ds in self.catalog_head.datasets.items():
attrs = self.extract_metadata(ds)
yield item_name, ds.url_path, attrs
filename = ds.url_path[ds.url_path.rfind("/") :]
url = self.catalog_head.catalog_url[: self.catalog_head.catalog_url.rfind("/")] + filename
yield item_name, url, attrs

for name, ref in self.catalog_head.catalog_refs.items():
self.catalog_head = ref.follow()
Expand Down
Loading
Loading