Skip to content

Commit

Permalink
latest code for testing noise subspace spectrum analyzers
Browse files Browse the repository at this point in the history
  • Loading branch information
mahdiolfat committed Mar 6, 2024
1 parent bef9f0c commit 512a8f0
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 12 deletions.
172 changes: 168 additions & 4 deletions examples/signal_eigendecomposition.ipynb

Large diffs are not rendered by default.

55 changes: 48 additions & 7 deletions ssp/spectrum.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Power spectrum and Frequency Estimators. Chapter 8."""

from typing import NoReturn
import logging

import numpy as np
from numpy.typing import ArrayLike
Expand All @@ -9,7 +10,10 @@
from .state import covar


def periodogram(x: ArrayLike, n1: int=0, n2: None | int=None, nfft: None | int=1024) -> np.ndarray:
logger = logging.getLogger(__name__)

Check failure on line 13 in ssp/spectrum.py

View workflow job for this annotation

GitHub Actions / build

Ruff (I001)

ssp/spectrum.py:3:1: I001 Import block is un-sorted or un-formatted


def periodogram(x: ArrayLike, p=4, M=64, n1: int=0, n2: None | int=None, nfft: None | int=1024) -> np.ndarray:

Check failure on line 16 in ssp/spectrum.py

View workflow job for this annotation

GitHub Actions / build

Ruff (PLR0913)

ssp/spectrum.py:16:5: PLR0913 Too many arguments in function definition (6 > 5)

Check failure on line 16 in ssp/spectrum.py

View workflow job for this annotation

GitHub Actions / build

Ruff (E501)

ssp/spectrum.py:16:101: E501 Line too long (110 > 100)
"""Periodogram, non-paramteric spectrum estimator.
Ergodic in signal length converging to the true power spectrum.
Expand Down Expand Up @@ -158,7 +162,7 @@ def modal(x: ArrayLike, p: int, q: int) -> NoReturn:
raise NotImplementedError()


def phd(x: ArrayLike, p: int) -> tuple[np.ndarray, float]:
def phd(x: ArrayLike, p: int, M: int) -> tuple[np.ndarray, float]:
"""Pisarenko Harmonic Decomposition frequency estimator.
A noise subspace method.
Expand All @@ -173,6 +177,7 @@ def phd(x: ArrayLike, p: int) -> tuple[np.ndarray, float]:
sigma = ddiag[index]
vmin = v[:, index]

logger.warning(f'{v.shape=}')
return vmin, sigma


Expand All @@ -192,13 +197,14 @@ def music(x: ArrayLike, p: int, M: int) -> ArrayLike:
ddiag = np.diag(d)
i = np.argsort(ddiag)
# y = ddiag[i]
Px = np.zeros(1, dtype=float)

nfft = max(len(_x) + 1, 1024)
Px = np.zeros(nfft, dtype=complex)

for j in range(M - p):
Px = Px + np.abs(np.fft.fft(v[:, i[j]], nfft))

Px = -20 * np.log10(Px)
logger.debug(f'{Px.shape=}')

return Px

Expand All @@ -213,7 +219,7 @@ def ev(x: ArrayLike, p: int, M: int) -> np.ndarray:
if p + 1 > M:
raise ValueError('Specified signal size is too small')

_x = np.array(x)
_x = np.array(x, dtype=complex)
R = covar(_x, M)
d, v = np.linalg.eig(R)
# ddiag = np.diag(d)
Expand Down Expand Up @@ -292,10 +298,45 @@ def ar_pc(x: ArrayLike, p: int, M: int) -> NoReturn:
raise NotImplementedError()


def overlay(N: int, omega: int, A: ArrayLike, sigma: int, num: int) -> NoReturn:
ALG_MAPPING = {
"periodogram": periodogram,
"phd": phd,
"music": music,
"min_norm": min_norm,
"eigenvector": ev,
}


def overlay(N: int, omega: ArrayLike, A: ArrayLike, sigma: float, num: int, alg: str = "periodogram") -> np.ndarray:

Check failure on line 310 in ssp/spectrum.py

View workflow job for this annotation

GitHub Actions / build

Ruff (PLR0913)

ssp/spectrum.py:310:5: PLR0913 Too many arguments in function definition (6 > 5)

Check failure on line 310 in ssp/spectrum.py

View workflow job for this annotation

GitHub Actions / build

Ruff (E501)

ssp/spectrum.py:310:101: E501 Line too long (116 > 100)
"""Periodogram overlays: using an ensemble of realizations.
Summary of this function.
Arguements: (N, omega, A, sigma, num)
"""

Check failure on line 315 in ssp/spectrum.py

View workflow job for this annotation

GitHub Actions / build

Ruff (D202)

ssp/spectrum.py:311:5: D202 No blank lines allowed after function docstring (found 1)
raise NotImplementedError()

nfft = 1024
# random signal process
rng1 = np.random.default_rng()
# noise process
rng2 = np.random.default_rng()

_omega = np.array(omega, ndmin=1, dtype=np.complex128)
_A = np.array(A, ndmin=1, dtype=np.complex128)
Px = np.zeros((nfft, N), dtype=np.complex128)
jj = np.array(omega, ndmin=1, dtype=np.complex128).size
n = np.arange(N)

jj = len(_omega)
for i in range(num):
x = sigma * rng1.normal()
for j in range(jj):
phi = 2 * np.pi * rng2.uniform()
x = x + _A[j] * np.sin(_omega[j] * n + phi)

#Px[:, i] = ALG_MAPPING[alg](x, p=4, M=64)
logger.warning(f'{x.shape=}')
res = periodogram(x, p=4, M=64)
logger.warning(f'{res.shape=}')
Px[:, i] = res[1, :]

return Px
2 changes: 1 addition & 1 deletion ssp/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def covar(x: ArrayLike, p: int) -> np.ndarray:
p x p hermitian toeplitz matrix of sample covariances
"""
_x = np.array(x)
_x = np.array(x, dtype=complex)
m = len(_x)
# remove the mean
x0 = _x - np.mean(_x)
Expand Down
11 changes: 11 additions & 0 deletions tests/test_spectrum.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
Frequency Estimators: Example 8.6.5. Page 468.
"""

import logging

from typing import NoReturn
from ssp import spectrum


logger = logging.getLogger(__name__)


def test_phd() -> NoReturn:
Expand All @@ -28,3 +34,8 @@ def test_mv_pc() -> NoReturn:

def test_ar_pc() -> NoReturn:
raise NotImplementedError

def test_overlay() -> None:
res = spectrum.overlay(64, [0.2, 0.9], [1, 2], 0.5, 10)

assert(res.shape == (1024, 64))

0 comments on commit 512a8f0

Please sign in to comment.