Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving HRRR to Grib files #165

Merged
merged 9 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Interpolation between arbitrary lat-lon grids
- Added hybrid level support to HRRR data source

### Changed

- Switched HRRR data source back to AWS grib

### Deprecated

### Removed
Expand Down
300 changes: 243 additions & 57 deletions earth2studio/data/hrrr.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
# limitations under the License.

import asyncio
import hashlib
import os
import pathlib
import re
import shutil
from datetime import datetime, timedelta

Expand All @@ -36,7 +38,6 @@
prep_forecast_inputs,
)
from earth2studio.lexicon import HRRRFXLexicon, HRRRLexicon
from earth2studio.lexicon.base import LexiconType
from earth2studio.utils.type import LeadTimeArray, TimeArray, VariableArray

logger.remove()
Expand All @@ -45,30 +46,31 @@

class _HRRRBase:

HRRR_BUCKET_NAME = "hrrrzarr"
HRRR_BUCKET_NAME = "noaa-hrrr-bdp-pds"
HRRR_X = np.arange(1799)
HRRR_Y = np.arange(1059)
MAX_BYTE_SIZE = 5000000

def __init__(self, lexicon: LexiconType, cache: bool = True, verbose: bool = True):
def __init__(
self,
cache: bool = True,
verbose: bool = True,
):
self._cache = cache
self._lexicon = lexicon
self._verbose = verbose

fs = s3fs.S3FileSystem(
self.fs = s3fs.S3FileSystem(
anon=True,
default_block_size=2**20,
client_kwargs={},
)

if self._cache:
cache_options = {
"cache_storage": self.cache,
"expiry_time": 31622400, # 1 year
}
fs = WholeFileCacheFileSystem(fs=fs, **cache_options)

fs_map = fsspec.FSMap(f"s3://{self.HRRR_BUCKET_NAME}", fs)
self.zarr_group = zarr.open(fs_map, mode="r")
# Doesnt work with read block, only use with index file
cache_options = {
"cache_storage": self.cache,
"expiry_time": 31622400, # 1 year
}
self.fsc = WholeFileCacheFileSystem(fs=self.fs, **cache_options)

async def async_fetch(
self,
Expand Down Expand Up @@ -111,47 +113,100 @@ async def async_fetch(
"variable": variable,
"hrrr_x": self.HRRR_X,
"hrrr_y": self.HRRR_Y,
"lat": (
["hrrr_y", "hrrr_x"],
self.zarr_group["grid"]["HRRR_chunk_index.zarr"]["latitude"][:],
),
"lon": (
["hrrr_y", "hrrr_x"],
self.zarr_group["grid"]["HRRR_chunk_index.zarr"]["longitude"][:]
+ 360,
), # Change to [0,360]
},
)
# Banking on async calls in zarr 3.0
for i, t in enumerate(time):
for j, ld in enumerate(lead_time):
for k, v in enumerate(variable):
try:
hrrr_str, modifier = self._lexicon[v]
hrrr_class, hrrr_product, hrrr_level, hrrr_var = hrrr_str.split(
"::"
)
except KeyError as e:
logger.error(f"variable id {v} not found in HRRR lexicon")
raise e

date_group = t.strftime("%Y%m%d")
time_group = t.strftime(f"%Y%m%d_%Hz_{hrrr_product}.zarr")
args = [
(t, i, ld, j, v, k)
for k, v in enumerate(variable)
for j, ld in enumerate(lead_time) # noqa
for i, t in enumerate(time)
]
pbar = tqdm(
total=len(args), desc="Fetching HRRR data", disable=(not self._verbose)
)

logger.debug(
f"Fetching HRRR {hrrr_product} variable {v} at {t.isoformat()}"
)
for (t, i, ld, j, v, k) in args:
logger.debug(
f"Fetching HRRR data for variable: {v} at {t.isoformat()} lead time {ld}"
)
try:
# Set lexicon based on lead
lexicon: type[HRRRLexicon] | type[HRRRFXLexicon] = HRRRLexicon
if ld.total_seconds() != 0:
lexicon = HRRRFXLexicon
hrrr_str, modifier = lexicon[v]
hrrr_class, hrrr_product, hrrr_level, hrrr_var = hrrr_str.split("::")
hrrr_regex = lexicon.index_regex(v, t, ld)
except KeyError as e:
logger.error(f"variable id {v} not found in HRRR lexicon")
raise e

date_group = t.strftime("%Y%m%d")
forcast_hour = t.strftime("%H")
lead_index = int(ld.total_seconds() // 3600)
s3_grib_uri = f"{self.HRRR_BUCKET_NAME}/hrrr.{date_group}/conus/hrrr.t{forcast_hour}z.wrf{hrrr_class}f{lead_index:02}.grib2"

# Download the grib index file and parse
with self.fsc.open(f"{s3_grib_uri}.idx") as file:
index_lines = [line.decode("utf-8").rstrip() for line in file]
# Add dummy variable at end of file with max offset
index_lines.append(
f"xx:{self.fsc.size(s3_grib_uri)}:d=xx:NULL:NULL:NULL:NULL"
)

data = self.zarr_group[hrrr_class][date_group][time_group][
hrrr_level
][hrrr_var][hrrr_level][hrrr_var]
if hrrr_product == "fcst":
# Minus 1 here because index 0 is forecast with leadtime 1hr
# forecast_period coordinate system tells what the lead times are in hours
lead_index = int(ld.total_seconds() // 3600) - 1
data = data[lead_index]
byte_offset = None
byte_length = None
for line_index, line in enumerate(index_lines[:-1]):
lsplit = line.split(":")
if len(lsplit) < 7:
continue
# If match get in byte offset and length
if bool(re.search(hrrr_regex, line)):
nlsplit = index_lines[line_index + 1].split(":")
byte_length = int(nlsplit[1]) - int(lsplit[1])
byte_offset = int(lsplit[1])
key = f"{lsplit[3]}::{lsplit[4]}"
if byte_length > self.MAX_BYTE_SIZE:
raise ValueError(
f"Byte length, {byte_length}, of variable {key} larger than safe threshold of {self.MAX_BYTE_SIZE}"
)
# If byte offset is not raise error
if byte_offset is None or byte_length is None:
raise KeyError(
f"Could not find variable {hrrr_var} level {hrrr_level} in index file"
)
# Read grib block into cache location
sha = hashlib.sha256(
(s3_grib_uri + str(byte_offset) + str(byte_length)).encode()
)
filename = sha.hexdigest()
cache_path = os.path.join(self.cache, filename)
if not pathlib.Path(cache_path).is_file():
grib_buffer = self.fs.read_block(
s3_grib_uri, offset=byte_offset, length=byte_length
)
with open(cache_path, "wb") as file:
file.write(grib_buffer)

da = xr.open_dataarray(
cache_path,
engine="cfgrib",
backend_kwargs={"indexpath": ""},
)
hrrr_da[i, j, k] = modifier(da.values)
# Add lat lon coords if not present
if "lat" not in hrrr_da.coords:
hrrr_da.coords["lat"] = (
["hrrr_y", "hrrr_x"],
da.coords["latitude"].values,
)
hrrr_da.coords["lon"] = (
["hrrr_y", "hrrr_x"],
da.coords["longitude"].values,
)

hrrr_da[i, j, k] = data
pbar.update(1)

# Delete cache if needed
if not self._cache:
Expand Down Expand Up @@ -224,11 +279,136 @@ def available(

# Object store directory for given date
date_group = time.strftime("%Y%m%d")
time_group = time.strftime("%Y%m%d_%Hz_anl.zarr")
s3_uri = f"s3://{cls.HRRR_BUCKET_NAME}/sfc/{date_group}/{time_group}"
exists = fs.exists(s3_uri)
forcast_hour = time.strftime("%H")
s3_uri = f"{cls.HRRR_BUCKET_NAME}/hrrr.{date_group}/conus/hrrr.t{forcast_hour}z.wrfsfcf00.grib2.idx"

return fs.exists(s3_uri)


class _HRRRZarrBase(_HRRRBase):
# Not used but keeping here for now for future reference
HRRR_BUCKET_NAME = "hrrrzarr"
HRRR_X = np.arange(1799)
HRRR_Y = np.arange(1059)

def __init__(
self,
lexicon: HRRRLexicon | HRRRFXLexicon,
cache: bool = True,
verbose: bool = True,
):
self._cache = cache
self._lexicon = lexicon
self._verbose = verbose

return exists
fs = s3fs.S3FileSystem(
anon=True,
default_block_size=2**20,
client_kwargs={},
)

if self._cache:
cache_options = {
"cache_storage": self.cache,
"expiry_time": 31622400, # 1 year
}
fs = WholeFileCacheFileSystem(fs=fs, **cache_options)

fs_map = fsspec.FSMap(f"s3://{self.HRRR_BUCKET_NAME}", fs)
self.zarr_group = zarr.open(fs_map, mode="r")

async def async_fetch(
self,
time: list[datetime],
lead_time: list[timedelta],
variable: list[str],
) -> xr.DataArray:
"""Async function to retrieve HRRR forecast data into a single Xarray data array

Parameters
----------
time : list[datetime]
Timestamps to return data for (UTC).
lead_time: list[timedelta]
List of forecast lead times to fetch
variable : str | list[str] | VariableArray
String, list of strings or array of strings that refer to variables to
return. Must be in the HRRR lexicon.

Returns
-------
xr.DataArray
HRRR weather data array
"""

hrrr_da = xr.DataArray(
data=np.empty(
(
len(time),
len(lead_time),
len(variable),
len(self.HRRR_Y),
len(self.HRRR_X),
)
),
dims=["time", "lead_time", "variable", "hrrr_y", "hrrr_x"],
coords={
"time": time,
"lead_time": lead_time,
"variable": variable,
"hrrr_x": self.HRRR_X,
"hrrr_y": self.HRRR_Y,
"lat": (
["hrrr_y", "hrrr_x"],
self.zarr_group["grid"]["HRRR_chunk_index.zarr"]["latitude"][:],
),
"lon": (
["hrrr_y", "hrrr_x"],
self.zarr_group["grid"]["HRRR_chunk_index.zarr"]["longitude"][:]
+ 360,
), # Change to [0,360]
},
)
# Banking on async calls in zarr 3.0
for i, t in enumerate(time):
for j, ld in enumerate(lead_time):
# Set lexicon based on lead
lexicon: type[HRRRLexicon] | type[HRRRFXLexicon] = HRRRLexicon
if ld.total_seconds() != 0:
lexicon = HRRRFXLexicon
for k, v in enumerate(variable):
try:
hrrr_str, modifier = lexicon[v]
hrrr_class, hrrr_product, hrrr_level, hrrr_var = hrrr_str.split(
"::"
)
except KeyError as e:
logger.error(f"variable id {v} not found in HRRR lexicon")
raise e

date_group = t.strftime("%Y%m%d")
time_group = t.strftime(f"%Y%m%d_%Hz_{hrrr_product}.zarr")

logger.debug(
f"Fetching HRRR {hrrr_product} variable {v} at {t.isoformat()}"
)

data = self.zarr_group[hrrr_class][date_group][time_group][
hrrr_level
][hrrr_var][hrrr_level][hrrr_var]
if hrrr_product == "fcst":
# Minus 1 here because index 0 is forecast with leadtime 1hr
# forecast_period coordinate system tells what the lead times are in hours
lead_index = int(ld.total_seconds() // 3600) - 1
data = data[lead_index]

hrrr_da[i, j, k] = modifier(data)

# Delete cache if needed
if not self._cache:
shutil.rmtree(self.cache)

return hrrr_da


class HRRR(_HRRRBase):
Expand Down Expand Up @@ -261,7 +441,7 @@ class HRRR(_HRRRBase):
"""

def __init__(self, cache: bool = True, verbose: bool = True):
super().__init__(HRRRLexicon, cache, verbose)
super().__init__(cache, verbose)

def __call__(
self,
Expand Down Expand Up @@ -317,6 +497,11 @@ class HRRR_FX(_HRRRBase):
This is a remote data source and can potentially download a large amount of data
to your local machine for large requests.

Note
----
48 hour forecasts are provided on 6 hour intervals. 18 hour forecasts are generated
hourly.

Note
----
Additional information on the data repository can be referenced here:
Expand All @@ -327,7 +512,7 @@ class HRRR_FX(_HRRRBase):
"""

def __init__(self, cache: bool = True, verbose: bool = True):
super().__init__(HRRRFXLexicon, cache, verbose)
super().__init__(cache, verbose)

def __call__(
self,
Expand Down Expand Up @@ -380,7 +565,8 @@ def _validate_leadtime(cls, lead_times: list[timedelta]) -> None:
f"Requested lead time {delta} needs to be 1 hour interval for HRRR"
)
hours = int(delta.total_seconds() // 3600)
if hours > 18 or hours < 1:
# Note, one forecasts every 6 hours have 2 day lead times, others only have 18 hours
if hours > 48 or hours < 0:
raise ValueError(
f"Requested lead time {delta} can only be between [1,18] hours for HRRR forecast"
f"Requested lead time {delta} can only be between [0,48] hours for HRRR forecast"
)
Loading