From 4e21a118b88086ff04c8031087966a66da9c8e5d Mon Sep 17 00:00:00 2001 From: mahdi Date: Tue, 21 Nov 2023 17:38:43 -0500 Subject: [PATCH] Resolve #8: static type checking enforced and passing - all algorithm declarations are: - marked as implemented or not - annotated with static typing - added exhaustive list of random processes from chapter 3. - mypy'ed, locally passing all files - ct update: verbose pytest, mypy, and ruff - first clean set of stub files, manually added, see #9 - partially resolves #9, but direct generation from code is still needed --- .github/workflows/ContinuousTesting.yml | 6 +- .gitignore | 2 +- pyproject.toml | 27 +++++++- pyssp/adaptive.py | 23 ++++--- pyssp/lattice.py | 40 +++++------ pyssp/levinson.py | 51 +++++++------- pyssp/modeling.py | 40 ++++++----- pyssp/optimal.py | 11 +-- pyssp/routines.py | 11 +-- pyssp/spectrum.py | 89 +++++++++++++------------ pyssp/state.py | 67 +++++++++---------- pyssp/system.py | 52 +++++++++++++-- stubs/__init__.pyi | 10 +++ stubs/adaptive.pyi | 6 ++ stubs/lattice.pyi | 9 +++ stubs/levinson.pyi | 12 ++++ stubs/modeling.pyi | 12 ++++ stubs/optimal.pyi | 13 ++++ stubs/routines.pyi | 5 ++ stubs/spectrum.pyi | 24 +++++++ stubs/state.pyi | 30 +++++++++ stubs/system.pyi | 18 +++++ 22 files changed, 393 insertions(+), 165 deletions(-) create mode 100644 stubs/__init__.pyi create mode 100644 stubs/adaptive.pyi create mode 100644 stubs/lattice.pyi create mode 100644 stubs/levinson.pyi create mode 100644 stubs/modeling.pyi create mode 100644 stubs/optimal.pyi create mode 100644 stubs/routines.pyi create mode 100644 stubs/spectrum.pyi create mode 100644 stubs/state.pyi create mode 100644 stubs/system.pyi diff --git a/.github/workflows/ContinuousTesting.yml b/.github/workflows/ContinuousTesting.yml index 611223c..2ff46f0 100644 --- a/.github/workflows/ContinuousTesting.yml +++ b/.github/workflows/ContinuousTesting.yml @@ -21,12 +21,12 @@ jobs: - name: Static Lint run: | - ruff check --output-format=github pyssp --verbose + ruff . --verbose --output-format=github - name: Static Type Check run: | - mypy pyssp --explicit-package-bases + mypy --verbose - name: Test run: | - pytest + pytest --verbose diff --git a/.gitignore b/.gitignore index fceecc2..e72a55a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,7 @@ __pycache__ *cache *.egg-info env -.env +.pyenv* .coverage .vscode diff --git a/pyproject.toml b/pyproject.toml index 7df3c8a..71803bd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,13 +16,31 @@ addopts = [ ] [tool.ruff] +include = ["pyssp/**"] + line-length = 100 indent-width = 4 +target-version = "py312" +extend-exclude = [".pyenv*"] [tool.ruff.lint] -select = ["E", "F", "B", "I", "SIM", "W", "D"] +select = ["E", + "F", + "B", + "I", + "SIM", + "W", + "D", + "PL", + "NPY", + "PERF", + "C90", + "RUF"] ignore = ["D213", "D401", "D211"] +[tool.ruff.lint.pydocstyle] +convention = "google" + [tool.ruff.format] quote-style = "double" indent-style = "space" @@ -32,4 +50,9 @@ line-ending = "auto" [tool.mypy] mypy_path = "pyssp" ignore_missing_imports = true -#plugins = "numpy.typing.mypy_plugin" +disallow_incomplete_defs = true +disallow_untyped_calls = true +disallow_untyped_defs = true +disallow_untyped_decorators = true +plugins = "numpy.typing.mypy_plugin" +files = ["pyssp/*.py"] diff --git a/pyssp/adaptive.py b/pyssp/adaptive.py index d453089..c2a0625 100644 --- a/pyssp/adaptive.py +++ b/pyssp/adaptive.py @@ -1,11 +1,13 @@ """Chapter 9 algorithm implementations.""" import numpy as np +from numpy.typing import ArrayLike from .state import convm -def lms(x, d, mu, nord, a0=None): +def lms(x: ArrayLike, d: ArrayLike, mu: int, nord: int, + a0: None | ArrayLike) -> tuple[ArrayLike, ArrayLike]: """LMS Adaptive Filter. Refernce Page 506, Figure 9.7 @@ -21,17 +23,19 @@ def lms(x, d, mu, nord, a0=None): E = np.zeros((M - nord + 1)) _a0 = np.array(a0) - E[0] = d[0] - _a0 * X[0] + _d = np.array(d) + E[0] = _d[0] - _a0 * X[0] A[0] = a0 + mu * E[0] * np.conjugate(X[0]) for k in (1, M - nord + 1): - E[k] = d[k] - A[k - 1] * X[k] + E[k] = _d[k] - A[k - 1] * X[k] A[k] = A[k - 1] + mu * E[k] * np.conjugate(X[k]) return A, E -def nlms(x, d, beta, nord, a0): +def nlms(x: ArrayLike, d: ArrayLike, beta: int, nord: int, + a0: ArrayLike | None = None) -> tuple[ArrayLike, ArrayLike]: """Normalized LMS Adaptive Filter. Refernce Page 515, Figure 9.12 @@ -47,20 +51,22 @@ def nlms(x, d, beta, nord, a0): E = np.zeros((M - nord + 1)) _a0 = np.array(a0) + _d = np.array(d) DEN = X[0] * X[0] + delta - E[0] = d[0] - _a0 * X[0] + E[0] = _d[0] - _a0 * X[0] A[0] = a0 + beta / DEN * E[0] * np.conjugate(X[0]) # no further iterations if M == 1 for k in (1, M - nord + 1): - E[k] = d[k] - A[k - 1] * X[k] + E[k] = _d[k] - A[k - 1] * X[k] DEN = X[k] * X[k] + delta A[k] = A[k - 1] + beta / DEN * E[k] * np.conjugate(X[k]) return A, E -def rls(x, d, nord, lamda=1, delta=0.001): +def rls(x: ArrayLike, d: ArrayLike, nord: int, lamda: float=1, + delta: float=0.001) -> tuple[ArrayLike, ArrayLike]: """Recursive Least Squares. Reference 545, Table 9.6 @@ -68,6 +74,7 @@ def rls(x, d, nord, lamda=1, delta=0.001): For special case of lambda == 1, this is known as growing window RLS algorithm, For all other values of lambda <1 and >0, this is the exponentially weighted RLS algorithm. """ + _d = np.array(d) X = convm(x, nord) M, N = X.shape P = np.eye(N) / delta @@ -76,7 +83,7 @@ def rls(x, d, nord, lamda=1, delta=0.001): for k in range(1, M - nord + 1): z = P * X[k] g = z / (lamda + X[k] * z) - alpha = d[k] - X[k] @ np.transpose(W[k]) + alpha = _d[k] - X[k] @ np.transpose(W[k]) W[k] = W[k - 1] + alpha * g P = (P - g * z) / lamda diff --git a/pyssp/lattice.py b/pyssp/lattice.py index 6436166..25c270b 100644 --- a/pyssp/lattice.py +++ b/pyssp/lattice.py @@ -1,21 +1,24 @@ """Implementation of algorithm from Chapter 6.""" +from typing import NoReturn import numpy as np import scipy as sp +from numpy.typing import ArrayLike -def fcov(x, p): +def fcov(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]: """Figure 6.15, Page 310. Using the forward covariance method the reflection co-efficients of the lattice filter are found by sequentially minimizing the sum of the squares of the forward prediction error. """ - if p >= len(x): + _x = np.array(x) + if p >= len(_x): raise ValueError("Model order must be less than length of signal") - _x = np.array(x).reshape(-1, 1) - N = len(x) + _x = np.array(_x).reshape(-1, 1) + N = len(_x) eplus = _x[1:N] eminus = _x[:N - 1] @@ -40,16 +43,16 @@ def fcov(x, p): return gamma, err -def burg(x, p): +def burg(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]: """Sequentially minimizes the sum of the forward and backward covariance errors. Guaranteed to be stable. All reflection coefficients will be <|1| """ - if p > len(x): + _x = np.array(x).reshape(-1, 1) + N = len(_x) + if p > N: raise ValueError("Model order must be less than length of signal") - _x = np.array(x).reshape(-1, 1) - N = len(x) eplus = _x[1:N] eminus = _x[:N - 1] @@ -74,25 +77,24 @@ def burg(x, p): return gamma, err -def bcov(): - """Sequentially minimizes the backward covariance error. - - Arguements: (x, p) - """ +def bcov(x: ArrayLike, p: int) -> NoReturn: + """Sequentially minimizes the backward covariance error.""" + raise NotImplementedError() -def mcov(x, p): - """Modified covariance method. Unlike the forward/backward algorithms. +def mcov(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]: + """Modified covariance method. + Unlike the forward/backward algorithms. It *does not* minimize an error term sequentially. """ _x = np.array(x).reshape(-1, 1) - N = len(x) + N = len(_x) - if p >= len(x): + if p >= len(_x): raise ValueError("Model order must be less than length of signal") - X = sp.linalg.toeplitz(_x[p:N], np.flipud(x[:p + 1])) + X = sp.linalg.toeplitz(_x[p:N], np.flipud(_x[:p + 1])) R = np.transpose(X) @ X R1 = np.array(R[1:p + 1, 1: p + 1]) R2 = np.array(np.flipud(np.fliplr(R[:p, :p]))) @@ -103,7 +105,7 @@ def mcov(x, p): b = b1 + b2 a = sp.linalg.solve_toeplitz(Rx[:, 1], b) a = np.concatenate(([1], a)) - print(a.shape) + # print(a.shape) err = np.dot(R[0], a) + np.dot(np.flip(R[p]), a) return a, err diff --git a/pyssp/levinson.py b/pyssp/levinson.py index 41400c6..4f1324a 100644 --- a/pyssp/levinson.py +++ b/pyssp/levinson.py @@ -1,20 +1,23 @@ """Chapter 5 algorithm implementations.""" + import numpy as np +from numpy.typing import ArrayLike -def rtoa(r) -> tuple[np.ndarray, np.ndarray]: +def rtoa(r: ArrayLike) -> tuple[np.ndarray, float]: """Recursively map a set of autocorrelations to a set of model parameters. The Levison-Durbin recursion. """ + _r = np.array(r) a = np.ones((1, 1)) - epsilon = r[0] - p = len(r) - 1 - r = r.reshape(-1, 1) + epsilon = _r[0] + p = len(_r) - 1 + _r = _r.reshape(-1, 1) for j in range(1, p + 1): - gamma = -np.transpose(r[1:1 + j,]) @ np.flipud(a) / epsilon + gamma = -np.transpose(_r[1:1 + j,]) @ np.flipud(a) / epsilon an = np.concatenate((a, np.zeros((1, 1)))).reshape(-1, 1) anT = np.conjugate(np.flipud(a)) a = an + gamma * np.concatenate((np.zeros(1), anT.ravel())).reshape(-1, 1) @@ -23,7 +26,7 @@ def rtoa(r) -> tuple[np.ndarray, np.ndarray]: return a, epsilon -def gtoa(gamma): +def gtoa(gamma: ArrayLike) -> ArrayLike: """Recursively map parameters to reflection coeffs. Reference Page 233, Table 5.2, Figure 5.6. @@ -35,17 +38,18 @@ def gtoa(gamma): Cumulant generating function in statistics. """ a = np.ones((1, 1)) - p = len(gamma) + _g = np.array(gamma) + p = len(_g) for j in range(1, p): a = np.concatenate((a, np.zeros((1, 1)))) _a = a.copy() af = np.conjugate(np.flipud(_a)) - a = a + gamma[j] * af + a = a + _g[j] * af return a -def atog(a): +def atog(a: ArrayLike) -> ArrayLike: """Recursively map from reflection coeffs to filter coeffs. The step-down recursion. @@ -80,20 +84,21 @@ def atog(a): return gamma -def gtor(gamma, epsilon=None): +def gtor(gamma: ArrayLike, epsilon: None | float = None) -> ArrayLike: """Find the autocorrelation sequence from the reflection coefficients and the modeling error. Page 241, Figure 5.9. """ - p = len(gamma) - aa = np.array([[gamma[0]]]).reshape(-1, 1) - r = np.array(([1, -gamma[0]])).reshape(1, -1) + _g = np.array(gamma) + p = len(_g) + aa = np.array([[_g[0]]]).reshape(-1, 1) + r = np.array(([1, -_g[0]])).reshape(1, -1) for j in range(1, p): aa1 = np.concatenate((np.ones((1, 1)), aa)).reshape(-1, 1) aa0 = np.concatenate((aa, np.zeros((1, 1)))).reshape(-1, 1) aaf = np.conjugate(np.flipud(aa1)) - aa = aa0 + gamma[j] * aaf + aa = aa0 + _g[j] * aaf print(aa) rf = -np.fliplr(r) @ aa print(rf) @@ -109,16 +114,16 @@ def gtor(gamma, epsilon=None): return r -def ator(a, b): +def ator(a: ArrayLike, b: float) -> ArrayLike: """Page 241, Figure 5.9.""" gamma = atog(a) - r = gtor(gamma.ravel()) + r = gtor(gamma) r = r * np.sqrt(b) / np.prod(1 - np.abs(gamma)**2) return r -def rtog(r): +def rtog(r: ArrayLike) -> ArrayLike: """Recurse from the model params to filter coeffs. The Shur Recursion: Table 5.5. @@ -130,17 +135,17 @@ def rtog(r): return gamma -def glev(r, b): +def glev(r: ArrayLike, b: ArrayLike) -> ArrayLike: """General Levinson Recursion, solves any Hermitian Toeplitz matrix. Can solve the Wiener-Hopf system of equations for Optimal MSE Filter design. """ _r = np.array(r).reshape(-1, 1) - # _b = np.array([b]).reshape(-1, 1) - p = len(b) + _b = np.array([b]).reshape(-1, 1) + p = len(_b) a = np.array([[1]]).reshape(-1, 1) - x = np.array([b[0] / r[0]]).reshape(-1, 1) - epsilon = r[0] + x = np.array([_b[0] / _r[0]]).reshape(-1, 1) + epsilon = _r[0] for j in range(1, p): print(j) print(f"{_r=}, {_r.shape=}") @@ -160,7 +165,7 @@ def glev(r, b): epsilon = epsilon * (1 - np.abs(gamma)**2) print(f"{epsilon=}") delta = _r1 @ np.flipud(x) - q = (b[j] - delta[0, 0]) / epsilon + q = (_b[j] - delta[0, 0]) / epsilon x = np.concatenate([x, [[0]]]) x = x + q * np.conjugate(np.flipud(a)) print() diff --git a/pyssp/modeling.py b/pyssp/modeling.py index b71206b..f406868 100644 --- a/pyssp/modeling.py +++ b/pyssp/modeling.py @@ -1,22 +1,25 @@ """Chapter 4 modeling algorithm implementations.""" +from typing import NoReturn import numpy as np import scipy as sp +from numpy.typing import ArrayLike from .state import convm -def pade(x, p, q): +def pade(x: ArrayLike, p: int, q: int) -> tuple[ArrayLike, ArrayLike]: """Reference Page 138, Table 4.1. The Pade approximation models a signal as the unis sample response of linear shift invariant system have p poles and q zeros. """ - if p + q > len(x): + _x = np.array(x) + if p + q > len(_x): raise ValueError(f"Model order {p + q} is too large.") - X = convm(x, p + 1) + X = convm(_x, p + 1) # Linear difference matrix spanning the number of zeros Xq = X[q + 1:q + p + 1, 1:p + 1].copy() @@ -29,7 +32,7 @@ def pade(x, p, q): return (a, b) -def prony(x, p, q): +def prony(x: ArrayLike, p: int, q: int) -> tuple[ArrayLike, ArrayLike, ArrayLike]: """Least square minimization of poles to get denominator coefficients. Solves directly (Pade method) to get numerator coefficients. @@ -37,6 +40,7 @@ def prony(x, p, q): Condition to energy_match is on page 575 """ + x = np.array(x) if p + q > len(x): raise ValueError(f"Model order {p + q} is too large.") @@ -74,13 +78,15 @@ def prony(x, p, q): return a, b, err -def shanks(x, p, q): +def shanks(x: ArrayLike, p: int, q: int) -> tuple[ArrayLike, ArrayLike, float]: """Shank's method.""" + x = np.array(x) N = len(x) if p + q >= N: raise ValueError(f"Model order {p + q} is too large.") a, _, _ = prony(x, p, q) + a = np.array(a) print(f"{a.transpose().ravel()=}") u = np.concatenate((np.ones(1), np.zeros(N - 1))) res = sp.signal.tf2zpk([1], a.ravel()) @@ -103,9 +109,9 @@ def shanks(x, p, q): return a, b, err -def spike(g, n0, n): +def spike(g: ArrayLike, n0: int, n: int) -> ArrayLike: """Leaset Squares Inverse Filter.""" - g = g.reshape(-1, 1) + g = np.array(g).reshape(-1, 1) m = len(g) if m + n - 1 <= n0: @@ -124,16 +130,14 @@ def spike(g, n0, n): return h -def ipf(): - """Iterative Pre-Filtering. - - Arguements: (x, p, q, n=10, a=None) - """ +def ipf(x: ArrayLike, p: int, q: int, n: None, a: ArrayLike) -> NoReturn: + """Iterative Pre-Filtering.""" + raise NotImplementedError() -def acm(x, p) -> tuple[np.ndarray, np.ndarray]: +def acm(x: ArrayLike, p: int) -> tuple[np.ndarray, np.ndarray]: """The auto-correlation method.""" - x0 = x.copy().ravel().reshape(-1, 1) + x0 = np.array(x).ravel().reshape(-1, 1) N = len(x0) if p >= len(x0): raise ValueError("p (all-pole model) too large") @@ -150,9 +154,9 @@ def acm(x, p) -> tuple[np.ndarray, np.ndarray]: return a, err -def covm(x, p): +def covm(x: ArrayLike, p: int)-> tuple[ArrayLike, float]: """Solve the complete Prony normal equations.""" - x0 = x.copy().ravel().reshape(-1, 1) + x0 = np.array(x).ravel().reshape(-1, 1) N = len(x0) if p >= len(x0): raise ValueError(f"{p=} all-pole model too large") @@ -169,9 +173,9 @@ def covm(x, p): return a, err -def durbin(x, p, q): +def durbin(x: ArrayLike, p: int, q: int) -> tuple[ArrayLike, ArrayLike]: """The durbin method.""" - x0 = x.copy().ravel().reshape(-1, 1) + x0 = np.array(x).ravel().reshape(-1, 1) # N = len(x0) if p >= len(x0): raise ValueError("p (all-pole model) too large") diff --git a/pyssp/optimal.py b/pyssp/optimal.py index 1176c29..580eaff 100644 --- a/pyssp/optimal.py +++ b/pyssp/optimal.py @@ -2,10 +2,11 @@ import numpy as np +import numpy.typing as npt -# pylint: disable=R0914 -def kalman(y, A, C, sigmaw, sigmav): +def kalman(y: list[float], A: npt.ArrayLike, C: npt.ArrayLike, sigmaw: list[float], + sigmav: list[float]) -> tuple[npt.NDArray, ...]: """Kalman Filter. y: vector of observations N x q, n time steps, q sensors @@ -64,9 +65,11 @@ def kalman(y, A, C, sigmaw, sigmav): return P0, P1, K, xhat0, xhat1 -def wiener_denoise(): +def wiener_denoise() -> None: """Denoising based on IIR wiener filters.""" + raise NotImplementedError() -def wiener_systemid(): +def wiener_systemid() -> None: """Systemid based on FIR wiener filters.""" + raise NotImplementedError() diff --git a/pyssp/routines.py b/pyssp/routines.py index b0d2635..c867555 100644 --- a/pyssp/routines.py +++ b/pyssp/routines.py @@ -1,8 +1,9 @@ -"""Some helper routines.""" +"""Some helper routines. +Note: remove this file and fine the right place for the functions. +""" -def back_substitution(): - """Convinient recursion for an all-pole model. - Arguements: (x, p) - """ +def back_substitution(x: list[float], p: int) -> None: + """Convinient recursion for an all-pole model.""" + raise NotImplementedError() diff --git a/pyssp/spectrum.py b/pyssp/spectrum.py index 2490a39..6b2e55f 100644 --- a/pyssp/spectrum.py +++ b/pyssp/spectrum.py @@ -1,21 +1,15 @@ """Power spectrum and Frequency Estimators.""" +from typing import NoReturn import numpy as np +from numpy.typing import ArrayLike from .modeling import acm from .state import covar -def overlay(): - """Periodogram overlays: using an ensemble of realizations. - - Summary of this function. - Arguements: (N, omega, A, sigma, num) - """ - - -def periodogram(x, n1=0, n2=None, nfft=1024): +def periodogram(x: ArrayLike, n1: int=0, n2: None | int=None, nfft: None | int=1024) -> np.ndarray: """Periodogram, non-paramteric spectrum estimator. Ergodic in signal length converging to the true power spectrum. @@ -27,6 +21,9 @@ def periodogram(x, n1=0, n2=None, nfft=1024): if n2 is None: n2 = len(_x) + if nfft is None: + nfft = max(1024, n2 + 1) + nfft = max(nfft, len(_x)) Px = np.abs(np.fft.fft(_x[n1:n2], nfft)) ** 2 / (n2 - n1) @@ -36,7 +33,7 @@ def periodogram(x, n1=0, n2=None, nfft=1024): return Px -def mper(x, n1, n2, win=None): +def mper(x: ArrayLike, n1: int, n2: int, win: ArrayLike | None) -> np.ndarray: """Estimates non-paramteric spectrum estimator. Same as the periodogram but the signal is windowed first. @@ -56,14 +53,14 @@ def mper(x, n1, n2, win=None): return Px -def bart(x, nsect): +def bart(x: ArrayLike, nsect: int) -> np.ndarray: """Bartlett's method non-paramteric spectrum estimation. Reference Page 414, Figure 8.13. Averages periodograms of non-overlapping sections. """ - _x = np.array(x) - L = len(x) // nsect + _x = np.array(x).ravel().reshape(-1, 1) + L = len(_x) // nsect Px = np.zeros(1) n1 = 0 @@ -74,7 +71,7 @@ def bart(x, nsect): return Px -def welch(x, L, over, win=None): +def welch(x: ArrayLike, L: int, over: int, win: ArrayLike | None) -> np.ndarray: """Welch's method: non-paramteric spectrum estimator. Reference Page 418, Figure 8.16 @@ -83,35 +80,39 @@ def welch(x, L, over, win=None): if over >= 1 or over < 0: raise ValueError(f"Overlap {over=} is invalid") + if win is None: + win = np.ones(len(_x)) + n1 = 0 n0 = (1 - over) * L nsect = 1 + (len(_x) - L) // n0 Px = np.zeros(1) for _ in range(nsect): - Px = Px + mper(_x, win, n1, n1 + L - 1) + Px = Px + mper(_x, n1, n1 + L - 1, win) n1 = n1 + n0 return Px -def per_smooth(x, M, n1=0, n2=None, win=None): +def per_smooth(x: ArrayLike, M: int, n1: int=0, n2: None=None, win: None=None) -> np.ndarray: """Blackman-Tukey Spectrum Estimator: non-parametric estimation. Page 421, Figure 8.18 """ _x = np.array(x) + _n2: int if n2 is None: - n2 = len(_x) + _n2 = len(_x) - R = covar(x[n1:n2], M) - r = np.concatenate((np.flip(R[0, 1:M]), - [R[0, 0]], R[0, 1:M])) + R = np.array(covar(_x[n1:n2], M)) + r = np.concatenate(np.array((np.flip(R[0, 1:M]), [R[0, 0]], R[0, 1:M]))) + _win: ArrayLike if win is None: - win = np.ones(M) - r = r * win + _win = np.ones(M) + r = r * _win nfft = max(1024, M) Px = np.abs(np.fft.fft(r, nfft)) Px[0] = Px[1] @@ -119,7 +120,7 @@ def per_smooth(x, M, n1=0, n2=None, win=None): return Px -def minvar(x, p): +def minvar(x: ArrayLike, p: int) -> np.ndarray: """Minimum Variance spectrum estimator: parametric estimation. Page 430, Figure 8.19 @@ -135,7 +136,7 @@ def minvar(x, p): return Px -def mem(x, p): +def mem(x: ArrayLike, p: int) -> np.ndarray: """Maximum Entropy spectrum estimator: parametric estimation. Page 437, Figure 8.19 @@ -147,16 +148,17 @@ def mem(x, p): return Px -def modal(): +def modal(x: ArrayLike, p: int, q: int) -> NoReturn: """Mode-based spectrum estimator: parametric estimation. Page 440. Arguements: (x, p, q) """ + raise NotImplementedError() -def phd(x, p): +def phd(x: ArrayLike, p: int) -> tuple[np.ndarray, float]: """Pisarenko Harmonic Decomposition frequency estimator. A noise subspace method. @@ -174,15 +176,15 @@ def phd(x, p): return vmin, sigma -def music(x, p, M): +def music(x: ArrayLike, p: int, M: int) -> ArrayLike: """MUSIC, Frequency estimator. A noise subspace method. Page 430, Figure 8.19 """ - _x = np.array(x) - if p + 1 > M or len(x) < M: + _x = np.array(x).ravel().reshape(-1, 1) + if p + 1 > M or len(_x) < M: raise ValueError("Size of signal covariance matrix is inappropriate.") R = covar(x, M) @@ -190,7 +192,7 @@ def music(x, p, M): ddiag = np.diag(d) i = np.argsort(ddiag) # y = ddiag[i] - Px = np.zeros(1) + Px = np.zeros(1, dtype=float) nfft = max(len(_x) + 1, 1024) for j in range(M - p): @@ -201,7 +203,7 @@ def music(x, p, M): return Px -def ev(x, p, M): +def ev(x: ArrayLike, p: int, M: int) -> np.ndarray: """Eigenvector spectrum estimator: noise subspace method, parametric. A noise subspace method. @@ -217,7 +219,7 @@ def ev(x, p, M): # ddiag = np.diag(d) yi = np.argsort(np.diag(d)) # y = ddiag[yi] - Px = np.zeros(0) + Px = np.zeros(1, dtype=float) nfft = max(1024, M + p + 1) for j in range(M - p): Px = Px + np.abs(np.fft.fft(v[:, yi[j]], nfft)) @@ -227,7 +229,7 @@ def ev(x, p, M): return Px -def min_norm(x, p, M): +def min_norm(x: ArrayLike, p: int, M: int) -> np.ndarray: """Minimum Norm spectrum estimator: noise subspace method, parametric. Frequency estimator. @@ -253,7 +255,7 @@ def min_norm(x, p, M): return Px -def bt_pc(x, p, M): +def bt_pc(x: ArrayLike, p: int, M: int) -> np.ndarray: """Blackman-Tukey Principle-Component Frequeny Estimator. program for estimating the frequencies of pcomplex exponentials in white noise using @@ -280,15 +282,20 @@ def bt_pc(x, p, M): return Px -def mv_pc(): - """Minimum Variance, Principle Component. +def mv_pc(x: ArrayLike, p: int, M: int) -> NoReturn: + """Minimum Variance, Principle Component.""" + raise NotImplementedError() - Arguements: (x, p, M) - """ +def ar_pc(x: ArrayLike, p: int, M: int) -> NoReturn: + """Autoregressive, Principle Component.""" + raise NotImplementedError() -def ar_pc(): - """Autoregressive, Principle Component. - Arguements: (x, p, M) +def overlay(N: int, omega: int, A: ArrayLike, sigma: int, num: int) -> NoReturn: + """Periodogram overlays: using an ensemble of realizations. + + Summary of this function. + Arguements: (N, omega, A, sigma, num) """ + raise NotImplementedError() diff --git a/pyssp/state.py b/pyssp/state.py index f667d0f..b7fd990 100644 --- a/pyssp/state.py +++ b/pyssp/state.py @@ -1,78 +1,75 @@ """Systems of state matrix representation from the Appendix.""" +from typing import NoReturn + import numpy as np +from numpy.typing import ArrayLike -def convm(x, p): - """Convolution Matrix. +def convm(x: ArrayLike, p: int) -> np.ndarray: + """Construct the convolution matrix of the signal x with p number of parameters. (N + p - 1) by p non-symmetric Toeplitz matrix """ + _x = np.array(x) if p < 1: raise ValueError(f"{p=} must be greater or equal to 1.") - N = len(x) + 2 * p - 2 + N = len(_x) + 2 * p - 2 # the signal centered over its support # needed for the signal information-preserving frequency spectrum - xcol = (x.copy()).reshape(-1, 1) + xcol = (_x.copy()).reshape(-1, 1) xpad = np.concatenate((np.zeros((p - 1, 1)), xcol, np.zeros((p - 1, 1)))) - X = np.empty([len(x) + p - 1, p]) + X = np.empty([len(_x) + p - 1, p]) for i in range(p): X[:, i] = xpad[p - i - 1:N - i, 0] return X -def covar(x, p): +def covar(x: ArrayLike, p: int) -> np.ndarray: """Covariance Matrix. p x p hermitian toeplitz matrix of sample covariances """ - m = len(x) + _x = np.array(x) + m = len(_x) # remove the mean - x0 = x.copy() - np.mean(x) + x0 = _x - np.mean(_x) R = np.transpose((convm(x0, p + 1).conjugate())) @ (convm(x0, p + 1) / (m - 1)) return R -def normalprony(): - """Yuler-Walker Systems of Equations. - - Arguements: (x, p, q) - """ - +def normalprony(x: ArrayLike, p: int, q: int) -> NoReturn: + """Normalized prony Systems of Equations.""" + raise NotImplementedError() -def ywe(): - """Yuler-Walker Systems of Equations. - Arguements: (x, p, q) - """ +def ywe(x: ArrayLike, p: int, q: int) -> NoReturn: + """Yuler-Walker Systems of Equations.""" + raise NotImplementedError() -def nywe(): - """Normalized Yuler-Walker Systems of Equations. +def nywe(x: ArrayLike, p: int, q: int) -> NoReturn: + """Normalized Yuler-Walker Systems of Equations.""" + raise NotImplementedError() - Arguements: (x, p, q) - """ +def mywe(x: ArrayLike, p: int, q: int) -> NoReturn: + """Modified Yuler-Walker Systems of Equations.""" + raise NotImplementedError() -def mywe(): - """Modified Yuler-Walker Systems of Equations. - Arguements: (x, p, q) - """ - - -def eywe(): - """Extended Yuler-Walker Systems of Equations. - - Arguements: (x, p, q) - """ +def eywe(x: ArrayLike, p: int, q: int) -> NoReturn: + """Extended Yuler-Walker Systems of Equations.""" + raise NotImplementedError() -def normaldeterministic(): +def normaldeterministic(x: ArrayLike, p: int, q: int) -> NoReturn: """Normal Determenistic Systems of Equations.""" + raise NotImplementedError() -def wienerhopf(): +def wienerhopf(x: ArrayLike, p: int, q: int) -> NoReturn: """Wiener-Hopf Systems of Equations.""" + raise NotImplementedError() diff --git a/pyssp/system.py b/pyssp/system.py index 0e410d6..a266470 100644 --- a/pyssp/system.py +++ b/pyssp/system.py @@ -1,17 +1,57 @@ """Chapter 3, stochastic systems.""" -def arma(): +import random +from collections.abc import Generator +from typing import NoReturn + + +class RandomProcess(random.Random): + """Random process base class.""" + + +def arma(p: int, q: int, N: int, process: None | Generator = None) -> NoReturn: """Auto-regressive Moving-Average.""" + raise NotImplementedError() -def ar(): +def ar(p: int, N: int, process: None | Generator = None) -> NoReturn: """Auto-regressive.""" + raise NotImplementedError() + + +def ma(q: int, N: int, process: None | Generator = None) -> NoReturn: + """Moving Average Random (stochastic) process.""" + raise NotImplementedError() + + +def harmonic(A: int, process: None | Generator = None) -> NoReturn: + """The harmonic random process.""" + raise NotImplementedError() + + +def white_noise(variance: float) -> NoReturn: + """The harmonic random process. + + Page 93. + + """ + raise NotImplementedError() + + +def white_gaussian_noise() -> NoReturn: + """A random process, a sequence of uncorrelated real-valued Gaussian random. + + Page 94. + + """ + raise NotImplementedError() -def ma(): - """Moving Average.""" +def bernoulli() -> NoReturn: + """The Bernoulli process consists of a sequence of uncorrelated Bernoulli variables (-1, 1). + Page 94. -def wiener(): - """Optimal Wiener.""" + """ + raise NotImplementedError() diff --git a/stubs/__init__.pyi b/stubs/__init__.pyi new file mode 100644 index 0000000..760a1cb --- /dev/null +++ b/stubs/__init__.pyi @@ -0,0 +1,10 @@ +"""The import root for the book sections.""" + +# import system as Chapter3 +# import modeling as Chapter4 +# import levinson as Chapter5 +# import lattice as Chapter6 +# import optimal as Chapter7 +# import spectrum as Chapter8 +# import adaptive as Chapter9 +# import state as Appendix diff --git a/stubs/adaptive.pyi b/stubs/adaptive.pyi new file mode 100644 index 0000000..b4a5a80 --- /dev/null +++ b/stubs/adaptive.pyi @@ -0,0 +1,6 @@ +"""Chapter 9 algorithm implementations.""" + + +def lms(x, d, mu, nord, a0=None): ... +def nlms(x, d, beta, nord, a0): ... +def rls(x, d, nord, lamda=1, delta=0.001): ... diff --git a/stubs/lattice.pyi b/stubs/lattice.pyi new file mode 100644 index 0000000..bd14adc --- /dev/null +++ b/stubs/lattice.pyi @@ -0,0 +1,9 @@ +"""Implementation of algorithm from Chapter 6.""" + + +def fcov(x, p): ... +def burg(x, p): ... +def mcov(x, p): ... + +def bcov(x, p): + raise NotImplementedError() diff --git a/stubs/levinson.pyi b/stubs/levinson.pyi new file mode 100644 index 0000000..398401c --- /dev/null +++ b/stubs/levinson.pyi @@ -0,0 +1,12 @@ +"""Chapter 5 algorithm implementations.""" + +import numpy as np + + +def rtoa(r) -> tuple[np.ndarray, np.ndarray]: ... +def gtoa(gamma): ... +def atog(a): ... +def gtor(gamma, epsilon=None): ... +def ator(a, b): ... +def rtog(r): ... +def glev(r, b): ... diff --git a/stubs/modeling.pyi b/stubs/modeling.pyi new file mode 100644 index 0000000..4cfdf85 --- /dev/null +++ b/stubs/modeling.pyi @@ -0,0 +1,12 @@ +"""Chapter 4 modeling algorithm implementations.""" + +from numpy import ndarray + +def pade(x, p, q): ... +def prony(x, p, q): ... +def shanks(x, p, q): ... +def spike(g, n0, n): ... +def ipf(): ... +def acm(x, p) -> tuple[ndarray, ndarray]: ... +def covm(x, p): ... +def durbin(x, p, q): ... diff --git a/stubs/optimal.pyi b/stubs/optimal.pyi new file mode 100644 index 0000000..accca06 --- /dev/null +++ b/stubs/optimal.pyi @@ -0,0 +1,13 @@ +"""Optimal Filters: optimal wiener filters optimal kalman estimator.""" + + +"""Optimal kalman estimator""" +def kalman(y, A, C, sigmaw, sigmav): ... + +"""Denoising based on IIR wiener filters.""" +def wiener_denoise(): + raise NotImplementedError() + +"""Systemid based on FIR wiener filters.""" +def wiener_systemid(): + raise NotImplementedError() diff --git a/stubs/routines.pyi b/stubs/routines.pyi new file mode 100644 index 0000000..9d0b4a5 --- /dev/null +++ b/stubs/routines.pyi @@ -0,0 +1,5 @@ +"""Some helper routines.""" + + +def back_substitution(): + raise NotImplementedError() diff --git a/stubs/spectrum.pyi b/stubs/spectrum.pyi new file mode 100644 index 0000000..9022052 --- /dev/null +++ b/stubs/spectrum.pyi @@ -0,0 +1,24 @@ +"""Power spectrum and Frequency Estimators.""" + + +def periodogram(x, n1=0, n2=None, nfft=1024): ... +def mper(x, n1, n2, win=None): ... +def bart(x, nsect): ... +def welch(x, L, over, win=None): ... +def per_smooth(x, M, n1=0, n2=None, win=None): ... +def minvar(x, p): ... +def mem(x, p): ... +def phd(x, p): ... +def music(x, p, M): ... +def ev(x, p, M): ... +def min_norm(x, p, M): ... +def bt_pc(x, p, M): ... + +def overlay(): + raise NotImplementedError() +def modal(x, p, q): + raise NotImplementedError() +def mv_pc(x, p, M): + raise NotImplementedError() +def ar_pc(x, p, M): + raise NotImplementedError() diff --git a/stubs/state.pyi b/stubs/state.pyi new file mode 100644 index 0000000..03139f2 --- /dev/null +++ b/stubs/state.pyi @@ -0,0 +1,30 @@ +"""Systems of state matrix representation from the Appendix.""" + + +import numpy.typing as npt + + +def convm(x: npt.ArrayLike, p: int = ...): ... + +def covar(x: npt.ArrayLike, p: int = ...): ... + +def normalprony(x: npt.ArrayLike, p: int = ..., q: int = ...): + raise NotImplementedError() + +def ywe(x: npt.ArrayLike, p: int = ..., q: int = ...): + raise NotImplementedError() + +def nywe(x: npt.ArrayLike, p: int = ..., q: int = ...): + raise NotImplementedError() + +def mywe(x: npt.ArrayLike, p: int = ..., q: int = ...): + raise NotImplementedError() + +def eywe(x: npt.ArrayLike, p: int = ..., q: int = ...): + raise NotImplementedError() + +def normaldeterministic(x: npt.ArrayLike, p: int = ..., q: int = ...): + raise NotImplementedError() + +def wienerhopf(x: npt.ArrayLike, p: int = ..., q: int = ...): + raise NotImplementedError() diff --git a/stubs/system.pyi b/stubs/system.pyi new file mode 100644 index 0000000..b33217e --- /dev/null +++ b/stubs/system.pyi @@ -0,0 +1,18 @@ +"""Chapter 3, stochastic systems.""" + + +"""Auto-regressive Moving-Average.""" +def arma(): + raise NotImplementedError() + +"""Auto-regressive.""" +def ar(): + raise NotImplementedError() + +"""Moving Average.""" +def ma(): + raise NotImplementedError() + +"""Optimal Wiener.""" +def wiener(): + raise NotImplementedError()