-
Notifications
You must be signed in to change notification settings - Fork 1
/
covariance_transformer.py
200 lines (176 loc) · 8.59 KB
/
covariance_transformer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
from abc import ABC, abstractmethod
import numpy as np
from sklearn.neighbors import KernelDensity
from scipy.optimize import minimize
import pandas as pd
class AbstractCovarianceTransformer(ABC):
"""
Abstract class for transforming a covariance matrix
"""
@abstractmethod
def transform(self, cov: np.array, n_observations: int) -> np.array:
"""
Transforms a covariance matrix
:param cov: covariance matrix
:param n_observations: number of observations used to create the covariance matrix
:return: transformed covariance matrix
"""
pass
class DeNoiserCovarianceTransformer(AbstractCovarianceTransformer):
def __init__(self, bandwidth: float = .25):
"""
:param bandwidth: bandwidth hyper-parameter for KernelDensity
"""
self.bandwidth = bandwidth
def transform(self, cov: np.array, n_observations: int) -> np.array:
"""
Computes the correlation matrix associated with a given covariance matrix,
and derives the eigenvalues and eigenvectors for that correlation matrix.
Then shrinks the eigenvalues associated with noise, resulting in a de-noised correlation matrix
which is then used to recover the covariance matrix.
In summary, this step shrinks only the eigenvalues
associated with noise, leaving the eigenvalues associated with signal unchanged.
For more info see section 4.2 of "A Robust Estimator of the Efficient Frontier",
this function and the functions it calls are all modified from this section
:param cov: the covariance matrix we want to de-noise
:param n_observations: the number of observations used to create the covariance matrix
:return: de-noised covariance matrix
"""
# q=T/N where T=sample length and N=number of variables
q = n_observations / cov.shape[1]
# get correlation matrix based on covariance matrix
correlation_matrix = cov_to_corr(cov)
# Get eigenvalues and eigenvectors in the correlation matrix
eigenvalues, eigenvectors = self._get_PCA(correlation_matrix)
# Find max random eigenvalue
max_eigenvalue = self._find_max_eigenvalue(np.diag(eigenvalues), q)
# de-noise the correlation matrix
n_facts = eigenvalues.shape[0] - np.diag(eigenvalues)[::-1].searchsorted(max_eigenvalue)
correlation_matrix = self._de_noised_corr(eigenvalues, eigenvectors, n_facts)
# recover covariance matrix from correlation matrix
de_noised_covariance_matrix = self._corr_to_cov(correlation_matrix, np.diag(cov) ** .5)
return de_noised_covariance_matrix
def _get_PCA(self, matrix: np.array) -> (np.array, np.array):
"""
Gets eigenvalues and eigenvectors from a Hermitian matrix
:param matrix: a Hermitian matrix
:return: array of eigenvalues and array of eigenvectors
"""
eigenvalues, eigenvectors = np.linalg.eigh(matrix)
indices = eigenvalues.argsort()[::-1] # arguments for sorting eigenvalues desc
eigenvalues, eigenvectors = eigenvalues[indices], eigenvectors[:, indices]
eigenvalues = np.diagflat(eigenvalues)
return eigenvalues, eigenvectors
def _find_max_eigenvalue(self, eigenvalues: np.array, q: float) -> float:
"""
Uses a Kernel Density Estimate (KDE) algorithm to fit the
Marcenko-Pastur distribution to the empirical distribution of eigenvalues.
This has the effect of separating noise-related eigenvalues from signal-related eigenvalues.
:param eigenvalues: array of eigenvalues
:param q: q=T/N where T=sample length and N=number of variables
:return: max random eigenvalue, variance
"""
# Find max random eigenvalues by fitting Marcenko's dist to the empirical one
out = minimize(
lambda *x: self._err_PDFs(*x),
.5,
args=(eigenvalues, q),
bounds=((1E-5, 1 - 1E-5),)
)
if out['success']:
var = out['x'][0]
else:
var = 1
max_eigenvalue = var * (1 + (1. / q) ** .5) ** 2
return max_eigenvalue
def _err_PDFs(self, var: float, eigenvalues: pd.Series, q: float, pts: int = 1000) -> float:
"""
Calculates a theoretical Marcenko-Pastur probability density function and
an empirical Marcenko-Pastur probability density function,
and finds the error between the two by squaring the difference of the two
:param var: variance 𝜎^2
:param eigenvalues: array of eigenvalues
:param q: q=T/N where T=sample length and N=number of variables
:param pts: number of points in the distribution
:return: the error of the probability distribution functions obtained by squaring the difference
of the theoretical and empirical Marcenko-Pastur probability density functions
"""
# Fit error
theoretical_pdf = self._mp_PDF(var, q, pts) # theoretical probability density function
empirical_pdf = self._fit_KDE(eigenvalues,
x=theoretical_pdf.index.values) # empirical probability density function
sse = np.sum((empirical_pdf - theoretical_pdf) ** 2)
return sse
def _mp_PDF(self, var: float, q: float, pts: int) -> pd.Series:
"""
Creates a theoretical Marcenko-Pastur probability density function
:param var: variance 𝜎^2
:param q: q=T/N where T=sample length and N=number of variables
:param pts: number of points in the distribution
:return: a theoretical Marcenko-Pastur probability density function
"""
min_eigenvalue, max_eigenvalue = var * (1 - (1. / q) ** .5) ** 2, var * (1 + (1. / q) ** .5) ** 2
eigenvalues = np.linspace(min_eigenvalue, max_eigenvalue, pts).flatten()
pdf = q / (2 * np.pi * var * eigenvalues) * \
((max_eigenvalue - eigenvalues) * (eigenvalues - min_eigenvalue)) ** .5
pdf = pdf.flatten()
pdf = pd.Series(pdf, index=eigenvalues)
return pdf
def _fit_KDE(
self,
obs: np.array,
kernel: str = 'gaussian',
x: np.array = None
) -> pd.Series:
"""
Fit kernel to a series of observations, and derive the prob of observations.
x is the array of values on which the fit KDE will be evaluated
:param obs: the series of observations
:param kernel: kernel hyper-parameter for KernelDensity
:param x: array of values _fit_KDE will be evaluated against
:return: an empirical Marcenko-Pastur probability density function
"""
if len(obs.shape) == 1:
obs = obs.reshape(-1, 1)
kde = KernelDensity(kernel=kernel, bandwidth=self.bandwidth).fit(obs)
if x is None:
x = np.unique(obs).reshape(-1, 1)
if len(x.shape) == 1:
x = x.reshape(-1, 1)
log_prob = kde.score_samples(x) # log(density)
pdf = pd.Series(np.exp(log_prob), index=x.flatten())
return pdf
def _de_noised_corr(self, eigenvalues: np.array, eigenvectors: np.array, n_facts: int) -> np.array:
"""
Shrinks the eigenvalues associated with noise, and returns a de-noised correlation matrix
:param eigenvalues: array of eigenvalues
:param eigenvectors: array of eigenvectors
:param n_facts: number of elements in diagonalized eigenvalues to replace with the mean of eigenvalues
:return: de-noised correlation matrix
"""
# Remove noise from corr by fixing random eigenvalues
eigenvalues_ = np.diag(eigenvalues).copy()
eigenvalues_[n_facts:] = eigenvalues_[n_facts:].sum() / float(eigenvalues_.shape[0] - n_facts)
eigenvalues_ = np.diag(eigenvalues_)
corr = np.dot(eigenvectors, eigenvalues_).dot(eigenvectors.T)
corr = cov_to_corr(corr)
return corr
def _corr_to_cov(self, corr: np.array, std: np.array) -> np.array:
"""
Recovers the covariance matrix from the de-noise correlation matrix
:param corr: de-noised correlation matrix
:param std: standard deviation of the correlation matrix
:return: a recovered covariance matrix
"""
cov = corr * np.outer(std, std)
return cov
def cov_to_corr(cov: np.array) -> np.array:
"""
Derive the correlation matrix from a covariance matrix
:param cov: covariance matrix
:return: correlation matrix
"""
std = np.sqrt(np.diag(cov))
corr = cov / np.outer(std, std)
corr[corr < -1], corr[corr > 1] = -1, 1 # numerical error
return corr