Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sine basis #5

Merged
merged 10 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
579 changes: 579 additions & 0 deletions notebooks/Aramis/basis_test.ipynb

Large diffs are not rendered by default.

326 changes: 326 additions & 0 deletions notebooks/Aramis/fitting_pv_data.ipynb

Large diffs are not rendered by default.

103 changes: 103 additions & 0 deletions notebooks/Aramis/regularisation_test.ipynb

Large diffs are not rendered by default.

86 changes: 86 additions & 0 deletions notebooks/Aramis/utils/dilatation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import utils.interpolation as interpolation
import numpy as np

def build_original_idx(signal_series, nvals_ori):
# Building a float index from 00:00 first day to last measure of last day (eg. 23:55)
# In addition, last bin end (00:00 next day)
# Original signal length + 1
signal_idx_ori = signal_series.index.astype(int).to_numpy()
signal_idx_ori = (signal_idx_ori - signal_idx_ori[0])
signal_idx_ori = signal_idx_ori / signal_idx_ori[1] * 24 / nvals_ori
dt = signal_idx_ori[-1] - signal_idx_ori[-2]
signal_idx_ori = np.append(signal_idx_ori, signal_idx_ori[-1] + dt) # Last bin end
return signal_idx_ori

def build_dilated_idx(sunrises, sunsets, signal_idx_ori, nvals_dil=101):
# Building a float index from 00:00 first day to last sunset of last day (eg. 18:37)
# In addition, last bin end (00:00 next day)
# Dilated signal length + 1
sunrise_idx_ori = sunrises + 24*np.arange(len(sunrises))
sunset_idx_ori = sunsets + 24*np.arange(len(sunsets))
signal_idx_dil = np.linspace(sunrise_idx_ori, sunset_idx_ori, nvals_dil).ravel(order='F')
signal_idx_dil = np.append(0, signal_idx_dil) # Adding first midnight
signal_idx_dil = np.append(signal_idx_dil, signal_idx_ori[-1]) # Last bin end
return signal_idx_dil

def add_night_values(signal_idx_dil, quantiles_dil, nvals_dil):
ndays = len(signal_idx_dil) // nvals_dil
matrix = np.zeros((nvals_dil + 1, ndays))
# Extrapolating the index
signal_idx_dil_night = np.zeros((nvals_dil + 1) * ndays + 2)
matrix[:-1] = signal_idx_dil.reshape((nvals_dil, ndays), order='F')
matrix[-1] = matrix[-2] + (matrix[-2] - matrix[-3]) # Adding night value every day
signal_idx_dil_night[1:-1] = matrix.ravel(order='F')
signal_idx_dil_night[-1] = signal_idx_dil_night[-2] + 24 # Closing the last day by adding a false night value
signal_idx_dil_night[0] = signal_idx_dil_night[1] - 24
# Extrapolating the signal
quantiles_dil_night = np.zeros(((nvals_dil + 1) * ndays + 2, quantiles_dil.shape[1]))
for i in range(quantiles_dil.shape[1]):
matrix[:-1] = quantiles_dil[:, i].reshape((nvals_dil, ndays), order='F')
matrix[-1] = 0
quantiles_dil_night[1:-1, i] = matrix.ravel(order='F')
quantiles_dil_night[0] = 0
quantiles_dil_night[-1] = 0
return signal_idx_dil_night, quantiles_dil_night

def dilate_signal(signal_idx_dil, signal_idx_ori, signal_ori):
# Dilated index length - 1
_signal_ori = np.append(signal_ori, signal_ori[-1]) # Adding last dummy value to interpolate
signal_dil = interpolation.interpolate(signal_idx_dil, signal_idx_ori, _signal_ori, alignment='left')
return signal_dil

def undilate_signal(signal_idx_ori, signal_idx_dil, signal_dil):
# Original index length - 1
_signal_dil = np.append(signal_dil, signal_dil[-1]) # Adding last dummy value to interpolate
signal_ori = interpolation.interpolate(signal_idx_ori, signal_idx_dil, _signal_dil, alignment='left')
return signal_ori

def extrapolate_signal_after_sunset(signal, nvals, ndays, method):
# Signal has length nvals * ndays + 2
matrix = np.zeros((nvals + 1, ndays))
matrix[:-1] = signal[1:-1].reshape((nvals, ndays), order='F')
if method == 'linear':
matrix[-1] = matrix[-2] + (matrix[-2] - matrix[-3])
elif method == 'zero_padding':
matrix[-1] = 0
else:
raise ValueError("Invalid value for method. Choose from: ['linear', 'zero_padding']")
new_signal = np.zeros((nvals + 1) * ndays + 2)
new_signal[0] = signal[0]
new_signal[1:-1] = matrix.ravel(order='F')
new_signal[-1] = signal[-1]
return new_signal

def undilate_quantiles(signal_idx_ori, signal_idx_dil, quantiles_dil, nvals_dil=101):
ndays = (len(signal_idx_dil) - 2) // nvals_dil
new_signal_idx_dil = extrapolate_signal_after_sunset(signal_idx_dil, nvals_dil, ndays, method='linear')
_quantile_dil = np.zeros(nvals_dil * ndays + 2)

quantiles_ori = np.zeros((signal_idx_ori.shape[0]-1, quantiles_dil.shape[1]))
for i in range(quantiles_dil.shape[1]):
_quantile_dil[:-1] = quantiles_dil[:,i]
new_quantile_dil = extrapolate_signal_after_sunset(_quantile_dil, nvals_dil, ndays, method='zero_padding')
quantiles_ori[:,i] = interpolation.interpolate(signal_idx_ori, new_signal_idx_dil, new_quantile_dil, alignment='left')

return quantiles_ori

73 changes: 73 additions & 0 deletions notebooks/Aramis/utils/interpolation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import numpy as np

def check_sanity(tnew, t, x, alignment):
if (t.shape[0] < 2) | (tnew.shape[0] < 2):
raise ValueError("t and tnew must have at least two values.")
if not np.all(np.diff(t) >= 0):
raise ValueError("t values must be sorted.")
if not np.all(np.diff(tnew) >= 0):
raise ValueError("tnew values must be sorted.")
if (tnew[0] < t[0]) | (tnew[-1] > t[-1]):
raise ValueError("tnew values must be within the range of t values.")
if t.shape != x.shape:
raise ValueError("t must have the same shape as x.")
if alignment not in ['left', 'right']:
raise ValueError("Invalid value for alignment. Choose from: ['left', 'right']")

def initialize_arrays(tnew, t, x, alignment):
if alignment == 'left':
tnew_float = tnew.astype(float)
t_float = t.astype(float)
x_float = x.astype(float)
if alignment == 'right':
tnew_float = -np.flip(tnew).astype(float)
t_float = -np.flip(t).astype(float)
x_float = np.flip(x).astype(float)
return tnew_float, t_float, x_float

def compute_integral_interpolation(ttnew, xxnew, new_indices):
'''
Parameters:
ttnew (np.ndarray): t + tnew (length m+n).
xxnew (np.ndarray): x + xnew (length m+n).
new_indices (np.ndarray): Indices of the points in tnew (length n).

Returns:
y (np.ndarray): Interpolated signal (length n-1).
'''
piecewise_integrals = np.diff(ttnew) * xxnew[:-1]
cumulative_integrals = np.zeros(ttnew.shape[0])
# Add the initial zero as a baseline for the cumulative sum
cumulative_integrals[1:] = np.cumsum(piecewise_integrals)
y = np.diff(cumulative_integrals[new_indices])
return y

def interpolate(tnew, t, x, alignment='left'):
'''
Interpolate a signal for a new set of points while maintaining constant signal energy.

Parameters:
tnew (np.ndarray): New time points for interpolation (length n).
Last point is the end of the last time bin.
t (np.ndarray): Original time points (length m).
x (np.ndarray): Original signal values (length m).
alignment (str): Time bin label alignement, either 'left' or 'right'.

Returns:
y (np.ndarray): Interpolated signal for the new time points (length n-1).
'''

check_sanity(tnew, t, x, alignment)
# make sure everything is float, flip if alignment is right
tnew_float, t_float, x_float = initialize_arrays(tnew, t, x, alignment)
indices = np.searchsorted(t_float, tnew_float, side='right')
xnew = x_float[indices - 1]
ttnew = np.insert(t_float, indices, tnew_float)
xxnew = np.insert(x_float, indices, xnew)
new_indices = indices + np.arange(tnew.shape[0])
y = compute_integral_interpolation(ttnew, xxnew, new_indices)
dts = np.diff(ttnew[new_indices])
y = y / dts # normalize time bin length to keep the integral constant
if alignment == 'right':
y = np.flip(y)
return y
Loading