Skip to content

Commit

Permalink
Merge pull request #151 from Deltares/#115-Check-and-update-hazard-model
Browse files Browse the repository at this point in the history
#115 check and update hazard model
  • Loading branch information
frederique-hub authored Oct 16, 2023
2 parents b6399cf + 536211b commit 1befd34
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 617 deletions.
160 changes: 65 additions & 95 deletions hydromt_fiat/fiat.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
from hydromt_fiat import DATADIR
from hydromt_fiat.config import Config
from hydromt_fiat.workflows.exposure_vector import ExposureVector
from hydromt_fiat.workflows.hazard import * # TODO: when the hazard module is updated, explicitly mention the functions that are imported
from hydromt_fiat.workflows.hazard import create_lists, check_lists_size, read_maps, check_maps_metadata, check_maps_rp, check_map_uniqueness, create_risk_dataset
from hydromt_fiat.workflows.social_vulnerability_index import SocialVulnerabilityIndex
from hydromt_fiat.workflows.vulnerability import Vulnerability
from hydromt_fiat.workflows.aggregation_areas import join_exposure_aggregation_areas


__all__ = ["FiatModel"]

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -335,7 +334,8 @@ def setup_hazard(
risk_output: bool = False,
unit_conversion_factor: float = 1.0,
) -> None:
"""Set up hazard maps. This component integrates multiple checks for the maps
"""Set up hazard maps. This component integrates multiple checks for the hazard
maps.
Parameters
----------
Expand Down Expand Up @@ -372,58 +372,24 @@ def setup_hazard(
risk_output : bool, optional
The parameter that defines if a risk analysis is required, by default False
"""
# check parameters types and size, and existance of provided files of maps

params = check_parameters_type(map_fn, map_type, rp, crs, nodata, var, chunks)
check_parameters_size(params)
check_files(params, self.root)

# create lists of maps and their parameters to be able to iterate over them
params = create_lists(map_fn, map_type, rp, crs, nodata, var, chunks)
check_lists_size(params)

rp_list = []
map_name_lst = []

# retrieve maps information from parameters and datacatalog
# load maps in memory and check them and save the with st_map function
for idx, da_map_fn in enumerate(params["map_fn_lst"]):
da_map_fn, da_name, da_type = read_floodmaps(params, da_map_fn, idx)

# load flood maps to memory
# da = load_floodmaps(self.data_catalog, self.region,da_map_fn,da_name,name_catalog)
# reading from path
if isinstance(da_map_fn, Path):
if da_map_fn.stem == "sfincs_map":
sfincs_root = os.path.dirname(da_map_fn)
sfincs_model = SfincsModel(
sfincs_root, mode="r", logger=self.logger
)
sfincs_model.read_results()
# save sfincs map as GeoTIFF
# result_list = list(sfincs_model.results.keys())
# sfincs_model.write_raster("results.zsmax", compress="LZW")
da = sfincs_model.results["zsmax"]
# da = da.squeeze('timemax').drop('timemax')
da = da.isel(timemax=0).drop("timemax")


else:
if not self.region.empty:
# da = self.data_catalog.get_rasterdataset(
# da_map_fn, geom=self.region
# )
da = self.data_catalog.get_rasterdataset(da_map_fn)
else:
da = self.data_catalog.get_rasterdataset(da_map_fn)
# Convert to units of the exposure data if required
# reading from the datacatalog
else:
if not self.region.empty:
# da = self.data_catalog.get_rasterdataset(
# name_catalog, variables=da_name, geom=self.region
# )
da = self.data_catalog.get_rasterdataset(map_fn, variables=da_name)
else:
da = self.data_catalog.get_rasterdataset(map_fn, variables=da_name)
if self.exposure.unit != da.units:
da = da * unit_conversion_factor
# read maps and retrieve their attributes
da_map_fn, da_name, da_type = read_maps(params, da_map_fn, idx)

da = self.data_catalog.get_rasterdataset(da_map_fn, geom=self.region)

# Convert to units of the exposure data if required
if self.exposure in locals() or self.exposure in globals(): # change to be sure that the unit information is available from the expousure dataset
if self.exposure.unit != da.units:
da = da * unit_conversion_factor

da.encoding["_FillValue"] = None
da = da.raster.gdal_compliant()

Expand All @@ -433,84 +399,86 @@ def setup_hazard(
# check maps return periods
da_rp = check_maps_rp(params, da, da_name, idx, risk_output)

# chek if maps are unique
# TODO: check if providing methods like self.get_config can be used
# TODO: create a new funtion to check uniqueness trhough files names
# check_maps_uniquenes(self.get_config,self.staticmaps,params,da,da_map_fn,da_name,da_type,da_rp,idx)
if risk_output and da_map_fn.stem == "sfincs_map":
da_name = da_name + f"_{str(da_rp)}"

post = f"(rp {da_rp})" if risk_output else ""
self.logger.info(f"Added {hazard_type} hazard map: {da_name} {post}")

rp_list.append(da_rp)

# If a risk calculation is required and the map comes from sfincs, they
# have the same name so give another name
if risk_output and da_map_fn.stem == "sfincs_map":
da_name = da_name + f"_{str(da_rp)}"
map_name_lst.append(da_name)

da.attrs = {
"returnperiod": str(da_rp),
"type": da_type,
"name": da_name,
"analysis": "event",
}

da = da.to_dataset(name= da_name)

self.set_maps(da, da_name)

check_map_uniqueness(map_name_lst)
# in case risk_output is required maps are put in a netcdf with a raster with
# an extra dimension 'rp' accounting for return period
# select first risk maps

# in case of risk analysis, create a single netcdf with multibans per rp
if risk_output:
list_keys = list(self.maps.keys())
first_map = self.maps[list_keys[0]].rename("risk_datarray")
list_keys.pop(0)

# add additional risk maps
for idx, x in enumerate(list_keys):
key_name = list_keys[idx]
layer = self.maps[key_name]
first_map = xr.concat([first_map, layer], dim="rp")

# convert to a dataset to be able to write attributes when writing the maps
# in the ouput folders. If datarray is provided attributes will not be
# shown in the output netcdf dataset
da = first_map.to_dataset(name="risk_maps")
da.attrs = {
"returnperiod": list(rp_list),
"type": params["map_type_lst"],
"name": map_name_lst,
"Analysis": "risk",

da, sorted_rp, sorted_names = create_risk_dataset(params, rp_list, map_name_lst, self.maps)

self.set_grid(da)

self.grid.attrs = {
"rp": sorted_rp,
"type": params["map_type_lst"], #TODO: This parameter has to be changed in case that a list with different hazard types per map is provided
"name": sorted_names,
"analysis": "risk",
}
# load merged map into self.maps
self.set_maps(da)

list_maps = list(self.maps.keys())

# erase individual maps from self.maps keeping the merged map
for item in list_maps[:-1]:
for item in list_maps[:]:
self.maps.pop(item)

self.set_config("hazard.return_periods", rp_list)
# set configuration .toml file
self.set_config("hazard.return_periods",
str(da_rp) if not risk_output else sorted_rp
)

# the metadata of the hazard maps is saved in the configuration toml files
# this component was modified to provided the element [0] od the list
# in case multiple maps are required then remove [0]
self.set_config(
"hazard.file",
[
str(Path("hazard") / (hazard_map + ".nc"))
for hazard_map in self.maps.keys()
for hazard_map in self.maps.keys()
][0] if not risk_output else
[
str(Path("hazard") / ("risk_map" + ".nc"))
][0],
)
self.set_config(
"hazard.crs",
[
"EPSG:" + str((self.maps[hazard_map].rio.crs.to_epsg()))
"EPSG:" + str((self.maps[hazard_map].raster.crs.to_epsg()))
for hazard_map in self.maps.keys()
][0],
][0] if not risk_output else
[
"EPSG:" + str((self.crs.to_epsg()))
][0]
,
)

self.set_config(
"hazard.elevation_reference", "dem" if da_type == "water_depth" else "datum"
"hazard.elevation_reference",
"dem" if da_type == "water_depth" else "datum"
)

# Set the configurations for a multiband netcdf
self.set_config(
"hazard.settings.subset",
[(self.maps[hazard_map].name) for hazard_map in self.maps.keys()][0],
[
(self.maps[hazard_map].name)
for hazard_map in self.maps.keys()
][0] if not risk_output else sorted_rp,
)

self.set_config(
Expand Down Expand Up @@ -742,6 +710,8 @@ def write(self):
self.write_config()
if self.maps:
self.write_maps(fn="hazard/{name}.nc")
if self.grid:
self.write_grid(fn="hazard/risk_map.nc")
if self.geoms:
self.write_geoms(fn="exposure/{name}.gpkg", driver="GPKG")
if self._tables:
Expand Down
136 changes: 0 additions & 136 deletions hydromt_fiat/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,97 +9,6 @@ def check_dir_exist(dir, name=None):
f"The directory indicated by the '{name}' parameter does not exist."
)


def check_file_exist(root, param_lst, name=None):
root = Path(root)
param_lst = [Path(p) for p in param_lst]
for param_idx, param in enumerate(param_lst):
if isinstance(param, dict):
fn_lst = list(param.values())
else:
fn_lst = [param]
for fn_idx, fn in enumerate(fn_lst):
if not Path(fn).is_file():
if root.joinpath(fn).is_file():
if isinstance(param, dict):
param_lst[param_idx][
list(param.keys())[fn_idx]
] = root.joinpath(fn)
else:
param_lst[param_idx] = root.joinpath(fn)
else:
if isinstance(param, dict):
param_lst[param_idx][list(param.keys())[fn_idx]] = Path(fn)
else:
param_lst[param_idx] = Path(fn)
try:
if isinstance(param, dict):
assert isinstance(
param_lst[param_idx][list(param.keys())[fn_idx]], Path
)
else:
assert isinstance(param_lst[param_idx], Path)
except AssertionError:
raise TypeError(
f"The file indicated by the '{name}' parameter does not"
f" exist in the directory '{root}'."
)


# TODO: Improve this tool without calling model.get_congif(input_dir)
# def check_file_exist(get_config, root, param_lst, name=None, input_dir=None):
# root = Path(root)
# param_lst = [Path(p) for p in param_lst]
# for param_idx, param in enumerate(param_lst):
# if isinstance(param, dict):
# fn_lst = list(param.values())
# else:
# fn_lst = [param]
# for fn_idx, fn in enumerate(fn_lst):
# if not Path(fn).is_file():
# if root.joinpath(fn).is_file():
# if isinstance(param, dict):
# param_lst[param_idx][
# list(param.keys())[fn_idx]
# ] = root.joinpath(fn)
# else:
# param_lst[param_idx] = root.joinpath(fn)
# if input_dir is not None:
# if get_config(input_dir).joinpath(fn).is_file():
# if isinstance(param, dict):
# param_lst[param_idx][
# list(param.keys())[fn_idx]
# ] = get_config(input_dir).joinpath(fn)
# else:
# param_lst[param_idx] = get_config(
# input_dir
# ).joinpath(fn)
# else:
# if isinstance(param, dict):
# param_lst[param_idx][list(param.keys())[fn_idx]] = Path(fn)
# else:
# param_lst[param_idx] = Path(fn)
# try:
# if isinstance(param, dict):
# assert isinstance(
# param_lst[param_idx][list(param.keys())[fn_idx]], Path
# )
# else:
# assert isinstance(param_lst[param_idx], Path)
# except AssertionError:
# if input_dir is None:
# raise TypeError(
# f"The file indicated by the '{name}' parameter does not"
# f" exist in the directory '{root}'."
# )
# else:
# raise TypeError(
# f"The file indicated by the '{name}' parameter does not"
# f" exist in either of the directories '{root}' or "
# f"'{get_config(input_dir)}'."
# )


def check_uniqueness(map_name_lst):
def check_duplicates(lst):
unique_elements = set()
Expand All @@ -114,51 +23,6 @@ def check_duplicates(lst):
if check:
raise ValueError(f"The filenames of the hazard maps should be unique.")


# TODO: Improve this tool without calling model. Just checking the maps names
# def check_uniqueness(model, *args, file_type=None, filename=None):
# """ """

# args = list(args)
# if len(args) == 1 and "." in args[0]:
# args = args[0].split(".") + args[1:]
# branch = args.pop(-1)
# for key in args[::-1]:
# branch = {key: branch}

# if model.get_config(args[0], args[1]):
# for key in model.staticmaps.data_vars:
# if filename == key:
# raise ValueError(
# f"The filenames of the {file_type} maps should be unique."
# )
# if (
# model.get_config(args[0], args[1], key)
# == list(branch[args[0]][args[1]].values())[0]
# ):
# raise ValueError(f"Each model input layers must be unique.")


def check_param_type(param, name=None, types=None):
""" """

if not isinstance(param, list):
raise TypeError(
f"The '{name}_lst' parameter should be a of {list}, received a "
f"{type(param)} instead."
)
for i in param:
if not isinstance(i, types):
if isinstance(types, tuple):
types = " or ".join([str(j) for j in types])
else:
types = types
raise TypeError(
f"The '{name}' parameter should be a of {types}, received a "
f"{type(i)} instead."
)


def get_param(param_lst, map_fn_lst, file_type, filename, i, param_name):
""" """

Expand Down
Loading

0 comments on commit 1befd34

Please sign in to comment.