Skip to content

Commit

Permalink
Resolve #8: static type checking enforced and passing
Browse files Browse the repository at this point in the history
- all algorithm declarations are:
 - marked as implemented or not
 - annotated with static typing
- added exhaustive list of random processes from chapter 3.
- mypy'ed, locally passing all files
- ct update: verbose pytest, mypy, and ruff
- first clean set of stub files, manually added, see #9
- partially resolves #9, but direct generation from code is still needed
  • Loading branch information
mahdi committed Nov 23, 2023
1 parent e4d418d commit 4e21a11
Show file tree
Hide file tree
Showing 22 changed files with 393 additions and 165 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ContinuousTesting.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ jobs:
- name: Static Lint
run: |
ruff check --output-format=github pyssp --verbose
ruff . --verbose --output-format=github
- name: Static Type Check
run: |
mypy pyssp --explicit-package-bases
mypy --verbose
- name: Test
run: |
pytest
pytest --verbose
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ __pycache__
*cache
*.egg-info
env
.env
.pyenv*
.coverage
.vscode

27 changes: 25 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,31 @@ addopts = [
]

[tool.ruff]
include = ["pyssp/**"]

line-length = 100
indent-width = 4
target-version = "py312"
extend-exclude = [".pyenv*"]

[tool.ruff.lint]
select = ["E", "F", "B", "I", "SIM", "W", "D"]
select = ["E",
"F",
"B",
"I",
"SIM",
"W",
"D",
"PL",
"NPY",
"PERF",
"C90",
"RUF"]
ignore = ["D213", "D401", "D211"]

[tool.ruff.lint.pydocstyle]
convention = "google"

[tool.ruff.format]
quote-style = "double"
indent-style = "space"
Expand All @@ -32,4 +50,9 @@ line-ending = "auto"
[tool.mypy]
mypy_path = "pyssp"
ignore_missing_imports = true
#plugins = "numpy.typing.mypy_plugin"
disallow_incomplete_defs = true
disallow_untyped_calls = true
disallow_untyped_defs = true
disallow_untyped_decorators = true
plugins = "numpy.typing.mypy_plugin"
files = ["pyssp/*.py"]
23 changes: 15 additions & 8 deletions pyssp/adaptive.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Chapter 9 algorithm implementations."""

import numpy as np
from numpy.typing import ArrayLike

from .state import convm


def lms(x, d, mu, nord, a0=None):
def lms(x: ArrayLike, d: ArrayLike, mu: int, nord: int,
a0: None | ArrayLike) -> tuple[ArrayLike, ArrayLike]:
"""LMS Adaptive Filter.
Refernce Page 506, Figure 9.7
Expand All @@ -21,17 +23,19 @@ def lms(x, d, mu, nord, a0=None):
E = np.zeros((M - nord + 1))

_a0 = np.array(a0)
E[0] = d[0] - _a0 * X[0]
_d = np.array(d)
E[0] = _d[0] - _a0 * X[0]
A[0] = a0 + mu * E[0] * np.conjugate(X[0])

for k in (1, M - nord + 1):
E[k] = d[k] - A[k - 1] * X[k]
E[k] = _d[k] - A[k - 1] * X[k]
A[k] = A[k - 1] + mu * E[k] * np.conjugate(X[k])

return A, E


def nlms(x, d, beta, nord, a0):
def nlms(x: ArrayLike, d: ArrayLike, beta: int, nord: int,
a0: ArrayLike | None = None) -> tuple[ArrayLike, ArrayLike]:
"""Normalized LMS Adaptive Filter.
Refernce Page 515, Figure 9.12
Expand All @@ -47,27 +51,30 @@ def nlms(x, d, beta, nord, a0):
E = np.zeros((M - nord + 1))

_a0 = np.array(a0)
_d = np.array(d)
DEN = X[0] * X[0] + delta
E[0] = d[0] - _a0 * X[0]
E[0] = _d[0] - _a0 * X[0]
A[0] = a0 + beta / DEN * E[0] * np.conjugate(X[0])

# no further iterations if M == 1
for k in (1, M - nord + 1):
E[k] = d[k] - A[k - 1] * X[k]
E[k] = _d[k] - A[k - 1] * X[k]
DEN = X[k] * X[k] + delta
A[k] = A[k - 1] + beta / DEN * E[k] * np.conjugate(X[k])

return A, E


def rls(x, d, nord, lamda=1, delta=0.001):
def rls(x: ArrayLike, d: ArrayLike, nord: int, lamda: float=1,
delta: float=0.001) -> tuple[ArrayLike, ArrayLike]:
"""Recursive Least Squares.
Reference 545, Table 9.6
For special case of lambda == 1, this is known as growing window RLS algorithm,
For all other values of lambda <1 and >0, this is the exponentially weighted RLS algorithm.
"""
_d = np.array(d)
X = convm(x, nord)
M, N = X.shape
P = np.eye(N) / delta
Expand All @@ -76,7 +83,7 @@ def rls(x, d, nord, lamda=1, delta=0.001):
for k in range(1, M - nord + 1):
z = P * X[k]
g = z / (lamda + X[k] * z)
alpha = d[k] - X[k] @ np.transpose(W[k])
alpha = _d[k] - X[k] @ np.transpose(W[k])
W[k] = W[k - 1] + alpha * g
P = (P - g * z) / lamda

Expand Down
40 changes: 21 additions & 19 deletions pyssp/lattice.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
"""Implementation of algorithm from Chapter 6."""

from typing import NoReturn

import numpy as np
import scipy as sp
from numpy.typing import ArrayLike


def fcov(x, p):
def fcov(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]:
"""Figure 6.15, Page 310.
Using the forward covariance method the reflection co-efficients of the lattice filter
are found by sequentially minimizing the sum of the squares of the forward prediction error.
"""
if p >= len(x):
_x = np.array(x)
if p >= len(_x):
raise ValueError("Model order must be less than length of signal")

_x = np.array(x).reshape(-1, 1)
N = len(x)
_x = np.array(_x).reshape(-1, 1)
N = len(_x)
eplus = _x[1:N]
eminus = _x[:N - 1]

Expand All @@ -40,16 +43,16 @@ def fcov(x, p):
return gamma, err


def burg(x, p):
def burg(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]:
"""Sequentially minimizes the sum of the forward and backward covariance errors.
Guaranteed to be stable. All reflection coefficients will be <|1|
"""
if p > len(x):
_x = np.array(x).reshape(-1, 1)
N = len(_x)
if p > N:
raise ValueError("Model order must be less than length of signal")

_x = np.array(x).reshape(-1, 1)
N = len(x)
eplus = _x[1:N]
eminus = _x[:N - 1]

Expand All @@ -74,25 +77,24 @@ def burg(x, p):
return gamma, err


def bcov():
"""Sequentially minimizes the backward covariance error.
Arguements: (x, p)
"""
def bcov(x: ArrayLike, p: int) -> NoReturn:
"""Sequentially minimizes the backward covariance error."""
raise NotImplementedError()


def mcov(x, p):
"""Modified covariance method. Unlike the forward/backward algorithms.
def mcov(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]:
"""Modified covariance method.
Unlike the forward/backward algorithms.
It *does not* minimize an error term sequentially.
"""
_x = np.array(x).reshape(-1, 1)
N = len(x)
N = len(_x)

if p >= len(x):
if p >= len(_x):
raise ValueError("Model order must be less than length of signal")

X = sp.linalg.toeplitz(_x[p:N], np.flipud(x[:p + 1]))
X = sp.linalg.toeplitz(_x[p:N], np.flipud(_x[:p + 1]))
R = np.transpose(X) @ X
R1 = np.array(R[1:p + 1, 1: p + 1])
R2 = np.array(np.flipud(np.fliplr(R[:p, :p])))
Expand All @@ -103,7 +105,7 @@ def mcov(x, p):
b = b1 + b2
a = sp.linalg.solve_toeplitz(Rx[:, 1], b)
a = np.concatenate(([1], a))
print(a.shape)
# print(a.shape)
err = np.dot(R[0], a) + np.dot(np.flip(R[p]), a)

return a, err
51 changes: 28 additions & 23 deletions pyssp/levinson.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
"""Chapter 5 algorithm implementations."""


import numpy as np
from numpy.typing import ArrayLike


def rtoa(r) -> tuple[np.ndarray, np.ndarray]:
def rtoa(r: ArrayLike) -> tuple[np.ndarray, float]:
"""Recursively map a set of autocorrelations to a set of model parameters.
The Levison-Durbin recursion.
"""
_r = np.array(r)
a = np.ones((1, 1))
epsilon = r[0]
p = len(r) - 1
r = r.reshape(-1, 1)
epsilon = _r[0]
p = len(_r) - 1
_r = _r.reshape(-1, 1)

for j in range(1, p + 1):
gamma = -np.transpose(r[1:1 + j,]) @ np.flipud(a) / epsilon
gamma = -np.transpose(_r[1:1 + j,]) @ np.flipud(a) / epsilon
an = np.concatenate((a, np.zeros((1, 1)))).reshape(-1, 1)
anT = np.conjugate(np.flipud(a))
a = an + gamma * np.concatenate((np.zeros(1), anT.ravel())).reshape(-1, 1)
Expand All @@ -23,7 +26,7 @@ def rtoa(r) -> tuple[np.ndarray, np.ndarray]:
return a, epsilon


def gtoa(gamma):
def gtoa(gamma: ArrayLike) -> ArrayLike:
"""Recursively map parameters to reflection coeffs.
Reference Page 233, Table 5.2, Figure 5.6.
Expand All @@ -35,17 +38,18 @@ def gtoa(gamma):
Cumulant generating function in statistics.
"""
a = np.ones((1, 1))
p = len(gamma)
_g = np.array(gamma)
p = len(_g)
for j in range(1, p):
a = np.concatenate((a, np.zeros((1, 1))))
_a = a.copy()
af = np.conjugate(np.flipud(_a))
a = a + gamma[j] * af
a = a + _g[j] * af

return a


def atog(a):
def atog(a: ArrayLike) -> ArrayLike:
"""Recursively map from reflection coeffs to filter coeffs.
The step-down recursion.
Expand Down Expand Up @@ -80,20 +84,21 @@ def atog(a):
return gamma


def gtor(gamma, epsilon=None):
def gtor(gamma: ArrayLike, epsilon: None | float = None) -> ArrayLike:
"""Find the autocorrelation sequence from the reflection coefficients and the modeling error.
Page 241, Figure 5.9.
"""
p = len(gamma)
aa = np.array([[gamma[0]]]).reshape(-1, 1)
r = np.array(([1, -gamma[0]])).reshape(1, -1)
_g = np.array(gamma)
p = len(_g)
aa = np.array([[_g[0]]]).reshape(-1, 1)
r = np.array(([1, -_g[0]])).reshape(1, -1)

for j in range(1, p):
aa1 = np.concatenate((np.ones((1, 1)), aa)).reshape(-1, 1)
aa0 = np.concatenate((aa, np.zeros((1, 1)))).reshape(-1, 1)
aaf = np.conjugate(np.flipud(aa1))
aa = aa0 + gamma[j] * aaf
aa = aa0 + _g[j] * aaf
print(aa)
rf = -np.fliplr(r) @ aa
print(rf)
Expand All @@ -109,16 +114,16 @@ def gtor(gamma, epsilon=None):
return r


def ator(a, b):
def ator(a: ArrayLike, b: float) -> ArrayLike:
"""Page 241, Figure 5.9."""
gamma = atog(a)
r = gtor(gamma.ravel())
r = gtor(gamma)
r = r * np.sqrt(b) / np.prod(1 - np.abs(gamma)**2)

return r


def rtog(r):
def rtog(r: ArrayLike) -> ArrayLike:
"""Recurse from the model params to filter coeffs.
The Shur Recursion: Table 5.5.
Expand All @@ -130,17 +135,17 @@ def rtog(r):
return gamma


def glev(r, b):
def glev(r: ArrayLike, b: ArrayLike) -> ArrayLike:
"""General Levinson Recursion, solves any Hermitian Toeplitz matrix.
Can solve the Wiener-Hopf system of equations for Optimal MSE Filter design.
"""
_r = np.array(r).reshape(-1, 1)
# _b = np.array([b]).reshape(-1, 1)
p = len(b)
_b = np.array([b]).reshape(-1, 1)
p = len(_b)
a = np.array([[1]]).reshape(-1, 1)
x = np.array([b[0] / r[0]]).reshape(-1, 1)
epsilon = r[0]
x = np.array([_b[0] / _r[0]]).reshape(-1, 1)
epsilon = _r[0]
for j in range(1, p):
print(j)
print(f"{_r=}, {_r.shape=}")
Expand All @@ -160,7 +165,7 @@ def glev(r, b):
epsilon = epsilon * (1 - np.abs(gamma)**2)
print(f"{epsilon=}")
delta = _r1 @ np.flipud(x)
q = (b[j] - delta[0, 0]) / epsilon
q = (_b[j] - delta[0, 0]) / epsilon
x = np.concatenate([x, [[0]]])
x = x + q * np.conjugate(np.flipud(a))
print()
Expand Down
Loading

0 comments on commit 4e21a11

Please sign in to comment.