-
Notifications
You must be signed in to change notification settings - Fork 2
/
kalman_filter.py
85 lines (70 loc) · 3.13 KB
/
kalman_filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!\bin\python2.7
import numpy as np
from numpy import dot, zeros, eye
from numpy.linalg import inv
import config
class KalmanFilter:
'''
Kalman filtering, also known as linear quadratic estimation (LQE), is an algorithm that uses a series of measurements
observed over time, containing statistical noise and other inaccuracies,
and produces estimates of unknown variables that tend to be more accurate than those based on a single measurement
alone, by estimating a joint probability distribution over the variables for each time frame.
'''
def __init__(self, dim_x, dim_z):
#self.debug_flag = config.DEBUG_FLAG
self.debug_flag = False
if self.debug_flag:
print("KalmanFilter::Init")
print("dim_x: {}, dim_z: {}".format(dim_x, dim_z))
self.dim_x = dim_x
self.dim_z = dim_z
self.x = zeros((dim_x, 1))
self.P = eye(dim_x)
self.Q = eye(dim_x)
self.F = eye(dim_x)
self.H = zeros((dim_z, dim_x))
self.R = eye(dim_z)
self.M = zeros((dim_z, dim_z))
self._I = eye(dim_x) # This helps the I matrix to always be compatible to the state vector's dim
self.x_prior = np.copy(self.x)
self.P_prior = np.copy(self.P)
if self.debug_flag:
print("KalmanFilter::Init, x_prior: {}, P_prior: {}".format(self.x_prior, self.P_prior))
def predict(self):
'''
Predict next state (prior) using the Kalman filter state propagation
equations.
'''
if self.debug_flag:
print("KalmanFilter::predict")
self.x = dot(self.F, self.x) # x = Fx
self.P = dot(self.F, dot(self.P, self.F.T)) + self.Q # P = FPF' + Q
self.x_prior = np.copy(self.x)
self.P_prior = np.copy(self.P)
if self.debug_flag:
print("KalmanFilter::predict, x_prior: {}, P_prior: {}".format(self.x_prior, self.P_prior))
def update(self, z):
'''
At the time step k, this update step computes the posterior mean x and covariance P
of the system state given a new measurement z.
'''
if self.debug_flag:
print("KalmanFilter::update, z: {}".format(z))
# y = z - Hx (Residual between measurement and prediction)
y = z - np.dot(self.H, self.x)
PHT = dot(self.P, self.H.T)
# S = HPH' + R (Project system uncertainty into measurement space)
S = dot(self.H, PHT) + self.R
# K = PH'S^-1 (map system uncertainty into Kalman gain)
K = dot(PHT, inv(S))
if self.debug_flag:
print("KalmanFilter::update, K.shape: {}, K: {}".format(K.shape, K))
# x = x + Ky (predict new x with residual scaled by the Kalman gain)
self.x = self.x + dot(K, y)
if self.debug_flag:
print("KalmanFilter::update, x.shape: {}, x :{}".format(self.x.shape, self.x))
# P = (I-KH)P
I_KH = self._I - dot(K, self.H)
self.P = dot(I_KH, self.P)
if self.debug_flag:
print("KalmanFilter::update, P.shape: {}, P :{}".format(self.P.shape, self.P))