Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding feature to intercept errors and save information to files #45

Merged
merged 12 commits into from
Feb 21, 2024
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ reports
STACpopulator.egg-info/
build
*.pyc

## Logs
*.jsonl
22 changes: 9 additions & 13 deletions STACpopulator/api_requests.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
import logging
import os
from typing import Any, Optional
from typing import Any, Optional, Union

import requests
from requests import Session
from colorlog import ColoredFormatter

LOGGER = logging.getLogger(__name__)
LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
formatter = ColoredFormatter(LOGFORMAT)
stream = logging.StreamHandler()
stream.setFormatter(formatter)
LOGGER.addHandler(stream)
LOGGER.setLevel(logging.INFO)
LOGGER.propagate = False


def stac_host_reachable(url: str, session: Optional[Session] = None) -> bool:
Expand Down Expand Up @@ -81,7 +73,7 @@ def post_stac_item(
json_data: dict[str, dict],
update: Optional[bool] = True,
session: Optional[Session] = None,
) -> None:
) -> Union[None, str]:
"""Post a STAC item to the host server.

:param stac_host: address of the STAC host
Expand All @@ -107,8 +99,12 @@ def post_stac_item(
if update:
LOGGER.info(f"Item {item_id} already exists. Updating.")
r = session.put(os.path.join(stac_host, f"collections/{collection_id}/items/{item_id}"), json=json_data)
r.raise_for_status()
return f"Requests: {r.reason}"
# r.raise_for_status()
else:
LOGGER.info(f"Item {item_id} already exists.")
LOGGER.warn(f"Item {item_id} already exists.")
else:
r.raise_for_status()
return f"Requests: {r.reason}"
# r.raise_for_status()
dchandan marked this conversation as resolved.
Show resolved Hide resolved

return None
57 changes: 31 additions & 26 deletions STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import argparse
import json
import logging
import os
from typing import Any, MutableMapping, NoReturn, Optional, Union

from requests.sessions import Session
from pystac.extensions.datacube import DatacubeExtension
from requests.sessions import Session

from STACpopulator.cli import add_request_options, apply_request_options
from STACpopulator.extensions.cmip6 import CMIP6Properties, CMIP6Helper
from STACpopulator.extensions.cmip6 import CMIP6Helper, CMIP6Properties
from STACpopulator.extensions.datacube import DataCubeHelper
from STACpopulator.extensions.thredds import THREDDSHelper, THREDDSExtension
from STACpopulator.input import GenericLoader, ErrorLoader, THREDDSLoader
from STACpopulator.extensions.thredds import THREDDSExtension, THREDDSHelper
from STACpopulator.input import ErrorLoader, GenericLoader, THREDDSLoader
from STACpopulator.models import GeoJSONPolygon
from STACpopulator.populator_base import STACpopulatorBase
from STACpopulator.stac_utils import get_logger

LOGGER = get_logger(__name__)
LOGGER = logging.getLogger(__name__)


class CMIP6populator(STACpopulatorBase):
Expand All @@ -29,6 +29,7 @@ def __init__(
update: Optional[bool] = False,
session: Optional[Session] = None,
config_file: Optional[Union[os.PathLike[str], str]] = None,
log_debug: Optional[bool] = False,
) -> None:
"""Constructor

Expand All @@ -37,14 +38,12 @@ def __init__(
:param data_loader: loader to iterate over ingestion data.
"""
super().__init__(
stac_host,
data_loader,
update=update,
session=session,
config_file=config_file,
stac_host, data_loader, update=update, session=session, config_file=config_file, log_debug=log_debug
)

def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
def create_stac_item(
self, item_name: str, item_data: MutableMapping[str, Any]
) -> Union[None, MutableMapping[str, Any]]:
"""Creates the STAC item.

:param item_name: name of the STAC item. Interpretation of name is left to the input loader implementation
Expand All @@ -58,26 +57,23 @@ def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any])
try:
cmip_helper = CMIP6Helper(item_data, self.item_geometry_model)
item = cmip_helper.stac_item()
except Exception:
LOGGER.error("Failed to add CMIP6 extension to item %s", item_name)
raise
except Exception as e:
raise Exception("Failed to add CMIP6 extension") from e

# Add datacube extension
try:
dc_helper = DataCubeHelper(item_data)
dc_ext = DatacubeExtension.ext(item, add_if_missing=True)
dc_ext.apply(dimensions=dc_helper.dimensions, variables=dc_helper.variables)
except Exception:
LOGGER.error("Failed to add Datacube extension to item %s", item_name)
raise
except Exception as e:
raise Exception("Failed to add Datacube extension") from e

try:
thredds_helper = THREDDSHelper(item_data["access_urls"])
thredds_ext = THREDDSExtension.ext(item)
thredds_ext.apply(thredds_helper.services, thredds_helper.links)
except Exception:
LOGGER.error("Failed to add THREDDS references to item %s", item_name)
raise
except Exception as e:
raise Exception("Failed to add THREDDS extension") from e

# print(json.dumps(item.to_dict()))
return json.loads(json.dumps(item.to_dict()))
Expand All @@ -88,14 +84,21 @@ def make_parser() -> argparse.ArgumentParser:
parser.add_argument("stac_host", type=str, help="STAC API address")
parser.add_argument("href", type=str, help="URL to a THREDDS catalog or a NCML XML with CMIP6 metadata.")
parser.add_argument("--update", action="store_true", help="Update collection and its items")
parser.add_argument("--mode", choices=["full", "single"], default="full",
help="Operation mode, processing the full dataset or only the single reference.")
parser.add_argument(
"--config", type=str, help=(
"--mode",
choices=["full", "single"],
default="full",
help="Operation mode, processing the full dataset or only the single reference.",
)
parser.add_argument(
"--config",
type=str,
help=(
"Override configuration file for the populator. "
"By default, uses the adjacent configuration to the implementation class."
)
),
)
parser.add_argument("--debug", action="store_true", help="Set logger level to debug")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be in the parent argparser (generic one before specific implementation is called).
ns.debug will not exist if using other implementations than CMIP6

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I've moved it. I'm not sure I like the current position much. Right now, one has to do:

stac-populator --debug run CMIP6_UofT stac-host catalog-url --auth-handler cookie --auth-identity ~/daccs_cookie.txt

I'd prefer if one could instead just do:

stac-populator run CMIP6_UofT stac-host catalog-url --auth-handler cookie --auth-identity ~/daccs_cookie.txt --debug

By the way, this might be related to #46. I noticed that there is something off with the CLI parser system you've put into place. I'll let you look into it since you put it in and hence you probably understand it better than me.

Copy link
Collaborator

@fmigneault fmigneault Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To allow stac-populator run CMIP6_UofT --debug ..., you could define the logging options as their own log_parser = argparse.ArgumentParser(...) and pass that log_parser in parents:

parents=[populator_parser],

Need to pass it down also to parser_maker so it can add log_parser to its own parents as well.

populator_parser = parser_maker()

add_request_options(parser)
return parser

Expand All @@ -111,7 +114,9 @@ def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn:
# To be implemented
data_loader = ErrorLoader()

c = CMIP6populator(ns.stac_host, data_loader, update=ns.update, session=session, config_file=ns.config)
c = CMIP6populator(
ns.stac_host, data_loader, update=ns.update, session=session, config_file=ns.config, log_debug=ns.debug
)
c.ingest()


Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import logging
import os.path
from typing import Any, MutableMapping, NoReturn, Optional

Expand All @@ -8,9 +9,8 @@
from STACpopulator.input import STACDirectoryLoader
from STACpopulator.models import GeoJSONPolygon
from STACpopulator.populator_base import STACpopulatorBase
from STACpopulator.stac_utils import get_logger

LOGGER = get_logger(__name__)
LOGGER = logging.getLogger(__name__)


class DirectoryPopulator(STACpopulatorBase):
Expand Down
12 changes: 3 additions & 9 deletions STACpopulator/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,12 @@
import requests
import siphon
import xncml
from colorlog import ColoredFormatter
from requests.sessions import Session
from siphon.catalog import TDSCatalog, session_manager

from STACpopulator.stac_utils import numpy_to_python_datatypes, url_validate

LOGGER = logging.getLogger(__name__)
LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
formatter = ColoredFormatter(LOGFORMAT)
stream = logging.StreamHandler()
stream.setFormatter(formatter)
LOGGER.addHandler(stream)
LOGGER.setLevel(logging.INFO)
LOGGER.propagate = False


class GenericLoader(ABC):
Expand Down Expand Up @@ -149,7 +141,9 @@ def __iter__(self) -> Iterator[Tuple[str, str, MutableMapping[str, Any]]]:
if self.catalog_head.datasets.items():
for item_name, ds in self.catalog_head.datasets.items():
attrs = self.extract_metadata(ds)
yield item_name, ds.url_path, attrs
filename = ds.url_path[ds.url_path.rfind("/") :]
url = self.catalog_head.catalog_url[: self.catalog_head.catalog_url.rfind("/")] + filename
yield item_name, url, attrs

for name, ref in self.catalog_head.catalog_refs.items():
self.catalog_head = ref.follow()
Expand Down
128 changes: 128 additions & 0 deletions STACpopulator/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import datetime as dt
import json
import logging

LOG_RECORD_BUILTIN_ATTRS = {
"args",
"asctime",
"created",
"exc_info",
"exc_text",
"filename",
"funcName",
"levelname",
"levelno",
"lineno",
"module",
"msecs",
"message",
"msg",
"name",
"pathname",
"process",
"processName",
"relativeCreated",
"stack_info",
"thread",
"threadName",
"taskName",
}


def setup_logging(logfname: str, log_level: str) -> None:
"""Setup the logger for the app.

:param logfname: name of the file to which to write log outputs
:type logfname: str
:param log_level: base logging level (e.g. "INFO")
:type log_level: str
"""
config = logconfig
config["handlers"]["file"]["filename"] = logfname
for handler in config["handlers"]:
config["handlers"][handler]["level"] = log_level
logging.config.dictConfig(config)


class JSONLogFormatter(logging.Formatter):
# From: https://github.com/mCodingLLC/VideosSampleCode/tree/master/videos/135_modern_logging
def __init__(
self,
*,
fmt_keys: dict[str, str] | None = None,
):
super().__init__()
self.fmt_keys = fmt_keys if fmt_keys is not None else {}

def format(self, record: logging.LogRecord) -> str:
message = self._prepare_log_dict(record)
return json.dumps(message, default=str)

def _prepare_log_dict(self, record: logging.LogRecord):
always_fields = {
"message": record.getMessage(),
"timestamp": dt.datetime.fromtimestamp(record.created, tz=dt.timezone.utc).isoformat(),
}
if record.exc_info is not None:
always_fields["exc_info"] = self.formatException(record.exc_info)

if record.stack_info is not None:
always_fields["stack_info"] = self.formatStack(record.stack_info)

message = {
key: msg_val if (msg_val := always_fields.pop(val, None)) is not None else getattr(record, val)
for key, val in self.fmt_keys.items()
}
message.update(always_fields)

for key, val in record.__dict__.items():
if key not in LOG_RECORD_BUILTIN_ATTRS:
message[key] = val

return message


class NonErrorFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool | logging.LogRecord:
return record.levelno <= logging.INFO


logconfig = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"simple": {
"()": "colorlog.ColoredFormatter",
"format": " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s",
"datefmt": "%Y-%m-%dT%H:%M:%S%z",
},
"json": {
"()": "STACpopulator.logging.JSONLogFormatter",
dchandan marked this conversation as resolved.
Show resolved Hide resolved
"fmt_keys": {
"level": "levelname",
"message": "message",
"timestamp": "timestamp",
"logger": "name",
"module": "module",
"function": "funcName",
"line": "lineno",
"thread_name": "threadName",
},
},
},
"handlers": {
"stderr": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stderr",
},
"file": {
"class": "logging.FileHandler",
"level": "INFO",
"formatter": "json",
"filename": "__added_dynamically__",
},
},
"loggers": {"root": {"level": "DEBUG", "handlers": ["stderr", "file"]}},
}
Loading
Loading