Skip to content

Commit

Permalink
release(earthdaily) : v0.2.12
Browse files Browse the repository at this point in the history
v0.2.12
  • Loading branch information
nkarasiak authored Oct 31, 2024
2 parents c0a9bc8 + 21729b5 commit 6f66291
Show file tree
Hide file tree
Showing 4 changed files with 330 additions and 63 deletions.
2 changes: 1 addition & 1 deletion earthdaily/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# to hide warnings from rioxarray or nano seconds conversion
# warnings.filterwarnings("ignore")

__version__ = "0.2.11"
__version__ = "0.2.12"


def EarthDataStore(
Expand Down
66 changes: 4 additions & 62 deletions earthdaily/earthdatastore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,73 +21,14 @@
from itertools import chain
from odc import stac
from . import _scales_collections, cube_utils, mask

from .parallel_search import parallel_search
from .cube_utils import datacube, metacube, _datacubes, asset_mapper

__all__ = ["datacube", "metacube", "xr", "stac"]

logging.getLogger("earthdaily-earthdatastore")


class NoItems(Warning):
pass


_no_item_msg = NoItems("No item has been found for your query.")


def _datetime_to_str(datetime):
start, end = ItemSearch(url=None)._format_datetime(datetime).split("/")
return start, end


def _datetime_split(datetime, freq="auto"):
start, end = [pd.Timestamp(date) for date in _datetime_to_str(datetime)]
diff = end - start
if freq == "auto":
# freq increases of 5 days every 6 months
freq = diff // (5 + 5 * (diff.days // 183))
else:
freq = pd.Timedelta(days=freq)
if diff.days < freq.days:
return datetime
logging.info(f"Parallel search with datetime split every {freq.days} days.")
return [
(chunk, min(chunk + freq, end))
for chunk in pd.date_range(start, end, freq=freq)[:-1]
]


def _parallel_search(func):
def _search(*args, **kwargs):
from joblib import Parallel, delayed

t0 = time.time()
kwargs.setdefault("batch_days", "auto")
batch_days = kwargs.get("batch_days", None)
datetime = kwargs.get("datetime", None)
need_parallel = False
if datetime and batch_days is not None:
datetimes = _datetime_split(datetime, batch_days)
need_parallel = True if len(datetimes) > 1 else False
if need_parallel:
kwargs.pop("datetime")
kwargs["raise_no_items"] = False
items = Parallel(n_jobs=10, backend="threading")(
delayed(func)(*args, datetime=datetime, **kwargs)
for datetime in datetimes
)
items = ItemCollection(chain(*items))
if len(items) == 0:
raise _no_item_msg
if not need_parallel:
items = func(*args, **kwargs)
logging.info(f"Search/load items : {np.round(time.time()-t0,3)}s.")
return items

return _search


def post_query_items(items, query):
"""Applies query to items fetched from the STAC catalog.
Expand Down Expand Up @@ -1112,7 +1053,7 @@ def _update_search_for_assets(self, assets):
fields["include"].extend([f"assets.{asset}" for asset in assets])
return fields

@_parallel_search
@parallel_search
def search(
self,
collections: str | list,
Expand All @@ -1124,6 +1065,7 @@ def search(
assets=None,
raise_no_items=True,
batch_days="auto",
n_jobs=-1,
**kwargs,
):
"""
Expand Down Expand Up @@ -1268,7 +1210,7 @@ def search(
if post_query:
items_collection = post_query_items(items_collection, post_query)
if len(items_collection) == 0 and raise_no_items:
raise _no_item_msg
raise parallel_search.NoItemsFoundError("No items found.")
return items_collection

def find_cloud_mask_items(
Expand Down
2 changes: 2 additions & 0 deletions earthdaily/earthdatastore/cube_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ def _cube_odc(
metadata = {k: ("time", v.tolist()) for k, v in df.items()}
# assign metadata as coords
ds = ds.assign_coords(**metadata)
if "latitude" in ds.coords and "longitude" in ds.coords:
ds = ds.rename({"latitude": "y", "longitude": "x"})
ds = ds.chunk(kwargs["chunks"])

return ds
Expand Down
Loading

0 comments on commit 6f66291

Please sign in to comment.