diff --git a/mikeio/__init__.py b/mikeio/__init__.py index 7b9add6c8..3f8ec8cb1 100644 --- a/mikeio/__init__.py +++ b/mikeio/__init__.py @@ -2,6 +2,7 @@ from pathlib import Path from platform import architecture from collections.abc import Sequence +from typing import Any # PEP0440 compatible formatted version, see: # https://www.python.org/dev/peps/pep-0440/ @@ -135,7 +136,7 @@ def read( return dfs.read(items=items, time=time, keepdims=keepdims, **kwargs) -def open(filename: str | Path, **kwargs): +def open(filename: str | Path, **kwargs: Any) -> Any: """Open a dfs/mesh file (and read the header) The typical workflow for small dfs files is to read all data diff --git a/mikeio/dfs/_dfs.py b/mikeio/dfs/_dfs.py index 68c2435ff..5d2e5e942 100644 --- a/mikeio/dfs/_dfs.py +++ b/mikeio/dfs/_dfs.py @@ -271,16 +271,16 @@ class _Dfs123: show_progress = False - def __init__(self, filename=None): + def __init__(self, filename: str | Path) -> None: path = Path(filename) if not path.exists(): raise FileNotFoundError(path) self._filename = str(filename) if filename else None self._end_time = None self._is_equidistant = True - self._geometry = None # handled by subclass dfs = DfsFileFactory.DfsGenericOpen(self._filename) self._dfs = dfs + self._geometry: Any = None # Handled by subclass n_items = len(dfs.ItemInfo) self._items = self._get_item_info(list(range(n_items))) if dfs.FileInfo.TimeAxis.TimeAxisType in { @@ -310,17 +310,17 @@ def __init__(self, filename=None): else: self._timestep = None self._time = None - self._n_timesteps = dfs.FileInfo.TimeAxis.NumberOfTimeSteps + self._n_timesteps: int = dfs.FileInfo.TimeAxis.NumberOfTimeSteps projstr = dfs.FileInfo.Projection.WKTString - self._projstr = "NON-UTM" if not projstr else projstr - self._longitude = dfs.FileInfo.Projection.Longitude - self._latitude = dfs.FileInfo.Projection.Latitude - self._orientation = dfs.FileInfo.Projection.Orientation - self._deletevalue = dfs.FileInfo.DeleteValueFloat + self._projstr: str = "NON-UTM" if not projstr else projstr + self._longitude: float = dfs.FileInfo.Projection.Longitude + self._latitude: float = dfs.FileInfo.Projection.Latitude + self._orientation: float = dfs.FileInfo.Projection.Orientation + self._deletevalue: float = dfs.FileInfo.DeleteValueFloat dfs.Close() - def __repr__(self): + def __repr__(self) -> str: name = self.__class__.__name__ out = [f""] @@ -411,20 +411,17 @@ def read( t_seconds[i] = itemdata.Time - time = pd.to_datetime(t_seconds, unit="s", origin=self.start_time) # type: ignore + time = pd.to_datetime(t_seconds, unit="s", origin=self.start_time) items = _get_item_info(self._dfs.ItemInfo, item_numbers) self._dfs.Close() return Dataset(data_list, time, items, geometry=self.geometry, validate=False) - def _open(self): + def _open(self) -> None: raise NotImplementedError("Should be implemented by subclass") - def _set_spatial_axis(self): - raise NotImplementedError("Should be implemented by subclass") - - def _get_item_info(self, item_numbers): + def _get_item_info(self, item_numbers: Sequence[int]) -> List[ItemInfo]: """Read DFS ItemInfo Parameters @@ -444,8 +441,8 @@ def _get_item_info(self, item_numbers): itemtype = EUMType(eumItem) unit = EUMUnit(eumUnit) data_value_type = self._dfs.ItemInfo[item].ValueType - item = ItemInfo(name, itemtype, unit, data_value_type) - items.append(item) + item_info = ItemInfo(name, itemtype, unit, data_value_type) + items.append(item_info) return items @property @@ -490,37 +487,37 @@ def n_timesteps(self) -> int: return self._n_timesteps @property - def timestep(self) -> float: + def timestep(self) -> Any: """Time step size in seconds""" # this will fail if the TimeAxisType is not calendar and equidistant, but that is ok return self._dfs.FileInfo.TimeAxis.TimeStepInSeconds() @property - def projection_string(self): + def projection_string(self) -> str: return self._projstr @property - def longitude(self): + def longitude(self) -> float: """Origin longitude""" return self._longitude @property - def latitude(self): + def latitude(self) -> float: """Origin latitude""" return self._latitude @property - def origin(self): + def origin(self) -> Any: """Origin (in own CRS)""" return self.geometry.origin @property - def orientation(self): + def orientation(self) -> Any: """Orientation (in own CRS)""" return self.geometry.orientation @property - def is_geo(self): + def is_geo(self) -> bool: """Are coordinates geographical (LONG/LAT)?""" return self._projstr == "LONG/LAT" @@ -530,17 +527,11 @@ def shape(self) -> Tuple[int, ...]: """Shape of the data array""" pass - @property - @abstractmethod - def dx(self): - """Step size in x direction""" - pass - - def _validate_no_orientation_in_geo(self): + def _validate_no_orientation_in_geo(self) -> None: if self.is_geo and abs(self._orientation) > 1e-6: raise ValueError("Orientation is not supported for LONG/LAT coordinates") - def _origin_and_orientation_in_CRS(self): + def _origin_and_orientation_in_CRS(self) -> Tuple[Any, float]: """Project origin and orientation to projected CRS (if not LONG/LAT)""" if self.is_geo: origin = self._longitude, self._latitude @@ -554,6 +545,6 @@ def _origin_and_orientation_in_CRS(self): orientation=self._orientation, ) # convert origin and orientation to projected CRS - origin = tuple(np.round(cart.Geo2Proj(lon, lat), 6)) + origin = tuple(np.round(cart.Geo2Proj(lon, lat), 6)) # type: ignore orientation = cart.OrientationProj return origin, orientation diff --git a/mikeio/dfs/_dfs0.py b/mikeio/dfs/_dfs0.py index 2ae9bcc58..7e28fe094 100644 --- a/mikeio/dfs/_dfs0.py +++ b/mikeio/dfs/_dfs0.py @@ -36,7 +36,7 @@ def _write_dfs0( if len(dataset.time) == 1: dt = 1.0 # TODO else: - dt = (dataset.time[1] - dataset.time[0]).total_seconds() # type: ignore + dt = (dataset.time[1] - dataset.time[0]).total_seconds() temporal_axis = factory.CreateTemporalEqCalendarAxis( TimeStepUnit.SECOND, system_start_time, 0, dt @@ -65,7 +65,7 @@ def _write_dfs0( delete_value = dfs.FileInfo.DeleteValueFloat - t_seconds = (dataset.time - dataset.time[0]).total_seconds().values # type: ignore + t_seconds = (dataset.time - dataset.time[0]).total_seconds().values data = np.array(dataset.to_numpy(), order="F").astype(np.float64).T data[np.isnan(data)] = delete_value @@ -109,16 +109,16 @@ def __init__(self, filename: str | Path): TimeAxisType.CalendarEquidistant, TimeAxisType.CalendarNonEquidistant, }: - self._start_time = dfs.FileInfo.TimeAxis.StartDateTime + self._start_time: datetime = dfs.FileInfo.TimeAxis.StartDateTime else: # relative time axis self._start_time = datetime(1970, 1, 1) # time - self._n_timesteps = dfs.FileInfo.TimeAxis.NumberOfTimeSteps + self._n_timesteps: int = dfs.FileInfo.TimeAxis.NumberOfTimeSteps dfs.Close() - def __repr__(self): + def __repr__(self) -> str: out = [""] out.append(f"timeaxis: {repr(self._timeaxistype)}") @@ -199,7 +199,9 @@ def read( return ds - def _read(self, filename): + def _read( + self, filename: str + ) -> tuple[list[np.ndarray], pd.DatetimeIndex, list[ItemInfo]]: """ Read all data from a dfs0 file. """ @@ -233,7 +235,7 @@ def _read(self, filename): return data, time, items @staticmethod - def _to_dfs_datatype(dtype): + def _to_dfs_datatype(dtype: Any = None) -> DfsSimpleType: if dtype is None: return DfsSimpleType.Float @@ -339,7 +341,7 @@ def n_timesteps(self) -> int: def timestep(self) -> float: """Time step size in seconds""" if self._timeaxistype == TimeAxisType.CalendarEquidistant: - return self._source.FileInfo.TimeAxis.TimeStep + return self._source.FileInfo.TimeAxis.TimeStep # type: ignore else: raise ValueError("Time axis type not supported") @@ -361,28 +363,28 @@ def time(self) -> pd.DatetimeIndex: def series_to_dfs0( - self, - filename, - itemtype=None, - unit=None, - items=None, - title=None, - dtype=None, -): + self: pd.Series, + filename: str, + itemtype: EUMType | None = None, + unit: EUMUnit | None = None, + items: Sequence[ItemInfo] | None = None, + title: str | None = None, + dtype: Any | None = None, +) -> None: df = pd.DataFrame(self) df.to_dfs0(filename, itemtype, unit, items, title, dtype) def dataframe_to_dfs0( - self, - filename, - itemtype=None, - unit=None, - items=None, - title=None, - dtype=None, -): + self: pd.DataFrame, + filename: str, + itemtype: EUMType | None = None, + unit: EUMUnit | None = None, + items: Sequence[ItemInfo] | None = None, + title: str = "", + dtype: Any | None = None, +) -> None: """ Create a dfs0 @@ -427,6 +429,6 @@ def dataframe_to_dfs0( # Monkey patching onto Pandas classes -pd.DataFrame.to_dfs0 = dataframe_to_dfs0 # type: ignore +pd.DataFrame.to_dfs0 = dataframe_to_dfs0 -pd.Series.to_dfs0 = series_to_dfs0 # type: ignore +pd.Series.to_dfs0 = series_to_dfs0 diff --git a/mikeio/dfs/_dfs1.py b/mikeio/dfs/_dfs1.py index afaf1e9ae..ecfac48ef 100644 --- a/mikeio/dfs/_dfs1.py +++ b/mikeio/dfs/_dfs1.py @@ -68,13 +68,13 @@ def _write_dfs1_header(filename: str | Path, ds: Dataset, title: str) -> DfsFile class Dfs1(_Dfs123): _ndim = 1 - def __init__(self, filename): + def __init__(self, filename: str | Path) -> None: super().__init__(filename) self._dfs = DfsFileFactory.Dfs1FileOpen(str(filename)) - self._x0 = self._dfs.SpatialAxis.X0 - self._dx = self._dfs.SpatialAxis.Dx - self._nx = self._dfs.SpatialAxis.XCount + self._x0: float = self._dfs.SpatialAxis.X0 + self._dx: float = self._dfs.SpatialAxis.Dx + self._nx: int = self._dfs.SpatialAxis.XCount origin = self._longitude, self._latitude self._geometry = Grid1D( @@ -86,31 +86,25 @@ def __init__(self, filename): orientation=self._orientation, ) - def _open(self): + def _open(self) -> None: self._dfs = DfsFileFactory.Dfs1FileOpen(self._filename) - def _set_spatial_axis(self): - self._builder.SetSpatialAxis( - self._factory.CreateAxisEqD1( - eumUnit.eumUmeter, self._nx, self._x0, self._dx - ) - ) - @property - def geometry(self): + def geometry(self) -> Grid1D: + assert isinstance(self._geometry, Grid1D) return self._geometry @property - def x0(self): + def x0(self) -> float: """Start point of x values (often 0)""" return self._x0 @property - def dx(self): + def dx(self) -> float: """Step size in x direction""" return self._dx @property - def nx(self): + def nx(self) -> int: """Number of node values""" return self._nx diff --git a/mikeio/dfs/_dfs2.py b/mikeio/dfs/_dfs2.py index b25a81269..ba4218466 100644 --- a/mikeio/dfs/_dfs2.py +++ b/mikeio/dfs/_dfs2.py @@ -91,7 +91,9 @@ def _write_dfs2_header(filename: str | Path, ds: Dataset, title: str = "") -> Df return builder.GetFile() -def _write_dfs2_spatial_axis(builder, factory, geometry): +def _write_dfs2_spatial_axis( + builder: DfsBuilder, factory: DfsFactory, geometry: Grid2D +) -> None: builder.SetSpatialAxis( factory.CreateAxisEqD2( eumUnit.eumUmeter, @@ -183,7 +185,7 @@ def read( if area is not None: take_subset = True - ii, jj = self.geometry.find_index(area=area) + ii, jj = self.geometry.find_index(area=area) # type: ignore shape = (nt, len(jj), len(ii)) geometry = self.geometry._index_to_Grid2D(ii, jj) else: @@ -220,7 +222,7 @@ def read( self._dfs.Close() - time = pd.to_datetime(t_seconds, unit="s", origin=self.start_time) # type: ignore + time = pd.to_datetime(t_seconds, unit="s", origin=self.start_time) dims: Tuple[str, ...] @@ -238,45 +240,33 @@ def read( validate=False, ) - def _open(self): + def _open(self) -> None: self._dfs = DfsFileFactory.Dfs2FileOpen(self._filename) self._source = self._dfs - def _set_spatial_axis(self): - self._builder.SetSpatialAxis( - self._factory.CreateAxisEqD2( - eumUnit.eumUmeter, - self._nx, - self._x0, - self._dx, - self._ny, - self._y0, - self._dy, - ) - ) - @property def geometry(self) -> Grid2D: """Spatial information""" + assert isinstance(self._geometry, Grid2D) return self._geometry @property - def x0(self): + def x0(self) -> Any: """Start point of x values (often 0)""" return self.geometry.x[0] @property - def y0(self): + def y0(self) -> Any: """Start point of y values (often 0)""" return self.geometry.y[0] @property - def dx(self): + def dx(self) -> float: """Step size in x direction""" return self.geometry.dx @property - def dy(self): + def dy(self) -> float: """Step size in y direction""" return self.geometry.dy @@ -286,11 +276,11 @@ def shape(self) -> Tuple[int, ...]: return (self._n_timesteps, self.geometry.ny, self.geometry.nx) @property - def nx(self): + def nx(self) -> int: """Number of values in the x-direction""" return self.geometry.nx @property - def ny(self): + def ny(self) -> int: """Number of values in the y-direction""" return self.geometry.ny diff --git a/mikeio/dfs/_dfs3.py b/mikeio/dfs/_dfs3.py index a34e9e315..b35334a74 100644 --- a/mikeio/dfs/_dfs3.py +++ b/mikeio/dfs/_dfs3.py @@ -215,7 +215,7 @@ def read( dims = ("time", "y", "x") shape = (nt, ny, nx) else: - geometry = self.geometry._geometry_for_layers(layers, keepdims) + geometry = self.geometry._geometry_for_layers(layers, keepdims) # type: ignore dims = ("time", "z", "y", "x") shape = (nt, nzl, ny, nx) @@ -267,22 +267,6 @@ def read( validate=False, ) - def _set_spatial_axis(self): - self._builder.SetSpatialAxis( - self._factory.CreateAxisEqD3( - eumUnit.eumUmeter, - self._nx, - self._x0, - self._dx, - self._ny, - self._y0, - self._dy, - self._nz, - self._z0, - self._dz, - ) - ) - @staticmethod def _get_bottom_values(data): diff --git a/mikeio/dfsu/_mesh.py b/mikeio/dfsu/_mesh.py index 68cff0191..463162354 100644 --- a/mikeio/dfsu/_mesh.py +++ b/mikeio/dfsu/_mesh.py @@ -113,7 +113,7 @@ def zn(self) -> np.ndarray: def zn(self, v: np.ndarray) -> None: if len(v) != self.n_nodes: raise ValueError(f"zn must have length of nodes ({self.n_nodes})") - self.geometry._nc[:, 2] = v + self.geometry.node_coordinates[:, 2] = v def write( self, diff --git a/mikeio/exceptions.py b/mikeio/exceptions.py index cac5c4855..30013eb62 100644 --- a/mikeio/exceptions.py +++ b/mikeio/exceptions.py @@ -1,3 +1,7 @@ +from __future__ import annotations +from typing import Any + + class DataDimensionMismatch(ValueError): def __init__(self): self.message = ( @@ -29,7 +33,15 @@ def __init__(self): class OutsideModelDomainError(ValueError): - def __init__(self, *, x, y, z=None, indices=None, message=None): + def __init__( + self, + *, + x: Any, + y: Any = None, + z: Any = None, + indices: Any = None, + message: str | None = None, + ) -> None: self.x = x self.y = y self.z = z diff --git a/mikeio/spatial/_FM_geometry.py b/mikeio/spatial/_FM_geometry.py index 7cc5183e4..1845b8e13 100644 --- a/mikeio/spatial/_FM_geometry.py +++ b/mikeio/spatial/_FM_geometry.py @@ -1,10 +1,12 @@ from __future__ import annotations from collections import namedtuple from functools import cached_property +from pathlib import Path from typing import ( Collection, List, Any, + Literal, Sequence, Sized, Tuple, @@ -36,12 +38,13 @@ if TYPE_CHECKING: from ._FM_geometry_layered import GeometryFM3D + from matplotlib.axes import Axes class GeometryFMPointSpectrum(_Geometry): def __init__( self, - frequencies: np.ndarray, + frequencies: np.ndarray | None = None, directions: np.ndarray | None = None, x: float | None = None, y: float | None = None, @@ -61,39 +64,34 @@ def __init__( def is_layered(self) -> bool: return False - @property - def type_name(self): - """Type name: DfsuSpectral0D""" - return self._type.name # TODO there is no self._type?? - - def __repr__(self): + def __repr__(self) -> str: txt = f"Point Spectrum Geometry(frequency:{self.n_frequencies}, direction:{self.n_directions}" if self.x is not None: txt = txt + f", x:{self.x:.5f}, y:{self.y:.5f}" return txt + ")" @property - def ndim(self): + def ndim(self) -> int: # TODO: 0, 1 or 2 ? return 0 @property - def n_frequencies(self): + def n_frequencies(self) -> int: """Number of frequencies""" return 0 if self.frequencies is None else len(self.frequencies) @property - def frequencies(self): + def frequencies(self) -> np.ndarray | None: """Frequency axis""" return self._frequencies @property - def n_directions(self): + def n_directions(self) -> int: """Number of directions""" return 0 if self.directions is None else len(self.directions) @property - def directions(self): + def directions(self) -> np.ndarray | None: """Directional axis""" return self._directions @@ -116,33 +114,51 @@ class _GeometryFMPlotter: def __init__(self, geometry: GeometryFM2D | GeometryFM3D) -> None: self.g = geometry - def __call__(self, ax=None, figsize=None, **kwargs): + def __call__( + self, + ax: Axes | None = None, + figsize: Tuple[float, float] | None = None, + **kwargs: Any, + ) -> Axes: """Plot bathymetry as coloured patches""" ax = self._get_ax(ax, figsize) kwargs["plot_type"] = kwargs.get("plot_type") or "patch" return self._plot_FM_map(ax, **kwargs) - def contour(self, ax=None, figsize=None, **kwargs): + def contour( + self, + ax: Axes | None = None, + figsize: Tuple[float, float] | None = None, + **kwargs: Any, + ) -> Axes: """Plot bathymetry as contour lines""" ax = self._get_ax(ax, figsize) kwargs["plot_type"] = "contour" return self._plot_FM_map(ax, **kwargs) - def contourf(self, ax=None, figsize=None, **kwargs): + def contourf( + self, + ax: Axes | None = None, + figsize: Tuple[float, float] | None = None, + **kwargs: Any, + ) -> Axes: """Plot bathymetry as filled contours""" ax = self._get_ax(ax, figsize) kwargs["plot_type"] = "contourf" return self._plot_FM_map(ax, **kwargs) @staticmethod - def _get_ax(ax=None, figsize=None): - import matplotlib.pyplot as plt # type: ignore + def _get_ax( + ax: Axes | None = None, + figsize: Tuple[float, float] | None = None, + ) -> Axes: + import matplotlib.pyplot as plt if ax is None: _, ax = plt.subplots(figsize=figsize) return ax - def _plot_FM_map(self, ax, **kwargs): + def _plot_FM_map(self, ax: Axes, **kwargs: Any) -> Axes: if "title" not in kwargs: kwargs["title"] = "Bathymetry" @@ -162,7 +178,12 @@ def _plot_FM_map(self, ax, **kwargs): **kwargs, ) - def mesh(self, title="Mesh", figsize=None, ax=None): + def mesh( + self, + title: str = "Mesh", + figsize: Tuple[float, float] | None = None, + ax: Axes | None = None, + ) -> Axes: """Plot mesh only""" # TODO this must be a duplicate, delegate @@ -183,10 +204,14 @@ def mesh(self, title="Mesh", figsize=None, ax=None): _set_xy_label_by_projection(ax, self.g.projection) return ax - def outline(self, title="Outline", figsize=None, ax=None): + def outline( + self, + title: str = "Outline", + figsize: Tuple[float, float] | None = None, + ax: Axes | None = None, + ) -> Axes: """Plot domain outline (using the boundary_polylines property)""" - # TODO this must be a duplicate, delegate ax = self._get_ax(ax=ax, figsize=figsize) ax.set_aspect(self._plot_aspect()) @@ -201,7 +226,12 @@ def outline(self, title="Outline", figsize=None, ax=None): ax = self._set_plot_limits(ax) return ax - def boundary_nodes(self, boundary_names=None, figsize=None, ax=None): + def boundary_nodes( + self, + boundary_names: Sequence[str] | None = None, + figsize: Tuple[float, float] | None = None, + ax: Axes | None = None, + ) -> Axes: """Plot mesh boundary nodes and their code values""" import matplotlib.pyplot as plt @@ -216,7 +246,7 @@ def boundary_nodes(self, boundary_names=None, figsize=None, ax=None): if boundary_names is not None: if len(boundary_codes) != len(boundary_names): raise Exception( - f"Number of boundary names ({len(boundary_names)}) inconsistent with number of boundaries ({len(self.g.boundary_codes)})" + f"Number of boundary names ({len(boundary_names)}) inconsistent with number of boundaries ({len(boundary_codes)})" ) user_defined_labels = dict(zip(boundary_codes, boundary_names)) @@ -234,16 +264,14 @@ def boundary_nodes(self, boundary_names=None, figsize=None, ax=None): ax = self._set_plot_limits(ax) return ax - def _set_plot_limits(self, ax): - # TODO this must be a duplicate, delegate + def _set_plot_limits(self, ax: Axes) -> Axes: bbox = xy_to_bbox(self.g.node_coordinates) xybuf = 6e-3 * (bbox.right - bbox.left) ax.set_xlim(bbox.left - xybuf, bbox.right + xybuf) ax.set_ylim(bbox.bottom - xybuf, bbox.top + xybuf) return ax - def _plot_aspect(self): - # TODO this must be a duplicate, delegate + def _plot_aspect(self) -> Literal["equal"] | float: if self.g.is_geo: mean_lat = np.mean(self.g.node_coordinates[:, 1]) return 1.0 / np.cos(np.pi * mean_lat / 180) @@ -254,15 +282,16 @@ def _plot_aspect(self): class _GeometryFM(_Geometry): def __init__( self, - node_coordinates, - element_table, - codes=None, - projection: str = "LONG/LAT", - dfsu_type=None, # TODO should this be mandatory? - element_ids=None, - node_ids=None, - validate=True, - reindex=False, + *, + node_coordinates: np.ndarray, + element_table: np.ndarray | List[Sequence[int]] | List[np.ndarray], + projection: str, + codes: np.ndarray | None = None, + dfsu_type: DfsuFileType, + element_ids: np.ndarray | None = None, + node_ids: np.ndarray | None = None, + validate: bool = True, + reindex: bool = False, ) -> None: super().__init__(projection=projection) self.node_coordinates = np.asarray(node_coordinates) @@ -287,7 +316,12 @@ def __init__( if reindex: self._reindex() - def _check_elements(self, element_table, element_ids=None, validate=True): + def _check_elements( + self, + element_table: np.ndarray | List[Sequence[int]] | List[np.ndarray], + element_ids: np.ndarray | None = None, + validate: bool = True, + ) -> tuple[Any, Any]: if validate: max_node_id = self._node_ids.max() for i, e in enumerate(element_table): @@ -306,9 +340,10 @@ def _check_elements(self, element_table, element_ids=None, validate=True): element_ids = np.arange(len(element_table)) element_ids = np.asarray(element_ids) + # TODO make sure return type is np.ndarray return element_table, element_ids - def _reindex(self): + def _reindex(self) -> None: new_node_ids = np.arange(self.n_nodes) new_element_ids = np.arange(self.n_elements) node_dict = dict(zip(self._node_ids, new_node_ids)) @@ -332,7 +367,7 @@ def n_nodes(self) -> int: return len(self._node_ids) @property - def node_ids(self): + def node_ids(self) -> np.ndarray: return self._node_ids @property @@ -341,15 +376,11 @@ def n_elements(self) -> int: return len(self._element_ids) @property - def element_ids(self): + def element_ids(self) -> np.ndarray: return self._element_ids - @property - def _nc(self): - return self.node_coordinates - @cached_property - def max_nodes_per_element(self): + def max_nodes_per_element(self) -> int: """The maximum number of nodes for an element""" maxnodes = 0 for local_nodes in self.element_table: @@ -359,12 +390,12 @@ def max_nodes_per_element(self): return maxnodes @property - def codes(self): + def codes(self) -> np.ndarray: """Node codes of all nodes (0=water, 1=land, 2...=open boundaries)""" return self._codes @codes.setter - def codes(self, v): + def codes(self, v: np.ndarray) -> None: if len(v) != self.n_nodes: raise ValueError(f"codes must have length of nodes ({self.n_nodes})") self._codes = np.array(v, dtype=np.int32) @@ -374,7 +405,8 @@ class GeometryFM2D(_GeometryFM): def __init__( self, node_coordinates: np.ndarray, - element_table: np.ndarray | Sequence[Sequence[int]] | Sequence[np.ndarray], + # TODO settle on type for element_table + element_table: Any, codes: np.ndarray | None = None, projection: str = "LONG/LAT", dfsu_type: DfsuFileType = DfsuFileType.Dfsu2D, # Reasonable default? @@ -385,7 +417,7 @@ def __init__( ) -> None: super().__init__( node_coordinates=node_coordinates, - element_table=element_table, + element_table=element_table, # type: ignore codes=codes, projection=projection, dfsu_type=dfsu_type, @@ -450,7 +482,7 @@ def _area_is_polygon(area: Sequence[Tuple[float, float]] | Sequence[float]) -> b return True @property - def type_name(self): + def type_name(self) -> str: """Type name, e.g. Mesh, Dfsu2D""" return self._type.name if self._type else "Mesh" @@ -492,16 +524,16 @@ def is_tri_only(self) -> bool: return self.max_nodes_per_element == 3 or self.max_nodes_per_element == 6 @cached_property - def element_coordinates(self): + def element_coordinates(self) -> np.ndarray: """Center coordinates of each element""" return self._calc_element_coordinates() @cached_property - def _tree2d(self): + def _tree2d(self) -> cKDTree: xy = self.element_coordinates[:, :2] return cKDTree(xy) - def _calc_element_coordinates(self): + def _calc_element_coordinates(self) -> np.ndarray: element_table = self.element_table n_elements = len(element_table) @@ -793,7 +825,7 @@ def get_overset_grid( bbox = xy_to_bbox(nc, buffer=buffer) return Grid2D(bbox=bbox, dx=dx, dy=dy, nx=nx, ny=ny, projection=self.projection) - def get_element_area(self): + def get_element_area(self) -> np.ndarray: """Calculate the horizontal area of each element. Returns @@ -865,7 +897,7 @@ def boundary_polylines(self) -> BoundaryPolylines: """Lists of closed polylines defining domain outline""" return self._get_boundary_polylines() - def contains(self, points): + def contains(self, points: np.ndarray) -> np.ndarray: """test if a list of points are contained by mesh Parameters @@ -898,10 +930,10 @@ def contains(self, points): return cnts - def __contains__(self, pt) -> bool: + def __contains__(self, pt: np.ndarray) -> bool: return self.contains(pt)[0] - def _get_boundary_polylines_uncategorized(self): + def _get_boundary_polylines_uncategorized(self) -> List[List[np.int64]]: """Construct closed polylines for all boundary faces""" boundary_faces = self._get_boundary_faces() face_remains = boundary_faces.copy() @@ -955,7 +987,7 @@ def _get_boundary_polylines(self) -> BoundaryPolylines: n_int = len(poly_lines_int) return BoundaryPolylines(n_ext, poly_lines_ext, n_int, poly_lines_int) - def _get_boundary_faces(self): + def _get_boundary_faces(self) -> np.ndarray: """Construct list of faces""" element_table = self.element_table @@ -977,7 +1009,7 @@ def _get_boundary_faces(self): return all_faces[uf_id[bnd_face_id]] def isel( - self, idx: Collection[int], keepdims=False, **kwargs + self, idx: Collection[int], keepdims: bool = False, **kwargs: Any ) -> "GeometryFM2D" | GeometryPoint2D: """export a selection of elements to a new geometry @@ -1008,7 +1040,15 @@ def isel( else: return self.elements_to_geometry(elements=idx, keepdims=keepdims) - def find_index(self, x=None, y=None, coords=None, area=None) -> np.ndarray: + def find_index( + self, + x: float | np.ndarray | None = None, + y: float | np.ndarray | None = None, + coords: np.ndarray | None = None, + area: ( + Tuple[float, float, float, float] | Sequence[Tuple[float, float]] | None + ) = None, + ) -> np.ndarray: """Find a *set* of element indicies for a number of points or within an area. The returned indices returned are the unique, unordered set of element indices that contain the points or area. @@ -1061,9 +1101,9 @@ def find_index(self, x=None, y=None, coords=None, area=None) -> np.ndarray: ) if coords is not None: coords = np.atleast_2d(coords) - xy = coords[:, :2] + xy = coords[:, :2] # type: ignore else: - xy = np.vstack((x, y)).T + xy = np.vstack((x, y)).T # type: ignore idx = self._find_element_2d(coords=xy) return idx elif area is not None: @@ -1072,14 +1112,16 @@ def find_index(self, x=None, y=None, coords=None, area=None) -> np.ndarray: raise ValueError("Provide either coordinates or area") @staticmethod - def _inside_polygon(polygon, xy): + def _inside_polygon(polygon: np.ndarray, xy: np.ndarray) -> np.ndarray: import matplotlib.path as mp if polygon.ndim == 1: polygon = np.column_stack((polygon[0::2], polygon[1::2])) return mp.Path(polygon).contains_points(xy) - def _elements_in_area(self, area: Sequence[float] | Sequence[Tuple[float, float]]): + def _elements_in_area( + self, area: Sequence[float] | Sequence[Tuple[float, float]] + ) -> np.ndarray: """Find 2d element ids of elements inside area""" if self._area_is_bbox(area): x0, y0, x1, y1 = area @@ -1095,7 +1137,9 @@ def _elements_in_area(self, area: Sequence[float] | Sequence[Tuple[float, float] else: raise ValueError("'area' must be bbox [x0,y0,x1,y1] or polygon") - def _nodes_to_geometry(self, nodes) -> "GeometryFM2D" | GeometryPoint2D: + def _nodes_to_geometry( + self, nodes: Collection[int] + ) -> "GeometryFM2D" | GeometryPoint2D: """export a selection of nodes to new flexible file geometry Note: takes only the elements for which all nodes are selected @@ -1110,7 +1154,7 @@ def _nodes_to_geometry(self, nodes) -> "GeometryFM2D" | GeometryPoint2D: UnstructuredGeometry which can be used for further extraction or saved to file """ - nodes = np.atleast_1d(nodes) + nodes = np.atleast_1d(nodes) # type: ignore if len(nodes) == 1: xy = self.node_coordinates[nodes[0], :2] return GeometryPoint2D(xy[0], xy[1]) @@ -1139,7 +1183,7 @@ def _nodes_to_geometry(self, nodes) -> "GeometryFM2D" | GeometryPoint2D: ) def elements_to_geometry( - self, elements: int | Collection[int], keepdims=False + self, elements: int | Collection[int], keepdims: bool = False ) -> "GeometryFM2D" | GeometryPoint2D: if isinstance(elements, (int, np.integer)): sel_elements: List[int] = [elements] @@ -1172,7 +1216,9 @@ def elements_to_geometry( reindex=True, ) - def _get_nodes_and_table_for_elements(self, elements): + def _get_nodes_and_table_for_elements( + self, elements: np.ndarray | List[int] + ) -> tuple[Any, Any]: """list of nodes and element table for a list of elements Parameters @@ -1192,10 +1238,12 @@ def _get_nodes_and_table_for_elements(self, elements): for j, eid in enumerate(elements): elem_tbl[j] = np.asarray(self.element_table[eid]) - nodes = np.unique(np.hstack(elem_tbl)) + nodes = np.unique(np.hstack(elem_tbl)) # type: ignore return nodes, elem_tbl - def get_node_centered_data(self, data, extrapolate=True): + def get_node_centered_data( + self, data: np.ndarray, extrapolate: bool = True + ) -> np.ndarray: """convert cell-centered data to node-centered by pseudo-laplacian method Parameters @@ -1216,7 +1264,7 @@ def get_node_centered_data(self, data, extrapolate=True): elem_table = geometry.element_table return _get_node_centered_data(nc, elem_table, ec, data, extrapolate) - def to_shapely(self): + def to_shapely(self) -> Any: """Export mesh as shapely MultiPolygon Returns @@ -1239,7 +1287,7 @@ def to_shapely(self): return mp - def to_mesh(self, outfilename): + def to_mesh(self, outfilename: str | Path) -> None: """Export geometry to new mesh file Parameters @@ -1248,6 +1296,7 @@ def to_mesh(self, outfilename): path to file to be written """ builder = MeshBuilder() + outfilename = str(outfilename) nc = self.node_coordinates builder.SetNodes(nc[:, 0], nc[:, 1], nc[:, 2], self.codes) @@ -1265,17 +1314,17 @@ def to_mesh(self, outfilename): class _GeometryFMSpectrum(GeometryFM2D): def __init__( self, - node_coordinates, - element_table, - codes=None, + node_coordinates: np.ndarray, + element_table: Any, + codes: np.ndarray | None = None, projection: str = "LONG/LAT", - dfsu_type=None, - element_ids=None, - node_ids=None, - validate=True, - frequencies=None, - directions=None, - reindex=False, + dfsu_type: DfsuFileType | None = None, + element_ids: np.ndarray | None = None, + node_ids: np.ndarray | None = None, + validate: bool = True, + frequencies: np.ndarray | None = None, + directions: np.ndarray | None = None, + reindex: bool = False, ) -> None: super().__init__( node_coordinates=node_coordinates, @@ -1293,32 +1342,36 @@ def __init__( self._directions = directions @property - def n_frequencies(self): + def n_frequencies(self) -> int: """Number of frequencies""" return 0 if self.frequencies is None else len(self.frequencies) @property - def frequencies(self): + def frequencies(self) -> np.ndarray | None: """Frequency axis""" return self._frequencies @property - def n_directions(self): + def n_directions(self) -> int: """Number of directions""" return 0 if self.directions is None else len(self.directions) @property - def directions(self): + def directions(self) -> np.ndarray | None: """Directional axis""" return self._directions # TODO reconsider inheritance to avoid overriding method signature class GeometryFMAreaSpectrum(_GeometryFMSpectrum): - def isel(self, idx=None, axis="elements"): + def isel( # type: ignore + self, idx: Collection[int], **kwargs: Any + ) -> "GeometryFMPointSpectrum" | "GeometryFMAreaSpectrum": return self.elements_to_geometry(elements=idx) - def elements_to_geometry(self, elements, keepdims=False): + def elements_to_geometry( # type: ignore + self, elements: Collection[int], keepdims: bool = False + ) -> "GeometryFMPointSpectrum" | "GeometryFMAreaSpectrum": """export a selection of elements to new flexible file geometry Parameters ---------- @@ -1331,7 +1384,7 @@ def elements_to_geometry(self, elements, keepdims=False): GeometryFMAreaSpectrum or GeometryFMPointSpectrum which can be used for further extraction or saved to file """ - elements = np.atleast_1d(elements) + elements = np.atleast_1d(elements) # type: ignore if len(elements) == 1: coords = self.element_coordinates[elements[0], :] return GeometryFMPointSpectrum( diff --git a/mikeio/spatial/_grid_geometry.py b/mikeio/spatial/_grid_geometry.py index 755c8ce8f..bd6425aae 100644 --- a/mikeio/spatial/_grid_geometry.py +++ b/mikeio/spatial/_grid_geometry.py @@ -1,6 +1,8 @@ from __future__ import annotations +from functools import cached_property +from pathlib import Path import warnings -from typing import Any, Tuple, TYPE_CHECKING, List, overload +from typing import Any, Sequence, Tuple, TYPE_CHECKING, List, overload from dataclasses import dataclass import numpy as np @@ -333,13 +335,13 @@ def _plot_grid( def outline( self, - title="Outline", - ax=None, - figsize=None, - color="0.4", - linewidth=1.2, - **kwargs, - ): + title: str = "Outline", + ax: Axes | None = None, + figsize: Tuple[float, float] | None = None, + color: str = "0.4", + linewidth: float = 1.2, + **kwargs: Any, + ) -> Axes: """Plot Grid2D outline Examples @@ -367,7 +369,7 @@ def outline( return ax - def _set_aspect_and_labels(self, ax): + def _set_aspect_and_labels(self, ax: Axes) -> None: g = self.g if g.is_spectral: ax.set_xlabel("Frequency [Hz]") @@ -611,7 +613,7 @@ def __repr__(self) -> str: return "\n".join(out) - def __str__(self): + def __str__(self) -> str: return f"Grid2D (ny={self.ny}, nx={self.nx})" @property @@ -638,7 +640,9 @@ def x(self) -> np.ndarray: return x_local + self._origin[0] @staticmethod - def _logarithmic_f(n=25, f0=0.055, freq_factor=1.1) -> np.ndarray: + def _logarithmic_f( + n: int = 25, f0: float = 0.055, freq_factor: float = 1.1 + ) -> np.ndarray: """Generate logarithmic frequency axis Parameters @@ -702,36 +706,14 @@ def bbox(self) -> BoundingBox: top = self.y[-1] + self.dy / 2 return BoundingBox(left, bottom, right, top) - @property - def _xx(self): - """2d array of all x-coordinates""" - if self.__xx is None: - self._create_meshgrid(self.x, self.y) - return self.__xx - - @property - def _yy(self): - """2d array of all y-coordinates""" - if self.__yy is None: - self._create_meshgrid(self.x, self.y) - return self.__yy - - def _create_meshgrid(self, x, y): - self.__xx, self.__yy = np.meshgrid(x, y) - - @property - def xy(self): + @cached_property + def xy(self) -> np.ndarray: """n-by-2 array of x- and y-coordinates""" - xcol = self._xx.reshape(-1, 1) - ycol = self._yy.reshape(-1, 1) + xx, yy = np.meshgrid(self.x, self.y) + xcol = xx.reshape(-1, 1) + ycol = yy.reshape(-1, 1) return np.column_stack([xcol, ycol]) - @property - def coordinates(self): - # TODO: remove this? - """n-by-2 array of x- and y-coordinates""" - return self.xy - @property def _cart(self) -> Cartography: """MIKE Core Cartography object""" @@ -778,16 +760,16 @@ def contains(self, coords: np.ndarray) -> Any: yinside = (self.bbox.bottom <= y) & (y <= self.bbox.top) return xinside & yinside - def __contains__(self, pt) -> Any: + def __contains__(self, pt: Any) -> Any: return self.contains(pt) def find_index( self, x: float | None = None, y: float | None = None, - coords=None, - area=None, - ): + coords: np.ndarray | None = None, + area: Tuple[float, float, float, float] | None = None, + ) -> Tuple[Any, Any] | None: """Find nearest index (i,j) of point(s) Parameters @@ -833,8 +815,10 @@ def find_index( return self._xy_to_index(coords) elif area is not None: return self._bbox_to_index(area) + else: + return None - def _xy_to_index(self, xy): + def _xy_to_index(self, xy: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Find specific points in this geometry""" xy = np.atleast_2d(xy) y = xy[:, 1] @@ -1059,7 +1043,9 @@ def to_geometryFM( projection=self.projection, ) - def to_mesh(self, outfilename, z=None): + def to_mesh( + self, outfilename: str | Path, z: np.ndarray | float | None = None + ) -> None: """export grid to mesh file Parameters @@ -1074,6 +1060,7 @@ def to_mesh(self, outfilename, z=None): if z is not None: if not np.isscalar(z): + assert isinstance(z, np.ndarray), "z must be a numpy array" if len(z) != g.n_nodes: raise ValueError( "z must either be scalar or have length of nodes ((nx+1)*(ny+1))" @@ -1105,21 +1092,21 @@ class Grid3D(_Geometry): def __init__( self, *, - x=None, - x0=0.0, - dx=None, - nx=None, - y=None, - y0=0.0, - dy=None, - ny=None, - z=None, - z0=0.0, - dz=None, - nz=None, - projection="NON-UTM", + x: np.ndarray | None = None, + x0: float = 0.0, + dx: float | None = None, + nx: int | None = None, + y: np.ndarray | None = None, + y0: float = 0.0, + dy: float | None = None, + ny: int | None = None, + z: np.ndarray | None = None, + z0: float = 0.0, + dz: float | None = None, + nz: int | None = None, + projection: str = "NON-UTM", # TODO LONG/LAT origin: Tuple[float, float] = (0.0, 0.0), - orientation=0.0, + orientation: float = 0.0, ) -> None: super().__init__(projection=projection) self._origin = (0.0, 0.0) if origin is None else (origin[0], origin[1]) @@ -1137,11 +1124,11 @@ def ndim(self) -> int: return 3 @property - def _is_rotated(self): + def _is_rotated(self) -> Any: return np.abs(self._orientation) > 1e-5 @property - def x(self): + def x(self) -> np.ndarray: """array of x-axis coordinates (element center)""" x1 = self._x0 + self.dx * (self.nx - 1) x_local = np.linspace(self._x0, x1, self.nx) @@ -1158,7 +1145,7 @@ def nx(self) -> int: return self._nx @property - def y(self): + def y(self) -> np.ndarray: """array of y-axis coordinates (element center)""" y1 = self._y0 + self.dy * (self.ny - 1) y_local = np.linspace(self._y0, y1, self.ny) @@ -1175,7 +1162,7 @@ def ny(self) -> int: return self._ny @property - def z(self): + def z(self) -> np.ndarray: """array of z-axis node coordinates""" z1 = self._z0 + self.dz * (self.nz - 1) return np.linspace(self._z0, z1, self.nz) @@ -1200,7 +1187,9 @@ def orientation(self) -> float: """Grid orientation""" return self._orientation - def find_index(self, coords=None, layers=None, area=None): + def find_index( + self, coords: Any = None, layers: Any = None, area: Any = None + ) -> Any: if layers is not None: raise NotImplementedError( f"Layer slicing is not yet implemented. Use the mikeio.read('file.dfs3', layers='{layers}')" @@ -1209,31 +1198,36 @@ def find_index(self, coords=None, layers=None, area=None): "Not yet implemented for Grid3D. Please use mikeio.read('file.dfs3') and its arguments instead." ) - def isel(self, idx, axis): + def isel( + self, idx: int | np.ndarray, axis: str | int + ) -> Grid3D | Grid2D | GeometryUndefined: """Get a subset geometry from this geometry""" if isinstance(axis, str): if axis == "z": - axis = 0 + axis_nr = 0 elif axis == "y": - axis = 1 + axis_nr = 1 elif axis == "x": - axis = 2 + axis_nr = 2 else: raise ValueError(f"axis must be 'x', 'y' or 'z', not {axis}") - assert isinstance(axis, int), "axis must be an integer (or 'x', 'y' or 'z')" - axis = axis + 3 if axis < 0 else axis + elif isinstance(axis, int): + axis_nr = axis + assert isinstance(axis_nr, int), "axis must be an integer (or 'x', 'y' or 'z')" + axis_nr = axis_nr + 3 if axis_nr < 0 else axis_nr if not np.isscalar(idx): + assert isinstance(idx, np.ndarray), "idx must be a numpy array" d = np.diff(idx) if np.any(d < 1) or not np.allclose(d, d[0]): return GeometryUndefined() else: - ii = idx if axis == 2 else None - jj = idx if axis == 1 else None - kk = idx if axis == 0 else None + ii = idx if axis_nr == 2 else None + jj = idx if axis_nr == 1 else None + kk = idx if axis_nr == 0 else None return self._index_to_Grid3D(ii, jj, kk) - if axis == 0: + if axis_nr == 0: # z is the first axis! return x-y Grid2D # TODO: origin, how to pass self.z[idx]? return Grid2D( @@ -1247,7 +1241,7 @@ def isel(self, idx, axis): origin=self.origin, orientation=self.orientation, ) - elif axis == 1: + elif axis_nr == 1: # y is the second axis! return x-z Grid2D # TODO: origin, how to pass self.y[idx]? return Grid2D( @@ -1255,7 +1249,7 @@ def isel(self, idx, axis): y=self.z, # projection=self._projection, ) - elif axis == 2: + elif axis_nr == 2: # x is the last axis! return y-z Grid2D # TODO: origin, how to pass self.x[idx]? return Grid2D( @@ -1264,9 +1258,16 @@ def isel(self, idx, axis): # projection=self._projection, ) else: - raise ValueError(f"axis must be 0, 1 or 2 (or 'x', 'y' or 'z'), not {axis}") + raise ValueError( + f"axis must be 0, 1 or 2 (or 'x', 'y' or 'z'), not {axis_nr}" + ) - def _index_to_Grid3D(self, ii=None, jj=None, kk=None): + def _index_to_Grid3D( + self, + ii: np.ndarray | range | None = None, + jj: np.ndarray | range | None = None, + kk: np.ndarray | range | None = None, + ) -> Grid3D | GeometryUndefined: ii = range(self.nx) if ii is None else ii jj = range(self.ny) if jj is None else jj kk = range(self.nz) if kk is None else kk @@ -1319,7 +1320,7 @@ def _index_to_Grid3D(self, ii=None, jj=None, kk=None): origin=origin, ) - def __repr__(self): + def __repr__(self) -> str: out = [""] out.append(_print_axis_txt("x", self.x, self.dx)) out.append(_print_axis_txt("y", self.y, self.dy)) @@ -1331,11 +1332,11 @@ def __repr__(self): out.append(f"projection: {self.projection_string}") return "\n".join(out) - def __str__(self): + def __str__(self) -> str: return f"Grid3D(nz={self.nz}, ny={self.ny}, nx={self.nx})" def _geometry_for_layers( - self, layers, keepdims=False + self, layers: Sequence[int] | None, keepdims: bool = False ) -> Grid2D | "Grid3D" | "GeometryUndefined": if layers is None: return self diff --git a/tests/test_geometry_grid.py b/tests/test_geometry_grid.py index cc1312389..06d0d230d 100644 --- a/tests/test_geometry_grid.py +++ b/tests/test_geometry_grid.py @@ -144,25 +144,6 @@ def test_x_y_is_increasing(): assert "increasing" in str(excinfo.value) -def test_xx_yy(): - nx = 4 - ny = 3 - x = np.linspace(1, 7, nx) - y = np.linspace(3, 5, ny) - g = Grid2D(x=x, y=y) - # assert g.n == nx * ny - assert g._xx[0, 0] == 1.0 - assert g._yy[-1, -1] == 5.0 - assert np.all(g.xy[1] == [3.0, 3.0]) - assert np.all(g.coordinates[1] == [3.0, 3.0]) - - g2 = Grid2D(x=x, y=y) - - # Reverse order compared to above makes no difference - assert g2._yy[-1, -1] == 5.0 - assert g2._xx[0, 0] == 1.0 - - def test_create_in_bbox(): bbox = [0, 0, 1, 5] g = Grid2D(bbox=bbox, nx=2, ny=5)