-
Notifications
You must be signed in to change notification settings - Fork 9
/
evaluation.py
208 lines (173 loc) · 8.77 KB
/
evaluation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""
Evaluation module for point-wise and interval-wise prediction algorithms in terms of mean average precision.
"""
import numpy as np
def label_anomaly_windows(labels):
"""
Converts the point wise anomaly labels to windows for intersection over union calculation.
:param label (type: 1D array): true labels 1/0 for each point
:return (type: list): set of intervals of anomaly (start time, end time)
"""
labeled_anomaly_window = []
#if the anomaly window starts from the beginning
start = 0
for i in range(1, labels.shape[0]):
if labels[i] == 1 and labels[i-1] == 0:
start = i
elif labels[i-1] == 1 and labels[i] == 0:
end = i-1
labeled_anomaly_window.append((start, end))
elif i == len(labels)-1 and labels[i] == 1:
#if the anomaly window extends till the end
labeled_anomaly_window.append((start, i))
return labeled_anomaly_window
def calculate_IOU(anomalies, label_window):
"""
Calculates Intersection over Union (IoU) for anomalous windows detected by the algorithm. Since a single window can contain multiple
true anomalous windows, anomaly_region keeps track of how many true windows each detected window intersects. This information is
required in the calculation of recall.
:param anomalies (type: list): anomaly windows generated by the algorithm, each item is a tuple with start and end index
:param label_window (type: list): true anomalous windows on the data, each item is a tuple with start and end index
:return iou (type: list): intersection over union if the anomaly windows overlap with actual labels else 0
:return anomaly_region (type: list): list of labels associated with each anomaly window since there could be multiple
"""
iou = []
anomaly_region = []
#IoU calculation for each anomaly window intersection with all other true windows
for i in range(len(anomalies)):
iou_i = 0
region = []
start = anomalies[i][0]
end = anomalies[i][1]
for j in range(len(label_window)):
if start <= label_window[j][0] and end >= label_window[j][0]:
overlap = 1 + min(label_window[j][1], end) - label_window[j][0]
union = 1 + max(label_window[j][1], end)- start
iou_i += (float(overlap)/union)
region.append(j)
elif start >= label_window[j][0] and end <= label_window[j][1]:
overlap = 1 + (end - start)
union = 1 + (label_window[j][1]- label_window[j][0])
iou_i += (float(overlap)/union)
region.append(j)
elif start <= label_window[j][1] and end >= label_window[j][1]:
overlap = 1 + (label_window[j][1] - start)
union = 1 + (end - label_window[j][0])
iou_i += (float(overlap)/union)
region.append(j)
anomaly_region.append(region)
iou.append(iou_i)
return iou, anomaly_region
def average_precision(y_true, y_scores, iou_threshold):
"""
Calculates average precision which summarises the shape of the precision/recall curve, and is defined as the mean precision at a set of
eleven equally spaced recall levels. The precision at each recall level r is interpolated by taking the maximum precision measured for
corresponding recalls exceeding r.
Reference: http://homepages.inf.ed.ac.uk/ckiw/postscript/ijcv_voc09.pdf
:param y_true: true labels 1/0 for each point
:param y_scores: anomaly scores output for each point by the algorithm
:param iou_threshold: if intersection over union value is greater than this threshold it is counted as true positive
:return: average precision
"""
precision = []
recall = []
sorted_scores = sorted(y_scores, reverse=True)
for score in sorted_scores:
#classifying true positive if the score is greater than a threshold and varying the threshold from maximum to minimum anomaly score
y = np.where(np.array(y_scores)>=score, 1, 0)
#change to consider if the considered anomaly is close (to be done later)
label_windows = label_anomaly_windows(y)
true_windows = label_anomaly_windows(y_true)
iou, labels_in_each_window = calculate_IOU(label_windows, true_windows)
total_labels = []
tp = 0
fp = 0
for i in range(len(iou)):
if iou[i] >= iou_threshold:
tp += 1
for label in labels_in_each_window[i]:
if label not in total_labels:
total_labels.append(label)
fp = len(iou) - tp
precision.append(float(tp)/ (tp + fp))
recall.append(len(total_labels)/ len(true_windows))
#Recall values for calculating average precision
recall_interp = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
recall_interp = recall_interp[::-1]
recall = recall[::-1]
precision = precision[::-1]
precision_interp = []
#look above in the function description for more information
for val in recall_interp:
k = 0
p = 0
while k < len(recall) and recall[k] >= val :
p = precision[k] if precision[k] >= p else p
k += 1
precision_interp.append(p)
return np.mean(precision_interp)
def map_pointwise_predictions(y_true, y_scores):
"""
Computes mean average precision for point wise predictions.
Mean Average Precision score is calculated by taking the mean of average precision over all IoU (intersection over union) thresholds.
Averaging over multiple IoU thresholds rather than only considering one generous threshold of IoU tends to reward models that are
better at precise localization.
:param y_true: true labels 1/0 for each point
:param y_scores: anomaly scores output for each point by the algorithm
:return: mean average precision
"""
#intersection over union thresholds for mean average precision calculation
thresholds = [0.05, 0.10, 0.15, 0.20, 0.25]
mean = 0.0
for iou_threshold in thresholds:
mean += average_precision(y_true, y_scores, iou_threshold)
return mean/len(thresholds)
def map_intervalwise_predictions(labels, anomalies):
"""
Computes mean average precision for interval wise predictions.
Mean Average Precision score is calculated by taking the mean of average precision over all IoU (intersection over union) thresholds.
Averaging over multiple IoU thresholds rather than only considering one generous threshold of IoU tends to reward models that are
better at precise localization.
:param anomalies (type: list): list with interval (start, end) and anomaly score, [start, end, anomaly_score]
:param labels (type: list): input labels 1/0 for each point
:param lag (type: int): lag time
:return (type: float): mean average precision score over given intersection over union (IoU) thresholds
"""
#intersection over union thresholds for mean average precision calculation
anomalies.sort(key=lambda item: item[2], reverse=True) # sort the anomalies by their scores in descending order
labeled_data = label_anomaly_windows(labels)
iou, anomaly_region = calculate_IOU(anomalies, labeled_data)
thresholds = [0.05, 0.10, 0.15, 0.20, 0.25]
mean = 0
for iou_threshold in thresholds:
precision = []
recall = []
for i in range(1, len(iou)+1):
#selecting top i anomaly scores and predicting them as positive label, because the anomalies are ordered by their scores
iou_i = iou[:i]
region = []
tp, fp, fn = 0,0,0
for j in range(len(iou_i)):
if iou_i[j] > iou_threshold:
tp += 1
for window in anomaly_region[j]:
if window not in region:
region.append(window)
fp = len(iou_i) - tp
precision.append(float(tp)/ (tp + fp))
recall.append(len(region)/ len(labeled_data))
#Recall values for calculating average precision
recall_interp = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
recall_interp = recall_interp[::-1]
recall = recall[::-1]
precision = precision[::-1]
precision_interp = []
for val in recall_interp:
k = 0
p = 0
while k < len(recall) and recall[k] >= val :
p = precision[k] if precision[k] >= p else p
k += 1
precision_interp.append(p)
mean += np.mean(precision_interp)
return float(mean)/ len(thresholds)