diff --git a/post_processing/pylbo/data_containers.py b/post_processing/pylbo/data_containers.py index cb007f9e..6c4f050b 100644 --- a/post_processing/pylbo/data_containers.py +++ b/post_processing/pylbo/data_containers.py @@ -1,8 +1,10 @@ +from __future__ import annotations + from abc import ABC, abstractmethod from pathlib import Path +from typing import Union import numpy as np - from pylbo.exceptions import ( EigenfunctionsNotPresent, EigenvectorsNotPresent, @@ -10,7 +12,6 @@ ResidualsNotPresent, ) from pylbo.utilities.datfile_utils import ( - get_header, read_derived_eigenfunction, read_ef_grid, read_eigenfunction, @@ -23,8 +24,10 @@ read_matrix_B, read_residuals, ) +from pylbo.utilities.datfiles.file_manager import LegolasFileManager +from pylbo.utilities.datfiles.header_legacy import LegolasLegacyHeader from pylbo.utilities.logger import pylboLogger -from pylbo.utilities.toolbox import transform_to_numpy +from pylbo.utilities.toolbox import get_values, transform_to_numpy from pylbo.visualisation.continua import calculate_continua @@ -45,61 +48,16 @@ class LegolasDataContainer(ABC): Baseclass for a Legolas data container. """ - @property + @abstractmethod def continua(self): - """ - Retrieves the continua based on the type of subclass, returns a dictionary - with the names of the continua as keys. - - - :class:`LegolasDataSet`: the values are single Numpy arrays. - - :class:`LegolasDataSeries`: every value is an array of Numpy arrays, - one for each dataset in the series. - - Returns - ------- - continua : dict - Dictionary containing the continua values. - """ - if isinstance(self, LegolasDataSet): - return self._continua - elif isinstance(self, LegolasDataSeries): - keys = self[0].continua.keys() - _continua = {key: [] for key in keys} - for ds in self: - for key in keys: - _continua[key].append(ds.continua[key]) - return {key: np.array(values) for key, values in _continua.items()} - else: - raise TypeError(f"unexpected instance: {type(self)}") + pass - @property + @abstractmethod def parameters(self): - """ - Retrieves the parameters based on the type of subclass, returns a dictionary - with the parameter names as keys. - - - :class:`LegolasDataSet`: values are single items. - - :class:`LegolasDataSeries`: values are Numpy arrays. - - Returns - ------- - parameters : dict - Dictionary containing the parameters. - """ - if isinstance(self, LegolasDataSet): - return self._parameters - elif isinstance(self, LegolasDataSeries): - keys = self[0].parameters.keys() - _params = {key: [] for key in keys} - for ds in self: - for key in keys: - _params[key].append(ds.parameters[key]) - return {key: np.array(values) for key, values in _params.items()} - else: - raise TypeError(f"unexpected instance: {type(self)}") + pass @abstractmethod - def efs_written(self): + def has_efs(self): pass @abstractmethod @@ -111,13 +69,29 @@ def ef_names(self): pass @abstractmethod - def derived_efs_written(self): + def has_derived_efs(self): pass @abstractmethod def derived_ef_names(self): pass + @abstractmethod + def has_ef_subset(self): + pass + + @abstractmethod + def has_matrices(self): + pass + + @abstractmethod + def has_eigenvectors(self): + pass + + @abstractmethod + def has_residuals(self): + pass + @abstractmethod def get_sound_speed(self, which_values=None): pass @@ -197,7 +171,10 @@ class LegolasDataSet(LegolasDataContainer): def __init__(self, datfile): self.datfile = Path(datfile) with open(self.datfile, "rb") as istream: - self.header = get_header(istream) + filemanager = LegolasFileManager(istream) + self.legolas_version = filemanager.legolas_version + if self.legolas_version < "2.0": + self.header = LegolasLegacyHeader(filemanager) self.grid = read_grid(istream, self.header) self.grid_gauss = read_grid_gauss(istream, self.header) self.equilibria = read_equilibrium_arrays(istream, self.header) @@ -215,17 +192,16 @@ def __init__(self, datfile): self.x_start = self.header["x_start"] self.x_end = self.header["x_end"] - self.gridpoints = self.header["gridpts"] - self.gauss_gridpoints = self.header["gauss_gridpts"] - self.matrix_gridpoints = self.header["matrix_gridpts"] - self.ef_gridpoints = self.header["ef_gridpts"] + self.gridpoints = self.header["gridpoints"] + self.gauss_gridpoints = self.header["gauss_gridpoints"] + self.matrix_gridpoints = self.header["matrix_gridpoints"] + self.ef_gridpoints = self.header["ef_gridpoints"] self.gamma = self.header["gamma"] self.eq_type = self.header["eq_type"] - self._parameters = self.header["params"] - self.cgs = self.header["cgs"] + self._parameters = self.header["parameters"] self.units = self.header["units"] - self.eq_names = self.header["equil_names"] - self.legolas_version = self.header["legolas_version"] + self.cgs = self.units["cgs"] + self.eq_names = self.header["equilibrium_names"] self._continua = calculate_continua(self) @@ -275,107 +251,66 @@ def efs_written(self): return self.header["eigenfuncs_written"] @property - def ef_names(self): - """ - Retrieves the eigenfunction names. + def continua(self) -> dict: + """Returns the continua in a dict with the continua names as keys.""" + return self._continua - Returns - ------- - ef_names : numpy.ndarray - Array containing the names of the eigenfunctions as strings. - None if no eigenfunctions are present. - """ - return self.header.get("ef_names", None) + @property + def parameters(self) -> dict: + """Returns the parameters in a dict with the parameter names as keys""" + return self._parameters @property - def ef_grid(self): - """ - Retrieves the eigenfunction grid. + def has_efs(self) -> bool: + """Returns `True` if eigenfunctions are present.""" + return self.header["has_efs"] - Returns - ------- - ef_grid : numpy.ndarray - The eigenfunction grid. Returns None if eigenfunctions are not present. - """ - if self.efs_written: - with open(self.datfile, "rb") as istream: - grid = read_ef_grid(istream, self.header) - return grid - else: + @property + def ef_grid(self) -> np.ndarray: + """Returns the eigenfunction grid, None if eigenfunctions are not present.""" + if not self.has_efs: return None + if getattr(self, "_ef_grid", None) is None: + with open(self.datfile, "rb") as istream: + self._ef_grid = read_ef_grid(istream, self.header) + return self._ef_grid @property - def ef_subset(self): - """ - Checks if dataset contains a subset of the eigenfunctions - - Returns - ------- - bool - `True` if subset present, `False` otherwise - """ - return self.header["eigenfunction_subset_used"] + def ef_names(self) -> np.ndarray: + """Retrieves the eigenfunction names, None if eigenfunctions are not present.""" + return self.header.get("ef_names", None) @property - def derived_efs_written(self): - """ - Checks if derived eigenfunctions are present. - - Returns - ------- - bool - If `True`, derived eigenfunctions are present in the datfile. - """ - return self.header["derived_eigenfuncs_written"] + def has_derived_efs(self) -> bool: + """Returns `True` if derived eigenfunctions are present.""" + return self.header["has_derived_efs"] @property - def derived_ef_names(self): - """ - Retrieves the derived eigenfunction names. - - Returns - ------- - numpy.ndarray - Array containing the names of the derived eigenfunctions as strings. - None if no derived eigenfunctions are present. - """ + def derived_ef_names(self) -> np.ndarray: + """Retrieves the derived eigenfunction names, None if not present.""" return self.header.get("derived_ef_names", None) - @staticmethod - def _get_values(array, which_values): - """ - Determines which values to retrieve from an array. - - Parameters - ---------- - array : numpy.ndarray - The array with values. - which_values : str - Can be one of the following: + @property + def has_ef_subset(self) -> bool: + """Returns `True` if the dataset contains a subset of the eigenfunctions.""" + return self.header["ef_subset_used"] - - "average": returns the average of the array - - "minimum": returns the minimum of the array - - "maximum": returns the maximum of the array + @property + def has_matrices(self) -> bool: + """Checks if matrices are present.""" + return self.header["has_matrices"] - If not supplied or equal to None, simply returns the array. + @property + def has_eigenvectors(self) -> bool: + """Checks if eigenvectors are present.""" + return self.header["has_eigenvectors"] - Returns - ------- - array : numpy.ndarray - Numpy array with values depending on the argument provided. - """ - if which_values is None: - return array - elif which_values == "average": - return np.average(array) - elif which_values == "minimum": - return np.min(array) - elif which_values == "maximum": - return np.max(array) - else: - raise ValueError(f"unknown argument which_values: {which_values}") + @property + def has_residuals(self) -> bool: + """Checks if residuals are present.""" + return self.header["has_residuals"] - def get_sound_speed(self, which_values=None): + def get_sound_speed(self, which_values=None) -> Union[float, np.ndarray]: """ Calculates the sound speed based on the equilibrium arrays, given by :math:`c_s = \\sqrt{\\frac{\\gamma p_0}{\\rho_0}}`. @@ -383,23 +318,19 @@ def get_sound_speed(self, which_values=None): Parameters ---------- which_values : str - Can be one of the following: - - None : returns the sound speed as a function of the grid. - - "average": returns the average sound speed over the grid. - - "minimum": returns the minimum sound speed over the grid. - - "maximum": returns the maximum sound speed over the grid. + Callback to :meth:`get_values`, either "average"/"minimum"/"maximum". Returns ------- - cs : float, numpy.ndarray + float or numpy.ndarray Array with the sound speed at every grid point, or a float corresponding to the value of `which_values` if provided. """ pressure = self.equilibria["T0"] * self.equilibria["rho0"] cs = np.sqrt(self.gamma * pressure / self.equilibria["rho0"]) - return self._get_values(cs, which_values) + return get_values(cs, which_values) - def get_alfven_speed(self, which_values=None): + def get_alfven_speed(self, which_values=None) -> Union[float, np.ndarray]: """ Calculates the Alfvén speed based on the equilibrium arrays, given by :math:`c_A = \\sqrt{\\frac{B_0^2}{\\rho_0}}`. @@ -407,22 +338,18 @@ def get_alfven_speed(self, which_values=None): Parameters ---------- which_values : str - Can be one of the following: - - None : returns the Alfvén speed as a function of the grid. - - "average": returns the average Alfvén speed over the grid. - - "minimum": returns the minimum Alfvén speed over the grid. - - "maximum": returns the maximum Alfvén speed over the grid. + Callback to :meth:`get_values`, either "average"/"minimum"/"maximum". Returns ------- - cA : float, numpy.ndarray + float or numpy.ndarray Array with the Alfvén speed at every grid point, or a float corresponding to the value of `which_values` if provided. """ cA = self.equilibria["B0"] / np.sqrt(self.equilibria["rho0"]) - return self._get_values(cA, which_values) + return get_values(cA, which_values) - def get_tube_speed(self, which_values=None): + def get_tube_speed(self, which_values=None) -> Union[float, np.ndarray]: """ Calculates the tube speed for a cylinder, given by :math:`c_t = \\frac{c_s c_A}{\\sqrt{c_s^2 + c_A^2}}`. @@ -430,15 +357,11 @@ def get_tube_speed(self, which_values=None): Parameters ---------- which_values : str - Can be one of the following: - - None : returns the tube speed as a function of the grid. - - "average": returns the average tube speed over the grid. - - "minimum": returns the minimum tube speed over the grid. - - "maximum": returns the maximum tube speed over the grid. + Callback to :meth:`get_values`, either "average"/"minimum"/"maximum". Returns ------- - ct = float, numpy.ndarray + float or numpy.ndarray Array with the tube speed at every grid point, or a float corresponding to the value of `which_values` if provided. Returns `None` if the geometry is not cylindrical. @@ -452,9 +375,9 @@ def get_tube_speed(self, which_values=None): cA = self.get_alfven_speed() cs = self.get_sound_speed() ct = cs * cA / np.sqrt(cs**2 + cA**2) - return self._get_values(ct, which_values) + return get_values(ct, which_values) - def get_reynolds_nb(self, which_values=None): + def get_reynolds_nb(self, which_values=None) -> Union[float, np.ndarray]: """ Calculates the Reynolds number, defined as :math:`R_e = \\frac{ac_s}{\\eta}` where the slabsize is given by @@ -463,15 +386,11 @@ def get_reynolds_nb(self, which_values=None): Parameters ---------- which_values : str - Can be one of the following: - - None : returns the Reynolds number as a function of the grid. - - "average": returns the average Reynolds number over the grid. - - "minimum": returns the minimum Reynolds number over the grid. - - "maximum": returns the maximum Reynolds number the grid + Callback to :meth:`get_values`, either "average"/"minimum"/"maximum". Returns ------- - Re : float, numpy.ndarray + float or numpy.ndarray Array with the Reynolds number at every grid point, or a float corresponding to the value of `which_values` if provided. Returns `None` if the resistivity is zero somewhere on the domain. @@ -487,9 +406,9 @@ def get_reynolds_nb(self, which_values=None): return None else: Re = a * cs / eta - return self._get_values(Re, which_values) + return get_values(Re, which_values) - def get_magnetic_reynolds_nb(self, which_values=None): + def get_magnetic_reynolds_nb(self, which_values=None) -> Union[float, np.ndarray]: """ Calculates the magnetic Reynolds number, defined as :math:`R_m = \\frac{ac_A}{\\eta}` where the slabsize is given by @@ -498,15 +417,11 @@ def get_magnetic_reynolds_nb(self, which_values=None): Parameters ---------- which_values : str - Can be one of the following: - - None : returns the magnetic Reynolds number as a function of the grid. - - "average": returns the average magnetic Reynolds number over the grid. - - "minimum": returns the minimum magnetic Reynolds number over the grid. - - "maximum": returns the maximum magnetic Reynolds number over the grid + Callback to :meth:`get_values`, either "average"/"minimum"/"maximum". Returns ------- - Rm : float, numpy.ndarray(dtype=float, ndim=1) + float or numpy.ndarray Array with the magnetic Reynolds number at every grid point, or a float corresponding to the value of `which_values` if provided. Returns `None` if the resistivity is zero somewhere on the domain. @@ -522,29 +437,22 @@ def get_magnetic_reynolds_nb(self, which_values=None): return None else: Rm = a * cA / eta - return self._get_values(Rm, which_values) + return get_values(Rm, which_values) - def get_k0_squared(self): + def get_k0_squared(self) -> float: """ Calculates the squared wave number, defined as :math:`k_0^2 = k_2^2 + k_3^2`. - - Returns - ------- - k0_sq : float - The wave number squared. - """ - k0_sq = self.parameters.get("k2") ** 2 + self.parameters.get("k3") ** 2 - return k0_sq + return self.parameters.get("k2") ** 2 + self.parameters.get("k3") ** 2 - def get_matrix_B(self): + def get_matrix_B(self) -> tuple(np.ndarray, np.ndarray, np.ndarray): """ Retrieves the matrix B from the datfile. Returns ------- - 3-tuple of numpy.ndarray(dtype=int, ndim=1) + Tuple(rows: numpy.ndarray, cols: numpy.ndarray, vals: numpy.ndarray) Tuple containing the rows, columns and values of the non-zero B-matrix elements. Rows and columns are integers, values are real. @@ -553,19 +461,19 @@ def get_matrix_B(self): MatricesNotPresent If the matrices were not saved to the datfile. """ - if not self.header["matrices_written"]: + if not self.has_matrices: raise MatricesNotPresent(self.datfile) with open(self.datfile, "rb") as istream: rows, cols, vals = read_matrix_B(istream, self.header) return rows, cols, vals - def get_matrix_A(self): + def get_matrix_A(self) -> tuple(np.ndarray, np.ndarray, np.ndarray): """ Retrieves the matrix A from the datfile. Returns ------- - 3-tuple of numpy.ndarray(dtype=int, ndim=1) + Tuple(rows: numpy.ndarray, cols: numpy.ndarray, vals: numpy.ndarray) Tuple containing the rows, columns and values of the non-zero A-matrix elements. Rows and columns are integers, values are complex. @@ -574,19 +482,19 @@ def get_matrix_A(self): MatricesNotPresent If the matrices were not saved to the datfile. """ - if not self.header["matrices_written"]: + if not self.has_matrices: raise MatricesNotPresent(self.datfile) with open(self.datfile, "rb") as istream: rows, cols, vals = read_matrix_A(istream, self.header) return rows, cols, vals - def get_eigenvectors(self): + def get_eigenvectors(self) -> np.ndarray: """ Retrieves the eigenvectors from the datfile. Returns ------- - eigenvectors : numpy.ndarray(dtype=complex, ndim=2) + numpy.ndarray Array containing the eigenvectors. One eigenvector in each column. @@ -595,19 +503,19 @@ def get_eigenvectors(self): EigenvectorsNotPresent If the eigenvectors were not saved to the datfile. """ - if not self.header["eigenvecs_written"]: + if not self.has_eigenvectors: raise EigenvectorsNotPresent(self.datfile) with open(self.datfile, "rb") as istream: evs = read_eigenvectors(istream, self.header) return evs - def get_residuals(self): + def get_residuals(self) -> np.ndarray: """ Retrieves the residuals from the datfile. Returns ------- - residuals : numpy.ndarray(dtype=double, ndim=1) + numpy.ndarray Array containing the residuals. Raises @@ -621,7 +529,7 @@ def get_residuals(self): res = read_residuals(istream, self.header) return res - def get_eigenfunctions(self, ev_guesses=None, ev_idxs=None): + def get_eigenfunctions(self, ev_guesses=None, ev_idxs=None) -> np.ndarray: """ Returns the eigenfunctions based on given eigenvalue guesses or their indices. An array will be returned where every item is a dictionary, containing @@ -637,13 +545,13 @@ def get_eigenfunctions(self, ev_guesses=None, ev_idxs=None): Returns ------- - eigenfuncs : numpy.ndarray(dtype=dict, ndim=1) + numpy.ndarray Array containing the eigenfunctions and eigenvalues corresponding to the supplied indices. Every index in this array contains a dictionary with the eigenfunctions and corresponding eigenvalue. The keys of each dictionary are the eigenfunction names. """ - if not self.efs_written: + if not self.has_efs: raise EigenfunctionsNotPresent("eigenfunctions not written to datfile") if ev_guesses is not None and ev_idxs is not None: raise ValueError( @@ -665,7 +573,7 @@ def get_eigenfunctions(self, ev_guesses=None, ev_idxs=None): eigenfuncs[dict_idx] = efs return eigenfuncs - def get_derived_eigenfunctions(self, ev_guesses=None, ev_idxs=None): + def get_derived_eigenfunctions(self, ev_guesses=None, ev_idxs=None) -> np.ndarray: """ Returns the derived eigenfunctions based on given eigenvalue guesses or their indices. An array will be returned where every item is a dictionary, containing @@ -681,14 +589,14 @@ def get_derived_eigenfunctions(self, ev_guesses=None, ev_idxs=None): Returns ------- - numpy.ndarray(dtype=dict, ndim=1) + numpy.ndarray Array containing the derived eigenfunctions and eigenvalues corresponding to the supplied indices. Every index in this array contains a dictionary with the derived eigenfunctions and corresponding eigenvalue. The keys of each dictionary are the corresponding eigenfunction names. """ - if not self.derived_efs_written: + if not self.has_derived_efs: raise EigenfunctionsNotPresent( "derived eigenfunctions not written to datfile" ) @@ -710,7 +618,7 @@ def get_derived_eigenfunctions(self, ev_guesses=None, ev_idxs=None): derived_efs[dict_idx] = defs return derived_efs - def get_nearest_eigenvalues(self, ev_guesses): + def get_nearest_eigenvalues(self, ev_guesses) -> tuple(np.ndarray, np.ndarray): """ Calculates the eigenvalues nearest to a given guess. This calculates the nearest eigenvalue based on the distance between two points. @@ -723,9 +631,8 @@ def get_nearest_eigenvalues(self, ev_guesses): Returns ------- - idxs : numpy.ndarray(dtype=int, ndim=1) + Tuple(numpy.ndarray, numpy.ndarray) The indices of the nearest eigenvalues in the :attr:`eigenvalues` array. - eigenvalues: numpy.ndarray(dtype=complex, ndim=1) The nearest eigenvalues to the provided guesses, corresponding with the indices `idxs`. """ @@ -767,150 +674,124 @@ def __len__(self): return len(self.datasets) @property - def efs_written(self): + def continua(self) -> dict: """ - Checks if the eigenfunctions are written. - - Returns - ------- - efs_written : numpy.ndarray - An array of bools corresponding to the various datasets, `True` if a - dataset has eigenfunctions present. + Returns the continua. Each key corresponds to a multiple Numpy arrays, + one for each dataset. """ - return np.array([ds.efs_written for ds in self.datasets]) + keys = self[0].continua.keys() + _continua = {key: [] for key in keys} + for ds in self: + for key in keys: + _continua[key].append(ds.continua[key]) + return {key: np.array(values) for key, values in _continua.items()} @property - def ef_names(self): + def parameters(self) -> dict: """ - Returns the eigenfunction names. + Returns the parameters. Each key corresponds to multiple Numpy arrays, + one for each dataset. + """ + keys = self[0].parameters.keys() + _params = {key: [] for key in keys} + for ds in self: + for key in keys: + _params[key].append(ds.parameters[key]) + return {key: np.array(values) for key, values in _params.items()} - Returns - ------- - ef_names : numpy.ndarray - An array with the eigenfunction names as strings. - """ - names = np.array([ds.ef_names for ds in self.datasets], dtype=object) - for item in names: - if item is None: - continue - else: - return item - else: - return None + @property + def has_efs(self) -> np.ndarray: + """Returns `True` if eigenfunctions are present.""" + return np.array([ds.has_efs for ds in self.datasets], dtype=bool) @property - def ef_grid(self): - """ - Retrieves the eigenfunction grid. + def ef_names(self) -> np.ndarray: + """Returns the eigenfunction names.""" + return np.array([ds.ef_names for ds in self.datasets], dtype=object) - Returns - ------- - ef_grid : numpy.ndarray - An array with arrays, containing the eigenfunction grids for each dataset. - """ + @property + def ef_grid(self) -> np.ndarray: + """Returns the eigenfunction grid.""" return np.array([ds.ef_grid for ds in self.datasets], dtype=object) @property - def derived_efs_written(self): - """ - Checks if the derived eigenfunctions are written. + def has_derived_efs(self) -> np.ndarray: + """Returns `True` at index `i` if eigenfunctions are present in dataset `i`.""" + return np.array([ds.has_derived_efs for ds in self.datasets]) - Returns - ------- - numpy.ndarray(dtype=bool) - An array of bools corresponding to the various datasets, `True` if a - dataset has derived eigenfunctions present. - """ - return np.array([ds.derived_efs_written for ds in self.datasets]) + @property + def derived_ef_names(self) -> np.ndarray: + """Returns the derived eigenfunction names.""" + return np.array([ds.derived_ef_names for ds in self.datasets], dtype=object) @property - def derived_ef_names(self): - """ - Returns the derived eigenfunction names. + def has_ef_subset(self) -> np.ndarray: + """Returns `True` at index `i` if the `i`-th dataset contains a subset.""" + return np.array([ds.has_ef_subset for ds in self.datasets], dtype=object) - Returns - ------- - numpy.ndarray(dtype=str) - An array with the derived eigenfunction names as strings. - """ - names = np.array([ds.derived_ef_names for ds in self.datasets], dtype=object) - for item in names: - if item is None: - continue - else: - return item - else: - return None + @property + def has_matrices(self) -> np.ndarray: + """Returns `True` at index `i` if the `i`-th dataset contains matrices.""" + return np.array([ds.has_matrices for ds in self.datasets], dtype=object) - def get_sound_speed(self, which_values=None): + @property + def has_eigenvectors(self) -> np.ndarray: + """Returns `True` at index `i` if the `i`-th dataset contains eigenvectorst.""" + return np.array([ds.has_eigenvectors for ds in self.datasets], dtype=object) + + @property + def has_residuals(self) -> np.ndarray: + """Returns `True` at index `i` if the `i`-th dataset contains residuals.""" + return np.array([ds.has_residuals for ds in self.datasets], dtype=object) + + def get_sound_speed(self, which_values=None) -> np.ndarray: """ Calculates the sound speed for the various datasets. Parameters ---------- which_values : str - Which values to return, can be one of the following: - - None: every element of the return array will be an array in itself. - - "average": every element of the return array is a float, equal to - the average sound speed for that dataset. - - "minimum": every element of the return array is a float, equal to - the minimum sound speed for that dataset. - - "maximum": every element of the return array is a float, equal to - the minimum sound speed for that dataset. + Callback to :meth:`get_values`, either "average"/"minimum"/"maximum". Returns ------- - cs : numpy.ndarray + numpy.ndarray A Numpy array of same length as the number of datasets, containing the sound speeds. Elements are either arrays themselves or floats, depending on the value of `which_values`. """ return np.array([ds.get_sound_speed(which_values) for ds in self.datasets]) - def get_alfven_speed(self, which_values=None): + def get_alfven_speed(self, which_values=None) -> np.ndarray: """ Calculates the Alfvén speed for the various datasets. Parameters ---------- which_values : str - Which values to return, can be one of the following: - - None: every element of the return array will be an array in itself. - - "average": every element of the return array is a float, equal to - the average Alfvén speed for that dataset. - - "minimum": every element of the return array is a float, equal to - the minimum Alfvén speed for that dataset. - - "maximum": every element of the return array is a float, equal to - the minimum Alfvén speed for that dataset. + Callback to :meth:`get_values`, either "average"/"minimum"/"maximum". Returns ------- - cA : numpy.ndarray + numpy.ndarray A Numpy array of same length as the number of datasets, containing the Alfvén speeds. Elements are either arrays themselves or floats, depending on the value of `which_values`. """ return np.array([ds.get_alfven_speed(which_values) for ds in self.datasets]) - def get_tube_speed(self, which_values=None): + def get_tube_speed(self, which_values=None) -> np.ndarray: """ Calculates the tube speed for the various datasets. Parameters ---------- which_values : str - Which values to return, can be one of the following: - - None: every element of the return array will be an array in itself. - - "average": every element of the return array is a float, equal to - the average tube speed for that dataset. - - "minimum": every element of the return array is a float, equal to - the minimum tube speed for that dataset. - - "maximum": every element of the return array is a float, equal to - the minimum tube speed for that dataset. + Callback to :meth:`get_values`, either "average"/"minimum"/"maximum". Returns ------- - cA : numpy.ndarray + numpy.ndarray A Numpy array of same length as the number of datasets, containing the tube speeds. Elements are either arrays themselves or floats, depending on the value of `which_values`. Elements are None if the geometry is @@ -918,25 +799,18 @@ def get_tube_speed(self, which_values=None): """ return np.array([ds.get_tube_speed(which_values) for ds in self.datasets]) - def get_reynolds_nb(self, which_values=None): + def get_reynolds_nb(self, which_values=None) -> np.ndarray: """ Calculates the Reynolds number for the various datasets. Parameters ---------- which_values : str - Which values to return, can be one of the following: - - None: every element of the return array will be an array in itself. - - "average": every element of the return array is a float, equal to - the average Reynolds number for that dataset. - - "minimum": every element of the return array is a float, equal to - the minimum Reynolds number for that dataset. - - "maximum": every element of the return array is a float, equal to - the minimum Reynolds number for that dataset. + Callback to :meth:`get_values`, either "average"/"minimum"/"maximum". Returns ------- - Re : numpy.ndarray + numpy.ndarray A Numpy array of same length as the number of datasets, containing the Reynolds number. Elements are either arrays themselves or floats, depending on the value of `which_values`. @@ -944,25 +818,18 @@ def get_reynolds_nb(self, which_values=None): """ return np.array([ds.get_reynolds_nb(which_values) for ds in self.datasets]) - def get_magnetic_reynolds_nb(self, which_values=None): + def get_magnetic_reynolds_nb(self, which_values=None) -> np.ndarray: """ Calculates the magnetic Reynolds number for the various datasets. Parameters ---------- which_values : str - Which values to return, can be one of the following: - - None: every element of the return array will be an array in itself. - - "average": every element of the return array is a float, equal to - the average magnetic Reynolds number for that dataset. - - "minimum": every element of the return array is a float, equal to - the minimum magnetic Reynolds number for that dataset. - - "maximum": every element of the return array is a float, equal to - the minimum magnetic Reynolds number for that dataset. + Callback to :meth:`get_values`, either "average"/"minimum"/"maximum". Returns ------- - Re : numpy.ndarray + numpy.ndarray A Numpy array of same length as the number of datasets, containing the magnetic Reynolds number. Elements are either arrays themselves or floats, depending on the value of `which_values`. @@ -972,14 +839,14 @@ def get_magnetic_reynolds_nb(self, which_values=None): [ds.get_magnetic_reynolds_nb(which_values) for ds in self.datasets] ) - def get_k0_squared(self): + def get_k0_squared(self) -> np.ndarray: """ Calculates the squared wave number for the various datasets. Returns ------- - k0_sq : numpy.ndarray + numpy.ndarray A Numpy array of same length as the number of datasets, containing the squared wavenumber for each. """ - return np.array([ds.get_k0_squared() for ds in self.datasets]) + return np.array([ds.get_k0_squared() for ds in self.datasets], dtype=float) diff --git a/post_processing/pylbo/file_handler.py b/post_processing/pylbo/file_handler.py index d91c0883..17fad6e4 100644 --- a/post_processing/pylbo/file_handler.py +++ b/post_processing/pylbo/file_handler.py @@ -1,12 +1,13 @@ import os -import numpy as np import tkinter as tk from pathlib import Path from tkinter import filedialog + +import numpy as np +from pylbo.data_containers import LegolasDataSeries, LegolasDataSet from pylbo.exceptions import InvalidLegolasFile -from pylbo.data_containers import LegolasDataSet, LegolasDataSeries -from pylbo.utilities.toolbox import transform_to_list from pylbo.utilities.logger import pylboLogger +from pylbo.utilities.toolbox import transform_to_list def _validate_file(file): @@ -33,7 +34,7 @@ def _validate_file(file): raise InvalidLegolasFile(path_to_file) -def load(datfile, display_info=True): +def load(datfile): """ Loads a single Legolas datfile. @@ -41,8 +42,6 @@ def load(datfile, display_info=True): ---------- datfile : str, ~os.PathLike Path to the datfile. - display_info : bool - If `True`, datfile information is written to terminal. Raises ------ @@ -58,29 +57,30 @@ def load(datfile, display_info=True): raise ValueError("load() takes a single datfile.") _validate_file(datfile) ds = LegolasDataSet(datfile) - if display_info: - pylboLogger.info(f"Legolas version : {ds.legolas_version}") - pylboLogger.info(f"file loaded : {ds.datfile.parent} -- {ds.datfile.name}") - pylboLogger.info(f"gridpoints : {ds.gridpoints}") - pylboLogger.info(f"geometry : {ds.geometry} in {ds.x_start, ds.x_end}") - pylboLogger.info(f"equilibrium : {ds.eq_type}") - if ds.header["matrices_written"]: - pylboLogger.info("matrices present in datfile") - if ds.header["eigenfuncs_written"]: - pylboLogger.info("eigenfunctions present in datfile") - if ds.header.get("derived_eigenfuncs_written", False): - pylboLogger.info("derived eigenfunctions present in datfile") - if ds.header.get("eigenfunction_subset_used", False): - saved_efs = len(ds.header["ef_written_idxs"]) - total_efs = len(ds.eigenvalues) - pylboLogger.info( - f"subset saved: {saved_efs}/{total_efs} eigenvalues have eigenfunctions" - ) - pylboLogger.info("-" * 75) + pylboLogger.info(f"Legolas version : {ds.legolas_version}") + pylboLogger.info(f"file loaded : {ds.datfile.parent} -- {ds.datfile.name}") + pylboLogger.info(f"gridpoints : {ds.gridpoints}") + pylboLogger.info(f"geometry : {ds.geometry} in {ds.x_start, ds.x_end}") + pylboLogger.info(f"equilibrium : {ds.eq_type}") + if ds.has_matrices: + pylboLogger.info("matrices present in datfile") + if ds.has_eigenvectors: + pylboLogger.info("eigenvectors present in datfile") + if ds.has_efs: + pylboLogger.info("eigenfunctions present in datfile") + if ds.has_derived_efs: + pylboLogger.info("derived eigenfunctions present in datfile") + if ds.has_ef_subset: + saved_efs = len(ds.header["ef_written_idxs"]) + total_efs = len(ds.eigenvalues) + pylboLogger.info( + f"subset saved: {saved_efs}/{total_efs} eigenvalues have eigenfunctions" + ) + pylboLogger.info("-" * 75) return ds -def load_series(datfiles, display_info=True): +def load_series(datfiles): """ Loads multiple Legolas datfiles. @@ -89,8 +89,6 @@ def load_series(datfiles, display_info=True): datfiles : list, numpy.ndarray Paths to the datfiles that should be loaded, in list/array form. Every element should be a string or a ~os.PathLike object. - display_info : bool - If `True`, datfile information is written to terminal. Raises ------ @@ -109,78 +107,68 @@ def load_series(datfiles, display_info=True): _validate_file(datfile) series = LegolasDataSeries(datfiles) - if display_info: - # handle version printing - versions = [ds.legolas_version.parse() for ds in series.datasets] - minversion, maxversion = min(versions), max(versions) - if minversion == maxversion: - info_msg = str(minversion) - else: - info_msg = f"{minversion} --> {maxversion}" - pylboLogger.info(f"Legolas_version : {info_msg}") - - # handle file information printing - names = sorted([ds.datfile.name for ds in series.datasets]) - pylboLogger.info(f"files loaded : {names[0]} --> {names[-1]}") - - # handle gridpoints printing - pts = [ds.gridpoints for ds in series.datasets] - minpts, maxpts = min(pts), max(pts) - if minpts == maxpts: - info_msg = str(minpts) - else: - info_msg = f"{minpts} --> {maxpts}" - pylboLogger.info(f"gridpoints : {info_msg}") - - # handle geometry printing - if not isinstance(series.geometry, str) and len(series.geometry) > 1: - pylboLogger.warning("multiple geometries detected!") - else: - pylboLogger.info(f"geometries : {series.geometry}") - - # handle equilibrium printing - equils = set([ds.eq_type for ds in series.datasets]) - if len(equils) > 1: - pylboLogger.error(f"multiple equilibria detected! -- {equils}") - raise ValueError - else: - pylboLogger.info(f"equilibria : {equils.pop()}") - - # check presence of matrices - matrices_present = set( - [ds.header["matrices_written"] for ds in series.datasets] - ) - if len(matrices_present) > 1: - pylboLogger.info("matrices present in some datfiles, but not all") - else: - if matrices_present.pop(): - pylboLogger.info("matrices present in all datfiles") - - # check presence of eigenfunctions - efs_present = set([ds.header["eigenfuncs_written"] for ds in series.datasets]) - if len(efs_present) > 1: - pylboLogger.info("eigenfunctions present in some datfiles, but not all") - else: - if efs_present.pop(): - pylboLogger.info("eigenfunctions present in all datfiles") - - # check presence of derived eigenfunctions - defs_present = set( - [ - ds.header.get("derived_eigenfuncs_written", False) - for ds in series.datasets - ] - ) - if len(defs_present) == 0: - pylboLogger.info("no derived eigenfunctions present") - elif len(defs_present) > 1: - pylboLogger.info( - "derived eigenfunctions present in some datfiles, but not all" - ) - else: - if defs_present.pop(): - pylboLogger.info("derived eigenfunctions present in all datfiles") - pylboLogger.info("-" * 75) + # handle version printing + versions = [ds.legolas_version.parse() for ds in series.datasets] + minversion, maxversion = min(versions), max(versions) + if minversion == maxversion: + info_msg = str(minversion) + else: + info_msg = f"{minversion} --> {maxversion}" + pylboLogger.info(f"Legolas_version : {info_msg}") + + # handle file information printing + names = sorted([ds.datfile.name for ds in series.datasets]) + pylboLogger.info(f"files loaded : {names[0]} --> {names[-1]}") + + # handle gridpoints printing + pts = [ds.gridpoints for ds in series.datasets] + minpts, maxpts = min(pts), max(pts) + if minpts == maxpts: + info_msg = str(minpts) + else: + info_msg = f"{minpts} --> {maxpts}" + pylboLogger.info(f"gridpoints : {info_msg}") + + # handle geometry printing + if not isinstance(series.geometry, str) and len(series.geometry) > 1: + pylboLogger.warning("multiple geometries detected!") + else: + pylboLogger.info(f"geometries : {series.geometry}") + + # handle equilibrium printing + equils = set([ds.eq_type for ds in series.datasets]) + if len(equils) > 1: + pylboLogger.error(f"multiple equilibria detected! -- {equils}") + raise ValueError + else: + pylboLogger.info(f"equilibria : {equils.pop()}") + + # check presence of matrices + matrices_present = set(series.has_matrices) + if len(matrices_present) > 1: + pylboLogger.info("matrices present in some datfiles, but not all") + else: + if matrices_present.pop(): + pylboLogger.info("matrices present in all datfiles") + + # check presence of eigenfunctions + efs_present = set(series.has_efs) + if len(efs_present) > 1: + pylboLogger.info("eigenfunctions present in some datfiles, but not all") + else: + if efs_present.pop(): + pylboLogger.info("eigenfunctions present in all datfiles") + + # check presence of derived eigenfunctions + defs_present = set(series.has_derived_efs) + if len(defs_present) == 0: + pylboLogger.info("no derived eigenfunctions present") + elif len(defs_present) > 1: + pylboLogger.info("derived eigenfunctions present in some datfiles, but not all") + else: + if defs_present.pop(): + pylboLogger.info("derived eigenfunctions present in all datfiles") + pylboLogger.info("-" * 75) return series diff --git a/post_processing/pylbo/utilities/datfile_utils.py b/post_processing/pylbo/utilities/datfile_utils.py index b9bfac5d..1ef0ff44 100644 --- a/post_processing/pylbo/utilities/datfile_utils.py +++ b/post_processing/pylbo/utilities/datfile_utils.py @@ -349,7 +349,7 @@ def read_grid(istream, header): The base grid from the datfile. """ istream.seek(header["offsets"]["grid"]) - fmt = ALIGN + header["gridpts"] * "d" + fmt = ALIGN + header["gridpoints"] * "d" grid = struct.unpack(fmt, istream.read(struct.calcsize(fmt))) return np.asarray(grid) @@ -371,7 +371,7 @@ def read_grid_gauss(istream, header): The Gaussian grid from the datfile. """ istream.seek(header["offsets"]["grid_gauss"]) - fmt = ALIGN + header["gauss_gridpts"] * "d" + fmt = ALIGN + header["gauss_gridpoints"] * "d" grid_gauss = struct.unpack(fmt, istream.read(struct.calcsize(fmt))) return np.asarray(grid_gauss) @@ -393,7 +393,7 @@ def read_ef_grid(istream, header): The eigenfunction grid from the datfile. """ istream.seek(header["offsets"]["ef_grid"]) - fmt = ALIGN + header["ef_gridpts"] * "d" + fmt = ALIGN + header["ef_gridpoints"] * "d" ef_grid = struct.unpack(fmt, istream.read(struct.calcsize(fmt))) return np.asarray(ef_grid) @@ -414,14 +414,17 @@ def read_eigenvectors(istream, header): eigenvectors : numpy.ndarray(dtype=complex, ndim=2) The eigenvectors from the datfile, one in each column. """ - istream.seek(header["offsets"]["eigenvectors"]) - fmt = ALIGN + (2 * "d") * header["eigenvec_len"] * header["nb_eigenvecs"] + offsets = header["offsets"] + ev_length = offsets["eigenvector_length"] + nb_evs = offsets["nb_eigenvectors"] + istream.seek(offsets["eigenvectors"]) + fmt = ALIGN + (2 * "d") * ev_length * nb_evs hdr = struct.unpack(fmt, istream.read(struct.calcsize(fmt))) reals = hdr[::2] imags = hdr[1::2] return np.reshape( np.asarray([complex(x, y) for x, y in zip(reals, imags)]), - (header["eigenvec_len"], header["nb_eigenvecs"]), + (ev_length, nb_evs), order="F", ) @@ -468,7 +471,7 @@ def read_eigenvalues(istream, header, omit_large_evs=True): The eigenvalues from the datfile, with optionally omitted large values. """ istream.seek(header["offsets"]["eigenvalues"]) - fmt = ALIGN + 2 * header["nb_eigenvals"] * "d" + fmt = ALIGN + 2 * header["nb_eigenvalues"] * "d" hdr = struct.unpack(fmt, istream.read(struct.calcsize(fmt))) # hdr is a 1D list with [real, imag, real, imag, real, imag...] reals = hdr[::2] @@ -496,10 +499,10 @@ def read_equilibrium_arrays(istream, header): Dictionary containing the equilibrium arrays, with keys given by `header['equil_names']`. """ - istream.seek(header["offsets"]["equil_arrays"]) + istream.seek(header["offsets"]["equilibrium_arrays"]) equil_arrays = {} - for name in header["equil_names"]: - fmt = ALIGN + header["gauss_gridpts"] * "d" + for name in header["equilibrium_names"]: + fmt = ALIGN + header["gauss_gridpoints"] * "d" equil_array = struct.unpack(fmt, istream.read(struct.calcsize(fmt))) equil_arrays.update({name: np.asarray(equil_array)}) return equil_arrays @@ -530,7 +533,7 @@ def read_eigenfunction(istream, header, ev_index): eigenvalue, associated with the same `ef_index`. """ ef_offset = header["offsets"]["ef_arrays"] - ef_gridpts = header["ef_gridpts"] + ef_gridpts = header["ef_gridpoints"] nb_eigenfuncs = len(header["ef_written_idxs"]) eigenfunctions = {} @@ -587,7 +590,7 @@ def read_derived_eigenfunction(istream, header, ev_index): eigenvalue, associated with the same `ev_index`. """ ef_offset = header["offsets"]["derived_ef_arrays"] - ef_gridpts = header["ef_gridpts"] + ef_gridpts = header["ef_gridpoints"] nb_eigenfuncs = len(header["ef_written_idxs"]) eigenfunctions = {} @@ -636,7 +639,7 @@ def read_matrix_B(istream, header): the rows and column indices. """ istream.seek(header["offsets"]["matrix_B"]) - fmt = ALIGN + (2 * "i" + "d") * header["nonzero_B_elements"] + fmt = ALIGN + (2 * "i" + "d") * header["offsets"]["nonzero_B_elements"] hdr = struct.unpack(fmt, istream.read(struct.calcsize(fmt))) rows = np.asarray(hdr[::3]) # rows are 1, 4, 7, 10 etc (Fortran indexing) cols = np.asarray(hdr[1::3]) # columns are 2, 5, 8, 11 etc (Fortran indexing) @@ -666,7 +669,7 @@ def read_matrix_A(istream, header): the row and column indices. """ istream.seek(header["offsets"]["matrix_A"]) - fmt = ALIGN + (2 * "i" + 2 * "d") * header["nonzero_A_elements"] + fmt = ALIGN + (2 * "i" + 2 * "d") * header["offsets"]["nonzero_A_elements"] hdr = struct.unpack(fmt, istream.read(struct.calcsize(fmt))) rows = np.asarray(hdr[::4]) cols = np.asarray(hdr[1::4]) diff --git a/post_processing/pylbo/utilities/datfiles/__init__.py b/post_processing/pylbo/utilities/datfiles/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/post_processing/pylbo/utilities/datfiles/file_manager.py b/post_processing/pylbo/utilities/datfiles/file_manager.py new file mode 100644 index 00000000..07b3357b --- /dev/null +++ b/post_processing/pylbo/utilities/datfiles/file_manager.py @@ -0,0 +1,140 @@ +import struct +from functools import wraps +from typing import BinaryIO, List, Union + +from pylbo._version import VersionHandler + + +def requires_version(version_needed, default=None): + def check_version(func): + @wraps(func) + def wrapper(*args, **kwargs): + if args[0].legolas_version < version_needed: + return default + return func(*args, **kwargs) + + return wrapper + + return check_version + + +class LegolasFileManager: + SIZE_CHAR = struct.calcsize("c") + SIZE_INT = struct.calcsize("i") + SIZE_BOOL = SIZE_INT # fortran logical is 4-byte integer + SIZE_DOUBLE = struct.calcsize("d") + SIZE_COMPLEX = struct.calcsize(2 * "d") + ALIGN = "=" + + def __init__(self, istream: BinaryIO): + self._istream = istream + self.istream.seek(0) + self.legolas_version = self._read_legolas_version() + + @property + def istream(self) -> BinaryIO: + """Returns the input stream of the file manager.""" + return self._istream + + def read_string_from_istream( + self, length: int, amount: int = 1 + ) -> Union[str, List[str]]: + """ + Reads a string from the input stream. + + Parameters + ---------- + length : int + The length of the string to read. + amount : int, optional + The amount of strings to read, by default 1 + + Returns + ------- + str, list of str + The string or list of strings read from the input stream. + """ + fmt = self.ALIGN + amount * length * "c" + hdr = struct.unpack(fmt, self.istream.read(struct.calcsize(fmt))) + if amount == 1: + return b"".join(hdr).strip().decode() + else: + return [ + b"".join(hdr[i : i + length]).strip().decode() + for i in range(0, amount * length, length) + ] + + def _read_number_from_istream(self, kind: str, amount: int = 1) -> complex: + """ + Reads a number from the input stream. + + Parameters + ---------- + kind : str + The kind of number to read. + Can be "i" for integer, "d" for double or "c" for complex. + amount : int, optional + The amount of numbers to read, by default 1 + + Returns + ------- + int, float, complex + The number or list of numbers read from the input stream. + """ + fmt = self.ALIGN + amount * kind + hdr = struct.unpack(fmt, self.istream.read(struct.calcsize(fmt))) + if amount == 1: + # for single values unpack and return single number + (hdr,) = hdr + return hdr + + def read_int_from_istream(self, amount: int = 1) -> int: + """ + Reads an integer from the input stream. + + Parameters + ---------- + amount : int, optional + The amount of integers to read, by default 1 + """ + return self._read_number_from_istream(kind="i", amount=amount) + + def read_float_from_istream(self, amount: int = 1) -> float: + """ + Reads a float from the input stream. + + Parameters + ---------- + amount : int, optional + The amount of floats to read, by default 1 + """ + return self._read_number_from_istream(kind="d", amount=amount) + + def read_complex_from_istream(self, amount: int = 1) -> complex: + """ + Reads a complex number from the input stream. + + Parameters + ---------- + amount : int, optional + The amount of complex numbers to read, by default 1 + """ + return complex(*self._read_number_from_istream(kind="d", amount=2 * amount)) + + def read_boolean_from_istream(self) -> bool: + """Reads a boolean from the input stream.""" + return bool(self.read_int_from_istream()) + + def _read_legolas_version(self): + version_name = self.read_string_from_istream(length=len("legolas_version")) + if version_name == "legolas_version": + # formatted version is character of length 10 + version = self.read_string_from_istream(length=10) + elif version_name == "datfile_version": + # old numbering, single integer + version = f"0.{str(self.read_int_from_istream())}.0" + else: + # very old numbering + self.istream.seek(0) + version = "0.0.0" + return VersionHandler(version) diff --git a/post_processing/pylbo/utilities/datfiles/header_legacy.py b/post_processing/pylbo/utilities/datfiles/header_legacy.py new file mode 100644 index 00000000..0eb41bd5 --- /dev/null +++ b/post_processing/pylbo/utilities/datfiles/header_legacy.py @@ -0,0 +1,279 @@ +from __future__ import annotations + +from typing import Any + +import numpy as np +from pylbo.utilities.datfiles.file_manager import LegolasFileManager, requires_version + + +class LegolasLegacyHeader: + def __init__(self, filemanager: LegolasFileManager) -> None: + self.legolas_version = filemanager.legolas_version + self.manager = filemanager + self.data = {} + + self._str_len = filemanager.read_int_from_istream() + self._str_len_array = filemanager.read_int_from_istream() + self.read_header_data() + self.read_data_offsets() + + def __str__(self) -> str: + return "".join([f"{key}: {self.data.get(key)}\n" for key in self.data.keys()]) + + def __getitem__(self, key: str) -> Any: + return self.data[key] + + def get(self, key: str, default: Any = None) -> Any: + return self.data.get(key, default) + + def read_header_data(self) -> None: + data = {} + manager = self.manager + + data["geometry"] = manager.read_string_from_istream(length=self._str_len) + data["x_start"], data["x_end"] = manager.read_float_from_istream(amount=2) + + for key in ("", "gauss_", "matrix_", "ef_"): + data[f"{key}gridpoints"] = manager.read_int_from_istream() + + data["gamma"] = manager.read_float_from_istream() + data["eq_type"] = manager.read_string_from_istream(length=self._str_len) + + data["has_efs"] = manager.read_boolean_from_istream() + data["has_derived_efs"] = self._read_has_derived_efs() + data["has_matrices"] = manager.read_boolean_from_istream() + data["has_eigenvectors"] = self._read_has_eigenvectors() + data["has_residuals"] = self._read_has_residuals() + ( + data["ef_subset_used"], + data["ef_subset_center"], + data["ef_subset_radius"], + ) = self._read_ef_subset_properties() + data["parameters"] = self._read_parameters() + data["equilibrium_names"] = self._read_equilibrium_names() + + data["units"] = self._read_units() + data["nb_eigenvalues"] = ( + manager.read_int_from_istream() + if self.legolas_version >= "1.0.2" + else data["matrix_gridpoints"] + ) + data["offsets"] = {} + self.data.update(data) + + def read_data_offsets(self) -> None: + istream = self.manager.istream + + offsets = {} + # eigenvalue offset + offsets["eigenvalues"] = istream.tell() + bytesize = self.data["nb_eigenvalues"] * self.manager.SIZE_COMPLEX + istream.seek(istream.tell() + bytesize) + # grid offset + offsets["grid"] = istream.tell() + bytesize = self.data["gridpoints"] * self.manager.SIZE_DOUBLE + istream.seek(istream.tell() + bytesize) + # grid gauss offset + offsets["grid_gauss"] = istream.tell() + bytesize = self.data["gauss_gridpoints"] * self.manager.SIZE_DOUBLE + istream.seek(istream.tell() + bytesize) + # equilibrium arrays offset + offsets["equilibrium_arrays"] = istream.tell() + bytesize = ( + self.data["gauss_gridpoints"] + * len(self.data["equilibrium_names"]) + * self.manager.SIZE_DOUBLE + ) + istream.seek(istream.tell() + bytesize) + + offsets.update(self._get_eigenfunction_offsets()) + offsets.update(self._get_derived_eigenfunction_offsets()) + offsets.update(self._get_eigenvector_offsets()) + offsets.update(self._get_residuals_offsets()) + offsets.update(self._get_matrices_offsets()) + + self.data["offsets"].update(offsets) + + @requires_version("1.1.3", default=False) + def _read_has_derived_efs(self) -> bool: + return self.manager.read_boolean_from_istream() + + @requires_version("1.3.0", default=False) + def _read_has_eigenvectors(self) -> bool: + return self.manager.read_boolean_from_istream() + + @requires_version("1.3.0", default=False) + def _read_has_residuals(self) -> bool: + return self.manager.read_boolean_from_istream() + + @requires_version("1.1.4", default=(False, None, None)) + def _read_ef_subset_properties(self) -> tuple(bool, complex, float): + used = self.manager.read_boolean_from_istream() + center = self.manager.read_complex_from_istream() + radius = self.manager.read_float_from_istream() + return (used, center, radius) + + def _read_parameters(self) -> dict: + nb_params = self.manager.read_int_from_istream() + len_param_name = ( + self.manager.read_int_from_istream() + if self.legolas_version >= "1.0.2" + else self._str_len_array + ) + parameter_names = self.manager.read_string_from_istream( + length=len_param_name, amount=nb_params + ) + parameter_values = self.manager.read_float_from_istream(amount=nb_params) + return { + name: value + for name, value in zip(parameter_names, parameter_values) + if not np.isnan(value) + } + + def _read_equilibrium_names(self) -> list[str]: + nb_names = self.manager.read_int_from_istream() + len_name = ( + self.manager.read_int_from_istream() + if self.legolas_version >= "1.0.2" + else self._str_len_array + ) + return self.manager.read_string_from_istream(length=len_name, amount=nb_names) + + def _read_units(self) -> dict: + units = {"cgs": self.manager.read_boolean_from_istream()} + if self.legolas_version >= "1.0.2": + nb_units, len_unit_name = self.manager.read_int_from_istream(amount=2) + unit_names = self.manager.read_string_from_istream( + length=len_unit_name, amount=nb_units + ) + else: + unit_names = [ + "unit_length", + "unit_time", + "unit_density", + "unit_velocity", + "unit_temperature", + "unit_pressure", + "unit_magneticfield", + "unit_numberdensity", + "unit_lambdaT", + "unit_conduction", + "unit_resistivity", + ] + nb_units = len(unit_names) + unit_values = self.manager.read_float_from_istream(amount=nb_units) + for name, value in zip(unit_names, unit_values): + units[name] = value + # mean molecular weight is added in 1.1.2, before this it defaults to 1 + units.setdefault("mean_molecular_weight", 1.0) + return units + + def _get_eigenfunction_offsets(self) -> dict: + if not self.data["has_efs"]: + return {} + # eigenfunction names + nb_efs = self.manager.read_int_from_istream() + self.data["ef_names"] = self.manager.read_string_from_istream( + length=self._str_len_array, amount=nb_efs + ) + # eigenfunction grid offset + offsets = {"ef_grid": self.manager.istream.tell()} + bytesize = self.data["ef_gridpoints"] * self.manager.SIZE_DOUBLE + self.manager.istream.seek(self.manager.istream.tell() + bytesize) + # ef written flags + self._set_ef_written_flags() + # eigenfunction offsets + offsets["ef_arrays"] = self.manager.istream.tell() + bytesize = ( + self.data["ef_gridpoints"] + * len(self.data["ef_written_idxs"]) + * nb_efs + * self.manager.SIZE_COMPLEX + ) + self.manager.istream.seek(self.manager.istream.tell() + bytesize) + return offsets + + def _set_ef_written_flags(self) -> None: + if self.legolas_version < "1.1.4": + self.data["ef_written_flags"] = np.asarray( + [True] * self.data["nb_eigenvalues"], dtype=bool + ) + self.data["ef_written_idxs"] = np.arange(0, self.data["nb_eigenvalues"]) + return + + ef_flags_size = self.manager.read_int_from_istream() + self.data["ef_written_flags"] = np.asarray( + self.manager.read_int_from_istream(amount=ef_flags_size), dtype=bool + ) + ef_idxs_size = self.manager.read_int_from_istream() + self.data["ef_written_idxs"] = ( + np.asarray( + self.manager.read_int_from_istream(amount=ef_idxs_size), dtype=int + ) + - 1 + ) # -1 to correct for Fortran 1-based indexing + # sanity check + assert all( + self.data["ef_written_idxs"] == np.where(self.data["ef_written_flags"])[0] + ) + + def _get_derived_eigenfunction_offsets(self) -> dict: + if not self.data["has_derived_efs"]: + return {} + nb_defs = self.manager.read_int_from_istream() + self.data["derived_ef_names"] = self.manager.read_string_from_istream( + length=self._str_len_array, amount=nb_defs + ) + offsets = {"derived_ef_arrays": self.manager.istream.tell()} + bytesize = ( + self.data["ef_gridpoints"] + * len(self.data["ef_written_idxs"]) + * nb_defs + * self.manager.SIZE_COMPLEX + ) + self.manager.istream.seek(self.manager.istream.tell() + bytesize) + return offsets + + def _get_eigenvector_offsets(self) -> dict: + if not self.data["has_eigenvectors"]: + return {} + len_eigvecs, nb_eigvecs = self.manager.read_int_from_istream(amount=2) + offsets = { + "eigenvectors": self.manager.istream.tell(), + "eigenvector_length": len_eigvecs, + "nb_eigenvectors": nb_eigvecs, + } + bytesize = len_eigvecs * nb_eigvecs * self.manager.SIZE_COMPLEX + self.manager.istream.seek(self.manager.istream.tell() + bytesize) + return offsets + + def _get_residuals_offsets(self) -> dict: + if not self.data["has_residuals"]: + return {} + nb_residuals = self.manager.read_int_from_istream() + offsets = { + "residuals": self.manager.istream.tell(), + "nb_residuals": nb_residuals, + } + bytesize = nb_residuals * self.manager.SIZE_DOUBLE + self.manager.istream.seek(self.manager.istream.tell() + bytesize) + return offsets + + def _get_matrices_offsets(self) -> dict: + if not self.data["has_matrices"]: + return {} + nonzero_B_elements = self.manager.read_int_from_istream() + nonzero_A_elements = self.manager.read_int_from_istream() + # B offsets, written as (row, column, value) + byte_size = ( + 2 * self.manager.SIZE_INT + self.manager.SIZE_DOUBLE + ) * nonzero_B_elements + offsets = { + "matrix_B": self.manager.istream.tell(), + "nonzero_B_elements": nonzero_B_elements, + } + self.manager.istream.seek(self.manager.istream.tell() + byte_size) + # A offsets + offsets["matrix_A"] = self.manager.istream.tell() + offsets["nonzero_A_elements"] = nonzero_A_elements + return offsets diff --git a/post_processing/pylbo/utilities/toolbox.py b/post_processing/pylbo/utilities/toolbox.py index f50fe787..0dc19892 100644 --- a/post_processing/pylbo/utilities/toolbox.py +++ b/post_processing/pylbo/utilities/toolbox.py @@ -46,6 +46,40 @@ def get_axis_geometry(ax): return axis_geometry +def get_values(array, which_values): + """ + Determines which values to retrieve from an array. + + Parameters + ---------- + array : numpy.ndarray + The array with values. + which_values : str + Can be one of the following: + + - "average": returns the average of the array + - "minimum": returns the minimum of the array + - "maximum": returns the maximum of the array + + If not supplied or equal to None, simply returns the array. + + Returns + ------- + array : numpy.ndarray + Numpy array with values depending on the argument provided. + """ + if which_values is None: + return array + elif which_values == "average": + return np.average(array) + elif which_values == "minimum": + return np.min(array) + elif which_values == "maximum": + return np.max(array) + else: + raise ValueError(f"unknown argument which_values: {which_values}") + + def add_pickradius_to_item(item, pickradius): """ Makes a matplotlib artist pickable and adds a pickradius. diff --git a/post_processing/pylbo/visualisation/eigenfunctions/derived_eigfunc_handler.py b/post_processing/pylbo/visualisation/eigenfunctions/derived_eigfunc_handler.py index 604e27fc..136d61d8 100644 --- a/post_processing/pylbo/visualisation/eigenfunctions/derived_eigfunc_handler.py +++ b/post_processing/pylbo/visualisation/eigenfunctions/derived_eigfunc_handler.py @@ -19,7 +19,7 @@ def __init__(self, data, def_ax, spec_ax): ) def _check_data_is_present(self): - if not any(transform_to_numpy(self.data.derived_efs_written)): + if not any(transform_to_numpy(self.data.has_derived_efs)): raise EigenfunctionsNotPresent( "None of the given datfiles has derived eigenfunctions " "written to it." @@ -69,5 +69,5 @@ def _get_title(self, ef_name): ) def _mark_points_without_data_written(self): - self._condition_to_make_transparent = "derived_efs_written" + self._condition_to_make_transparent = "has_derived_efs" super()._mark_points_without_data_written() diff --git a/post_processing/pylbo/visualisation/eigenfunctions/eigfunc_handler.py b/post_processing/pylbo/visualisation/eigenfunctions/eigfunc_handler.py index 55c5aff1..8fa1354a 100644 --- a/post_processing/pylbo/visualisation/eigenfunctions/eigfunc_handler.py +++ b/post_processing/pylbo/visualisation/eigenfunctions/eigfunc_handler.py @@ -17,7 +17,7 @@ def __init__(self, data, ef_ax, spec_ax): self.spec_axis.set_title(f"{self.spec_axis.get_title()} -- eigenfunctions") def _check_data_is_present(self): - if not any(transform_to_numpy(self.data.efs_written)): + if not any(transform_to_numpy(self.data.has_efs)): raise EigenfunctionsNotPresent( "None of the given datfiles has eigenfunctions written to it." ) @@ -84,5 +84,5 @@ def _get_title(self, ef_name): return name def _mark_points_without_data_written(self): - self._condition_to_make_transparent = "efs_written" + self._condition_to_make_transparent = "has_efs" super()._mark_points_without_data_written() diff --git a/post_processing/pylbo/visualisation/modes/mode_data.py b/post_processing/pylbo/visualisation/modes/mode_data.py index edcd0d39..2815d5a2 100644 --- a/post_processing/pylbo/visualisation/modes/mode_data.py +++ b/post_processing/pylbo/visualisation/modes/mode_data.py @@ -103,7 +103,7 @@ def _get_all_efs(self, ds, omega): An array of dicts with all eigenfunctions for every eigenvalue. """ arr1 = ds.get_eigenfunctions(omega) - if not ds.derived_efs_written: + if not ds.has_derived_efs: return arr1 arr2 = ds.get_derived_eigenfunctions(omega) diff --git a/post_processing/pylbo/visualisation/utils.py b/post_processing/pylbo/visualisation/utils.py index 92e9baa4..f2dbbce6 100644 --- a/post_processing/pylbo/visualisation/utils.py +++ b/post_processing/pylbo/visualisation/utils.py @@ -83,7 +83,7 @@ def validate_ef_name(ds, ef_name: str) -> str: The validated eigenfunction name. """ names = ds.ef_names - if ds.derived_efs_written: + if ds.has_derived_efs: names += ds.derived_ef_names if ef_name not in names: raise ValueError( diff --git a/pylbo_wrapper.py b/pylbo_wrapper.py index 71ed9154..7938c9ec 100644 --- a/pylbo_wrapper.py +++ b/pylbo_wrapper.py @@ -43,10 +43,10 @@ def _main(): pylbo.plot_equilibrium(ds) p = pylbo.plot_spectrum(ds, use_residuals=ds.has_residuals) p.add_continua() - if ds.efs_written: + if ds.has_efs: p.add_eigenfunctions() - if ds.derived_efs_written: - p2 = pylbo.plot_spectrum(ds, use_residuals=ds.has_residuals) + if ds.has_derived_efs: + p2 = pylbo.plot_spectrum(ds) p2.add_continua() p2.add_derived_eigenfunctions() p2.draw() diff --git a/tests/pylbo_tests/test_dataseries.py b/tests/pylbo_tests/test_dataseries.py index 215d5335..7c5222ce 100644 --- a/tests/pylbo_tests/test_dataseries.py +++ b/tests/pylbo_tests/test_dataseries.py @@ -1,5 +1,5 @@ import numpy as np -from pylbo.data_containers import LegolasDataSet, LegolasDataSeries +from pylbo.data_containers import LegolasDataSeries, LegolasDataSet def test_series_iterable(series_v112): @@ -21,17 +21,17 @@ def test_series_getslice(series_v112): def test_series_efs_none(series_v100): - assert isinstance(series_v100.efs_written, np.ndarray) - assert not np.all(series_v100.efs_written) + assert isinstance(series_v100.has_efs, np.ndarray) + assert not np.all(series_v100.has_efs) -def test_series_efs_written(series_v112): - assert isinstance(series_v112.efs_written, np.ndarray) - assert np.all(series_v112.efs_written) +def test_series_has_efs(series_v112): + assert isinstance(series_v112.has_efs, np.ndarray) + assert np.all(series_v112.has_efs) def test_series_ef_names_not_present(series_v100): - assert series_v100.ef_names is None + assert np.all(series_v100.ef_names) is None def test_series_ef_names(series_v112): diff --git a/tests/pylbo_tests/test_dataset.py b/tests/pylbo_tests/test_dataset.py index 30dea1cf..439bb812 100644 --- a/tests/pylbo_tests/test_dataset.py +++ b/tests/pylbo_tests/test_dataset.py @@ -1,6 +1,6 @@ import numpy as np -from pylbo.exceptions import MatricesNotPresent import pytest +from pylbo.exceptions import MatricesNotPresent ds_v112_ev_guess = -0.14360602 + 0.00688731j ds_v112_ev_idx = 158 @@ -12,9 +12,9 @@ def test_ds_iterable(ds_v112): assert gen.pop() == ds_v112 -def test_ds_efs_written(ds_v112): - assert isinstance(ds_v112.efs_written, bool) - assert ds_v112.efs_written +def test_ds_has_efs(ds_v112): + assert isinstance(ds_v112.has_efs, bool) + assert ds_v112.has_efs def test_ds_ef_names(ds_v112): @@ -162,7 +162,7 @@ def test_ds_get_efs_idx(ds_v112): def test_ds_get_efs_idx_list(ds_v112): efs = ds_v112.get_eigenfunctions(ev_idxs=[ds_v112_ev_idx] * 2) assert isinstance(efs, np.ndarray) - for i, ef in enumerate(efs): + for ef in efs: assert np.isclose(ds_v112_ev_guess, ef.get("eigenvalue", np.NaN)) diff --git a/tests/pylbo_tests/test_eigenfunction_subset.py b/tests/pylbo_tests/test_eigenfunction_subset.py index 35878336..6272fb49 100644 --- a/tests/pylbo_tests/test_eigenfunction_subset.py +++ b/tests/pylbo_tests/test_eigenfunction_subset.py @@ -2,12 +2,12 @@ SUBSET_FLAGS_KEY = "ef_written_flags" SUBSET_IDXS_KEY = "ef_written_idxs" -SUBSET_CENTER_KEY = "eigenfunction_subset_center" -SUBSET_RADIUS_KEY = "eigenfunction_subset_radius" +SUBSET_CENTER_KEY = "ef_subset_center" +SUBSET_RADIUS_KEY = "ef_subset_radius" def test_subset_present(ds_v114_subset): - assert ds_v114_subset.ef_subset + assert ds_v114_subset.has_ef_subset def test_subset_center(ds_v114_subset): @@ -65,7 +65,7 @@ def test_subset_eigenfunction_retrieval_outside(ds_v114_subset): def test_subset_derived_efs_present(ds_v114_subset_defs): - assert ds_v114_subset_defs.ef_subset + assert ds_v114_subset_defs.has_ef_subset assert ds_v114_subset_defs.derived_ef_names is not None