-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvectorial_model.py
133 lines (105 loc) · 4.04 KB
/
vectorial_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from math import log
import json
try:
from .text_transform import get_progressbar
except (ModuleNotFoundError, ImportError):
from text_transform import get_progressbar
class VectorialModel:
def __init__(self, alpha=0.5, recover_len=10) -> None:
self.alpha = alpha
self.recover_len = recover_len
def fit(self, corpus: dict):
self.N = len(corpus)
self.invected_index = {}
self.max_freq_doc = {}
bar = get_progressbar(self.N, ' term indexation ')
bar.start()
for i, text_hsh in enumerate(corpus):
f_max = 0
for token in corpus[text_hsh]:
try:
doc_dic = self.invected_index[token]
except KeyError:
doc_dic = {}
try:
doc_dic[text_hsh] += 1
except KeyError:
doc_dic[text_hsh] = 1
if doc_dic[text_hsh] > f_max:
f_max = doc_dic[text_hsh]
self.invected_index[token] = doc_dic
self.max_freq_doc[text_hsh] = f_max
bar.update(i + 1)
bar.finish()
self.doc2vec = {}
self.doc_w2 = {}
self.term_to_index = [key for key in self.invected_index]
self.term_to_index.sort()
bar = get_progressbar(
len(self.term_to_index), f' document vectorization {len(self.term_to_index)} ')
bar.start()
for text_hsh in corpus:
self.doc2vec[text_hsh] = {}
for i, term in enumerate(self.term_to_index):
for text_hsh in self.invected_index[term]:
tf = self.invected_index[term][text_hsh] / \
self.max_freq_doc[text_hsh]
idf = log(self.N/len(self.invected_index[term]))
self.doc2vec[text_hsh][term] = (tf * idf)
bar.update(i+1)
bar.finish()
bar = get_progressbar(
len(corpus), f' document quadratic computing {len(corpus)} ')
bar.start()
for i, text_hsh in enumerate(corpus):
self.doc_w2[text_hsh] = sum(
[w*w for _, w in self.doc2vec[text_hsh].items()])
bar.update(i+1)
bar.finish()
def get_ranking(self, query_tokens):
fqi, max_fqi = {}, 0
for word in query_tokens:
if not word in self.invected_index:
continue
try:
fqi[word] += 1
except KeyError:
fqi[word] = 1
if max_fqi < fqi[word]:
max_fqi = fqi[word]
query_vector = []
for word in fqi:
w = (self.alpha + (1 - self.alpha) *
fqi[word]/max_fqi) * log(self.N/len(self.invected_index[word]))
query_vector.append((word, w))
q2 = sum([w*w for _, w in query_vector])
ranking = []
for text_hsh, term_w in self.doc2vec.items():
tqw = 0
for term, qw in query_vector:
try:
tqw += term_w[term] * qw
except KeyError:
continue
ww = self.doc_w2[text_hsh]
ranking.append((tqw/(q2 * ww) if q2 * ww > 0 else 0, text_hsh))
ranking.sort(key=lambda x: x[0], reverse=True)
return [ranking[i][1] for i in range(self.recover_len)]
def dumps_path(self, path, key=""):
return f'{path}/{key}/data_from_vectorial_model.json'
def save_model(self, path, key=""):
with open(f'{path}/{key}/data_from_vectorial_model.json', 'w+') as f:
f.write(json.dumps({
'ii': self.invected_index,
'd2v': self.doc2vec,
'dw2': self.doc_w2,
'N': self.N
}))
f.close()
def load_model(self, path, key=''):
with open(f'{path}/{key}/data_from_vectorial_model.json', 'r') as f:
data = json.load(f)
self.invected_index = data['ii']
self.doc2vec = data['d2v']
self.doc_w2 = data['dw2']
self.N = data['N']