From 6e9e03c079e7ecc991ab0b22c8e5ee955e739aca Mon Sep 17 00:00:00 2001 From: Henrik Andersson Date: Fri, 22 Mar 2024 08:31:17 +0100 Subject: [PATCH 1/3] Minimize passing and returning None and Dfsu refactor --- mikeio/dataset/_dataarray.py | 12 +- mikeio/dfsu/_dfsu.py | 231 +++++++++---------------- mikeio/dfsu/_layered.py | 36 ++-- mikeio/dfsu/_spectral.py | 42 +++++ mikeio/pfs/_pfsdocument.py | 24 +-- mikeio/spatial/_FM_geometry_layered.py | 6 +- mikeio/spatial/_grid_geometry.py | 4 +- 7 files changed, 171 insertions(+), 184 deletions(-) diff --git a/mikeio/dataset/_dataarray.py b/mikeio/dataset/_dataarray.py index 9d669126c..f851b9dae 100644 --- a/mikeio/dataset/_dataarray.py +++ b/mikeio/dataset/_dataarray.py @@ -837,8 +837,16 @@ def sel( def _sel_with_slice(self, kwargs: Mapping[str, slice]) -> "DataArray": for k, v in kwargs.items(): if isinstance(v, slice): - idx_start = self.geometry.find_index(**{k: v.start}) - idx_stop = self.geometry.find_index(**{k: v.stop}) + idx_start = ( + self.geometry.find_index(**{k: v.start}) + if v.start is not None + else None + ) + idx_stop = ( + self.geometry.find_index(**{k: v.stop}) + if v.stop is not None + else None + ) pos = 0 if isinstance(idx_start, tuple): if k == "x": diff --git a/mikeio/dfsu/_dfsu.py b/mikeio/dfsu/_dfsu.py index 008c3006b..81cb47b9d 100644 --- a/mikeio/dfsu/_dfsu.py +++ b/mikeio/dfsu/_dfsu.py @@ -351,7 +351,59 @@ def end_time(self) -> pd.Timestamp: def time(self) -> pd.DatetimeIndex: return self._time - def _read( + def _validate_elements_and_geometry_sel(self, elements: Any, **kwargs: Any) -> None: + """Check that only one of elements, area, x, y is selected""" + used_kwargs = [] + for kw, val in kwargs.items(): + if val is not None: + used_kwargs.append(kw) + + if elements is not None: + for kw in used_kwargs: + raise ValueError(f"Cannot select both {kw} and elements!") + + if "area" in used_kwargs and ("x" in used_kwargs or "y" in used_kwargs): + raise ValueError("Cannot select both x,y and area!") + + def to_mesh(self, outfilename): + """write object to mesh file + + Parameters + ---------- + outfilename : str + path to file to be written + """ + self.geometry.geometry2d.to_mesh(outfilename) + + +class Dfsu2DH(_Dfsu): + + def __init__(self, filename: str | Path) -> None: + super().__init__(filename) + self._geometry = self._read_geometry(self._filename) + + @staticmethod + def _read_geometry(filename: str) -> GeometryFM2D: + dfs = DfsuFile.Open(filename) + dfsu_type = DfsuFileType(dfs.DfsuFileType) + + node_table = get_nodes_from_source(dfs) + el_table = get_elements_from_source(dfs) + + geometry = GeometryFM2D( + node_coordinates=node_table.coordinates, + element_table=el_table.connectivity, + codes=node_table.codes, + projection=dfs.Projection.WKTString, + dfsu_type=dfsu_type, + element_ids=el_table.ids, + node_ids=node_table.ids, + validate=False, + ) + dfs.Close() + return geometry + + def read( self, *, items: str | int | Sequence[str | int] | None = None, @@ -365,16 +417,42 @@ def _read( error_bad_data: bool = True, fill_bad_data_value: float = np.nan, ) -> Dataset: + """ + Read data from a dfsu file + + Parameters + --------- + items: list[int] or list[str], optional + Read only selected items, by number (0-based), or by name + time: int, str, datetime, pd.TimeStamp, sequence, slice or pd.DatetimeIndex, optional + Read only selected time steps, by default None (=all) + keepdims: bool, optional + When reading a single time step only, should the time-dimension be kept + in the returned Dataset? by default: False + area: list[float], optional + Read only data inside (horizontal) area given as a + bounding box (tuple with left, lower, right, upper) + or as list of coordinates for a polygon, by default None + x, y: float, optional + Read only data for elements containing the (x,y) points(s), + by default None + elements: list[int], optional + Read only selected element ids, by default None + error_bad_data: bool, optional + raise error if data is corrupt, by default True, + fill_bad_data_value: + fill value for to impute corrupt data, used in conjunction with error_bad_data=False + default np.nan + + Returns + ------- + Dataset + A Dataset with data dimensions [t,elements] + """ + if dtype not in [np.float32, np.float64]: raise ValueError("Invalid data type. Choose np.float32 or np.float64") - - # Open the dfs file for reading - # self._read_dfsu_header(self._filename) dfs = DfsuFile.Open(self._filename) - # time may have changes since we read the header - # (if engine is continuously writing to this file) - # TODO: add more checks that this is actually still the same file - # (could have been replaced in the meantime) single_time_selected, time_steps = _valid_timesteps(dfs, time) @@ -459,45 +537,6 @@ def _read( data_list, time, items, geometry=geometry, dims=dims, validate=False ) - def _validate_elements_and_geometry_sel(self, elements, **kwargs): - """Check that only one of elements, area, x, y is selected - - Parameters - ---------- - elements : list[int], optional - Read only selected element ids, by default None - area : list[float], optional - Read only data inside (horizontal) area given as a - bounding box (tuple with left, lower, right, upper) - or as list of coordinates for a polygon, by default None - x : float, optional - Read only data for elements containing the (x,y) points(s), - by default None - y : float, optional - Read only data for elements containing the (x,y) points(s), - by default None - - Returns - ------- - None - - Raises - ------ - ValueError - If more than one of elements, area, x, y is selected - """ - used_kwargs = [] - for kw, val in kwargs.items(): - if val is not None: - used_kwargs.append(kw) - - if elements is not None: - for kw in used_kwargs: - raise ValueError(f"Cannot select both {kw} and elements!") - - if "area" in used_kwargs and ("x" in used_kwargs or "y" in used_kwargs): - raise ValueError("Cannot select both x,y and area!") - def _parse_geometry_sel(self, area, x, y): """Parse geometry selection @@ -539,104 +578,6 @@ def _parse_geometry_sel(self, area, x, y): return elements - def to_mesh(self, outfilename): - """write object to mesh file - - Parameters - ---------- - outfilename : str - path to file to be written - """ - self.geometry.geometry2d.to_mesh(outfilename) - - -class Dfsu2DH(_Dfsu): - - def __init__(self, filename: str | Path) -> None: - super().__init__(filename) - self._geometry = self._read_geometry(self._filename) - - @staticmethod - def _read_geometry(filename: str) -> GeometryFM2D: - dfs = DfsuFile.Open(filename) - dfsu_type = DfsuFileType(dfs.DfsuFileType) - - node_table = get_nodes_from_source(dfs) - el_table = get_elements_from_source(dfs) - - geometry = GeometryFM2D( - node_coordinates=node_table.coordinates, - element_table=el_table.connectivity, - codes=node_table.codes, - projection=dfs.Projection.WKTString, - dfsu_type=dfsu_type, - element_ids=el_table.ids, - node_ids=node_table.ids, - validate=False, - ) - dfs.Close() - return geometry - - def read( - self, - *, - items: str | int | Sequence[str | int] | None = None, - time: int | str | slice | None = None, - elements: Collection[int] | None = None, - area: Tuple[float, float, float, float] | None = None, - x: float | None = None, - y: float | None = None, - keepdims: bool = False, - dtype: Any = np.float32, - error_bad_data: bool = True, - fill_bad_data_value: float = np.nan, - ) -> Dataset: - """ - Read data from a dfsu file - - Parameters - --------- - items: list[int] or list[str], optional - Read only selected items, by number (0-based), or by name - time: int, str, datetime, pd.TimeStamp, sequence, slice or pd.DatetimeIndex, optional - Read only selected time steps, by default None (=all) - keepdims: bool, optional - When reading a single time step only, should the time-dimension be kept - in the returned Dataset? by default: False - area: list[float], optional - Read only data inside (horizontal) area given as a - bounding box (tuple with left, lower, right, upper) - or as list of coordinates for a polygon, by default None - x, y: float, optional - Read only data for elements containing the (x,y) points(s), - by default None - elements: list[int], optional - Read only selected element ids, by default None - error_bad_data: bool, optional - raise error if data is corrupt, by default True, - fill_bad_data_value: - fill value for to impute corrupt data, used in conjunction with error_bad_data=False - default np.nan - - Returns - ------- - Dataset - A Dataset with data dimensions [t,elements] - """ - - return self._read( - items=items, - time=time, - elements=elements, - area=area, - x=x, - y=y, - keepdims=keepdims, - dtype=dtype, - error_bad_data=error_bad_data, - fill_bad_data_value=fill_bad_data_value, - ) - def get_overset_grid(self, dx=None, dy=None, nx=None, ny=None, buffer=0.0): """get a 2d grid that covers the domain by specifying spacing or shape diff --git a/mikeio/dfsu/_layered.py b/mikeio/dfsu/_layered.py index 2c6e14199..f40261bbb 100644 --- a/mikeio/dfsu/_layered.py +++ b/mikeio/dfsu/_layered.py @@ -16,7 +16,7 @@ ) from ..eum import EUMType, ItemInfo from .._interpolation import get_idw_interpolant, interp2d -from ..spatial import GeometryFM3D, GeometryFMVerticalProfile +from ..spatial import GeometryFM3D, GeometryFMVerticalProfile, GeometryPoint3D from ..spatial._FM_utils import _plot_vertical_profile from ._dfsu import _Dfsu, get_nodes_from_source, get_elements_from_source @@ -71,20 +71,18 @@ def _read_geometry(filename: str) -> GeometryFM3D | GeometryFMVerticalProfile: return geometry @property - def n_layers(self): + def n_layers(self) -> int: """Maximum number of layers""" return self.geometry._n_layers @property - def n_sigma_layers(self): + def n_sigma_layers(self) -> int: """Number of sigma layers""" return self.geometry.n_sigma_layers @property - def n_z_layers(self): + def n_z_layers(self) -> int: """Maximum number of z-layers""" - if self.n_layers is None: - return None return self.n_layers - self.n_sigma_layers def read( @@ -153,16 +151,19 @@ def read( if elements is None: elements = self._parse_geometry_sel(area=area, layers=layers, x=x, y=y, z=z) - if elements is None: - n_elems = self.n_elements - n_nodes = self.n_nodes - geometry = self.geometry + geometry = ( + self.geometry + if elements is None + else self.geometry.elements_to_geometry(elements) + ) + + if isinstance(geometry, GeometryPoint3D): + n_elems = 1 + n_nodes = 1 else: - elements = list(elements) - n_elems = len(elements) - geometry = self.geometry.elements_to_geometry(elements) - node_ids, _ = self.geometry._get_nodes_and_table_for_elements(elements) - n_nodes = len(node_ids) + n_elems = geometry.n_elements + n_nodes = geometry.n_nodes + node_ids = geometry.node_ids item_numbers = _valid_item_numbers( dfs.ItemInfo, items, ignore_first=self.is_layered @@ -212,7 +213,7 @@ def read( if item == 0 and item0_is_node_based: d = d[node_ids] else: - d = d[elements] + d = d[elements] # type: ignore if single_time_selected and not keepdims: data_list[item] = d @@ -249,7 +250,7 @@ def read( data_list, time, items, geometry=geometry, dims=dims, validate=False ) - def _parse_geometry_sel(self, area, layers, x, y, z): + def _parse_geometry_sel(self, area, layers, x, y, z) -> list[int] | None: if ( (x is not None) or (y is not None) @@ -261,6 +262,7 @@ def _parse_geometry_sel(self, area, layers, x, y, z): raise ValueError("No elements in selection!") return elements + # TODO refactor to avoid returning None return None diff --git a/mikeio/dfsu/_spectral.py b/mikeio/dfsu/_spectral.py index 51196528f..62012ac9b 100644 --- a/mikeio/dfsu/_spectral.py +++ b/mikeio/dfsu/_spectral.py @@ -206,6 +206,7 @@ def read( if elements is None: elements = self._parse_geometry_sel(area=area, x=x, y=y) else: + # TODO move to _parse_geometry_sel if (area is not None) or (x is not None) or (y is not None): raise ValueError( f"Arguments area/x/y are not supported for {self._type}" @@ -266,6 +267,47 @@ def read( data_list, time, items, geometry=geometry, dims=dims, validate=False ) + def _parse_geometry_sel(self, area, x, y): + """Parse geometry selection + + Parameters + ---------- + area : list[float], optional + Read only data inside (horizontal) area given as a + bounding box (tuple with left, lower, right, upper) + or as list of coordinates for a polygon, by default None + x : float, optional + Read only data for elements containing the (x,y) points(s), + by default None + y : float, optional + Read only data for elements containing the (x,y) points(s), + by default None + + Returns + ------- + list[int] + List of element ids + + Raises + ------ + ValueError + If no elements are found in selection + """ + elements = None + + if area is not None: + elements = self.geometry._elements_in_area(area) + + if (x is not None) or (y is not None): + elements = self.geometry.find_index(x=x, y=y) + + if (x is not None) or (y is not None) or (area is not None): + # selection was attempted + if (elements is None) or len(elements) == 0: + raise ValueError("No elements in selection!") + + return elements + def _parse_elements_nodes(self, elements, nodes): if self._type == DfsuFileType.DfsuSpectral0D: if elements is not None or nodes is not None: diff --git a/mikeio/pfs/_pfsdocument.py b/mikeio/pfs/_pfsdocument.py index b3770a992..ae63ab6c8 100644 --- a/mikeio/pfs/_pfsdocument.py +++ b/mikeio/pfs/_pfsdocument.py @@ -5,7 +5,7 @@ from collections.abc import Mapping, Sequence from datetime import datetime from pathlib import Path -from typing import Callable, Dict, List, TextIO, Tuple, overload +from typing import Callable, Dict, List, TextIO, Tuple import yaml @@ -181,8 +181,7 @@ def names(self) -> List[str]: def copy(self) -> PfsDocument: """Return a deep copy of the PfsDocument""" - lines = self.write() - text = "\n".join(lines) + text = repr(self) return PfsDocument.from_text(text) @@ -373,26 +372,22 @@ def _parse_token(self, token: str, context: str = "") -> str: return s - @overload - def write(self) -> list[str]: ... - - @overload - def write(self, filename: str) -> None: ... - - def write(self, filename: str | None = None) -> list[str] | None: + def write(self, filename: str) -> None: """Write object to a pfs file Parameters ---------- filename: str, optional Full path and filename of pfs to be created. - If None, the content will be returned - as a list of strings. + + Notes + ----- + To return the content as a string, use repr() """ from mikeio import __version__ as mikeio_version - if filename is None: - return self._to_txt_lines() + # if filename is None: + # return self._to_txt_lines() with open(filename, "w") as f: now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -403,7 +398,6 @@ def write(self, filename: str | None = None) -> list[str] | None: f.write("\n\n") self._write_with_func(f.write, level=0) - return None # TODO remove this alias diff --git a/mikeio/spatial/_FM_geometry_layered.py b/mikeio/spatial/_FM_geometry_layered.py index 36f9b7513..f606be7a1 100644 --- a/mikeio/spatial/_FM_geometry_layered.py +++ b/mikeio/spatial/_FM_geometry_layered.py @@ -300,17 +300,17 @@ def layer_ids(self): return self._layer_ids @property - def n_layers(self): + def n_layers(self) -> int: """Maximum number of layers""" return self._n_layers @property - def n_sigma_layers(self): + def n_sigma_layers(self) -> int: """Number of sigma layers""" return self._n_sigma @property - def n_z_layers(self): + def n_z_layers(self) -> int: """Maximum number of z-layers""" return self.n_layers - self.n_sigma_layers diff --git a/mikeio/spatial/_grid_geometry.py b/mikeio/spatial/_grid_geometry.py index 8edfb168d..59b0e74c7 100644 --- a/mikeio/spatial/_grid_geometry.py +++ b/mikeio/spatial/_grid_geometry.py @@ -777,7 +777,7 @@ def find_index( y: float | None = None, coords: np.ndarray | None = None, area: Tuple[float, float, float, float] | None = None, - ) -> Tuple[Any, Any] | None: + ) -> Tuple[Any, Any]: """Find nearest index (i,j) of point(s) Parameters @@ -824,7 +824,7 @@ def find_index( elif area is not None: return self._bbox_to_index(area) else: - return None + raise ValueError("Provide x,y or coords") def _xy_to_index(self, xy: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Find specific points in this geometry""" From 3ebc3da53c33e30f817b10b3afedf3776a01134a Mon Sep 17 00:00:00 2001 From: Henrik Andersson Date: Fri, 22 Mar 2024 09:44:29 +0100 Subject: [PATCH 2/3] List comprehension --- mikeio/dfsu/_dfsu.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/mikeio/dfsu/_dfsu.py b/mikeio/dfsu/_dfsu.py index 81cb47b9d..68051da99 100644 --- a/mikeio/dfsu/_dfsu.py +++ b/mikeio/dfsu/_dfsu.py @@ -353,14 +353,10 @@ def time(self) -> pd.DatetimeIndex: def _validate_elements_and_geometry_sel(self, elements: Any, **kwargs: Any) -> None: """Check that only one of elements, area, x, y is selected""" - used_kwargs = [] - for kw, val in kwargs.items(): - if val is not None: - used_kwargs.append(kw) - - if elements is not None: - for kw in used_kwargs: - raise ValueError(f"Cannot select both {kw} and elements!") + used_kwargs = [key for key, val in kwargs.items() if val is not None] + + if elements is not None and len(used_kwargs) > 0: + raise ValueError(f"Cannot select both {used_kwargs} and elements!") if "area" in used_kwargs and ("x" in used_kwargs or "y" in used_kwargs): raise ValueError("Cannot select both x,y and area!") From 9cfb29e3c27419d5c7f9fe2f187dd3785be0d214 Mon Sep 17 00:00:00 2001 From: Henrik Andersson Date: Fri, 22 Mar 2024 11:12:36 +0100 Subject: [PATCH 3/3] Inline method to avoid returning None --- mikeio/dfsu/_layered.py | 34 ++++++++++++-------------- mikeio/spatial/_FM_geometry_layered.py | 16 +++++++++--- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/mikeio/dfsu/_layered.py b/mikeio/dfsu/_layered.py index f40261bbb..01c8ff274 100644 --- a/mikeio/dfsu/_layered.py +++ b/mikeio/dfsu/_layered.py @@ -1,6 +1,6 @@ from __future__ import annotations from pathlib import Path -from typing import Any, Collection, Sequence, Tuple +from typing import Any, Collection, Sequence, Tuple, TYPE_CHECKING import numpy as np from mikecore.DfsuFile import DfsuFile, DfsuFileType @@ -20,6 +20,9 @@ from ..spatial._FM_utils import _plot_vertical_profile from ._dfsu import _Dfsu, get_nodes_from_source, get_elements_from_source +if TYPE_CHECKING: + from ..spatial._FM_geometry_layered import Layer + class DfsuLayered(_Dfsu): def __init__(self, filename: str | Path) -> None: @@ -95,7 +98,7 @@ def read( x: float | None = None, y: float | None = None, z: float | None = None, - layers: int | str | Sequence[int] | None = None, + layers: int | Layer | Sequence[int] | None = None, keepdims: bool = False, dtype: Any = np.float32, error_bad_data: bool = True, @@ -149,7 +152,17 @@ def read( elements, area=area, layers=layers, x=x, y=y, z=z ) if elements is None: - elements = self._parse_geometry_sel(area=area, layers=layers, x=x, y=y, z=z) + if ( + (x is not None) + or (y is not None) + or (area is not None) + or (layers is not None) + ): + elements = self.geometry.find_index( + x=x, y=y, z=z, area=area, layers=layers + ) + if len(elements) == 0: + raise ValueError("No elements in selection!") geometry = ( self.geometry @@ -250,21 +263,6 @@ def read( data_list, time, items, geometry=geometry, dims=dims, validate=False ) - def _parse_geometry_sel(self, area, layers, x, y, z) -> list[int] | None: - if ( - (x is not None) - or (y is not None) - or (area is not None) - or (layers is not None) - ): - elements = self.geometry.find_index(x=x, y=y, z=z, area=area, layers=layers) - if (elements is None) or len(elements) == 0: - raise ValueError("No elements in selection!") - return elements - - # TODO refactor to avoid returning None - return None - class Dfsu2DV(DfsuLayered): def plot_vertical_profile( diff --git a/mikeio/spatial/_FM_geometry_layered.py b/mikeio/spatial/_FM_geometry_layered.py index f606be7a1..79b3543f4 100644 --- a/mikeio/spatial/_FM_geometry_layered.py +++ b/mikeio/spatial/_FM_geometry_layered.py @@ -655,7 +655,15 @@ def contains(self, points) -> np.ndarray: def to_mesh(self, outfilename): return self.geometry2d.to_mesh(outfilename) - def find_index(self, x=None, y=None, z=None, coords=None, area=None, layers=None): + def find_index( + self, + x: float | None = None, + y: float | None = None, + z: float | None = None, + coords: np.ndarray | None = None, + area: Tuple[float, float, float, float] | None = None, + layers: int | Layer | Sequence[int] | None = None, + ) -> np.ndarray: if layers is not None: idx = self.get_layer_elements(layers) @@ -674,10 +682,10 @@ def find_index(self, x=None, y=None, z=None, coords=None, area=None, layers=None ) if coords is not None: coords = np.atleast_2d(coords) - xy = coords[:, :2] - z = coords[:, 2] if coords.shape[1] == 3 else None + xy = coords[:, :2] # type: ignore + z = coords[:, 2] if coords.shape[1] == 3 else None # type: ignore else: - xy = np.vstack((x, y)).T + xy = np.vstack((x, y)).T # type: ignore idx_2d = self.geometry2d._find_element_2d(coords=xy) assert len(idx_2d) == len(xy) if z is None: