Skip to content

Commit

Permalink
Initial integration of rHEALPix wrapper stubs into rHPpandas and rast…
Browse files Browse the repository at this point in the history
…er2dggs (runs through a test but still somewhat speculative)
  • Loading branch information
ndemaio committed Oct 25, 2023
1 parent e253247 commit 0049b17
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 12 deletions.
81 changes: 73 additions & 8 deletions raster2dggs/rHP.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
# TODO: integrate rHPpandas into click
import logging
import multiprocessing
import tempfile
from pathlib import Path
from typing import Callable, Tuple, Union

import click
import click_log

import numpy as np
import xarray as xr
import pandas as pd
import pyarrow as pa

from numbers import Number
from pathlib import Path
from typing import Callable, Tuple, Union
from rasterio.enums import Resampling

import raster2dggs.rHPpandas
Expand All @@ -16,6 +21,59 @@
from raster2dggs import __version__


def _rhpfunc(
sdf: xr.DataArray,
resolution: int,
parent_res: int,
nodata: Number = np.nan,
band_labels: Tuple[str] = None,
) -> pa.Table:
"""
Index a raster window to rHEALPix.
Subsequent steps are necessary to resolve issues at the boundaries of windows.
If windows are very small, or in strips rather than blocks, processing may be slower
than necessary and the recommendation is to write different windows in the source raster.
"""
sdf: pd.DataFrame = sdf.to_dataframe().drop(columns=["spatial_ref"]).reset_index()
subset: pd.DataFrame = sdf.dropna()
subset = subset[subset.value != nodata]
subset = pd.pivot_table(
subset, values=const.DEFAULT_NAME, index=["x", "y"], columns=["band"]
).reset_index()
# Primary H3 index
rhpindex = subset.rHP.geo_to_rhp(resolution, lat_col="y", lng_col="x").drop(
columns=["x", "y"]
)
# Secondary (parent) H3 index, used later for partitioning
rhpindex = rhpindex.rHP.rhp_to_parent(parent_res).reset_index()
# Renaming columns to actual band labels
bands = sdf["band"].unique()
band_names = dict(zip(bands, map(lambda i: band_labels[i - 1], bands)))
for k, v in band_names.items():
if band_names[k] is None:
band_names[k] = str(bands[k - 1])
else:
band_names = band_names
rhpindex = rhpindex.rename(columns=band_names)
return pa.Table.from_pandas(rhpindex)


def _rhp_parent_groupby(
df, resolution: int, aggfunc: Union[str, Callable], decimals: int
):
"""
Function for aggregating the h3 resolution values per parent partition. Each partition will be run through with a
pandas .groupby function. This step is to ensure there are no duplicate rHEALPix values, which will happen when indexing a
high resolution raster at a coarser resolution.
"""
if decimals > 0:
return df.groupby(f"rhp_{resolution}").agg(aggfunc).round(decimals)
else:
return (
df.groupby(f"rhp_{resolution}").agg(aggfunc).round(decimals).astype("Int64")
)


@click.command(context_settings={"show_default": True})
@click_log.simple_verbosity_option(common.LOGGER)
@click.argument("raster_input", type=click.Path(), nargs=1)
Expand Down Expand Up @@ -110,8 +168,6 @@ def rhp(
RASTER_INPUT is the path to input raster data; prepend with protocol like s3:// or hdfs:// for remote data.
OUTPUT_DIRECTORY should be a directory, not a file, as it will be the write location for an Apache Parquet data store, with partitions equivalent to parent cells of target cells at a fixed offset. However, this can also be remote (use the appropriate prefix, e.g. s3://).
TODO: loads!
"""
tempfile.tempdir = tempdir if tempdir is not None else tempfile.tempdir

Expand All @@ -131,5 +187,14 @@ def rhp(
overwrite,
)

# TODO: the actual heavy lifting!
print("Called rhp top-level function")
common.initial_index(
"rhp",
_rhpfunc,
_rhp_parent_groupby,
raster_input,
output_directory,
int(resolution),
parent_res,
warp_args,
**kwargs,
)
41 changes: 37 additions & 4 deletions raster2dggs/rHPpandas.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
from typing import Union
from typing import Union, Callable, Any
from functools import partial, update_wrapper

import pandas as pd
import geopandas as gpd

import raster2dggs.rhealpixdggs_py.rhealpixdggs as rhp_py
import raster2dggs.rhealpixdggs_py.rhealpixdggs.wrappers as rhp_py

AnyDataFrame = Union[pd.DataFrame, gpd.GeoDataFrame]


def wrapped_partial(func, *args, **kwargs):
"""
Properly wrapped partial function
Appropriated from h3pandas.util package
"""
partial_func = partial(func, *args, **kwargs)
update_wrapper(partial_func, func)
return partial_func


@pd.api.extensions.register_dataframe_accessor("rHP")
class rHPAccessor:
def __init__(self, df: pd.DataFrame) -> None:
self._df = df

def geo_to_hrp(
def geo_to_rhp(
self,
resolution: int,
lat_col: str = "lat",
Expand Down Expand Up @@ -54,9 +66,30 @@ def geo_to_hrp(
]

# Add results to DataFrame
colname = f"rhp_{resolution}"
colname = f"rhp_{resolution:02}"
assign_arg = {colname: rhpaddresses}
df = self._df.assign(**assign_arg)
if set_index:
return df.set_index(colname)
return df

def rhp_to_parent(self, resolution: int = None) -> AnyDataFrame:
column = f"rhp_{resolution:02}" if resolution else "rhp_parent"

return self._apply_index_assign(
wrapped_partial(rhp_py.rhp_to_parent, res=resolution), column
)

def _apply_index_assign(
self,
func: Callable,
column_name: str,
processor: Callable = lambda x: x,
finalizer: Callable = lambda x: x,
) -> Any:
"""
Appropriated from h3pandas package
"""
result = [processor(func(rhpaddress)) for rhpaddress in self._df.index]
assign_args = {column_name: result}
return finalizer(self._df.assign(**assign_args))

0 comments on commit 0049b17

Please sign in to comment.