Skip to content

Commit

Permalink
improved typing of numpy arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
ClaasRostock committed Oct 24, 2024
1 parent 9dd1f04 commit 5047968
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 39 deletions.
6 changes: 3 additions & 3 deletions src/farn/core/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,16 +363,16 @@ def to_pandas(

return df_X

def to_numpy(self) -> ndarray[Any, Any]:
def to_numpy(self) -> ndarray[tuple[int, int], np.dtype[np.float64]]:
"""Return parameter values of all cases as a 2-dimensional numpy array.
Returns
-------
ndarray[Any, Any]
ndarray[tuple[int, int], np.dtype[np.float64 | np.int32]]
2-dimensional numpy array with case specific parameter values of all cases.
"""
df_X: DataFrame = self.to_pandas(parameters_only=True)
array: ndarray[Any, Any] = df_X.to_numpy()
array: ndarray[tuple[int, int], np.dtype[np.float64]] = df_X.to_numpy().astype(np.float64)
return array

def filter(
Expand Down
70 changes: 38 additions & 32 deletions src/farn/sampling/sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def _generate_samples_using_linspace_sampling(self) -> dict[str, list[Any]]:
def _generate_samples_using_uniform_lhs_sampling(self) -> dict[str, list[Any]]:
_ = self._check_length_matches_number_of_names("_ranges")
samples: dict[str, list[Any]] = self._generate_samples_dict()
values: ndarray[Any, Any] = self._generate_values_using_uniform_lhs_sampling()
values: ndarray[tuple[int,], np.dtype[np.float64]] = self._generate_values_using_uniform_lhs_sampling()
self._write_values_into_samples_dict(values, samples)

return samples
Expand All @@ -298,7 +298,7 @@ def _generate_samples_using_normal_lhs_sampling(self) -> dict[str, list[Any]]:
self.mean = self.sampling_parameters["_mu"]
self.std = self.sampling_parameters["_sigma"]

values: ndarray[Any, Any] = self._generate_values_using_normal_lhs_sampling()
values: ndarray[tuple[int,], np.dtype[np.float64]] = self._generate_values_using_normal_lhs_sampling()

# Clipping
# (optional. Clipping will only be performed if sampling parameter "_ranges" is defined.)
Expand All @@ -311,8 +311,8 @@ def _generate_samples_using_normal_lhs_sampling(self) -> dict[str, list[Any]]:
# Hence the somewhat simpler approach for now, where exceeding values
# simply get reset to the range bounderies.
if self.ranges:
range_lower_bounds: ndarray[Any, Any] = np.array([r[0] for r in self.ranges])
range_upper_bounds: ndarray[Any, Any] = np.array([r[1] for r in self.ranges])
range_lower_bounds: ndarray[tuple[int], np.dtype[np.float64]] = np.array([r[0] for r in self.ranges])
range_upper_bounds: ndarray[tuple[int], np.dtype[np.float64]] = np.array([r[1] for r in self.ranges])
values = np.clip(values, range_lower_bounds, range_upper_bounds)

self._write_values_into_samples_dict(values, samples)
Expand All @@ -324,7 +324,7 @@ def _generate_samples_using_sobol_sampling(self) -> dict[str, list[Any]]:
self.onset = int(self.sampling_parameters["_onset"])

samples: dict[str, list[Any]] = self._generate_samples_dict()
values: ndarray[Any, Any] = self._generate_values_using_sobol_sampling()
values: ndarray[tuple[int,], np.dtype[np.float64]] = self._generate_values_using_sobol_sampling()
self._write_values_into_samples_dict(values, samples)

return samples
Expand Down Expand Up @@ -372,7 +372,7 @@ def _generate_samples_using_hilbert_sampling(self) -> dict[str, list[Any]]:
self.minIterationDepth = 3
self.maxIterationDepth = 15

values: ndarray[Any, Any] = self._generate_values_using_hilbert_sampling()
values: ndarray[tuple[int,], np.dtype[np.float64]] = self._generate_values_using_hilbert_sampling()
self._write_values_into_samples_dict(values, samples)

return samples
Expand All @@ -383,33 +383,33 @@ def _generate_samples_dict(self) -> dict[str, list[Any]]:
self._generate_case_names(samples_dict)
return samples_dict

def _generate_values_using_uniform_lhs_sampling(self) -> ndarray[Any, Any]:
def _generate_values_using_uniform_lhs_sampling(self) -> ndarray[tuple[int,], np.dtype[np.float64]]:
"""Uniform LHS."""
from pyDOE3 import lhs
from scipy.stats import uniform

lhs_distribution: ndarray[Any, Any] | None = lhs(
lhs_distribution: ndarray[tuple[int,], np.dtype[np.float64]] | None = lhs(
n=self.number_of_fields,
samples=self.number_of_samples - self.number_of_bb_samples,
criterion="corr",
random_state=self.seed,
)

_range_lower_bounds: ndarray[Any, Any] = np.array([r[0] for r in self.ranges])
_range_upper_bounds: ndarray[Any, Any] = np.array([r[1] for r in self.ranges])
loc: ndarray[Any, Any] = _range_lower_bounds
scale: ndarray[Any, Any] = _range_upper_bounds - _range_lower_bounds
_range_lower_bounds: ndarray[tuple[int], np.dtype[np.float64]] = np.array([r[0] for r in self.ranges])
_range_upper_bounds: ndarray[tuple[int], np.dtype[np.float64]] = np.array([r[1] for r in self.ranges])
loc: ndarray[tuple[int], np.dtype[np.float64]] = _range_lower_bounds
scale: ndarray[tuple[int], np.dtype[np.float64]] = _range_upper_bounds - _range_lower_bounds

sample_set: ndarray[Any, Any] = uniform(loc=loc, scale=scale).ppf(lhs_distribution) # pyright: ignore[reportUnknownMemberType]
sample_set: ndarray[tuple[int,], np.dtype[np.float64]] = uniform(loc=loc, scale=scale).ppf(lhs_distribution) # pyright: ignore[reportUnknownMemberType]

return sample_set

def _generate_values_using_normal_lhs_sampling(self) -> ndarray[Any, Any]:
def _generate_values_using_normal_lhs_sampling(self) -> ndarray[tuple[int,], np.dtype[np.float64]]:
"""Gaussnormal LHS."""
from pyDOE3 import lhs
from scipy.stats import norm

lhs_distribution: ndarray[Any, Any] | None = lhs(
lhs_distribution: ndarray[tuple[int,], np.dtype[np.float64]] | None = lhs(
n=self.number_of_fields,
samples=self.number_of_samples - self.number_of_bb_samples,
criterion="corr",
Expand All @@ -424,13 +424,13 @@ def _generate_values_using_normal_lhs_sampling(self) -> ndarray[Any, Any]:
# - correlation|corr: minimize the maximum correlation coefficient

# std of type scalar (scale) or vector (stretch, scale), no rotation
_std: ndarray[Any, Any] = np.array(self.std)
_std: ndarray[tuple[int], np.dtype[np.float64]] = np.array(self.std)

sample_set: ndarray[Any, Any] = norm(loc=self.mean, scale=_std).ppf(lhs_distribution) # pyright: ignore[reportUnknownMemberType]
sample_set: ndarray[tuple[int,], np.dtype[np.float64]] = norm(loc=self.mean, scale=_std).ppf(lhs_distribution) # pyright: ignore[reportUnknownMemberType]

return sample_set

def _generate_values_using_sobol_sampling(self) -> ndarray[Any, Any]:
def _generate_values_using_sobol_sampling(self) -> ndarray[tuple[int,], np.dtype[np.float64]]:
from scipy.stats import qmc
from scipy.stats.qmc import Sobol

Expand All @@ -443,22 +443,22 @@ def _generate_values_using_sobol_sampling(self) -> ndarray[Any, Any]:
if self.onset > 0:
_ = sobol_engine.fast_forward(n=self.onset) # pyright: ignore[reportUnknownMemberType]

points: ndarray[Any, Any] = sobol_engine.random( # pyright: ignore[reportUnknownMemberType]
points: ndarray[tuple[int,], np.dtype[np.float64]] = sobol_engine.random( # pyright: ignore[reportUnknownMemberType]
n=self.number_of_samples - self.number_of_bb_samples,
)

# Upscale points from unit hypercube to bounds
range_lower_bounds: ndarray[Any, Any] = np.array([r[0] for r in self.ranges])
range_upper_bounds: ndarray[Any, Any] = np.array([r[1] for r in self.ranges])
sample_set: ndarray[Any, Any] = qmc.scale( # pyright: ignore[reportUnknownMemberType]
range_lower_bounds: ndarray[tuple[int], np.dtype[np.float64]] = np.array([r[0] for r in self.ranges])
range_upper_bounds: ndarray[tuple[int], np.dtype[np.float64]] = np.array([r[1] for r in self.ranges])
sample_set: ndarray[tuple[int,], np.dtype[np.float64]] = qmc.scale( # pyright: ignore[reportUnknownMemberType]
points,
range_lower_bounds,
range_upper_bounds,
)

return sample_set

def _generate_values_using_hilbert_sampling(self) -> ndarray[Any, Any]:
def _generate_values_using_hilbert_sampling(self) -> ndarray[tuple[int,], np.dtype[np.float64]]:
"""Source hilbertcurve pypi pkg or numpy
it showed that hilbertcurve is a better choice and more precise with a higher iteration depth (<=15)
pypi pkg Decimals is required for proper function up to (<=15)
Expand Down Expand Up @@ -523,7 +523,7 @@ def _generate_values_using_hilbert_sampling(self) -> ndarray[Any, Any]:
'a number of hilbert points of about 10-times higher than "_numberOfSamples".'
)

distribution = np.array(
distribution: ndarray[tuple[int], np.dtype[np.float64]] = np.array(
[
Decimal(x)
for x in np.linspace(
Expand All @@ -533,9 +533,9 @@ def _generate_values_using_hilbert_sampling(self) -> ndarray[Any, Any]:
)
]
)
int_distribution = np.trunc(distribution)
int_distribution: ndarray[tuple[int], np.dtype[np.int32]] = np.trunc(distribution)

hilbert_points = hc.points_from_distances(int_distribution)
hilbert_points: Iterable[Iterable[int]] = hc.points_from_distances(int_distribution)

_points: list[Iterable[float]] = []
interpolation_hits = 0
Expand Down Expand Up @@ -565,7 +565,7 @@ def _generate_values_using_hilbert_sampling(self) -> ndarray[Any, Any]:
point.append(float(i))

_points.append(point)
points: ndarray[Any, Any] = np.array(_points)
points: ndarray[tuple[int,], np.dtype[np.float64]] = np.array(_points).astype(np.float64)

# Downscale points from hilbert space to unit hypercube [0,1)*d
points = qmc.scale( # pyright: ignore[reportUnknownMemberType]
Expand All @@ -576,9 +576,9 @@ def _generate_values_using_hilbert_sampling(self) -> ndarray[Any, Any]:
)

# Upscale points from unit hypercube to bounds
range_lower_bounds: ndarray[Any, Any] = np.array([r[0] for r in self.ranges])
range_upper_bounds: ndarray[Any, Any] = np.array([r[1] for r in self.ranges])
sample_set: ndarray[Any, Any] = qmc.scale( # pyright: ignore[reportUnknownMemberType]
range_lower_bounds: ndarray[tuple[int], np.dtype[np.float64]] = np.array([r[0] for r in self.ranges])
range_upper_bounds: ndarray[tuple[int], np.dtype[np.float64]] = np.array([r[1] for r in self.ranges])
sample_set: ndarray[tuple[int,], np.dtype[np.float64]] = qmc.scale( # pyright: ignore[reportUnknownMemberType]
points,
range_lower_bounds,
range_upper_bounds,
Expand Down Expand Up @@ -651,12 +651,18 @@ def _create_bounding_box(self) -> None:

def _write_values_into_samples_dict(
self,
values: ndarray[Any, Any],
values: ndarray[tuple[int,], np.dtype[np.float64]],
samples: dict[str, list[Any]],
) -> None:
if self.include_bounding_box is True:
self._create_bounding_box()
values = np.concatenate((np.array(self.bounding_box), values), axis=0)
values = np.concatenate(
(
np.array(self.bounding_box),
values,
),
axis=0,
)
for index, _ in enumerate(self.fields):
samples[self.fields[index]] = values.T[index].tolist()
return
Expand Down
8 changes: 4 additions & 4 deletions tests/test_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ def test_to_numpy() -> None:
# Prepare
case_1, case_2, case_3 = _create_cases()
cases: Cases = Cases([case_1, case_2, case_3])
array_assert: ndarray[Any, Any] = _create_ndarray()
array_assert: ndarray[tuple[int,], np.dtype[np.float64]] = _create_ndarray()
# Execute
array: ndarray[Any, Any] = cases.to_numpy()
array: ndarray[tuple[int, int], np.dtype[np.float64]] = cases.to_numpy()
# Assert
assert array.shape == array_assert.shape
assert array.shape == (3, 3)
Expand Down Expand Up @@ -169,8 +169,8 @@ def _create_dataframe(
return data


def _create_ndarray() -> ndarray[Any, Any]:
array: ndarray[Any, Any] = np.array(
def _create_ndarray() -> ndarray[tuple[int,], np.dtype[np.float64]]:
array: ndarray[tuple[int,], np.dtype[np.float64]] = np.array(
[
[11.1, np.nan, np.nan],
[21.1, 22.2, np.nan],
Expand Down

0 comments on commit 5047968

Please sign in to comment.