Skip to content

Commit

Permalink
power_spectrum: update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
JoepVanlier committed Sep 11, 2024
1 parent 0301170 commit 3bbb4fb
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 47 deletions.
2 changes: 1 addition & 1 deletion lumicks/pylake/force_calibration/detail/driving_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def estimate_driving_input_parameters(
class DrivenPower:
def __init__(self, psd_data, sample_rate, driving_frequency, num_windows, freq_window=50.0):
"""This class is used to determine power in the driven peak."""
self.ps = PowerSpectrum(
self.ps = PowerSpectrum.from_data(
psd_data, sample_rate, window_seconds=num_windows / driving_frequency
).in_range(max(1.0, driving_frequency - freq_window), driving_frequency + freq_window)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def calculate_power_spectrum(
if not isinstance(data, np.ndarray) or (data.ndim != 1):
raise TypeError('Argument "data" must be a numpy vector')

power_spectrum = PowerSpectrum(data, sample_rate)
power_spectrum = PowerSpectrum.from_data(data, sample_rate)
power_spectrum = power_spectrum.in_range(*fit_range)._exclude_range(excluded_ranges)
power_spectrum = power_spectrum.downsampled_by(num_points_per_block)

Expand Down Expand Up @@ -314,9 +314,7 @@ def fit_power_spectrum(
backing = scipy.stats.chi2.sf(chi_squared, n_degrees_of_freedom) * 100

# Fitted power spectrum values.
ps_model = power_spectrum.with_spectrum(
model(power_spectrum.frequency, *solution_params), power_spectrum.num_points_per_block
)
ps_model = power_spectrum.with_spectrum(model(power_spectrum.frequency, *solution_params))

# When using theoretical weights for fitting, ref [5] mentions that the found value for D will
# be biased by a factor (n+1)/n. Multiplying by n/(n+1) compensates for this.
Expand Down
13 changes: 8 additions & 5 deletions lumicks/pylake/force_calibration/tests/test_power_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,12 @@ def test_fit_analytic(


def test_analytic_corner_case():
ps = PowerSpectrum([1, 2, 3], 78125)
ps.frequency = np.array([178.63690424, 335.92231409, 493.20772395, 650.4931338, 807.77854365])
ps.power = np.array([0.00029835, 0.00027251, 0.00027432, 0.00028302, 0.00029127])
ps = PowerSpectrum(
np.array([178.63690424, 335.92231409, 493.20772395, 650.4931338, 807.77854365]),
np.array([0.00029835, 0.00027251, 0.00027432, 0.00028302, 0.00029127]),
sample_rate=78125,
total_duration=1.0,
)
fit = fit_analytical_lorentzian(ps)
assert fit.D > 0

Expand All @@ -98,13 +101,13 @@ def test_analytic_low_frequency(reference_models):
data = power_model_to_time_series(
78125, 78125, lambda f: reference_models.lorentzian(f, 35, 0.33)
)
ps = PowerSpectrum(data, sample_rate=78125).in_range(1e2, 1e4)
ps = PowerSpectrum.from_data(data, sample_rate=78125).in_range(1e2, 1e4)
fit = fit_analytical_lorentzian(ps)
np.testing.assert_allclose(fit.fc, 0.5 * ps.frequency[0])


def test_fit_analytic_curve():
ps = PowerSpectrum([3, 3, 4, 5, 1, 3, 2, 4, 5, 2], 100)
ps = PowerSpectrum.from_data([3, 3, 4, 5, 1, 3, 2, 4, 5, 2], 100)
ref = [0.079276, 0.077842, 0.073833, 0.067997, 0.061221, 0.054269]
fit = fit_analytical_lorentzian(ps)
np.testing.assert_allclose(fit.ps_fit.frequency, np.arange(0, 60, 10))
Expand Down
73 changes: 43 additions & 30 deletions lumicks/pylake/force_calibration/tests/test_power_spectrum.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ def test_power_spectrum_bad_shape():
ValueError,
match=r"Only 1D arrays of data are supported. You provided a 2D array of shape \(10, 2\)",
):
_ = PowerSpectrum(np.ones((10, 2)), 1, window_seconds=1)
_ = PowerSpectrum.from_data(np.ones((10, 2)), 1, window_seconds=1)


def test_power_spectrum_variance():
"""Testing the variance of Welch PSD estimates"""
sigma, sample_rate = 40, 100

np.random.seed(90083773)
ps = PowerSpectrum(sigma * np.random.normal(size=(1000000)), sample_rate, window_seconds=1)
ps = PowerSpectrum.from_data(
sigma * np.random.normal(size=(1000000)), sample_rate, window_seconds=1
)
avg_variance = np.mean(ps._variance[1:-1]) # First and last bin have twice the variance

# scaling_factor = (2.0 / sample_rate)
Expand All @@ -34,14 +36,14 @@ def test_power_spectrum_variance():
np.testing.assert_equal(ps._exclude_range([[10, 25]])._variance, ps._variance[mask])

# A single window won't give us a variance
ps = PowerSpectrum(
ps = PowerSpectrum.from_data(
sigma * np.random.normal(size=(2 * sample_rate - 1)), sample_rate, window_seconds=1
)
assert ps._variance is None
assert ps.in_range(10, 15)._variance is None

# Two windows will (though it will likely not be a great estimate)
ps = PowerSpectrum(
ps = PowerSpectrum.from_data(
sigma * np.random.normal(size=(2 * sample_rate)), sample_rate, window_seconds=1
)
assert np.all(ps._variance)
Expand All @@ -59,7 +61,7 @@ def test_power_spectrum_variance():
def test_power_spectrum_attrs(frequency, num_data, sample_rate):
"""Testing the attributes of power spectra"""
data = np.sin(2.0 * np.pi * frequency / sample_rate * np.arange(num_data))
power_spectrum = PowerSpectrum(data, sample_rate)
power_spectrum = PowerSpectrum.from_data(data, sample_rate)

df = sample_rate / num_data
frequency_axis = np.arange(0, 0.5 * sample_rate + 1, df)
Expand Down Expand Up @@ -99,7 +101,7 @@ def test_power_spectrum_blocking(
):
"""Functional test whether the results of blocking the power spectrum are correct"""
data = np.sin(2.0 * np.pi * frequency / sample_rate * np.arange(num_data)) / np.sqrt(2)
power_spectrum = PowerSpectrum(data, sample_rate)
power_spectrum = PowerSpectrum.from_data(data, sample_rate)

downsampling_factor = len(power_spectrum.frequency) // num_blocks
blocked = power_spectrum.downsampled_by(downsampling_factor)
Expand Down Expand Up @@ -141,7 +143,9 @@ def test_windowing_sine_wave(amp, frequency, data_duration, sample_rate, multipl
)

window_duration = multiple / frequency
power_spectrum_windowed = PowerSpectrum(data, sample_rate, window_seconds=window_duration)
power_spectrum_windowed = PowerSpectrum.from_data(
data, sample_rate, window_seconds=window_duration
)

# Windowing with the correct window size makes the full power end up in a single bin (this is
# the property we're after).
Expand All @@ -161,7 +165,7 @@ def test_windowing_sine_wave(amp, frequency, data_duration, sample_rate, multipl

# When we don't use windowing, we leak when the driving frequency is not an exact multiple of
# the sampling rate.
power_spectrum = PowerSpectrum(data, sample_rate)
power_spectrum = PowerSpectrum.from_data(data, sample_rate)
low_power = np.max(power_spectrum.power) * (
power_spectrum.frequency[1] - power_spectrum.frequency[0]
)
Expand All @@ -175,10 +179,12 @@ def test_windowing_sine_wave(amp, frequency, data_duration, sample_rate, multipl
def test_windowing_too_long_duration():
sample_rate = 78125
data = np.sin(2.0 * np.pi * 36.33 / sample_rate * np.arange(2 * sample_rate))
ref_power_spectrum = PowerSpectrum(data, sample_rate)
ref_power_spectrum = PowerSpectrum.from_data(data, sample_rate)

with pytest.warns(RuntimeWarning):
power_spectrum_windowed = PowerSpectrum(data, sample_rate, window_seconds=50000000)
power_spectrum_windowed = PowerSpectrum.from_data(
data, sample_rate, window_seconds=50000000
)

assert power_spectrum_windowed.num_points_per_block == 1
np.testing.assert_allclose(ref_power_spectrum.frequency, power_spectrum_windowed.frequency)
Expand All @@ -191,7 +197,7 @@ def test_windowing_too_long_duration():

def test_windowing_negative_duration():
with pytest.raises(ValueError):
PowerSpectrum([], 78125, window_seconds=-1)
PowerSpectrum.from_data([], 78125, window_seconds=-1)


@pytest.mark.parametrize(
Expand All @@ -207,8 +213,8 @@ def test_windowing_negative_duration():
)
def test_in_range(frequency, num_data, sample_rate, num_blocks, f_min, f_max):
data = np.sin(2.0 * np.pi * frequency / sample_rate * np.arange(num_data))
power_spectrum = PowerSpectrum(data, sample_rate)
assert power_spectrum._fit_range == (0, sample_rate / 2)
power_spectrum = PowerSpectrum.from_data(data, sample_rate)
np.testing.assert_allclose(power_spectrum._fit_range, (0, sample_rate / 2), atol=1e-12)

power_subset = power_spectrum.in_range(f_min, f_max)
assert id(power_subset) != id(power_spectrum)
Expand All @@ -221,15 +227,17 @@ def test_in_range(frequency, num_data, sample_rate, num_blocks, f_min, f_max):
np.testing.assert_allclose(power_subset.power, power_spectrum.power[mask], atol=1e-16)
np.testing.assert_allclose(power_spectrum.total_duration, power_subset.total_duration)
np.testing.assert_allclose(power_spectrum.sample_rate, power_subset.sample_rate)
assert power_subset._fit_range == (f_min, min(sample_rate / 2, f_max))
np.testing.assert_allclose(power_subset._fit_range, (f_min, min(sample_rate / 2, f_max)))


def test_in_range_twice():
sample_rate, num_data = 2000, 1000
data = np.sin(2.0 * np.pi * 10 / sample_rate * np.arange(num_data))
power_spectrum = PowerSpectrum(data, sample_rate)
assert power_spectrum._fit_range == (0, sample_rate / 2)
assert power_spectrum.frequency[-1] == power_spectrum._fit_range[-1]
power_spectrum = PowerSpectrum.from_data(data, sample_rate)
np.testing.assert_allclose(power_spectrum._fit_range, (0, sample_rate / 2), atol=1e-12)
np.testing.assert_allclose(
power_spectrum.frequency[-1], power_spectrum._fit_range[-1], atol=1e-12
)
subset = power_spectrum.in_range(100, 500)
assert subset._fit_range == (100, 500)
subset2 = subset.in_range(50, 800)
Expand All @@ -239,21 +247,21 @@ def test_in_range_twice():


def test_plot():
ps = PowerSpectrum(np.sin(2.0 * np.pi * 100 / 78125 * np.arange(100)), 78125)
ps = PowerSpectrum.from_data(np.sin(2.0 * np.pi * 100 / 78125 * np.arange(100)), 78125)
ps.plot()
ps = ps.downsampled_by(2)
ps.plot()


def test_replace_spectrum():
power_spectrum = PowerSpectrum(np.arange(10), 5)
power_spectrum = PowerSpectrum.from_data(np.arange(10), 5)
replaced = power_spectrum.with_spectrum(np.arange(6))

np.testing.assert_allclose(power_spectrum.frequency, replaced.frequency)
np.testing.assert_allclose(replaced.power, np.arange(6), atol=1e-16)
np.testing.assert_allclose(replaced.num_points_per_block, 1)
np.testing.assert_allclose(replaced.sample_rate, power_spectrum.sample_rate)
np.testing.assert_allclose(replaced.total_duration, power_spectrum.total_duration)
assert replaced.total_duration == 2

with pytest.raises(ValueError, match="New power spectral density vector has incorrect length"):
power_spectrum.with_spectrum(np.arange(7))
Expand All @@ -272,10 +280,15 @@ def test_replace_spectrum():
],
)
def test_exclusions(exclusion_ranges, result_frequency, result_power):
ps = PowerSpectrum([1], 78125, unit="V")
ps.frequency = np.arange(1, 12, 2)
ps.power = np.arange(10, 120, 20)
ps._variance = None
ps = PowerSpectrum(
np.arange(1, 12, 2),
np.arange(10, 120, 20),
sample_rate=78125,
unit="V",
total_duration=1000,
total_samples_used=1000,
)
ps._raw_variance = None
excluded_range = ps._exclude_range(exclusion_ranges)
np.testing.assert_allclose(excluded_range.frequency, result_frequency)
np.testing.assert_allclose(excluded_range.power, result_power)
Expand All @@ -289,9 +302,11 @@ def test_exclusions(exclusion_ranges, result_frequency, result_power):


def test_identify_peaks_args():
power_spectrum = PowerSpectrum([1], 5)
power_spectrum.frequency = np.arange(10)
power_spectrum.power = np.ones_like(power_spectrum.frequency)
frequency = np.arange(10)
power_spectrum = PowerSpectrum(
frequency, np.ones_like(frequency), sample_rate=78125, total_duration=1
)

with pytest.raises(ValueError, match="baseline cannot be negative"):
power_spectrum.identify_peaks(lambda f: np.ones_like(f), baseline=-1.0)

Expand Down Expand Up @@ -324,8 +339,6 @@ def test_identify_peaks_args():
],
)
def test_identify_peaks(power, result_frequency):
power_spectrum = PowerSpectrum([1], 5)
power_spectrum.frequency = np.arange(10)
power_spectrum.power = power
power_spectrum = PowerSpectrum(np.arange(10), power, sample_rate=78125, total_duration=1.0)

assert power_spectrum.identify_peaks(lambda f: np.ones_like(f)) == result_frequency
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ def test_no_data_in_range():
model = PassiveCalibrationModel(1, temperature=20, viscosity=0.0004)

# Here the range slices off all the data and we are left with an empty spectrum
power_spectrum = psc.PowerSpectrum(np.arange(100), sample_rate=100).in_range(47, 100)
power_spectrum = psc.PowerSpectrum.from_data(np.arange(100), sample_rate=100).in_range(47, 100)

with pytest.raises(RuntimeError):
psc.fit_power_spectrum(power_spectrum, model=model)

# Check whether a failure to get a sufficient number of points in the analytical fit is
# detected.
power_spectrum = psc.PowerSpectrum(np.arange(100), sample_rate=1e-3)
power_spectrum = psc.PowerSpectrum.from_data(np.arange(100), sample_rate=1e-3)

with pytest.raises(RuntimeError):
psc.fit_power_spectrum(power_spectrum, model=model)
Expand All @@ -214,9 +214,7 @@ def test_bad_fit(reference_calibration_result):
ps_calibration, model, reference_spectrum = reference_calibration_result
bad_spectrum = reference_spectrum.power.copy()
bad_spectrum[30:31] = reference_spectrum.power[10] # Chop!
bad_spectrum = reference_spectrum.with_spectrum(
bad_spectrum, num_points_per_block=reference_spectrum.num_points_per_block
)
bad_spectrum = reference_spectrum.with_spectrum(bad_spectrum)
bad_calibration = psc.fit_power_spectrum(
power_spectrum=bad_spectrum, model=model, loss_function="gaussian"
)
Expand Down Expand Up @@ -258,8 +256,8 @@ def test_actual_spectrum(reference_calibration_result):
np.testing.assert_allclose(ps_calibration[name].value, **expected_result)
np.testing.assert_allclose(ps_calibration.params[name].value, **expected_result)

# Test whether the model contains the number of points per block that were used to fit it
np.testing.assert_allclose(ps_calibration.ps_model.num_points_per_block, 100)
# The model was not averaged
np.testing.assert_allclose(ps_calibration.ps_model.num_points_per_block, 1)
np.testing.assert_allclose(ps_calibration.ps_data.num_points_per_block, 100)


Expand Down

0 comments on commit 3bbb4fb

Please sign in to comment.