diff --git a/import_test.py b/import_test.py new file mode 100644 index 0000000..b639dff --- /dev/null +++ b/import_test.py @@ -0,0 +1,2 @@ +import pyPTE.core.delay_estimations as de +print(dir(de)) # Check if the functions are visible here diff --git a/pyPTE/core/delay_estimations.py b/pyPTE/core/delay_estimations.py new file mode 100644 index 0000000..8892c6f --- /dev/null +++ b/pyPTE/core/delay_estimations.py @@ -0,0 +1,47 @@ +import numpy as np +import numpy.typing as npt + + +def delay_by_hillebrand(phase: npt.NDArray) -> npt.NDArray: + """ + Original method to compute the overall delay for all given channels. + + Parameters: + ---------- + phase : numpy.ndarray + m x n ndarray : m: number of channels, n: number of samples. + + Returns: + ------- + delay : int + The computed delay. + """ + m, n = phase.shape + c1 = n * m + r_phase = np.roll(phase, 1, axis=0) + phase_product = np.multiply(phase, r_phase) + c2 = (phase_product < 0).sum() + delay = int(np.round(c1 / c2)) + delay_matrix = np.full((m, m), delay, dtype=int) + return delay_matrix + +def delay_by_crosscorrelation(phase_matrix: npt.NDArray) -> npt.NDArray: + m, _ = phase_matrix.shape + delay_matrix = np.zeros((m, m), dtype=int) + + for i in range(m): + for j in range(i+1, m): + unwrapped_phase_i = np.unwrap(phase_matrix[i]) + unwrapped_phase_j = np.unwrap(phase_matrix[j]) + + cross_corr = np.correlate(unwrapped_phase_i - np.mean(unwrapped_phase_i), + unwrapped_phase_j - np.mean(unwrapped_phase_j), + mode='full') + + delay = np.argmax(cross_corr) - (len(unwrapped_phase_i) - 1) + delay_matrix[i, j] = delay + + delay_matrix = delay_matrix - delay_matrix.T + + return delay_matrix + diff --git a/pyPTE/core/pyPTE.py b/pyPTE/core/pyPTE.py index 4d7c053..ed0cb2e 100644 --- a/pyPTE/core/pyPTE.py +++ b/pyPTE/core/pyPTE.py @@ -4,29 +4,31 @@ import numpy.typing as npt from scipy.signal import hilbert +from .delay_estimations import delay_by_crosscorrelation, delay_by_hillebrand -def get_delay(phase: npt.NDArray) -> int: + +def get_delay(phase: npt.NDArray, method: str = 'hillebrand') -> npt.NDArray: """ - Computes the overall delay for a all given channels + Computes the delay using specified method. - Parameters + Parameters: ---------- phase : numpy.ndarray - m x n ndarray : m: number of channels, n: number of samples + m x n ndarray : m: number of channels, n: number of samples. + method : str + The method to use for delay calculation ('original', 'cross_correlation'). - Returns + Returns: ------- delay : int + The computed delay. """ - phase = phase - m, n = phase.shape - c1 = n * m - r_phase = np.roll(phase, 1, axis=0) - phase_product = np.multiply(phase, r_phase) - c2 = (phase_product < 0).sum() - delay = int(np.round(c1 / c2)) - - return delay + if method == 'hillebrand': + return delay_by_hillebrand(phase) + elif method == 'cross_correlation': + return delay_by_crosscorrelation(phase) + else: + raise ValueError(f"Unknown method: {method}") def get_phase(time_series: npt.ArrayLike) -> npt.NDArray: @@ -110,7 +112,7 @@ def get_bincount(binsize: float) -> int: return bincount -def compute_PTE(phase: npt.NDArray, delay: int) -> npt.NDArray: +def compute_PTE(phase: npt.NDArray, delay_matrix: npt.NDArray) -> npt.NDArray: """ For each channel pair (x, y) containing the individual discretized phase, which is obtained by pyPTE.pyPTE.get_discretized_phase, @@ -137,10 +139,29 @@ def compute_PTE(phase: npt.NDArray, delay: int) -> npt.NDArray: for i in range(0, m): for j in range(0, m): - - ypr = phase[delay:, j] - y = phase[:-delay, j] - x = phase[:-delay, i] + if i == j: + continue # Skip self-comparison + + delay = delay_matrix[i, j] + delay = int(max(min(delay, n-1), -n+1)) + print(delay_matrix) + print(delay) + + if delay > 0: + x = phase[i, :-delay] + y = phase[j, delay:] + # Adjust y for prediction, shifting by one more sample + ypr = phase[j, delay+1:] + elif delay < 0: + x = phase[i, -delay:-1] + # Adjust x to match the prediction shift when delay is negative + y = phase[j, :delay] + # Keep ypr aligned with y, considering the negative delay + ypr = phase[j, :delay+1] + else: + x = phase[i, :-1] + y = phase[j, :-1] + ypr = phase[j, 1:] P_y = np.zeros([y.max() + 1]) np.add.at(P_y, [y], 1) @@ -170,7 +191,7 @@ def compute_PTE(phase: npt.NDArray, delay: int) -> npt.NDArray: def compute_dPTE_rawPTE( - phase: npt.NDArray, delay: int + phase: npt.NDArray, delay_matrix: npt.NDArray ) -> Tuple[npt.NDArray, npt.NDArray]: """ This function calls pyPTE.pyPTE.compute_PTE to obtain a PTE matrix and performs a @@ -195,7 +216,7 @@ def compute_dPTE_rawPTE( dPTE : normalized PTE matrix, raw_PTE: original PTE values """ - raw_PTE = compute_PTE(phase, delay) + raw_PTE = compute_PTE(phase, delay_matrix) tmp = np.triu(raw_PTE) + np.tril(raw_PTE).T with np.errstate(divide="ignore", invalid="ignore"): @@ -227,9 +248,9 @@ def PTE(time_series: npt.ArrayLike) -> Tuple[npt.NDArray, npt.NDArray]: """ phase = get_phase(time_series) - delay = get_delay(phase) + delay_matrix = get_delay(phase) phase_inc = phase + np.pi binsize = get_binsize(phase_inc) d_phase = get_discretized_phase(phase_inc, binsize) - return compute_dPTE_rawPTE(d_phase, delay) + return compute_dPTE_rawPTE(d_phase, delay_matrix) diff --git a/tests/test_pyPTE.py b/tests/test_pyPTE.py index 3190862..d4690d8 100644 --- a/tests/test_pyPTE.py +++ b/tests/test_pyPTE.py @@ -1,4 +1,5 @@ import numpy as np +import pytest from pyPTE.core.pyPTE import ( PTE, @@ -47,7 +48,8 @@ def test_PTE_with_independent_signals(): binsize = get_binsize(phase_inc) d_phase = get_discretized_phase(phase_inc, binsize) - pte_matrix = compute_PTE(d_phase, 1) + delay_matrix = np.ones((2, 2)) + pte_matrix = compute_PTE(d_phase, delay_matrix) # Check off-diagonal elements for significant PTE # (s1 -> s2 and s2 -> s1 should be low)