Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supporting collection of local files #144

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [0.4.0a0] - 2024-11-xx

### Added
- support for local directory of IC files

### Changed

Expand Down
2 changes: 1 addition & 1 deletion earth2studio/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
from .rx import CosineSolarZenith, LandSeaMask, SurfaceGeoPotential
from .utils import datasource_to_file, fetch_data, prep_data_array
from .wb2 import WB2ERA5, WB2Climatology, WB2ERA5_32x64, WB2ERA5_121x240
from .xr import DataArrayFile, DataSetFile
from .xr import DataArrayFile, DataSetFile, DataArrayDirectory
6 changes: 4 additions & 2 deletions earth2studio/data/arco.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ class ARCO:
ARCO_LAT = np.linspace(90, -90, 721)
ARCO_LON = np.linspace(0, 359.75, 1440)

def __init__(self, cache: bool = True, verbose: bool = True):
def __init__(self, cache: bool = True,
verbose: bool = True,
async_timeout: int = 600):
self._cache = cache
self._verbose = verbose

Expand All @@ -90,7 +92,7 @@ def __init__(self, cache: bool = True, verbose: bool = True):
"gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3", fs
)
self.zarr_group = zarr.open(fs_map, mode="r")
self.async_timeout = 600
self.async_timeout = async_timeout
self.async_process_limit = 4

def __call__(
Expand Down
4 changes: 4 additions & 0 deletions earth2studio/data/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def datasource_to_file(
lead_time: LeadTimeArray = np.array([np.timedelta64(0, "h")]),
backend: Literal["netcdf", "zarr"] = "netcdf",
chunks: dict[str, int] = {"variable": 1},
dtype: np.dtype | None = None,
) -> None:
"""Utility function that can be used for building a local data store needed
for an inference request. This file can then be used with the
Expand Down Expand Up @@ -221,6 +222,9 @@ def datasource_to_file(
da = da.assign_coords(time=time)
da = da.chunk(chunks=chunks)

if dtype is not None:
da = da.astype(dtype=dtype)

match backend:
case "netcdf":
da.to_netcdf(file_name)
Expand Down
71 changes: 71 additions & 0 deletions earth2studio/data/xr.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DataArrayDirectory needs a unit test, see earth2studio/test/data/test_xr.py for examples.

Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from datetime import datetime
from typing import Any
from numpy import ndarray
from pandas import to_datetime
from datetime import datetime

import xarray as xr

Expand Down Expand Up @@ -94,3 +98,70 @@ def __call__(
Loaded data array
"""
return self.da.sel(time=time, variable=variable)


class DataArrayDirectory:
"""A local xarray dataarray directory data source. This file should be compatable with
xarray. For example, a netCDF file. the structure of the directory should be like
path/to/monthly/files
|___2020
| |___2020_01.nc
| |___2020_02.nc
| |___ ...
|
|___2021
|___2021_01.nc
|___...

Parameters
----------
file_path : str
Path to xarray data array compatible file.
"""

def __init__(self, dir_path: str, **xr_args: Any):
self.dir_path = dir_path
self.das = {}
for yr in os.listdir(self.dir_path):
yr_dir = os.path.join(self.dir_path, yr)
if os.path.isdir(yr_dir):
self.das[yr] = {}
for fl in os.listdir(yr_dir):
pth = os.path.join(yr_dir, fl)
if os.path.isfile(pth):
try:
arr = xr.open_dataarray(pth, **xr_args)
except:
continue
mon = fl.split('.')[0].split('_')[-1]
self.das[yr][mon] = arr

def __call__(
self,
time: datetime | list[datetime] | TimeArray,
variable: str | list[str] | VariableArray,
) -> xr.DataArray:
"""Function to get data.

Parameters
----------
time : datetime | list[datetime] | TimeArray
Timestamps to return data for.
variable : str | list[str] | VariableArray
Strings or list of strings that refer to variables to return.

Returns
-------
xr.DataArray
Loaded data array
"""
if not (isinstance(time, list) or isinstance(time, ndarray)):
time = [time]

arrs = []
for tt in time:
yr = str(to_datetime(tt).year)
mon = str(to_datetime(tt).month).zfill(2)
arrs.append(self.das[yr][mon].sel(time=tt, variable=variable))

return xr.concat(arrs, dim='time')
2 changes: 1 addition & 1 deletion earth2studio/models/auto/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def default_timeout(cls) -> int:
int
Time out in seconds
"""
default_timeout = 300
default_timeout = 3000
try:
timeout = os.environ.get("EARTH2STUDIO_PACKAGE_TIMEOUT", default_timeout)
default_timeout = int(timeout)
Expand Down