Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement NoDataStrategy to deal with optional setup_functions #229

Merged
merged 6 commits into from
Feb 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ Fixed
- **setup_config_output_timeseries**: bugfix for reducer.
- update hydromt configuration files from ini to yml format. PR #230
- remove or update calls to check if source in self.data_catalog `Issue #501 <https://github.com/Deltares/hydromt/issues/501>`_
- Included NoDataStrategy from hydromt-core: setup functions for lakes, reservoirs, glaciers, and gauges are skipped when no data is found withing the model region (same behavior as before) PR #229

Deprecated
----------
171 changes: 103 additions & 68 deletions hydromt_wflow/wflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Implement Wflow model class."""

# Implement model class following model API

import codecs
@@ -22,6 +23,7 @@
from dask.diagnostics import ProgressBar
from hydromt import flw
from hydromt.models.model_grid import GridModel
from hydromt.nodata import NoDataStrategy
from pyflwdir import core_conversion, core_d8, core_ldd
from shapely.geometry import box

@@ -1186,14 +1188,27 @@ def setup_gauges(
code = self.crs
kwargs.update(crs=code)
gdf_gauges = self.data_catalog.get_geodataframe(
gauges_fn, geom=self.basins, assert_gtype="Point", **kwargs
gauges_fn,
geom=self.basins,
assert_gtype="Point",
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
elif self.data_catalog[gauges_fn].data_type == "GeoDataFrame":
gdf_gauges = self.data_catalog.get_geodataframe(
gauges_fn, geom=self.basins, assert_gtype="Point", **kwargs
gauges_fn,
geom=self.basins,
assert_gtype="Point",
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
elif self.data_catalog[gauges_fn].data_type == "GeoDataset":
da = self.data_catalog.get_geodataset(gauges_fn, geom=self.basins, **kwargs)
da = self.data_catalog.get_geodataset(
gauges_fn,
geom=self.basins,
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
gdf_gauges = da.vector.to_gdf()
# Check for point geometry
if not np.all(np.isin(gdf_gauges.geometry.type, "Point")):
@@ -1208,10 +1223,9 @@ def setup_gauges(
if basename is None:
basename = os.path.basename(gauges_fn).split(".")[0].replace("_", "-")

# Create gauge map, subcatch map and update toml
if gdf_gauges.index.size == 0:
self.logger.warning(f"No {gauges_fn} gauge locations found within domain")

# Check if there is data found
if gdf_gauges is None:
self.logger.info("Skipping method, as no data has been found")
return

# Create the gauges map
@@ -1424,6 +1438,7 @@ def setup_lakes(
lakes_fn, "lake", min_area, **kwargs
)
if ds_lakes is None:
self.logger.info("Skipping method, as no data has been found")
return
rmdict = {k: v for k, v in self._MAPS.items() if k in ds_lakes.data_vars}
ds_lakes = ds_lakes.rename(rmdict)
@@ -1612,71 +1627,75 @@ def setup_reservoirs(
# the parameters tbls =
# if everything is present, skip calculate reservoirattrs() and
# directly make the maps
if ds_res is not None:
rmdict = {k: v for k, v in self._MAPS.items() if k in ds_res.data_vars}
self.set_grid(ds_res.rename(rmdict))

# add attributes
# if present use directly
resattributes = [
"waterbody_id",
"ResSimpleArea",
"ResMaxVolume",
"ResTargetMinFrac",
"ResTargetFullFrac",
"ResDemand",
"ResMaxRelease",
]
if np.all(np.isin(resattributes, gdf_org.columns)):
intbl_reservoirs = gdf_org[resattributes]
reservoir_accuracy = None
reservoir_timeseries = None
# else compute
else:
(
intbl_reservoirs,
reservoir_accuracy,
reservoir_timeseries,
) = workflows.reservoirattrs(
gdf=gdf_org, timeseries_fn=timeseries_fn, logger=self.logger
)
intbl_reservoirs = intbl_reservoirs.rename(columns=tbls)

# create a geodf with id of reservoir and gemoetry at outflow location
gdf_org_points = gpd.GeoDataFrame(
gdf_org["waterbody_id"],
geometry=gpd.points_from_xy(gdf_org.xout, gdf_org.yout),
# Skip method if no data is returned
if ds_res is None:
self.logger.info("Skipping method, as no data has been found")
return

# Continue method if data has been found
rmdict = {k: v for k, v in self._MAPS.items() if k in ds_res.data_vars}
self.set_grid(ds_res.rename(rmdict))

# add attributes
# if present use directly
resattributes = [
"waterbody_id",
"ResSimpleArea",
"ResMaxVolume",
"ResTargetMinFrac",
"ResTargetFullFrac",
"ResDemand",
"ResMaxRelease",
]
if np.all(np.isin(resattributes, gdf_org.columns)):
intbl_reservoirs = gdf_org[resattributes]
reservoir_accuracy = None
reservoir_timeseries = None
# else compute
else:
(
intbl_reservoirs,
reservoir_accuracy,
reservoir_timeseries,
) = workflows.reservoirattrs(
gdf=gdf_org, timeseries_fn=timeseries_fn, logger=self.logger
)
intbl_reservoirs = intbl_reservoirs.rename(
columns={"expr1": "waterbody_id"}
intbl_reservoirs = intbl_reservoirs.rename(columns=tbls)

# create a geodf with id of reservoir and gemoetry at outflow location
gdf_org_points = gpd.GeoDataFrame(
gdf_org["waterbody_id"],
geometry=gpd.points_from_xy(gdf_org.xout, gdf_org.yout),
)
intbl_reservoirs = intbl_reservoirs.rename(columns={"expr1": "waterbody_id"})
gdf_org_points = gdf_org_points.merge(
intbl_reservoirs, on="waterbody_id"
) # merge
# add parameter attributes to polygone gdf:
gdf_org = gdf_org.merge(intbl_reservoirs, on="waterbody_id")

# write reservoirs with param values to geoms
self.set_geoms(gdf_org, name="reservoirs")

for name in gdf_org_points.columns[2:]:
gdf_org_points[name] = gdf_org_points[name].astype("float32")
da_res = ds_res.raster.rasterize(
gdf_org_points, col_name=name, dtype="float32", nodata=-999
)
gdf_org_points = gdf_org_points.merge(
intbl_reservoirs, on="waterbody_id"
) # merge
# add parameter attributes to polygone gdf:
gdf_org = gdf_org.merge(intbl_reservoirs, on="waterbody_id")

# write reservoirs with param values to geoms
self.set_geoms(gdf_org, name="reservoirs")

for name in gdf_org_points.columns[2:]:
gdf_org_points[name] = gdf_org_points[name].astype("float32")
da_res = ds_res.raster.rasterize(
gdf_org_points, col_name=name, dtype="float32", nodata=-999
)
self.set_grid(da_res)
self.set_grid(da_res)

# Save accuracy information on reservoir parameters
if reservoir_accuracy is not None:
reservoir_accuracy.to_csv(join(self.root, "reservoir_accuracy.csv"))
# Save accuracy information on reservoir parameters
if reservoir_accuracy is not None:
reservoir_accuracy.to_csv(join(self.root, "reservoir_accuracy.csv"))

if reservoir_timeseries is not None:
reservoir_timeseries.to_csv(
join(self.root, f"reservoir_timeseries_{timeseries_fn}.csv")
)
if reservoir_timeseries is not None:
reservoir_timeseries.to_csv(
join(self.root, f"reservoir_timeseries_{timeseries_fn}.csv")
)

for option in res_toml:
self.set_config(option, res_toml[option])
for option in res_toml:
self.set_config(option, res_toml[option])

def _setup_waterbodies(self, waterbodies_fn, wb_type, min_area=0.0, **kwargs):
"""Help with common workflow of setup_lakes and setup_reservoir.
@@ -1688,8 +1707,16 @@ def _setup_waterbodies(self, waterbodies_fn, wb_type, min_area=0.0, **kwargs):
if "predicate" not in kwargs:
kwargs.update(predicate="contains")
gdf_org = self.data_catalog.get_geodataframe(
waterbodies_fn, geom=self.basins, **kwargs
waterbodies_fn,
geom=self.basins,
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
if gdf_org is None:
# Return two times None (similair to main function output), if there is no
# data found
return None, None

# skip small size waterbodies
if "Area_avg" in gdf_org.columns and gdf_org.geometry.size > 0:
min_area_m2 = min_area * 1e6
@@ -1859,8 +1886,16 @@ def setup_glaciers(self, glaciers_fn="rgi", min_area=1):
# retrieve data for basin
self.logger.info("Preparing glacier maps.")
gdf_org = self.data_catalog.get_geodataframe(
glaciers_fn, geom=self.basins, predicate="intersects"
glaciers_fn,
geom=self.basins,
predicate="intersects",
handle_nodata=NoDataStrategy.IGNORE,
)
# Check if there are glaciers found
if gdf_org is None:
self.logger.info("Skipping method, as no data has been found")
return

# skip small size glacier
if "AREA" in gdf_org.columns and gdf_org.geometry.size > 0:
gdf_org = gdf_org[gdf_org["AREA"] >= min_area]
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""add global fixtures."""

import logging
import platform
from os.path import abspath, dirname, join
@@ -72,7 +73,7 @@ def example_wflow_results():
@pytest.fixture()
def clipped_wflow_model():
root = join(EXAMPLEDIR, "wflow_piave_clip")
mod = WflowModel(root=root, mode="r")
mod = WflowModel(root=root, mode="r", data_libs="artifact_data")
return mod


15 changes: 15 additions & 0 deletions tests/test_model_methods.py
Original file line number Diff line number Diff line change
@@ -516,3 +516,18 @@ def test_setup_floodplains_2d(elevtn_map, example_wflow_model, floodplain1d_test
.raster.mask_nodata()
.equals(floodplain1d_testdata[f"{mapname}_D4"])
)


def test_skip_nodata_reservoir(clipped_wflow_model):
# Using the clipped_wflow_model as the reservoirs are not in this model
clipped_wflow_model.setup_reservoirs(
reservoirs_fn="hydro_reservoirs",
min_area=0.0,
)
assert clipped_wflow_model.config["model"]["reservoirs"] == False
# Get names for two reservoir layers
for mapname in ["resareas", "reslocs"]:
# Check if layers are indeed not present in the model
assert (
clipped_wflow_model._MAPS[mapname] not in clipped_wflow_model.grid.data_vars
)
Loading