-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathtime_frequency.py
293 lines (240 loc) · 11.6 KB
/
time_frequency.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
'''
Time Frenquency Pricing
This module is designed for time frequency asset pricing model,
including Fourier Transform based method and Wavelet based method.
'''
import numpy as np
from numpy import ndarray
import pywt
import statsmodels.api as sm
from statsmodels.regression.rolling import RollingOLS
from prettytable import PrettyTable
class wavelet():
'''
This class is designed for decomposing the series using wavelet method,
basing on the package pywt.
'''
def __init__(self, series:ndarray):
'''
input:
series (array) : the series to be decomposed.
'''
self.series = np.reshape(series, (len(series)))
def _decompose(self, wave: str, mode:str, level:int=None):
'''
This function is designed for decomposing the series using given wavelet family, mode and level.
input :
wave (str) : The chosen wavelet family, which is the same in pywt.
mode (str) : Choose multiscale or single scale. 'multi' denotes multiscale. 'single' denotes single scale.
level (int) : the level of multiscale. If 'multi' is chosen, it must be set.
output :
wave_dec (list): The decomposed details including (CA, CN, CN-1,..., C1).
'''
if mode == 'multi':
wave_dec = pywt.wavedec(self.series, wavelet=wave, level=level)
elif mode == 'single':
wave_dec = pywt.dwt(self.series, wavelet=wave)
self.mode = mode
self.wave = wave
self._level = level
self.wave_dec = wave_dec
return wave_dec
def _pick_details(self):
'''
This function is designed for picking the details of decomposed series at different level.
output:
pick_series (list): pick the detail series at each level. Each elements in list only contains the details at that level.
'''
import numpy as np
pick_series = list()
for i in range(self._level+1):
temp_dec = list()
for j in range(self._level+1):
if i == j :
temp_dec.append(self.wave_dec[j])
elif i != j :
temp_dec.append(np.zeros_like(self.wave_dec[j]))
pick_series.append(temp_dec)
return pick_series
def _rebuild(self, mode: str='constant'):
'''
This function is designed for rebuilding the detail series from the picked series at each level.
input:
mode (str): The recomposed method.
output:
wave_rec (list): The recomposed series from the details at each level.
'''
if self.mode == 'multi':
pick_series = self._pick_details()
wave_rec = list()
for i in range(len(pick_series)):
wave_rec.append(pywt.waverec(pick_series[i], self.wave, mode=mode))
return wave_rec
def fit(self, wave:str, mode:str, level:int, output: bool=False):
'''
This function is designed for fitting the model.
input :
wave (str) : The chosen wavelet family, which is the same in pywt.
mode (str) : Choose multiscale or single scale. 'multi' denotes multiscale. 'single' denotes single scale.
level (int) : The level of multiscale. If 'multi' is chosen, it must be set.
output (boolean): whether output the result. The **DEFAULT** is False.
output :
wave_rec (list): The recomposed series from the details at each level, if output is True.
'''
self._decompose(wave, mode, level)
if output == True:
return self._rebuild()
elif output == False:
self._rebuild()
'''
Wavelet pricing
'''
class wavelet_pricing():
'''
This module is designed for wavelet pricing model.
'''
def __init__(self, rets: ndarray, factors:ndarray):
'''
input :
rets (ndarray/Series/DataFrame): The dependent variables of the return.
factors (ndarray/Series/DataFrame): The independent variables of factors.
'''
if type(rets).__name__ == 'ndarray':
self.rets = rets
elif type(rets).__name__ == 'Series' or type(rets).__name__ == 'DataFrame':
self.rets = np.array(rets)
if type(factors).__name__ == 'ndarray':
self.factors = factors
elif type(factors).__name__ == 'Series' or type(factors).__name__ == 'DataFrame':
self.factors = np.array(factors)
def wavelet_dec_rec(self, **kwargs):
'''
This function is designed for wavelet decomposing and recomposing.
input :
kwargs : the kwargs include wave family, mode, and level of wavelet.
output :
rets_dec_s (list): The recomposed detail series of returns (rets).
factors_dec_s (list): The recomposed detail series of factors (factors).
'''
try:
r, c = np.shape(self.factors)
except:
r = len(self.factors)
c = 1
rets_dec = wavelet(self.rets)
if c > 1:
factors_dec_list = [wavelet(self.factors[:, i]) for i in range(c)]
elif c == 1:
factors_dec_list = [wavelet(self.factors)]
rets_dec_s = rets_dec.fit(wave=kwargs['wave'], mode=kwargs['mode'], level=kwargs['level'], output=True)
factors_dec_s = list()
for i in range(c):
factors_dec_s.append(factors_dec_list[i].fit(wave=kwargs['wave'], mode=kwargs['mode'], level=kwargs['level'], output=True))
self._rets_dec_s = rets_dec_s
self._factors_dec_s = factors_dec_s
self._level = kwargs['level']
self._factor_num = c
self._series_len = r
return rets_dec_s, factors_dec_s
def wavelet_regression(self, **kwargs):
'''
This function is designed for OLS regression of detail series between return and factors at each level.
input :
kwargs : the kwargs include 'constant': whether the regression includes the constant.
output :
regress (list): the regression results of OLS in package **statsmodels**.
'''
regress = list()
if kwargs['win'] == None:
for i in range(self._level+1):
temp_rets_s = self._rets_dec_s[i]
temp_factor_s = np.zeros((self._series_len, self._factor_num))
for j in range(self._factor_num):
temp_factor_s[:, j] = self._factors_dec_s[j][i]
if kwargs['constant'] == True:
if kwargs['robust'] == False:
regress.append(sm.OLS(temp_rets_s, sm.add_constant(temp_factor_s)).fit())
elif kwargs['robust'] == True:
regress.append(sm.OLS(temp_rets_s, sm.add_constant(temp_factor_s)).fit(cov_type='HC0'))
self._constant = kwargs['constant']
elif kwargs['constant'] == False:
if kwargs['robust'] == False:
regress.append(sm.OLS(temp_rets_s, temp_factor_s).fit())
elif kwargs['robust'] == True:
regress.append(sm.OLS(temp_rets_s, temp_factor_s).fit(cov_type='HC0'))
self._constant = kwargs['constant']
return regress
elif kwargs['win'] != None:
for i in range(self._level+1):
temp_rets_s = self._rets_dec_s[i]
temp_factor_s = np.zeros((self._series_len, self._factor_num))
for j in range(self._factor_num):
temp_factor_s[:, j] = self._factors_dec_s[j][i]
if kwargs['constant'] == True:
if kwargs['robust'] == False:
regress.append(RollingOLS(temp_rets_s, sm.add_constant(temp_factor_s), window=kwargs['win']).fit(method='pinv', params_only=kwargs['params_only']))
elif kwargs['robust'] == True:
regress.append(RollingOLS(temp_rets_s, sm.add_constant(temp_factor_s), window=kwargs['win']).fit(method='pinv', cov_type='HC0', params_only=kwargs['params_only']))
self._constant = kwargs['constant']
elif kwargs['constant'] == False:
if kwargs['robust'] == False:
regress.append(RollingOLS(temp_rets_s, temp_factor_s, window=kwargs['win']).fit(method='pinv', params_only=kwargs['params_only']))
elif kwargs['robust'] == True:
regress.append(RollingOLS(temp_rets_s, temp_factor_s, window=kwargs['win']).fit(method='pinv', cov_type='HC0', params_only=kwargs['params_only']))
self._constant = kwargs['constant']
return regress
def fit(self, wave:str, mode:str, level: int, win: int=None, robust: bool=False, constant: bool=True, params_only: bool=True):
'''
This function is designed for fitting the model.
input :
wave (str) : The chosen wavelet family, which is the same in pywt.
mode (str) : choose multiscale or single scale. 'multi' denotes multiscale. 'single' denotes single scale.
level (int) : the level of multiscale. If 'multi' is chosen, it must be set.
win (int): The rolling window if rolling regression is used.
robust (boolean): whether use the robust covariance matrix.
constant (boolean): whether includes the constant in regression. The DEFAULT is True.
'''
self.wavelet_dec_rec(wave=wave , mode=mode, level=level)
self.result = self.wavelet_regression(win=win, robust=robust, constant=constant, params_only=params_only)
return self.result
def summary(self, export: bool=False):
'''
Summarize the result
This function is designed for printing the summary.
input :
export (boolean): whether export the summary table. The **DEFAULT** is False.
output :
df (DataFrame): if export is True, then return the summary table.
'''
result = self.result
coef = np.zeros((self._factor_num+1, self._level+1))
t_value = np.zeros((self._factor_num+1, self._level+1))
r_square = list()
for j in range(len(result)):
coef[:, j] = np.around(result[j].params, decimals=3)
t_value[:, j] = np.around(result[j].tvalues, decimals=3)
r_square.append(np.around(result[j].rsquared, decimals=3))
# print table
table = PrettyTable()
if self._constant == True:
table.field_names = ['scale', 'alpha'] + [str(i+1) for i in range(self._factor_num)] + ['R2']
elif self._constant == False:
table.field_names = ['scale'] + [str(i+1) for i in range(self._factor_num)]
for j in range(len(result)):
if j != len(result)-1:
table.add_row(['scale' + str(j+1)] + list(coef[:, -(j+1)]) + [r_square[-(j+1)]])
table.add_row([' '] + list(t_value[:, -(j+1)]) + [' '])
elif j == len(result)-1:
table.add_row(['residue'] + list(coef[:, -(j+1)]) + [r_square[-(j+1)]])
table.add_row([' '] + list(t_value[:, -(j+1)]) + [' '])
print(table)
if export == True:
import pandas as pd
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
csv_string = table.get_csv_string()
with StringIO(csv_string) as f:
df = pd.read_csv(f)
return df