forked from asagar60/Instacart-Market-Basket-Analysis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathf1optimization_faron.py
97 lines (76 loc) · 3.23 KB
/
f1optimization_faron.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# -*- coding: utf-8 -*-
"""
@author: Faron
"""
import numpy as np
from operator import itemgetter
'''
This kernel implements the O(n²) F1-Score expectation maximization algorithm presented in
"Ye, N., Chai, K., Lee, W., and Chieu, H. Optimizing F-measures: A Tale of Two Approaches. In ICML, 2012."
It solves argmax_(0 <= k <= n,[[None]]) E[F1(P,k,[[None]])]
with [[None]] being the indicator for predicting label "None"
given posteriors P = [p_1, p_2, ... , p_n], where p_1 > p_2 > ... > p_n
under label independence assumption by means of dynamic programming in O(n²).
'''
class F1Optimizer():
def __init__(self):
pass
@staticmethod
def get_expectations(P, pNone=None):
expectations = []
P = np.sort(P)[::-1]
n = np.array(P).shape[0]
DP_C = np.zeros((n + 2, n + 1))
if pNone is None:
pNone = (1.0 - np.array(P)).prod()
DP_C[0][0] = 1.0
for j in range(1, n):
DP_C[0][j] = (1.0 - P[j - 1]) * DP_C[0, j - 1]
for i in range(1, n + 1):
DP_C[i, i] = DP_C[i - 1, i - 1] * P[i - 1]
for j in range(i + 1, n + 1):
DP_C[i, j] = P[j - 1] * DP_C[i - 1, j - 1] + (1.0 - P[j - 1]) * DP_C[i, j - 1]
DP_S = np.zeros((2 * n + 1,))
DP_SNone = np.zeros((2 * n + 1,))
for i in range(1, 2 * n + 1):
DP_S[i] = 1. / (1. * i)
DP_SNone[i] = 1. / (1. * i + 1)
for k in range(n + 1)[::-1]:
f1 = 0
f1None = 0
for k1 in range(n + 1):
f1 += 2 * k1 * DP_C[k1][k] * DP_S[k + k1]
f1None += 2 * k1 * DP_C[k1][k] * DP_SNone[k + k1]
for i in range(1, 2 * k - 1):
DP_S[i] = (1 - P[k - 1]) * DP_S[i] + P[k - 1] * DP_S[i + 1]
DP_SNone[i] = (1 - P[k - 1]) * DP_SNone[i] + P[k - 1] * DP_SNone[i + 1]
expectations.append([f1None + 2 * pNone / (2 + k), f1])
return np.array(expectations[::-1]).T
@staticmethod
def maximize_expectation(P, pNone=None):
expectations = F1Optimizer.get_expectations(P, pNone)
ix_max = np.unravel_index(expectations.argmax(), expectations.shape)
max_f1 = expectations[ix_max]
predNone = True if ix_max[0] == 0 else False
best_k = ix_max[1]
return best_k, predNone, max_f1
@staticmethod
def _F1(tp, fp, fn):
return 2 * tp / (2 * tp + fp + fn)
@staticmethod
def _Fbeta(tp, fp, fn, beta=1.0):
beta_squared = beta ** 2
return (1.0 + beta_squared) * tp / ((1.0 + beta_squared) * tp + fp + beta_squared * fn)
def get_best_prediction(items = None, preds = None, pNone=None, showThreshold = False):
items_preds = sorted(list(zip(items, preds)), key=itemgetter(1), reverse=True)
P = [p for i,p in items_preds]
L = [i for i,p in items_preds]
if pNone is None:
pNone = (1.0 - np.array(P)).prod()
opt = F1Optimizer.maximize_expectation(P, pNone)
best_prediction = ['None'] if opt[1] else []
best_prediction += (L[:opt[0]])
if showThreshold:
print("Threshold : P(X) > {:.4f}".format(P[:opt[0]][-1]))
print("Maximum F1 : {:.4f}".format(opt[2]))
return ' '.join(list(map(str,best_prediction)))