diff --git a/raster2dggs/rHP.py b/raster2dggs/rHP.py index a909f16..a066559 100644 --- a/raster2dggs/rHP.py +++ b/raster2dggs/rHP.py @@ -1,12 +1,17 @@ -# TODO: integrate rHPpandas into click import logging import multiprocessing import tempfile -from pathlib import Path -from typing import Callable, Tuple, Union - import click import click_log + +import numpy as np +import xarray as xr +import pandas as pd +import pyarrow as pa + +from numbers import Number +from pathlib import Path +from typing import Callable, Tuple, Union from rasterio.enums import Resampling import raster2dggs.rHPpandas @@ -16,6 +21,59 @@ from raster2dggs import __version__ +def _rhpfunc( + sdf: xr.DataArray, + resolution: int, + parent_res: int, + nodata: Number = np.nan, + band_labels: Tuple[str] = None, +) -> pa.Table: + """ + Index a raster window to rHEALPix. + Subsequent steps are necessary to resolve issues at the boundaries of windows. + If windows are very small, or in strips rather than blocks, processing may be slower + than necessary and the recommendation is to write different windows in the source raster. + """ + sdf: pd.DataFrame = sdf.to_dataframe().drop(columns=["spatial_ref"]).reset_index() + subset: pd.DataFrame = sdf.dropna() + subset = subset[subset.value != nodata] + subset = pd.pivot_table( + subset, values=const.DEFAULT_NAME, index=["x", "y"], columns=["band"] + ).reset_index() + # Primary H3 index + rhpindex = subset.rHP.geo_to_rhp(resolution, lat_col="y", lng_col="x").drop( + columns=["x", "y"] + ) + # Secondary (parent) H3 index, used later for partitioning + rhpindex = rhpindex.rHP.rhp_to_parent(parent_res).reset_index() + # Renaming columns to actual band labels + bands = sdf["band"].unique() + band_names = dict(zip(bands, map(lambda i: band_labels[i - 1], bands))) + for k, v in band_names.items(): + if band_names[k] is None: + band_names[k] = str(bands[k - 1]) + else: + band_names = band_names + rhpindex = rhpindex.rename(columns=band_names) + return pa.Table.from_pandas(rhpindex) + + +def _rhp_parent_groupby( + df, resolution: int, aggfunc: Union[str, Callable], decimals: int +): + """ + Function for aggregating the h3 resolution values per parent partition. Each partition will be run through with a + pandas .groupby function. This step is to ensure there are no duplicate rHEALPix values, which will happen when indexing a + high resolution raster at a coarser resolution. + """ + if decimals > 0: + return df.groupby(f"rhp_{resolution}").agg(aggfunc).round(decimals) + else: + return ( + df.groupby(f"rhp_{resolution}").agg(aggfunc).round(decimals).astype("Int64") + ) + + @click.command(context_settings={"show_default": True}) @click_log.simple_verbosity_option(common.LOGGER) @click.argument("raster_input", type=click.Path(), nargs=1) @@ -110,8 +168,6 @@ def rhp( RASTER_INPUT is the path to input raster data; prepend with protocol like s3:// or hdfs:// for remote data. OUTPUT_DIRECTORY should be a directory, not a file, as it will be the write location for an Apache Parquet data store, with partitions equivalent to parent cells of target cells at a fixed offset. However, this can also be remote (use the appropriate prefix, e.g. s3://). - - TODO: loads! """ tempfile.tempdir = tempdir if tempdir is not None else tempfile.tempdir @@ -131,5 +187,14 @@ def rhp( overwrite, ) - # TODO: the actual heavy lifting! - print("Called rhp top-level function") + common.initial_index( + "rhp", + _rhpfunc, + _rhp_parent_groupby, + raster_input, + output_directory, + int(resolution), + parent_res, + warp_args, + **kwargs, + ) diff --git a/raster2dggs/rHPpandas.py b/raster2dggs/rHPpandas.py index 2c84dc3..c9c252a 100644 --- a/raster2dggs/rHPpandas.py +++ b/raster2dggs/rHPpandas.py @@ -1,19 +1,31 @@ -from typing import Union +from typing import Union, Callable, Any +from functools import partial, update_wrapper import pandas as pd import geopandas as gpd -import raster2dggs.rhealpixdggs_py.rhealpixdggs as rhp_py +import raster2dggs.rhealpixdggs_py.rhealpixdggs.wrappers as rhp_py AnyDataFrame = Union[pd.DataFrame, gpd.GeoDataFrame] +def wrapped_partial(func, *args, **kwargs): + """ + Properly wrapped partial function + + Appropriated from h3pandas.util package + """ + partial_func = partial(func, *args, **kwargs) + update_wrapper(partial_func, func) + return partial_func + + @pd.api.extensions.register_dataframe_accessor("rHP") class rHPAccessor: def __init__(self, df: pd.DataFrame) -> None: self._df = df - def geo_to_hrp( + def geo_to_rhp( self, resolution: int, lat_col: str = "lat", @@ -54,9 +66,30 @@ def geo_to_hrp( ] # Add results to DataFrame - colname = f"rhp_{resolution}" + colname = f"rhp_{resolution:02}" assign_arg = {colname: rhpaddresses} df = self._df.assign(**assign_arg) if set_index: return df.set_index(colname) return df + + def rhp_to_parent(self, resolution: int = None) -> AnyDataFrame: + column = f"rhp_{resolution:02}" if resolution else "rhp_parent" + + return self._apply_index_assign( + wrapped_partial(rhp_py.rhp_to_parent, res=resolution), column + ) + + def _apply_index_assign( + self, + func: Callable, + column_name: str, + processor: Callable = lambda x: x, + finalizer: Callable = lambda x: x, + ) -> Any: + """ + Appropriated from h3pandas package + """ + result = [processor(func(rhpaddress)) for rhpaddress in self._df.index] + assign_args = {column_name: result} + return finalizer(self._df.assign(**assign_args))