Skip to content

Commit

Permalink
feat(repositories): Working MetOffice Datahub Implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Nov 15, 2024
1 parent bcbcb96 commit fd7362c
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ docs/
# Environments
.venv
uv.lock
.env

# mypy
.mypy_cache/
Expand Down
2 changes: 1 addition & 1 deletion src/nwp_consumer/internal/entities/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def metadata(self) -> ParameterData:
"to the equilibrium vapour pressure of water",
units="%",
limits=ParameterLimits(upper=100, lower=0),
alternate_shortnames=["r"],
alternate_shortnames=["r", "r2"],
)
case self.VISIBILITY_SL.name:
return ParameterData(
Expand Down
9 changes: 9 additions & 0 deletions src/nwp_consumer/internal/entities/repometadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import dataclasses
import datetime as dt
import os

import pandas as pd

Expand Down Expand Up @@ -144,6 +145,14 @@ def month_its(self, year: int, month: int) -> list[dt.datetime]:
its.append(dt.datetime(year, month, day, hour, tzinfo=dt.UTC))
return its

def missing_required_envs(self) -> list[str]:
"""Get a list of unset required environment variables.
Returns:
A list of missing environment variables.
"""
return [var for var in self.required_env if var not in os.environ]

def __str__(self) -> str:
"""Return a pretty-printed string representation of the metadata."""
pretty: str = "".join((
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,11 @@ def authenticate(cls) -> ResultE["CEDAFTPModelRepository"]:
Returns:
A Result containing the instantiated class if successful, or an error if not.
"""
if all(k not in os.environ for k in cls.repository().required_env):
return Failure(ValueError(
f"Missing required environment variables: {cls.repository().required_env}",
missing_envs = cls.repository().missing_required_envs()
if len(missing_envs) > 0:
return Failure(OSError(
f"Cannot authenticate with CEDA FTP service due to "
f"missing required environment variables: {', '.join(missing_envs)}",
))
username: str = urllib.parse.quote(os.environ["CEDA_FTP_USER"])
password: str = urllib.parse.quote(os.environ["CEDA_FTP_PASS"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ def fetch_init_data(self, it: dt.datetime) \
@classmethod
@override
def authenticate(cls) -> ResultE["ECMWFRealTimeS3ModelRepository"]:
missing_envs = cls.repository().missing_required_envs()
if len(missing_envs) > 0:
return Failure(OSError(
f"Cannot authenticate with ECMWF Realtime S3 service due to "
f"missing required environment variables: {', '.join(missing_envs)}",
))
try:
bucket: str = os.environ["ECMWF_REALTIME_S3_BUCKET"]
_fs: s3fs.S3FileSystem = s3fs.S3FileSystem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,24 @@
Documented Structure
--------------------
TODO: Document filestructure
MetOffice provide a number of models, a few of which OCF consume. Their flagship deterministic
model us called the "Unified Model" (UM) and is run in two configurations: "Global" and "UK".
The "Global" model has a resolution of 10km and the "UK" model has a resolution of 2km.
See `https://datahub.metoffice.gov.uk/docs/f/category/atmospheric/overview`_ for more information.
Data is provided on a per-order basis, so the filestructure depends on the order ID.
For OCF's purposes, on file per parameter per step is requested.
Actual Structure
----------------
The latitude and longitude increments are ascertained from the GRIB2 file's metadata:
.. code-block:: none
iDirectionIncrementInDegrees: 0.140625
jDirectionIncrementInDegrees: 0.09375
"""

import datetime as dt
Expand Down Expand Up @@ -53,6 +70,7 @@ def __init__(self, order_id: str, api_key: str) -> None:
self.order_id = order_id
self.request_url = f"{self.base_url}/{self.order_id}/latest"


@staticmethod
@override
def repository() -> entities.ModelRepositoryMetadata:
Expand All @@ -73,7 +91,7 @@ def repository() -> entities.ModelRepositoryMetadata:
def model() -> entities.ModelMetadata:
return entities.ModelMetadata(
name="UM-Global",
resolution="17km",
resolution="10km",
expected_coordinates=entities.NWPDimensionCoordinateMap(
init_time=[],
step=list(range(0, 55)),
Expand All @@ -93,10 +111,12 @@ def model() -> entities.ModelMetadata:
],
),
latitude=[
float(f"{lat:.4f}") for lat in np.arange(89.856, -89.856 - 0.156, -0.156)
float(f"{lat:.4f}")
for lat in np.arange(89.953125, -89.953125 - 0.09375, -0.09375)
],
longitude=[
float(f"{lon:.4f}") for lon in np.concatenate([np.arange(-179.87, 180, 0.234)])
float(f"{lon:.4f}")
for lon in np.arange(-179.929687, 179.929688 + 0.140625, 0.140625)
],
),
)
Expand All @@ -105,9 +125,11 @@ def model() -> entities.ModelMetadata:
@override
def authenticate(cls) -> ResultE["MetOfficeDatahubModelRepository"]:
"""Authenticate with the MetOffice DataHub service."""
if all(k not in os.environ for k in cls.repository().required_env):
return Failure(ValueError(
f"Missing required environment variables: {cls.repository().required_env}",
missing_envs = cls.repository().missing_required_envs()
if len(missing_envs) > 0:
return Failure(OSError(
f"Cannot authenticate with MetOffice DataHub service due to "
f"missing required environment variables: {', '.join(missing_envs)}",
))
api_key: str = os.environ["METOFFICE_API_KEY"]
order_id: str = os.environ["METOFFICE_ORDER_ID"]
Expand Down Expand Up @@ -250,10 +272,10 @@ def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]:
),
)

# Wind parameters are surfaced in the dataset as 'unknown'
# Some parameters are surfaced in the dataset as 'unknown'
# and have to be differentiated via the parameterNumber attribute
# which lines up with the last number in the GRIB2 code specified below
# https://datahub.metoffice.gov.uk/docs/glossary?groups=Wind&sortOrder=GRIB2_CODE
# https://datahub.metoffice.gov.uk/docs/glossary?sortOrder=GRIB2_CODE
name = next(iter(ds.data_vars))
parameter_number = ds[name].attrs["GRIB_parameterNumber"]
match name, parameter_number:
Expand All @@ -265,10 +287,12 @@ def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]:
ds = ds.rename({name: "wdir"})
case "unknown", 195:
ds = ds.rename({name: "wdir10"})
case "unknown", 1:
ds = ds.rename({name: "tcc"})
case "unknown", _:
log.warning(
"Encountered unknown parameter with parameterNumber %s",
parameter_number,
f"Encountered unknown parameter with parameterNumber {parameter_number} "
f"in file '{path}'.",
)

try:
Expand Down Expand Up @@ -297,8 +321,9 @@ def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]:
except Exception as e:
return Failure(
ValueError(
f"Error processing {path} to DataArray: {e}",
f"Error processing DataArray for path '{path}'. Error context: {e}",
),
)


return Success([da])

0 comments on commit fd7362c

Please sign in to comment.