-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathengine.py
205 lines (189 loc) · 8.12 KB
/
engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import pandas as pd
from collections import defaultdict
import numpy as np
from statics import *
from utils import *
from sklearn.decomposition import PCA
import numpy.polynomial as npbasis
from plots import *
from operator import itemgetter
class Basis:
'''
Class to handle basis functions
'''
def __init__(self,balen):
'''
:param balen: Max number of basis functions to use in the sequence
'''
if np.sqrt(balen) - round(np.sqrt(balen),0) > 0:
raise ValueError("Basis Truncation Length Must be a Perfect Square")
self.balen = balen
self.badim = np.sqrt(balen)
def get_one_bas(self,n):
'''
Get the nth basis function
:param n: index for basis function
:return: nth basis function
'''
if n > self.balen:
raise ValueError("Function index is larger than maximum")
bacof = np.zeros(self.balen)
bacof[n-1] = 1
bacof = bacof.reshape((self.badim,self.badim))
return lambda x,y : npbasis.laguerre.lagval2d(x,y,bacof)
class VolSurf:
'''
Class to store and perform calculation on volatility surfaces
'''
def __init__(self,vpath,h1=1,h2=1):
'''
:param vpath: Path to the volatility file
:param h1: Paramterter for the Nadaraya-Watson interpolation
:param h2: Paramterter for the Nadaraya-Watson interpolation
'''
self.myvols = defaultdict(list)
self.histvols = defaultdict(list)
self.diff_dates = defaultdict(list)
self.dates = []
self.contracts = []
self.maxcont = {}
self.mincont = {}
self.load_vols(vpath)
self.h1 = h1
self.h2 = h2
self.pca = None
self.d1_mask = None
def nwInterp(self,strike,mat,dat,diff=False):
'''
Computes the Nadaraya-Watson interpolation for a point on the vol surface
:param strike: Moneyness of the interp point
:param mat: Maturity in years of the interp point
:param dat: Date for which to reference for interpolation
:param diff: If true, use vol % differences instead of volatility quote
:return: Implied vol for a strike and maturity
'''
interp_vol = self.diff_dates if diff else self.myvols
#Compute difference to reference
matdiff = interp_vol[(dat,'matyears')] - mat
strdiff = interp_vol[(dat, 'moneyness')] - strike
denom = sum(gfunc(matdiff,strdiff,self.h1,self.h2))
num = np.inner(interp_vol[(dat, 'normalizediv')],gfunc(matdiff,strdiff,self.h1,self.h2))
return num/denom
def load_vols(self,vpath):
'''
Read vols from a csv file
:param vpath: Path to the csv file
:return: Nothing, sets an instance variable
'''
raw = pd.read_csv(vpath).sort_values(by=['date'], ascending=True)
tempmin = raw.min(); tempmax = raw.max()
for s in suf_map:
self.mincont[s] = tempmin[s]
self.maxcont[s] = tempmax[s]
mask = (raw['call/put'] != 'C') | (raw['moneyness'] != 1)
self.dates = list(raw['date'].unique())
raw = raw[mask].to_dict('records')
full_dates = defaultdict(list)
histo = defaultdict(list)
diff_dates = defaultdict(list)
for ele in raw:
for field in suf_map:
full_dates[(ele['date'],field)].append(ele[field])
histo[(ele['moneyness'],ele['matyears'])].append(ele['normalizediv'])
self.contracts = list(histo.keys())
mysurface = np.empty((len(self.dates),))
for ele in histo:
mysurface = np.vstack([mysurface,histo[ele]])
temp_dates = mysurface[1:] #Format changes in surface
temp_dates = np.diff(temp_dates, axis=1) / temp_dates[:, :-1]
for date in range(len(self.dates)-1):
for cont in range(len(self.contracts)):
diff_dates[(self.dates[date], 'moneyness')].append(self.contracts[cont][0])
diff_dates[(self.dates[date],'matyears')].append(self.contracts[cont][1])
diff_dates[(self.dates[date],'normalizediv')].append(temp_dates[cont][date])
#Change format to numpy array
for ele in full_dates:
full_dates[ele] = np.array(full_dates[ele])
diff_dates[ele] = np.array(diff_dates[ele])
self.myvols = full_dates
self.histvols = mysurface[1:]
self.diff_dates = diff_dates
def get_pca(self,n : int, cov=True, slice=None):
'''
Compute an sklean pca object for the historical vol surface
:param n: (int) Number of factors in PCA
:param cov: (bool) If True, apply covatiance matric
:param slice: (tuple) Filter the data for 1D pca
format slice = ('moneyness',1.0) or ('maturity',0.328767123)
:return: None
'''
pca = PCA(n_components=n)
if slice is not None:
axis = 0 if slice[0] == 'moneyness' else 1
self.d1_mask = [i for i in range(len(self.contracts)) if abs(self.contracts[i][axis] - slice[1]) < 0.01]
tempvols = self.histvols[self.d1_mask,:]
else:
tempvols = self.histvols
tempvols = np.diff(tempvols,axis=1)/tempvols[:,:-1]
tempvols = (tempvols-np.mean(tempvols,axis=0))/np.std(tempvols,axis=0)
if cov:
tempvols = np.cov(tempvols)
pca.fit(np.transpose(tempvols))
self.pca = pca
return None
def map_pca(self):
'''
Maps an existing PCA scenario to the myvols instance variable
:return: None
'''
if self.pca is None:
print('Error: Must Run get_pca() before map_pca()')
exit(1)
for n in range(self.pca.n_components_):
for i in range(len(self.pca.components_[n])):
self.myvols[('pca' + str(n),'moneyness')].append(self.contracts[i][0])
self.myvols[('pca' + str(n), 'matyears')].append(self.contracts[i][1])
self.myvols[('pca' + str(n), 'normalizediv')].append(self.pca.components_[n][i])
for n in range(self.pca.n_components_):
for field in suf_map:
self.myvols[('pca' + str(n), field)] \
= np.array(self.myvols[('pca' + str(n), field)])
def proj_fact(self,diff=False):
'''
Project PCA factors onto a time series of implied vols
:param diff: Wheather to work with differences in vols or not
:return: An nxt array where n is the number of PCA factors and t
is the number of time series points
'''
if self.pca is None:
print('Error: Must Run get_pca() before proj_fact()')
exit(1)
if diff:
return np.matmul(self.pca.components_, np.diff(self.histvols,axis=1))
return np.matmul(self.pca.components_,self.histvols)
def pca_num_factors(self):
'''
Returns the number of factors contained in the PCA
:return: (int) Number of factors in mySurf pca
'''
return self.pca.n_components_
if __name__ == '__main__':
print('Welcome to the Volatility Surfice PCA Demo!')
print('You will see surface plots of the most significant volatility factors in 2019.\n'
' These will appear in Sepereate Windows.\n')
input("Press Enter to continue...")
print('Computing...')
mySurf = VolSurf('data_download_semi_clean.csv',h1=0.1,h2=0.1)
pca_factors = 4; graph_scn = ['pca' + str(i) for i in range(pca_factors)]
mySurf.get_pca(pca_factors)
mySurf.map_pca()
print('Rendering Graphs...')
title = ['PCA Factor ' + str(i) for i in range(1,pca_factors+1)]
colors = ['blue','orange','green','red', 'purple', 'grey']
plt_surf_mult(mySurf,graph_scn,title=title,color=colors)
for i in range(pca_factors): #Plot surfaces of volatility factors
graph_scn = 'pca' + str(i)
plt_surf(mySurf,graph_scn,diff=False,save=False,title='Volatility Plot for Factor ' + str(i+1))
plt_importance(mySurf,4) #Plot importance of each factor
plt_proj_time(mySurf,pca_factors,save=True,title='projected_factors.png') #Plot time series of factor magnitude
print("Done!")