From 4646dfe5ff2454e1e71215d23ad950a6639ad9b1 Mon Sep 17 00:00:00 2001 From: Mares2022 Date: Thu, 1 Jun 2023 15:36:50 +0200 Subject: [PATCH 1/4] Update hazard Update hazard --- hydromt_fiat/workflows/hazard.py | 667 +++++++++++++++---------------- 1 file changed, 313 insertions(+), 354 deletions(-) diff --git a/hydromt_fiat/workflows/hazard.py b/hydromt_fiat/workflows/hazard.py index 2c2953f3..3354f6c1 100644 --- a/hydromt_fiat/workflows/hazard.py +++ b/hydromt_fiat/workflows/hazard.py @@ -1,392 +1,351 @@ -from hydromt_fiat.validation import Validation -from pathlib import Path +"""Implement fiat model class""" + +from hydromt_fiat.workflows.vulnerability import Vulnerability +from hydromt_fiat.workflows.hazard import Hazard +from hydromt.models.model_grid import GridModel +from hydromt_fiat.workflows.exposure_vector import ExposureVector +import logging +from hydromt_fiat.config import Config import geopandas as gpd -from ast import literal_eval -import os -import xarray as xr +import pandas as pd +import hydromt +from pathlib import Path + +from shapely.geometry import box +from typing import Union +from hydromt_fiat.workflows.social_vulnerability_index import SocialVulnerabilityIndex + + +from . import DATADIR + +__all__ = ["FiatModel"] + +_logger = logging.getLogger(__name__) + +class FiatModel(GridModel): + """General and basic API for the FIAT model in hydroMT.""" -class Hazard: - def __init__(self): - self.crs = "" - self.check = Validation() + _NAME = "fiat" + _CONF = "fiat_configuration.ini" + _GEOMS = {} # FIXME Mapping from hydromt names to model specific names + _MAPS = {} # FIXME Mapping from hydromt names to model specific names + _FOLDERS = ["hazard", "exposure", "vulnerability", "output"] + _DATADIR = DATADIR - def checkInputs( + def __init__( self, - model_fiat, - map_fn, - map_type, - chunks, - rp, - crs, - nodata, - var, + root=None, + mode="w", + config_fn=None, + data_libs=None, + logger=_logger, ): - # Checks the hazard input parameter types. - # Checks map path list - self.map_fn_lst = [map_fn] if isinstance(map_fn, (str, Path)) else map_fn - self.check.check_param_type(self.map_fn_lst, name="map_fn", types=(str, Path)) - # Checks map path list - self.map_type_lst = ( - [map_type] if isinstance(map_type, (str, Path)) else map_type + super().__init__( + root=root, + mode=mode, + config_fn=config_fn, + data_libs=data_libs, + logger=logger, ) - self.check.check_param_type(self.map_type_lst, name="map_type", types=str) - if not len(self.map_type_lst) == 1 and not len(self.map_type_lst) == len( - self.map_fn_lst - ): - raise IndexError( - "The number of 'map_type' parameters should match with the number of " - "'map_fn' parameters." - ) - # Checks the chunk list. The list must be equal to the number of maps. - if chunks != "auto": - self.chunks_lst = [chunks] if isinstance(chunks, (int, dict)) else chunks - self.check.check_param_type( - self.chunks_lst, name="chunks", types=(int, dict) - ) - if not len(self.chunks_lst) == 1 and not len(self.chunks_lst) == len( - self.map_fn_lst - ): - raise IndexError( - "The number of 'chunks' parameters should match with the number of " - "'map_fn' parameters." - ) - # Checks the return period list. The list must be equal to the number of maps. - if rp is not None: - self.rp_lst = [rp] if isinstance(rp, (int, float)) else rp - self.check.check_param_type(self.rp_lst, name="rp", types=(float, int)) - if not len(self.rp_lst) == len(self.map_fn_lst): - raise IndexError( - "The number of 'rp' parameters should match with the number of " - "'map_fn' parameters." - ) - # Checks the projection list - if crs is not None: - self.crs_lst = [str(crs)] if isinstance(crs, (int, str)) else crs - self.check.check_param_type(self.crs_lst, name="crs", types=(int, str)) - if not len(self.crs_lst) == 1 and not len(self.crs_lst) == len( - self.map_fn_lst - ): - raise IndexError( - "The number of 'crs' parameters should match with the number of " - "'map_fn' parameters." - ) - # Checks the no data list - if nodata is not None: - self.nodata_lst = [nodata] if isinstance(nodata, (float, int)) else nodata - self.check.check_param_type( - self.nodata_lst, name="nodata", types=(float, int) - ) - if not len(self.nodata_lst) == 1 and not len(self.nodata_lst) == len( - self.map_fn_lst - ): - raise IndexError( - "The number of 'nodata' parameters should match with the number of " - "'map_fn' parameters." - ) - # Checks the var list - if var is not None: - self.var_lst = [var] if isinstance(var, str) else var - self.check.check_param_type(self.var_lst, name="var", types=str) - if not len(self.var_lst) == 1 and not len(self.var_lst) == len( - self.map_fn_lst - ): - raise IndexError( - "The number of 'var' parameters should match with the number of " - "'map_fn' parameters." - ) + self.tables = [] # List of tables to write + self.exposure = None - # Check if the hazard input files exist. - self.check.check_file_exist( - model_fiat.root, param_lst=self.map_fn_lst, name="map_fn" - ) + def setup_config(self, **kwargs): + """_summary_""" + # Setup config from HydroMT FIAT ini file + global_settings = {} + for k in kwargs.keys(): + global_settings[k] = kwargs[k] - def readMaps( + self.config["global"] = global_settings + + def setup_basemaps( self, - model_fiat, - name_catalog, - hazard_type, - risk_output, - crs, - nodata, - var, - chunks, - region=gpd.GeoDataFrame(), + region, **kwargs, ): - # This list of names should be provided to have the maps in order - # catalog = model_fiat.data_catalog.get_rasterdataset(name_catalog) - # self.map_fn_lst = [i for i in list(catalog.variables) if maps_id in i] + """Define the model domain that is used to clip the raster layers. - # Read the hazard map(s) and add to config and staticmaps. - list_rp = [] - list_names = [] + Adds model layer: - for idx, da_map_fn in enumerate(self.map_fn_lst): - # Check if it is a path or a name from the catalog + * **region** geom: A geometry with the nomenclature 'region'. - if os.path.exists(da_map_fn): - da_map_fn = Path(da_map_fn) - da_name = da_map_fn.stem - da_suffix = da_map_fn.suffix - list_names.append(da_name) + Parameters + ---------- + region: dict + Dictionary describing region of interest, e.g. {'bbox': [xmin, ymin, xmax, ymax]}. See :py:meth:`~hydromt.workflows.parse_region()` for all options. + """ - else: - da_name = da_map_fn - list_names.append(da_name) - - da_type = self.check.get_param( - self.map_type_lst, self.map_fn_lst, "hazard", da_name, idx, "map type" + kind, region = hydromt.workflows.parse_region(region, logger=self.logger) + if kind == "bbox": + geom = gpd.GeoDataFrame(geometry=[box(*region["bbox"])], crs=4326) + elif kind == "grid": + geom = region["grid"].raster.box + elif kind == "geom": + geom = region["geom"] + else: + raise ValueError( + f"Unknown region kind {kind} for FIAT, expected one of ['bbox', 'grid', 'geom']." ) - # Get the local hazard map. - kwargs.update(chunks=chunks if chunks == "auto" else self.chunks_lst[idx]) - - if "da_suffix" in locals() and da_suffix == ".nc": - if var is None: - raise ValueError( - "The 'var' parameter is required when reading NetCDF data." - ) - kwargs.update( - variables=self.check.get_param( - self.var_lst, - self.map_fn_lst, - "hazard", - da_name, - idx, - "NetCDF variable", - ) - ) - - if os.path.exists(da_map_fn): - if not region.empty: - da = model_fiat.data_catalog.get_rasterdataset( - da_map_fn, geom=region, **kwargs - ) - else: - da = model_fiat.data_catalog.get_rasterdataset(da_map_fn, **kwargs) - else: - if not region.empty: - da = model_fiat.data_catalog.get_rasterdataset( - name_catalog, variables=da_name, geom=region - ) - else: - da = model_fiat.data_catalog.get_rasterdataset( - name_catalog, variables=da_name - ) - - # Set (if necessary) the coordinate reference system. - # if crs is not None and not da.raster.crs.is_epsg_code: - # if crs is not None and not da.raster.crs: - # Change to be defined by the user - if crs is not None: - da_crs = self.check.get_param( - self.crs_lst, - self.map_fn_lst, - "hazard", - da_name, - idx, - "coordinate reference system", - ) - da_crs_str = da_crs if "EPSG" in da_crs else f"EPSG:{da_crs}" - da.raster.set_crs(da_crs_str) - # elif crs is None and not da.raster.crs.is_epsg_code: - elif crs is None and not da.raster.crs: - raise ValueError( - "The hazard map has no coordinate reference system assigned." - ) + # Set the model region geometry (to be accessed through the shortcut self.region). + self.set_geoms(geom, "region") - # Set (if necessary) and mask the nodata value. - if nodata is not None: - da_nodata = self.check.get_param( - self.nodata_lst, self.map_fn_lst, "hazard", da_name, idx, "nodata" - ) - da.raster.set_nodata(nodata=da_nodata) - elif nodata is None and da.raster.nodata is None: - raise ValueError("The hazard map has no nodata value assigned.") - - # Correct (if necessary) the grid orientation from the lower to the upper left corner. - if da.raster.res[1] > 0: - da = da.reindex({da.raster.y_dim: list(reversed(da.raster.ycoords))}) - - # Check if the obtained hazard map is identical. - if ( - model_fiat.staticmaps - and not model_fiat.staticmaps.raster.identical_grid(da) - ): - raise ValueError("The hazard maps should have identical grids.") - - # Get the return period input parameter. - da_rp = ( - self.check.get_param( - self.rp_lst, - self.map_fn_lst, - "hazard", - da_name, - idx, - "return period", - ) - if hasattr(self, "rp_lst ") - else None + def setup_vulnerability( + self, + vulnerability_source: str, + vulnerability_identifiers_and_linking: str, + unit: str, + ) -> None: + """Setup the vulnerability curves from various possible inputs. + + Parameters + ---------- + vulnerability_source : str + The (relative) path or ID from the data catalog to the source of the vulnerability functions. + vulnerability_identifiers_and_linking : str + The (relative) path to the table that links the vulnerability functions and exposure categories. + unit : str + The unit of the vulnerability functions. + """ + + if not Path(vulnerability_identifiers_and_linking): + logging.error( + f"Vulnerability identifiers and linking table does not exist at: {vulnerability_identifiers_and_linking}" ) - if risk_output and da_rp is None: - # Get (if possible) the return period from dataset names if the input parameter is None. - if "rp" in da_name.lower(): - - def fstrip(x): - return x in "0123456789." - - rp_str = "".join( - filter(fstrip, da_name.lower().split("rp")[-1]) - ).lstrip("0") - - try: - assert isinstance( - literal_eval(rp_str) if rp_str else None, (int, float) - ) - da_rp = literal_eval(rp_str) - list_rp.append(da_rp) - - except AssertionError: - raise ValueError( - f"Could not derive the return period for hazard map: {da_name}." - ) - else: - raise ValueError( - "The hazard map must contain a return period in order to conduct a risk calculation." - ) - - # Add the hazard map to config and staticmaps. - self.check.check_uniqueness( - model_fiat, - "hazard", - da_type, - da_name, - { - "usage": True, - "map_fn": da_map_fn, - "map_type": da_type, - "rp": da_rp, - "crs": da.raster.crs, - "nodata": da.raster.nodata, - # "var": None if "var_lst" not in locals() else self.var_lst[idx], - "var": None if not hasattr(self, "var_lst") else self.var_lst[idx], - "chunks": "auto" if chunks == "auto" else self.chunks_lst[idx], - }, - file_type="hazard", - filename=da_name, + self.vulnerability = Vulnerability(unit, self.data_catalog) + vf_source_df = self.vulnerability.get_vulnerability_source(vulnerability_source) + self.vf_ids_and_linking_df = ( + self.vulnerability.get_vulnerability_identifiers_and_linking_source( + vulnerability_identifiers_and_linking ) + ) + self.vulnerability.get_vulnerability_functions_from_one_file( + vf_source_df, self.vf_ids_and_linking_df + ) - model_fiat.set_config( - "hazard", - # da_type, - da_name, - { - "usage": "True", - "map_fn": da_map_fn, - "map_type": da_type, - "rp": da_rp, - "crs": da.raster.crs, - "nodata": da.raster.nodata, - # "var": None if "var_lst" not in locals() else self.var_lst[idx], - "var": None if not hasattr(self, "var_lst") else self.var_lst[idx], - "chunks": "auto" if chunks == "auto" else self.chunks_lst[idx], - }, + def setup_exposure_vector( + self, + asset_locations: str, + occupancy_type: str, + max_potential_damage: str, + ground_floor_height: Union[int, float, str, None], + ground_flood_height_unit: str, + ) -> None: + self.exposure = ExposureVector(self.data_catalog, self.region) + + if asset_locations == occupancy_type == max_potential_damage: + # The source for the asset locations, occupancy type and maximum potential + # damage is the same, use one source to create the exposure data. + self.exposure.setup_from_single_source(asset_locations, ground_floor_height) + + # Link the damage functions to assets + try: + assert not self.vf_ids_and_linking_df.empty + except AssertionError: + logging.error( + "Please call the 'setup_vulnerability' function before " + "the 'setup_exposure_vector' function. Error message: {e}" ) + self.exposure.link_exposure_vulnerability(self.vf_ids_and_linking_df) + self.exposure.check_required_columns() - # extra functionalities - # da = da.rename(f"map_{da_rp}") - # da.attrs = { - # "usage": "True", - # "map_fn": da_map_fn, - # "map_type": da_type, - # "rp": da_rp, - # "crs": da.raster.crs, - # "nodata": da.raster.nodata, - # # "var": None if "var_lst" not in locals() else self.var_lst[idx], - # "var": None if not hasattr(self, "var_lst") else self.var_lst[idx], - # "chunks": "auto" if chunks == "auto" else self.chunks_lst[idx], - # } - - da = da.expand_dims({"rp": [da_rp]}, axis=0) - - model_fiat.set_maps(da, da_name) - # post = f"(rp {da_rp})" if rp is not None and risk_output else "" - post = f"(rp {da_rp})" if risk_output else "" - model_fiat.logger.info(f"Added {hazard_type} hazard map: {da_name} {post}") - - # new_da = xr.concat([new_da, da], dim='rp') - - # import numpy as np - # new_da = xr.DataArray(np.zeros_like(da), dims=da.dims, coords=da.coords).rename('new_dataframe') - - # # model_fiat.maps.to_netcdf("test_hazard_1/test.nc") - - # #model_fiat.set_maps(catalog, "all") - maps = model_fiat.maps - list_keys = list(maps.keys()) - maps_0 = maps[list_keys[0]].rename("risk") - list_keys.pop(0) - - for idx, x in enumerate(list_keys): - key_name = list_keys[idx] - layer = maps[key_name] - maps_0 = xr.concat([maps_0, layer], dim="rp") - - # new_da = maps_0.to_dataset(name='RISK') - # new_da.attrs = { "returnperiod": list(list_rp), - # "type":self.map_type_lst, - # 'name':list_names} - - if not risk_output: - # new_da = maps_0.drop_dims('rp') - new_da = maps_0.to_dataset(name="EVENT") - new_da.attrs = { - "returnperiod": list(list_rp), - "type": self.map_type_lst, - "name": list_names, - "Analysis": "Event base", - } + def setup_exposure_raster(self): + NotImplemented + + def setup_hazard( + self, + map_fn: str, + map_type: str, + rp, + crs, + nodata, + var, + chunks, + risk_output: bool = True, + hazard_type: str = "flooding", + name_catalog: str = "flood_maps", + maps_id: str = "RP", - else: - new_da = maps_0.to_dataset(name='RISK') - new_da.attrs = { "returnperiod": list(list_rp), - "type":self.map_type_lst, - 'name':list_names, - "Analysis": "Risk"} + ): + hazard = Hazard() + + hazard.checkInputs( + self, + map_fn=map_fn, + map_type=map_type, + chunks=chunks, + rp=rp, + crs=crs, + nodata=nodata, + var=var, + ) - model_fiat.hazard = new_da - model_fiat.set_maps(model_fiat.hazard, 'HydroMT_Fiat_hazard') + hazard.readMaps( + self, + name_catalog=name_catalog, + hazard_type=hazard_type, + risk_output=risk_output, + crs=crs, + nodata=nodata, + var=var, + chunks=chunks, + ) - list_maps = list(model_fiat.maps.keys()) + # Store the hazard settings. + hazard_settings = {} + hazard_maps = [] + for hazard_map in self.maps.keys(): + hazard_maps.append( + str(Path("hazard") / (self.maps[hazard_map].name + ".nc")) + ) - if risk_output: - for item in list_maps[:-1]: - model_fiat.maps.pop(item) + hazard_settings["grid_file"] = hazard_maps + if not isinstance(rp, list): + rp = "Event" + hazard_settings["return_period"] = rp - # ds = xr.Dataset(maps) - # ds.raster.set_crs(da.raster.crs) + hazard_settings["crs"] = hazard.crs + hazard_settings["spatial_reference"] = map_type + self.config["hazard"] = hazard_settings - # # new_da= ds.to_array(dim='rp') - # # new_da = new_da.to_dataset(name='Full_stack') - # # new_da.attrs = { "returnperiod": list_rp, - # # "type":self.map_type_lst, - # # 'name':list_names} + def setup_social_vulnerability_index( + self, census_key: str, path: str, state_abbreviation: str + ): + # Create SVI object + svi = SocialVulnerabilityIndex(self.data_catalog, self.config) + + # Call functionalities of SVI + svi.set_up_census_key(census_key) + svi.variable_code_csv_to_pd_df(path) + svi.set_up_download_codes() + svi.set_up_state_code(state_abbreviation) + svi.download_census_data() + svi.rename_census_data("Census_code_withE", "Census_variable_name") + svi.create_indicator_groups("Census_variable_name", "Indicator_code") + svi.processing_svi_data() + svi.normalization_svi_data() + svi.domain_scores() + svi.composite_scores() + + # TO DO: JOIN WITH GEOMETRIES. FOR MAPPING. + # this link can be used: https://github.com/datamade/census + + def read(self): + """Method to read the complete model schematization and configuration from file.""" + self.logger.info(f"Reading model data from {self.root}") + + # Read the configuration file + self.read_config(config_fn=str(Path(self.root).joinpath("settings.toml"))) + + # TODO: determine if it is required to read the hazard files + # hazard_maps = self.config["hazard"]["grid_file"] + # self.read_grid(fn="hazard/{name}.nc") + + # Read the exposure data + self.read_exposure(Path(self.root).joinpath("exposure", "exposure.csv")) + + # Read the vulnerability data + self.read_vulnerability( + Path(self.root).joinpath("vulnerability", "vulnerability_curves.csv") + ) - # # for var_name, var_data in ds.variables.items(): - # # new_da = xr.concat([new_da, ds[var_name]], dim='rp') + def _configread(self, fn): + """Parse fiat configuration toml file to dict.""" + # Read the fiat configuration toml file. + config = Config() + return config.load_file(fn) + + def check_path_exists(self, fn): + """TODO: decide to use this or another function (check_file_exist in validation.py)""" + path = Path(fn) + self.logger.debug(f"Reading file {str(path.name)}") + if not fn.is_file(): + logging.warning(f"File {fn} does not exist!") + + def read_exposure(self, fn): + """_summary_""" + self.check_path_exists(fn) + self.exposure = ExposureVector(crs=self.config["exposure"]["crs"]) + self.exposure.read(fn) + + def read_vulnerability(self, fn): + self.check_path_exists(fn) + self.vulnerability = Vulnerability() + self.vulnerability.read(fn) + + def write(self): + """Method to write the complete model schematization and configuration to file.""" + self.logger.info(f"Writing model data to {self.root}") + + # Save the vulnerability and exposure data in the tables variable. + # Check if data exist + if self.vulnerability: + vulnerability_output_path = "./vulnerability/vulnerability_curves.csv" + self.tables.append( + ( + self.vulnerability.get_table(), + vulnerability_output_path, + {"index": False, "header": False}, + ) + ) - # for layer_name in ds: - # layer = ds[layer_name] - # new_da = xr.concat([new_da, layer], dim='rp') + # Store the vulnerability settings in the config file. + self.config["vulnerability"] = {"dbase_file": vulnerability_output_path} - # # new_da = new_da.to_dataset() + if self.exposure: + exposure_output_path = "./exposure/exposure.csv" + self.tables.append( + (self.exposure.exposure_db, exposure_output_path, {"index": False}) + ) - # # config = model_fiat.config - # # new_da.to_netcdf("P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database/test_hazard_1/hazard/test_final_v2.nc") - # #C:\Users\fuentesm\CISNE\HydroMT_sprint_sessions - return model_fiat.maps + # Store the exposure settings in the config file. + self.config["exposure"] = [ + { + "type": "vector", + "dbase_file": exposure_output_path, + "crs": self.exposure.crs, + } + ] + + if self.config: # try to read default if not yet set + self.write_config() + if self.maps: + self.write_maps(fn="hazard/{name}.nc") + if self.geoms: + self.write_geoms(fn="exposure/{name}.geojson") + if self.tables: + self.write_tables() + + def write_tables(self) -> None: + if len(self.tables) == 0: + self.logger.debug("No table data found, skip writing.") + return + self._assert_write_mode + for data, path, kwargs in self.tables: + path = Path(path) + if not isinstance(data, (pd.DataFrame)) or len(data.index) == 0: + self.logger.warning( + f"{path.name} object of type {type(data).__name__} not recognized" + ) + continue + self.logger.debug(f"Writing file {str(path)}") + _fn = Path(self.root) / path + if not _fn.parent.is_dir(): + _fn.parent.mkdir(parents=True) + + if path.name.endswith("csv"): + data.to_csv(_fn, **kwargs) + elif path.name.endswith("xlsx"): + data.to_excel(_fn, **kwargs) + + def _configwrite(self, fn): + """Write config to Delft-FIAT configuration toml file.""" + # Save the configuration file. + Config().save(self.config, Path(self.root).joinpath("settings.toml")) + \ No newline at end of file From 36211d499d187c9d67ec9a25c4ceb1755a70dbcf Mon Sep 17 00:00:00 2001 From: Mares2022 Date: Thu, 1 Jun 2023 15:48:02 +0200 Subject: [PATCH 2/4] Update hazard Update hazard --- fiat.py | 369 +++++++++++ hydromt_fiat/workflows/hazard.py | 723 ++++++++++++---------- tests/test_SVI.py | 2 +- tests/test_exposure.py | 2 +- tests/test_hazard.py | 2 +- tests/test_integration.py | 2 +- tests/test_integrations_hazard.py | 4 +- tests/test_raise_ground_floor_height.py | 2 +- tests/test_reader.py | 2 +- tests/test_setup_new_composite_areas.py | 2 +- tests/test_truncate_damage_function.py | 2 +- tests/test_update_max_potential_damage.py | 2 +- tests/test_vulnerability.py | 2 +- 13 files changed, 785 insertions(+), 331 deletions(-) create mode 100644 fiat.py diff --git a/fiat.py b/fiat.py new file mode 100644 index 00000000..6ddabd44 --- /dev/null +++ b/fiat.py @@ -0,0 +1,369 @@ +"""Implement fiat model class""" + +from hydromt_fiat.workflows.vulnerability import Vulnerability +from hydromt_fiat.workflows.hazard import Hazard +from hydromt.models.model_grid import GridModel +from hydromt_fiat.workflows.exposure_vector import ExposureVector +import logging +from hydromt_fiat.config import Config +import geopandas as gpd +import pandas as pd +import hydromt +from pathlib import Path + +from shapely.geometry import box +from typing import Union +from hydromt_fiat.workflows.social_vulnerability_index import SocialVulnerabilityIndex + + +from . import DATADIR + +__all__ = ["FiatModel"] + +_logger = logging.getLogger(__name__) + + +class FiatModel(GridModel): + """General and basic API for the FIAT model in hydroMT.""" + + _NAME = "fiat" + _CONF = "fiat_configuration.ini" + _GEOMS = {} # FIXME Mapping from hydromt names to model specific names + _MAPS = {} # FIXME Mapping from hydromt names to model specific names + _FOLDERS = ["hazard", "exposure", "vulnerability", "output"] + _DATADIR = DATADIR + + def __init__( + self, + root=None, + mode="w", + config_fn=None, + data_libs=None, + logger=_logger, + ): + super().__init__( + root=root, + mode=mode, + config_fn=config_fn, + data_libs=data_libs, + logger=logger, + ) + self.tables = [] # List of tables to write + self.exposure = None + + def setup_config(self, **kwargs): + """_summary_""" + # Setup config from HydroMT FIAT ini file + global_settings = {} + for k in kwargs.keys(): + global_settings[k] = kwargs[k] + + self.config["global"] = global_settings + + def setup_basemaps( + self, + region, + **kwargs, + ): + """Define the model domain that is used to clip the raster layers. + + Adds model layer: + + * **region** geom: A geometry with the nomenclature 'region'. + + Parameters + ---------- + region: dict + Dictionary describing region of interest, e.g. {'bbox': [xmin, ymin, xmax, ymax]}. See :py:meth:`~hydromt.workflows.parse_region()` for all options. + """ + + kind, region = hydromt.workflows.parse_region(region, logger=self.logger) + if kind == "bbox": + geom = gpd.GeoDataFrame(geometry=[box(*region["bbox"])], crs=4326) + elif kind == "grid": + geom = region["grid"].raster.box + elif kind == "geom": + geom = region["geom"] + else: + raise ValueError( + f"Unknown region kind {kind} for FIAT, expected one of ['bbox', 'grid', 'geom']." + ) + + # Set the model region geometry (to be accessed through the shortcut self.region). + self.set_geoms(geom, "region") + + def setup_vulnerability( + self, + vulnerability_source: str, + vulnerability_identifiers_and_linking: str, + unit: str, + ) -> None: + """Setup the vulnerability curves from various possible inputs. + + Parameters + ---------- + vulnerability_source : str + The (relative) path or ID from the data catalog to the source of the vulnerability functions. + vulnerability_identifiers_and_linking : str + The (relative) path to the table that links the vulnerability functions and exposure categories. + unit : str + The unit of the vulnerability functions. + """ + + if not Path(vulnerability_identifiers_and_linking): + logging.error( + f"Vulnerability identifiers and linking table does not exist at: {vulnerability_identifiers_and_linking}" + ) + + self.vulnerability = Vulnerability(unit, self.data_catalog) + vf_source_df = self.vulnerability.get_vulnerability_source(vulnerability_source) + self.vf_ids_and_linking_df = ( + self.vulnerability.get_vulnerability_identifiers_and_linking_source( + vulnerability_identifiers_and_linking + ) + ) + self.vulnerability.get_vulnerability_functions_from_one_file( + vf_source_df, self.vf_ids_and_linking_df + ) + + def setup_exposure_vector( + self, + asset_locations: str, + occupancy_type: str, + max_potential_damage: str, + ground_floor_height: Union[int, float, str, None], + ground_flood_height_unit: str, + ) -> None: + self.exposure = ExposureVector(self.data_catalog, self.region) + + if asset_locations == occupancy_type == max_potential_damage: + # The source for the asset locations, occupancy type and maximum potential + # damage is the same, use one source to create the exposure data. + self.exposure.setup_from_single_source(asset_locations, ground_floor_height) + + # Link the damage functions to assets + try: + assert not self.vf_ids_and_linking_df.empty + except AssertionError: + logging.error( + "Please call the 'setup_vulnerability' function before " + "the 'setup_exposure_vector' function. Error message: {e}" + ) + self.exposure.link_exposure_vulnerability(self.vf_ids_and_linking_df) + self.exposure.check_required_columns() + + def setup_exposure_raster(self): + NotImplemented + + def setup_hazard( + self, + map_fn: str, + map_type: str, + rp, + crs, + nodata, + var, + chunks, + risk_output: bool = True, + hazard_type: str = "flooding", + name_catalog: str = "flood_maps", + maps_id: str = "RP", + + ): + hazard = Hazard() + + hazard.checkInputs( + self, + map_fn=map_fn, + map_type=map_type, + chunks=chunks, + rp=rp, + crs=crs, + nodata=nodata, + var=var, + ) + + hazard.readMaps( + self, + name_catalog=name_catalog, + hazard_type=hazard_type, + risk_output=risk_output, + crs=crs, + nodata=nodata, + var=var, + chunks=chunks, + ) + + # Store the hazard settings. + hazard_settings = {} + hazard_maps = [] + for hazard_map in self.maps.keys(): + hazard_maps.append( + str(Path("hazard") / (self.maps[hazard_map].name + ".nc")) + ) + + hazard_settings["grid_file"] = hazard_maps + + if not isinstance(rp, list): + rp = "Event" + hazard_settings["return_period"] = rp + + hazard_settings["crs"] = hazard.crs + hazard_settings["spatial_reference"] = map_type + self.config["hazard"] = hazard_settings + + + + def setup_social_vulnerability_index( + self, census_key: str, path: str, state_abbreviation: str + ): + # Create SVI object + svi = SocialVulnerabilityIndex(self.data_catalog, self.config) + + # Call functionalities of SVI + svi.set_up_census_key(census_key) + svi.variable_code_csv_to_pd_df(path) + svi.set_up_download_codes() + svi.set_up_state_code(state_abbreviation) + svi.download_census_data() + svi.rename_census_data("Census_code_withE", "Census_variable_name") + svi.create_indicator_groups("Census_variable_name", "Indicator_code") + svi.processing_svi_data() + svi.normalization_svi_data() + svi.domain_scores() + svi.composite_scores() + + # TO DO: JOIN WITH GEOMETRIES. FOR MAPPING. + # this link can be used: https://github.com/datamade/census + + def read(self): + """Method to read the complete model schematization and configuration from file.""" + self.logger.info(f"Reading model data from {self.root}") + + # Read the configuration file + self.read_config(config_fn=str(Path(self.root).joinpath("settings.toml"))) + + # TODO: determine if it is required to read the hazard files + # hazard_maps = self.config["hazard"]["grid_file"] + # self.read_grid(fn="hazard/{name}.nc") + + # Read the exposure data + self.read_exposure(Path(self.root).joinpath("exposure", "exposure.csv")) + + # Read the vulnerability data + self.read_vulnerability( + Path(self.root).joinpath("vulnerability", "vulnerability_curves.csv") + ) + + def _configread(self, fn): + """Parse fiat configuration toml file to dict.""" + # TODO: update to FIAT toml file + + # Read the fiat configuration toml file. + config = Config() + return config.load_file(fn) + + def check_path_exists(self, fn): + """TODO: decide to use this or another function (check_file_exist in validation.py)""" + path = Path(fn) + self.logger.debug(f"Reading file {str(path.name)}") + if not fn.is_file(): + logging.warning(f"File {fn} does not exist!") + + def read_exposure(self, fn): + """_summary_""" + self.check_path_exists(fn) + self.exposure = ExposureVector(crs=self.config["exposure"]["crs"]) + self.exposure.read(fn) + + def read_vulnerability(self, fn): + self.check_path_exists(fn) + self.vulnerability = Vulnerability() + self.vulnerability.read(fn) + + def write(self): + """Method to write the complete model schematization and configuration to file.""" + self.logger.info(f"Writing model data to {self.root}") + + # Save the vulnerability and exposure data in the tables variable. + # Check if data exist + if self.vulnerability: + vulnerability_output_path = "./vulnerability/vulnerability_curves.csv" + self.tables.append( + ( + self.vulnerability.get_table(), + vulnerability_output_path, + {"index": False, "header": False}, + ) + ) + + # Store the vulnerability settings in the config file. + self.config["vulnerability"] = {"dbase_file": vulnerability_output_path} + + if self.exposure: + exposure_output_path = "./exposure/exposure.csv" + self.tables.append( + (self.exposure.exposure_db, exposure_output_path, {"index": False}) + ) + # self.tables.append( + # ( + # self.vulnerability.vulnerability, + # vulnerability_output_path, + # {"index": False, "header": False}, + # ) + # ) + # self.tables.append( + # (self.exposure.exposure_db, exposure_output_path, {"index": False}) + # ) + + # # Store the vulnerability settings in the config file. + # self.config["vulnerability"] = {"dbase_file": vulnerability_output_path} + + # Store the exposure settings in the config file. + self.config["exposure"] = { + "dbase_file": exposure_output_path, + "crs": self.exposure.crs, + } + # # Store the exposure settings in the config file. + # self.config["exposure"] = { + # "dbase_file": exposure_output_path, + # "crs": self.exposure.crs, + # } + + if self.config: # try to read default if not yet set + self.write_config() + if self.maps: + self.write_maps(fn="hazard/{name}.nc") + if self.geoms: + self.write_geoms(fn="exposure/{name}.geojson") + if self.tables: + self.write_tables() + + def write_tables(self) -> None: + if len(self.tables) == 0: + self.logger.debug("No table data found, skip writing.") + return + self._assert_write_mode + for data, path, kwargs in self.tables: + path = Path(path) + if not isinstance(data, (pd.DataFrame)) or len(data.index) == 0: + self.logger.warning( + f"{path.name} object of type {type(data).__name__} not recognized" + ) + continue + self.logger.debug(f"Writing file {str(path)}") + _fn = Path(self.root) / path + if not _fn.parent.is_dir(): + _fn.parent.mkdir(parents=True) + + if path.name.endswith("csv"): + data.to_csv(_fn, **kwargs) + elif path.name.endswith("xlsx"): + data.to_excel(_fn, **kwargs) + + def _configwrite(self, fn): + """Write config to Delft-FIAT configuration toml file.""" + # Save the configuration file. + Config().save(self.config, Path(self.root).joinpath("settings.toml")) + + \ No newline at end of file diff --git a/hydromt_fiat/workflows/hazard.py b/hydromt_fiat/workflows/hazard.py index 3354f6c1..30bb79ff 100644 --- a/hydromt_fiat/workflows/hazard.py +++ b/hydromt_fiat/workflows/hazard.py @@ -1,351 +1,436 @@ -"""Implement fiat model class""" - -from hydromt_fiat.workflows.vulnerability import Vulnerability -from hydromt_fiat.workflows.hazard import Hazard -from hydromt.models.model_grid import GridModel -from hydromt_fiat.workflows.exposure_vector import ExposureVector -import logging -from hydromt_fiat.config import Config -import geopandas as gpd -import pandas as pd -import hydromt +from hydromt_fiat.validation import Validation from pathlib import Path +import geopandas as gpd +from ast import literal_eval +import os +import xarray as xr -from shapely.geometry import box -from typing import Union -from hydromt_fiat.workflows.social_vulnerability_index import SocialVulnerabilityIndex - - -from . import DATADIR - -__all__ = ["FiatModel"] - -_logger = logging.getLogger(__name__) - - -class FiatModel(GridModel): - """General and basic API for the FIAT model in hydroMT.""" - _NAME = "fiat" - _CONF = "fiat_configuration.ini" - _GEOMS = {} # FIXME Mapping from hydromt names to model specific names - _MAPS = {} # FIXME Mapping from hydromt names to model specific names - _FOLDERS = ["hazard", "exposure", "vulnerability", "output"] - _DATADIR = DATADIR +class Hazard: + def __init__(self): + self.crs = "" + self.check = Validation() - def __init__( + def checkInputs( self, - root=None, - mode="w", - config_fn=None, - data_libs=None, - logger=_logger, + model_fiat, + map_fn, + map_type, + chunks, + rp, + crs, + nodata, + var, ): - super().__init__( - root=root, - mode=mode, - config_fn=config_fn, - data_libs=data_libs, - logger=logger, - ) - self.tables = [] # List of tables to write - self.exposure = None - - def setup_config(self, **kwargs): - """_summary_""" - # Setup config from HydroMT FIAT ini file - global_settings = {} - for k in kwargs.keys(): - global_settings[k] = kwargs[k] - - self.config["global"] = global_settings - def setup_basemaps( - self, - region, - **kwargs, - ): - """Define the model domain that is used to clip the raster layers. - - Adds model layer: - - * **region** geom: A geometry with the nomenclature 'region'. - - Parameters - ---------- - region: dict - Dictionary describing region of interest, e.g. {'bbox': [xmin, ymin, xmax, ymax]}. See :py:meth:`~hydromt.workflows.parse_region()` for all options. - """ - - kind, region = hydromt.workflows.parse_region(region, logger=self.logger) - if kind == "bbox": - geom = gpd.GeoDataFrame(geometry=[box(*region["bbox"])], crs=4326) - elif kind == "grid": - geom = region["grid"].raster.box - elif kind == "geom": - geom = region["geom"] - else: - raise ValueError( - f"Unknown region kind {kind} for FIAT, expected one of ['bbox', 'grid', 'geom']." + + # Checks the hazard input parameter types. + # Checks map path list + self.map_fn_lst = [map_fn] if isinstance(map_fn, (str, Path)) else map_fn + self.check.check_param_type(self.map_fn_lst, name="map_fn", types=(str, Path)) + # Checks map path list + self.map_type_lst = ( + [map_type] if isinstance(map_type, (str, Path)) else map_type + ) + self.check.check_param_type(self.map_type_lst, name="map_type", types=str) + if not len(self.map_type_lst) == 1 and not len(self.map_type_lst) == len( + self.map_fn_lst + ): + raise IndexError( + "The number of 'map_type' parameters should match with the number of " + "'map_fn' parameters." ) - - # Set the model region geometry (to be accessed through the shortcut self.region). - self.set_geoms(geom, "region") - - def setup_vulnerability( - self, - vulnerability_source: str, - vulnerability_identifiers_and_linking: str, - unit: str, - ) -> None: - """Setup the vulnerability curves from various possible inputs. - - Parameters - ---------- - vulnerability_source : str - The (relative) path or ID from the data catalog to the source of the vulnerability functions. - vulnerability_identifiers_and_linking : str - The (relative) path to the table that links the vulnerability functions and exposure categories. - unit : str - The unit of the vulnerability functions. - """ - - if not Path(vulnerability_identifiers_and_linking): - logging.error( - f"Vulnerability identifiers and linking table does not exist at: {vulnerability_identifiers_and_linking}" + # Checks the chunk list. The list must be equal to the number of maps. + if chunks != "auto": + self.chunks_lst = [chunks] if isinstance(chunks, (int, dict)) else chunks + self.check.check_param_type( + self.chunks_lst, name="chunks", types=(int, dict) ) - - self.vulnerability = Vulnerability(unit, self.data_catalog) - vf_source_df = self.vulnerability.get_vulnerability_source(vulnerability_source) - self.vf_ids_and_linking_df = ( - self.vulnerability.get_vulnerability_identifiers_and_linking_source( - vulnerability_identifiers_and_linking + if not len(self.chunks_lst) == 1 and not len(self.chunks_lst) == len( + self.map_fn_lst + ): + raise IndexError( + "The number of 'chunks' parameters should match with the number of " + "'map_fn' parameters." + ) + # Checks the return period list. The list must be equal to the number of maps. + if rp is not None: + self.rp_lst = [rp] if isinstance(rp, (int, float)) else rp + self.check.check_param_type(self.rp_lst, name="rp", types=(float, int)) + if not len(self.rp_lst) == len(self.map_fn_lst): + raise IndexError( + "The number of 'rp' parameters should match with the number of " + "'map_fn' parameters." + ) + # Checks the projection list + if crs is not None: + self.crs_lst = [str(crs)] if isinstance(crs, (int, str)) else crs + self.check.check_param_type(self.crs_lst, name="crs", types=(int, str)) + if not len(self.crs_lst) == 1 and not len(self.crs_lst) == len( + self.map_fn_lst + ): + raise IndexError( + "The number of 'crs' parameters should match with the number of " + "'map_fn' parameters." + ) + # Checks the no data list + if nodata is not None: + self.nodata_lst = [nodata] if isinstance(nodata, (float, int)) else nodata + self.check.check_param_type( + self.nodata_lst, name="nodata", types=(float, int) ) - ) - self.vulnerability.get_vulnerability_functions_from_one_file( - vf_source_df, self.vf_ids_and_linking_df - ) + if not len(self.nodata_lst) == 1 and not len(self.nodata_lst) == len( + self.map_fn_lst + ): + raise IndexError( + "The number of 'nodata' parameters should match with the number of " + "'map_fn' parameters." + ) + # Checks the var list + if var is not None: + self.var_lst = [var] if isinstance(var, str) else var + self.check.check_param_type(self.var_lst, name="var", types=str) + if not len(self.var_lst) == 1 and not len(self.var_lst) == len( + self.map_fn_lst + ): + raise IndexError( + "The number of 'var' parameters should match with the number of " + "'map_fn' parameters." + ) - def setup_exposure_vector( - self, - asset_locations: str, - occupancy_type: str, - max_potential_damage: str, - ground_floor_height: Union[int, float, str, None], - ground_flood_height_unit: str, - ) -> None: - self.exposure = ExposureVector(self.data_catalog, self.region) - - if asset_locations == occupancy_type == max_potential_damage: - # The source for the asset locations, occupancy type and maximum potential - # damage is the same, use one source to create the exposure data. - self.exposure.setup_from_single_source(asset_locations, ground_floor_height) - - # Link the damage functions to assets - try: - assert not self.vf_ids_and_linking_df.empty - except AssertionError: - logging.error( - "Please call the 'setup_vulnerability' function before " - "the 'setup_exposure_vector' function. Error message: {e}" - ) - self.exposure.link_exposure_vulnerability(self.vf_ids_and_linking_df) - self.exposure.check_required_columns() + # Check if the hazard input files exist. + self.check.check_file_exist( + model_fiat.root, param_lst=self.map_fn_lst, name="map_fn" + ) - def setup_exposure_raster(self): - NotImplemented - - def setup_hazard( + def readMaps( self, - map_fn: str, - map_type: str, - rp, + model_fiat, + name_catalog, + hazard_type, + risk_output, crs, nodata, var, chunks, - risk_output: bool = True, - hazard_type: str = "flooding", - name_catalog: str = "flood_maps", - maps_id: str = "RP", - + region=gpd.GeoDataFrame(), + **kwargs, ): - hazard = Hazard() - - hazard.checkInputs( - self, - map_fn=map_fn, - map_type=map_type, - chunks=chunks, - rp=rp, - crs=crs, - nodata=nodata, - var=var, - ) + # This list of names should be provided to have the maps in order + # catalog = model_fiat.data_catalog.get_rasterdataset(name_catalog) + # self.map_fn_lst = [i for i in list(catalog.variables) if maps_id in i] - hazard.readMaps( - self, - name_catalog=name_catalog, - hazard_type=hazard_type, - risk_output=risk_output, - crs=crs, - nodata=nodata, - var=var, - chunks=chunks, - ) - - # Store the hazard settings. - hazard_settings = {} - hazard_maps = [] - for hazard_map in self.maps.keys(): - hazard_maps.append( - str(Path("hazard") / (self.maps[hazard_map].name + ".nc")) - ) + # Read the hazard map(s) and add to config and staticmaps. + list_rp = [] + list_names = [] - hazard_settings["grid_file"] = hazard_maps + for idx, da_map_fn in enumerate(self.map_fn_lst): + # Check if it is a path or a name from the catalog - if not isinstance(rp, list): - rp = "Event" - hazard_settings["return_period"] = rp + if os.path.exists(da_map_fn): + da_map_fn = Path(da_map_fn) + da_name = da_map_fn.stem + da_suffix = da_map_fn.suffix + list_names.append(da_name) - hazard_settings["crs"] = hazard.crs - hazard_settings["spatial_reference"] = map_type - self.config["hazard"] = hazard_settings + else: + da_name = da_map_fn + list_names.append(da_name) + da_type = self.check.get_param( + self.map_type_lst, self.map_fn_lst, "hazard", da_name, idx, "map type" + ) + # Get the local hazard map. + kwargs.update(chunks=chunks if chunks == "auto" else self.chunks_lst[idx]) + + if "da_suffix" in locals() and da_suffix == ".nc": + if var is None: + raise ValueError( + "The 'var' parameter is required when reading NetCDF data." + ) + kwargs.update( + variables=self.check.get_param( + self.var_lst, + self.map_fn_lst, + "hazard", + da_name, + idx, + "NetCDF variable", + ) + ) - def setup_social_vulnerability_index( - self, census_key: str, path: str, state_abbreviation: str - ): - # Create SVI object - svi = SocialVulnerabilityIndex(self.data_catalog, self.config) - - # Call functionalities of SVI - svi.set_up_census_key(census_key) - svi.variable_code_csv_to_pd_df(path) - svi.set_up_download_codes() - svi.set_up_state_code(state_abbreviation) - svi.download_census_data() - svi.rename_census_data("Census_code_withE", "Census_variable_name") - svi.create_indicator_groups("Census_variable_name", "Indicator_code") - svi.processing_svi_data() - svi.normalization_svi_data() - svi.domain_scores() - svi.composite_scores() - - # TO DO: JOIN WITH GEOMETRIES. FOR MAPPING. - # this link can be used: https://github.com/datamade/census - - def read(self): - """Method to read the complete model schematization and configuration from file.""" - self.logger.info(f"Reading model data from {self.root}") - - # Read the configuration file - self.read_config(config_fn=str(Path(self.root).joinpath("settings.toml"))) - - # TODO: determine if it is required to read the hazard files - # hazard_maps = self.config["hazard"]["grid_file"] - # self.read_grid(fn="hazard/{name}.nc") - - # Read the exposure data - self.read_exposure(Path(self.root).joinpath("exposure", "exposure.csv")) - - # Read the vulnerability data - self.read_vulnerability( - Path(self.root).joinpath("vulnerability", "vulnerability_curves.csv") - ) + if os.path.exists(da_map_fn): + if da_map_fn.stem == "sfincs_map": + #da = model_fiat.data_catalog.get_rasterdataset(da_map_fn) The file cannot be directly read by HydroMT + + ds_map = xr.open_dataset(da_map_fn) + da = ds_map[kwargs["variables"]].squeeze(dim="timemax").drop_vars("timemax") + da.raster.set_crs(ds_map.crs.epsg_code) + da.raster.set_nodata(nodata=ds_map.encoding.get("_FillValue")) + da.reset_coords(['spatial_ref'], drop=False) + da.encoding["_FillValue"] = None + #da_name = kwargs["variables"] + else: + if not region.empty: + da = model_fiat.data_catalog.get_rasterdataset( + da_map_fn, geom=region, **kwargs + ) + else: + da = model_fiat.data_catalog.get_rasterdataset(da_map_fn, **kwargs) + else: + if not region.empty: + da = model_fiat.data_catalog.get_rasterdataset( + name_catalog, variables=da_name, geom=region + ) + else: + da = model_fiat.data_catalog.get_rasterdataset( + name_catalog, variables=da_name + ) + + # Set (if necessary) the coordinate reference system. + # if crs is not None and not da.raster.crs.is_epsg_code: + # if crs is not None and not da.raster.crs: + # Change to be defined by the user + if crs is not None: + da_crs = self.check.get_param( + self.crs_lst, + self.map_fn_lst, + "hazard", + da_name, + idx, + "coordinate reference system", + ) + da_crs_str = da_crs if "EPSG" in da_crs else f"EPSG:{da_crs}" + da.raster.set_crs(da_crs_str) + # elif crs is None and not da.raster.crs.is_epsg_code: + elif crs is None and not da.raster.crs: + raise ValueError( + "The hazard map has no coordinate reference system assigned." + ) - def _configread(self, fn): - """Parse fiat configuration toml file to dict.""" - # Read the fiat configuration toml file. - config = Config() - return config.load_file(fn) - - def check_path_exists(self, fn): - """TODO: decide to use this or another function (check_file_exist in validation.py)""" - path = Path(fn) - self.logger.debug(f"Reading file {str(path.name)}") - if not fn.is_file(): - logging.warning(f"File {fn} does not exist!") - - def read_exposure(self, fn): - """_summary_""" - self.check_path_exists(fn) - self.exposure = ExposureVector(crs=self.config["exposure"]["crs"]) - self.exposure.read(fn) - - def read_vulnerability(self, fn): - self.check_path_exists(fn) - self.vulnerability = Vulnerability() - self.vulnerability.read(fn) - - def write(self): - """Method to write the complete model schematization and configuration to file.""" - self.logger.info(f"Writing model data to {self.root}") - - # Save the vulnerability and exposure data in the tables variable. - # Check if data exist - if self.vulnerability: - vulnerability_output_path = "./vulnerability/vulnerability_curves.csv" - self.tables.append( - ( - self.vulnerability.get_table(), - vulnerability_output_path, - {"index": False, "header": False}, + # Set (if necessary) and mask the nodata value. + if nodata is not None: + da_nodata = self.check.get_param( + self.nodata_lst, self.map_fn_lst, "hazard", da_name, idx, "nodata" + ) + da.raster.set_nodata(nodata=da_nodata) + elif nodata is None and da.raster.nodata is None: + raise ValueError("The hazard map has no nodata value assigned.") + + # Correct (if necessary) the grid orientation from the lower to the upper left corner. + if da_name != "sfincs_map": #This check could not be implemented into the sfincs_map outputs + if da.raster.res[1] > 0: + da = da.reindex( + {da.raster.y_dim: list(reversed(da.raster.ycoords))} + ) + + # Check if the obtained hazard map is identical. + if ( + model_fiat.staticmaps + and not model_fiat.staticmaps.raster.identical_grid(da) + ): + raise ValueError("The hazard maps should have identical grids.") + + # Get the return period input parameter. + da_rp = ( + self.check.get_param( + self.rp_lst, + self.map_fn_lst, + "hazard", + da_name, + idx, + "return period", ) + if hasattr(self, "rp_lst ") + else None ) - # Store the vulnerability settings in the config file. - self.config["vulnerability"] = {"dbase_file": vulnerability_output_path} - - if self.exposure: - exposure_output_path = "./exposure/exposure.csv" - self.tables.append( - (self.exposure.exposure_db, exposure_output_path, {"index": False}) + if risk_output: + da = da.expand_dims({'rp': [da_rp]}, axis=0) + + if risk_output and da_rp is None: + # Get (if possible) the return period from dataset names if the input parameter is None. + if "rp" in da_name.lower(): + + def fstrip(x): + return x in "0123456789." + + rp_str = "".join( + filter(fstrip, da_name.lower().split("rp")[-1]) + ).lstrip("0") + + try: + assert isinstance( + literal_eval(rp_str) if rp_str else None, (int, float) + ) + da_rp = literal_eval(rp_str) + list_rp.append(da_rp) + + except AssertionError: + raise ValueError( + f"Could not derive the return period for hazard map: {da_name}." + ) + else: + raise ValueError( + "The hazard map must contain a return period in order to conduct a risk calculation." + ) + + # Add the hazard map to config and staticmaps. + self.check.check_uniqueness( + model_fiat, + "hazard", + da_type, + da_name, + { + "usage": True, + "map_fn": da_map_fn, + "map_type": da_type, + "rp": da_rp, + "crs": da.raster.crs, + "nodata": da.raster.nodata, + # "var": None if "var_lst" not in locals() else self.var_lst[idx], + "var": None if not hasattr(self, "var_lst") else self.var_lst[idx], + "chunks": "auto" if chunks == "auto" else self.chunks_lst[idx], + }, + file_type="hazard", + filename=da_name, ) - # Store the exposure settings in the config file. - self.config["exposure"] = [ + model_fiat.set_config( + "hazard", + # da_type, + da_name, { - "type": "vector", - "dbase_file": exposure_output_path, - "crs": self.exposure.crs, - } - ] - - if self.config: # try to read default if not yet set - self.write_config() - if self.maps: - self.write_maps(fn="hazard/{name}.nc") - if self.geoms: - self.write_geoms(fn="exposure/{name}.geojson") - if self.tables: - self.write_tables() - - def write_tables(self) -> None: - if len(self.tables) == 0: - self.logger.debug("No table data found, skip writing.") - return - self._assert_write_mode - for data, path, kwargs in self.tables: - path = Path(path) - if not isinstance(data, (pd.DataFrame)) or len(data.index) == 0: - self.logger.warning( - f"{path.name} object of type {type(data).__name__} not recognized" - ) - continue - self.logger.debug(f"Writing file {str(path)}") - _fn = Path(self.root) / path - if not _fn.parent.is_dir(): - _fn.parent.mkdir(parents=True) - - if path.name.endswith("csv"): - data.to_csv(_fn, **kwargs) - elif path.name.endswith("xlsx"): - data.to_excel(_fn, **kwargs) - - def _configwrite(self, fn): - """Write config to Delft-FIAT configuration toml file.""" - # Save the configuration file. - Config().save(self.config, Path(self.root).joinpath("settings.toml")) - \ No newline at end of file + "usage": "True", + "map_fn": da_map_fn, + "map_type": da_type, + "rp": da_rp, + "crs": da.raster.crs, + "nodata": da.raster.nodata, + # "var": None if "var_lst" not in locals() else self.var_lst[idx], + "var": None if not hasattr(self, "var_lst") else self.var_lst[idx], + "chunks": "auto" if chunks == "auto" else self.chunks_lst[idx], + }, + ) + + # extra functionalities + # da = da.rename(f"map_{da_rp}") + # da.attrs = { + # "usage": "True", + # "map_fn": da_map_fn, + # "map_type": da_type, + # "rp": da_rp, + # "crs": da.raster.crs, + # "nodata": da.raster.nodata, + # # "var": None if "var_lst" not in locals() else self.var_lst[idx], + # "var": None if not hasattr(self, "var_lst") else self.var_lst[idx], + # "chunks": "auto" if chunks == "auto" else self.chunks_lst[idx], + # } + + + + model_fiat.set_maps(da, da_name) + # post = f"(rp {da_rp})" if rp is not None and risk_output else "" + post = f"(rp {da_rp})" if risk_output else "" + model_fiat.logger.info(f"Added {hazard_type} hazard map: {da_name} {post}") + + # new_da = xr.concat([new_da, da], dim='rp') + + # import numpy as np + # new_da = xr.DataArray(np.zeros_like(da), dims=da.dims, coords=da.coords).rename('new_dataframe') + + + if risk_output: + maps = model_fiat.maps + list_keys = list(maps.keys()) + maps_0 = maps[list_keys[0]].rename('risk') + list_keys.pop(0) + + for idx, x in enumerate(list_keys): + key_name = list_keys[idx] + layer = maps[key_name] + maps_0 = xr.concat([maps_0, layer], dim='rp') + + new_da = maps_0.to_dataset(name='RISK') + new_da.attrs = { "returnperiod": list(list_rp), + "type":self.map_type_lst, + 'name':list_names, + "Analysis": "Risk"} + + model_fiat.hazard = new_da + model_fiat.set_maps(model_fiat.hazard, 'HydroMT_Fiat_hazard') + + list_maps = list(model_fiat.maps.keys()) + + if risk_output: + for item in list_maps[:-1]: + model_fiat.maps.pop(item) + + # else: + # model_fiat.hazard = new_da + + + # # model_fiat.maps.to_netcdf("test_hazard_1/test.nc") + + # #model_fiat.set_maps(catalog, "all") + # maps = model_fiat.maps + # list_keys = list(maps.keys()) + # maps_0 = maps[list_keys[0]].rename('risk') + # list_keys.pop(0) + + # for idx, x in enumerate(list_keys): + # key_name = list_keys[idx] + # layer = maps[key_name] + # maps_0 = xr.concat([maps_0, layer], dim='rp') + + # if not risk_output: + # # new_da = maps_0.drop_dims('rp') + # new_da = maps_0.to_dataset(name='EVENT') + # new_da.attrs = { "returnperiod": list(list_rp), + # "type":self.map_type_lst, + # 'name':list_names, + # "Analysis": "Event base"} + #TODO: A netcdf is not required when working with evetns + # else: + # new_da = maps_0.to_dataset(name='RISK') + # new_da.attrs = { "returnperiod": list(list_rp), + # "type":self.map_type_lst, + # 'name':list_names, + # "Analysis": "Risk"} + + # if risk_output: + # model_fiat.hazard = new_da + # model_fiat.set_maps(model_fiat.hazard, 'HydroMT_Fiat_hazard') + # else: + # model_fiat.hazard = new_da + + # list_maps = list(model_fiat.maps.keys()) + + # if risk_output: + # for item in list_maps[:-1]: + # model_fiat.maps.pop(item) + + + # ds = xr.Dataset(maps) + # ds.raster.set_crs(da.raster.crs) + + # # new_da= ds.to_array(dim='rp') + + # # new_da = new_da.to_dataset(name='Full_stack') + # # new_da.attrs = { "returnperiod": list_rp, + # # "type":self.map_type_lst, + # # 'name':list_names} + + # # for var_name, var_data in ds.variables.items(): + # # new_da = xr.concat([new_da, ds[var_name]], dim='rp') + + # for layer_name in ds: + # layer = ds[layer_name] + # new_da = xr.concat([new_da, layer], dim='rp') + + # # new_da = new_da.to_dataset() + # # config = model_fiat.config + # # new_da.to_netcdf("P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database/test_hazard_1/hazard/test_final_v2.nc") + # #C:\Users\fuentesm\CISNE\HydroMT_sprint_sessions + return model_fiat.maps \ No newline at end of file diff --git a/tests/test_SVI.py b/tests/test_SVI.py index 930509ee..8afd3ee7 100644 --- a/tests/test_SVI.py +++ b/tests/test_SVI.py @@ -4,7 +4,7 @@ import pytest -EXAMPLEDIR = Path().absolute() / "local_test_database" +EXAMPLEDIR = Path("P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database") _cases = { "fiat_objects": { "folder": "test_hazard", diff --git a/tests/test_exposure.py b/tests/test_exposure.py index 6e8d07ad..ba41f34e 100644 --- a/tests/test_exposure.py +++ b/tests/test_exposure.py @@ -5,7 +5,7 @@ import shutil -EXAMPLEDIR = Path().absolute() / "local_test_database" +EXAMPLEDIR = Path("P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database") _cases = { "vulnerability_and_exposure": { diff --git a/tests/test_hazard.py b/tests/test_hazard.py index 78a4f578..d1dfead4 100644 --- a/tests/test_hazard.py +++ b/tests/test_hazard.py @@ -17,7 +17,7 @@ "fiat_objects": { "folder" : "test_hazard_1", "ini" : "test_hazard.ini", - "catalog" : "fiat_catalog_hazard.yml", + "catalog" : "fiat_catalog.yml", } } diff --git a/tests/test_integration.py b/tests/test_integration.py index f5d0f94f..6f49dd2a 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -4,7 +4,7 @@ import pytest import shutil -EXAMPLEDIR = Path().absolute() / "local_test_database" +EXAMPLEDIR = Path("P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database") _cases = { "integration": { diff --git a/tests/test_integrations_hazard.py b/tests/test_integrations_hazard.py index 9c1e0007..d6adfccf 100644 --- a/tests/test_integrations_hazard.py +++ b/tests/test_integrations_hazard.py @@ -8,8 +8,8 @@ _cases = { "integration": { - "data_catalogue": EXAMPLEDIR / "fiat_catalog_hazard.yml", - "dir": "test_hazard", + "data_catalogue": EXAMPLEDIR / "fiat_catalog.yml", + "dir": "test_hazard_integration", "ini": EXAMPLEDIR / "test_hazard.ini", }, } diff --git a/tests/test_raise_ground_floor_height.py b/tests/test_raise_ground_floor_height.py index 4ebaa697..5c0c9d9d 100644 --- a/tests/test_raise_ground_floor_height.py +++ b/tests/test_raise_ground_floor_height.py @@ -3,7 +3,7 @@ import pytest import shutil -EXAMPLEDIR = Path().absolute() / "local_test_database" +EXAMPLEDIR = Path("P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database") _cases = { "raise_ground_floor_height": { diff --git a/tests/test_reader.py b/tests/test_reader.py index afd7a62d..1a2d00e3 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -3,7 +3,7 @@ import pytest -EXAMPLEDIR = Path().absolute() / "local_test_database" +EXAMPLEDIR = Path("P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database") _cases = { "read": { diff --git a/tests/test_setup_new_composite_areas.py b/tests/test_setup_new_composite_areas.py index 5c13b9c7..1122ba72 100644 --- a/tests/test_setup_new_composite_areas.py +++ b/tests/test_setup_new_composite_areas.py @@ -3,7 +3,7 @@ import pytest import shutil -EXAMPLEDIR = Path().absolute() / "local_test_database" +EXAMPLEDIR = Path("P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database") _cases = { "setup_new_composite_area_datum": { diff --git a/tests/test_truncate_damage_function.py b/tests/test_truncate_damage_function.py index 2ccc152f..81e344d8 100644 --- a/tests/test_truncate_damage_function.py +++ b/tests/test_truncate_damage_function.py @@ -3,7 +3,7 @@ import pytest import shutil -EXAMPLEDIR = Path().absolute() / "local_test_database" +EXAMPLEDIR = Path("P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database") _cases = { "truncate_damage_function": { diff --git a/tests/test_update_max_potential_damage.py b/tests/test_update_max_potential_damage.py index 38303a0a..60166628 100644 --- a/tests/test_update_max_potential_damage.py +++ b/tests/test_update_max_potential_damage.py @@ -3,7 +3,7 @@ import pytest import shutil -EXAMPLEDIR = Path().absolute() / "local_test_database" +EXAMPLEDIR = Path("P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database") _cases = { "update_max_potential_damage": { diff --git a/tests/test_vulnerability.py b/tests/test_vulnerability.py index 41cfae3c..776b0cd5 100644 --- a/tests/test_vulnerability.py +++ b/tests/test_vulnerability.py @@ -4,7 +4,7 @@ import pytest import shutil -EXAMPLEDIR = Path().absolute() / "local_test_database" +EXAMPLEDIR = Path("P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database") _cases = { "vulnerability": { From 4dfdb6dc72ffac966e0e7f6ebca57a21dea89b37 Mon Sep 17 00:00:00 2001 From: Mares2022 Date: Thu, 1 Jun 2023 15:56:32 +0200 Subject: [PATCH 3/4] Modifying fiat component of hazard Modifying fiat component of hazard --- hydromt_fiat/fiat.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/hydromt_fiat/fiat.py b/hydromt_fiat/fiat.py index 64b183b5..86287fa8 100644 --- a/hydromt_fiat/fiat.py +++ b/hydromt_fiat/fiat.py @@ -157,17 +157,17 @@ def setup_exposure_raster(self): def setup_hazard( self, - map_fn: str, - map_type: str, - rp, - crs, - nodata, - var, - chunks, - risk_output: bool = True, - hazard_type: str = "flooding", + map_fn: Union[str, list[str]], + map_type: Union[str, list[str]], + rp: Union[int, list[int], None] = None , + crs: Union[str, list[str], None] = None, + nodata: Union[int, list[int]] = None, + var: Union[str, list[str], None] = None, + chunks: Union[str, int, list[int]] = 'auto', + risk_output: bool = True, + hazard_type: str = "flooding", name_catalog: str = "flood_maps", - maps_id: str = "RP", + maps_id: str = "RP", ): hazard = Hazard() @@ -199,18 +199,19 @@ def setup_hazard( hazard_maps = [] for hazard_map in self.maps.keys(): hazard_maps.append( - str(Path("hazard") / (self.maps[hazard_map].name + ".nc")) + str(Path("hazard") / (hazard_map + ".nc")) ) hazard_settings["grid_file"] = hazard_maps - + hazard_settings["crs"] = hazard.crs if not isinstance(rp, list): rp = "Event" hazard_settings["return_period"] = rp - - hazard_settings["crs"] = hazard.crs - hazard_settings["spatial_reference"] = map_type - self.config["hazard"] = hazard_settings + if map_type == "water_depth": + hazard_settings["spatial_reference"] = "DEM" + # else: + # hazard_settings["spatial_reference"] = "DATUM" + self.config["hazard"] = hazard_settings From 9b18b26a7b9eb50dd4101914aba2ba318b1dedeb Mon Sep 17 00:00:00 2001 From: Mares2022 Date: Thu, 1 Jun 2023 16:12:34 +0200 Subject: [PATCH 4/4] Remove extra fiat file Remove extra fiat file --- fiat.py | 369 -------------------------------------------------------- 1 file changed, 369 deletions(-) delete mode 100644 fiat.py diff --git a/fiat.py b/fiat.py deleted file mode 100644 index 6ddabd44..00000000 --- a/fiat.py +++ /dev/null @@ -1,369 +0,0 @@ -"""Implement fiat model class""" - -from hydromt_fiat.workflows.vulnerability import Vulnerability -from hydromt_fiat.workflows.hazard import Hazard -from hydromt.models.model_grid import GridModel -from hydromt_fiat.workflows.exposure_vector import ExposureVector -import logging -from hydromt_fiat.config import Config -import geopandas as gpd -import pandas as pd -import hydromt -from pathlib import Path - -from shapely.geometry import box -from typing import Union -from hydromt_fiat.workflows.social_vulnerability_index import SocialVulnerabilityIndex - - -from . import DATADIR - -__all__ = ["FiatModel"] - -_logger = logging.getLogger(__name__) - - -class FiatModel(GridModel): - """General and basic API for the FIAT model in hydroMT.""" - - _NAME = "fiat" - _CONF = "fiat_configuration.ini" - _GEOMS = {} # FIXME Mapping from hydromt names to model specific names - _MAPS = {} # FIXME Mapping from hydromt names to model specific names - _FOLDERS = ["hazard", "exposure", "vulnerability", "output"] - _DATADIR = DATADIR - - def __init__( - self, - root=None, - mode="w", - config_fn=None, - data_libs=None, - logger=_logger, - ): - super().__init__( - root=root, - mode=mode, - config_fn=config_fn, - data_libs=data_libs, - logger=logger, - ) - self.tables = [] # List of tables to write - self.exposure = None - - def setup_config(self, **kwargs): - """_summary_""" - # Setup config from HydroMT FIAT ini file - global_settings = {} - for k in kwargs.keys(): - global_settings[k] = kwargs[k] - - self.config["global"] = global_settings - - def setup_basemaps( - self, - region, - **kwargs, - ): - """Define the model domain that is used to clip the raster layers. - - Adds model layer: - - * **region** geom: A geometry with the nomenclature 'region'. - - Parameters - ---------- - region: dict - Dictionary describing region of interest, e.g. {'bbox': [xmin, ymin, xmax, ymax]}. See :py:meth:`~hydromt.workflows.parse_region()` for all options. - """ - - kind, region = hydromt.workflows.parse_region(region, logger=self.logger) - if kind == "bbox": - geom = gpd.GeoDataFrame(geometry=[box(*region["bbox"])], crs=4326) - elif kind == "grid": - geom = region["grid"].raster.box - elif kind == "geom": - geom = region["geom"] - else: - raise ValueError( - f"Unknown region kind {kind} for FIAT, expected one of ['bbox', 'grid', 'geom']." - ) - - # Set the model region geometry (to be accessed through the shortcut self.region). - self.set_geoms(geom, "region") - - def setup_vulnerability( - self, - vulnerability_source: str, - vulnerability_identifiers_and_linking: str, - unit: str, - ) -> None: - """Setup the vulnerability curves from various possible inputs. - - Parameters - ---------- - vulnerability_source : str - The (relative) path or ID from the data catalog to the source of the vulnerability functions. - vulnerability_identifiers_and_linking : str - The (relative) path to the table that links the vulnerability functions and exposure categories. - unit : str - The unit of the vulnerability functions. - """ - - if not Path(vulnerability_identifiers_and_linking): - logging.error( - f"Vulnerability identifiers and linking table does not exist at: {vulnerability_identifiers_and_linking}" - ) - - self.vulnerability = Vulnerability(unit, self.data_catalog) - vf_source_df = self.vulnerability.get_vulnerability_source(vulnerability_source) - self.vf_ids_and_linking_df = ( - self.vulnerability.get_vulnerability_identifiers_and_linking_source( - vulnerability_identifiers_and_linking - ) - ) - self.vulnerability.get_vulnerability_functions_from_one_file( - vf_source_df, self.vf_ids_and_linking_df - ) - - def setup_exposure_vector( - self, - asset_locations: str, - occupancy_type: str, - max_potential_damage: str, - ground_floor_height: Union[int, float, str, None], - ground_flood_height_unit: str, - ) -> None: - self.exposure = ExposureVector(self.data_catalog, self.region) - - if asset_locations == occupancy_type == max_potential_damage: - # The source for the asset locations, occupancy type and maximum potential - # damage is the same, use one source to create the exposure data. - self.exposure.setup_from_single_source(asset_locations, ground_floor_height) - - # Link the damage functions to assets - try: - assert not self.vf_ids_and_linking_df.empty - except AssertionError: - logging.error( - "Please call the 'setup_vulnerability' function before " - "the 'setup_exposure_vector' function. Error message: {e}" - ) - self.exposure.link_exposure_vulnerability(self.vf_ids_and_linking_df) - self.exposure.check_required_columns() - - def setup_exposure_raster(self): - NotImplemented - - def setup_hazard( - self, - map_fn: str, - map_type: str, - rp, - crs, - nodata, - var, - chunks, - risk_output: bool = True, - hazard_type: str = "flooding", - name_catalog: str = "flood_maps", - maps_id: str = "RP", - - ): - hazard = Hazard() - - hazard.checkInputs( - self, - map_fn=map_fn, - map_type=map_type, - chunks=chunks, - rp=rp, - crs=crs, - nodata=nodata, - var=var, - ) - - hazard.readMaps( - self, - name_catalog=name_catalog, - hazard_type=hazard_type, - risk_output=risk_output, - crs=crs, - nodata=nodata, - var=var, - chunks=chunks, - ) - - # Store the hazard settings. - hazard_settings = {} - hazard_maps = [] - for hazard_map in self.maps.keys(): - hazard_maps.append( - str(Path("hazard") / (self.maps[hazard_map].name + ".nc")) - ) - - hazard_settings["grid_file"] = hazard_maps - - if not isinstance(rp, list): - rp = "Event" - hazard_settings["return_period"] = rp - - hazard_settings["crs"] = hazard.crs - hazard_settings["spatial_reference"] = map_type - self.config["hazard"] = hazard_settings - - - - def setup_social_vulnerability_index( - self, census_key: str, path: str, state_abbreviation: str - ): - # Create SVI object - svi = SocialVulnerabilityIndex(self.data_catalog, self.config) - - # Call functionalities of SVI - svi.set_up_census_key(census_key) - svi.variable_code_csv_to_pd_df(path) - svi.set_up_download_codes() - svi.set_up_state_code(state_abbreviation) - svi.download_census_data() - svi.rename_census_data("Census_code_withE", "Census_variable_name") - svi.create_indicator_groups("Census_variable_name", "Indicator_code") - svi.processing_svi_data() - svi.normalization_svi_data() - svi.domain_scores() - svi.composite_scores() - - # TO DO: JOIN WITH GEOMETRIES. FOR MAPPING. - # this link can be used: https://github.com/datamade/census - - def read(self): - """Method to read the complete model schematization and configuration from file.""" - self.logger.info(f"Reading model data from {self.root}") - - # Read the configuration file - self.read_config(config_fn=str(Path(self.root).joinpath("settings.toml"))) - - # TODO: determine if it is required to read the hazard files - # hazard_maps = self.config["hazard"]["grid_file"] - # self.read_grid(fn="hazard/{name}.nc") - - # Read the exposure data - self.read_exposure(Path(self.root).joinpath("exposure", "exposure.csv")) - - # Read the vulnerability data - self.read_vulnerability( - Path(self.root).joinpath("vulnerability", "vulnerability_curves.csv") - ) - - def _configread(self, fn): - """Parse fiat configuration toml file to dict.""" - # TODO: update to FIAT toml file - - # Read the fiat configuration toml file. - config = Config() - return config.load_file(fn) - - def check_path_exists(self, fn): - """TODO: decide to use this or another function (check_file_exist in validation.py)""" - path = Path(fn) - self.logger.debug(f"Reading file {str(path.name)}") - if not fn.is_file(): - logging.warning(f"File {fn} does not exist!") - - def read_exposure(self, fn): - """_summary_""" - self.check_path_exists(fn) - self.exposure = ExposureVector(crs=self.config["exposure"]["crs"]) - self.exposure.read(fn) - - def read_vulnerability(self, fn): - self.check_path_exists(fn) - self.vulnerability = Vulnerability() - self.vulnerability.read(fn) - - def write(self): - """Method to write the complete model schematization and configuration to file.""" - self.logger.info(f"Writing model data to {self.root}") - - # Save the vulnerability and exposure data in the tables variable. - # Check if data exist - if self.vulnerability: - vulnerability_output_path = "./vulnerability/vulnerability_curves.csv" - self.tables.append( - ( - self.vulnerability.get_table(), - vulnerability_output_path, - {"index": False, "header": False}, - ) - ) - - # Store the vulnerability settings in the config file. - self.config["vulnerability"] = {"dbase_file": vulnerability_output_path} - - if self.exposure: - exposure_output_path = "./exposure/exposure.csv" - self.tables.append( - (self.exposure.exposure_db, exposure_output_path, {"index": False}) - ) - # self.tables.append( - # ( - # self.vulnerability.vulnerability, - # vulnerability_output_path, - # {"index": False, "header": False}, - # ) - # ) - # self.tables.append( - # (self.exposure.exposure_db, exposure_output_path, {"index": False}) - # ) - - # # Store the vulnerability settings in the config file. - # self.config["vulnerability"] = {"dbase_file": vulnerability_output_path} - - # Store the exposure settings in the config file. - self.config["exposure"] = { - "dbase_file": exposure_output_path, - "crs": self.exposure.crs, - } - # # Store the exposure settings in the config file. - # self.config["exposure"] = { - # "dbase_file": exposure_output_path, - # "crs": self.exposure.crs, - # } - - if self.config: # try to read default if not yet set - self.write_config() - if self.maps: - self.write_maps(fn="hazard/{name}.nc") - if self.geoms: - self.write_geoms(fn="exposure/{name}.geojson") - if self.tables: - self.write_tables() - - def write_tables(self) -> None: - if len(self.tables) == 0: - self.logger.debug("No table data found, skip writing.") - return - self._assert_write_mode - for data, path, kwargs in self.tables: - path = Path(path) - if not isinstance(data, (pd.DataFrame)) or len(data.index) == 0: - self.logger.warning( - f"{path.name} object of type {type(data).__name__} not recognized" - ) - continue - self.logger.debug(f"Writing file {str(path)}") - _fn = Path(self.root) / path - if not _fn.parent.is_dir(): - _fn.parent.mkdir(parents=True) - - if path.name.endswith("csv"): - data.to_csv(_fn, **kwargs) - elif path.name.endswith("xlsx"): - data.to_excel(_fn, **kwargs) - - def _configwrite(self, fn): - """Write config to Delft-FIAT configuration toml file.""" - # Save the configuration file. - Config().save(self.config, Path(self.root).joinpath("settings.toml")) - - \ No newline at end of file