-
Notifications
You must be signed in to change notification settings - Fork 7
/
defense_isoforest.py
112 lines (83 loc) · 3.42 KB
/
defense_isoforest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""
Copyright (c) 2021, FireEye, Inc.
Copyright (c) 2021 Giorgio Severi
In order to run any mitigation experiment, first run the desired attack for 1 iteration setting the save
parameter of the configuration file to a valid path in the system, and "defense": true.
The attack script will save there a set of artifacts such as the watermarked training and test sets,
and the backdoor trigger details.
"""
import os
import time
import numpy as np
from mw_backdoor import common_utils
from sklearn.ensemble import IsolationForest
def isolation_forest_analysis(xtrain, is_clean):
# Train the Isolation Forest
starttime = time.time()
isof = IsolationForest(max_samples='auto', contamination='auto', random_state=42, n_jobs=-1)
isof_pred = isof.fit_predict(xtrain)
print('Training the Isolation Forest took {:.2f} seconds'.format(time.time() - starttime))
starttime = time.time()
suspect = 0
poison_found = 0
false_positives_poison = 0
for i in range(len(isof_pred)):
if isof_pred[i] == -1:
suspect += 1
if is_clean[i] == 0 and isof_pred[i] == -1:
poison_found += 1
elif isof_pred[i] == -1 and is_clean[i] == 1:
false_positives_poison += 1
print(
'Results:'
'\n- {} suspect data points;'
'\n- {} correctly identified poisoned points;'
'\n- {} false positives;'.format(
suspect,
poison_found,
false_positives_poison
)
)
print('Evaluation took {:.2f} seconds'.format(time.time() - starttime))
return isof_pred, suspect, poison_found, false_positives_poison, isof
def isoforest_def():
# ## Defense parameters
# Set these parameters according to the specific attack for which you
# would like to test the isolation forest.
# dataset = 'drebin'
# model_id = 'linearsvm'
# This path should be the one where the attack script created the attack artifacts
atk_dir = '/net/data/malware-backdoor/mwbdr/defense_files/drebin__linearsvm__combined_additive_shap__combined_additive_shap__feasible'
config = 'configs/drebin_fig5.json'
cfg = common_utils.read_config(config, atk_def=True)
print(cfg)
# Load attack data
watermarked_X = np.load(os.path.join(atk_dir, 'watermarked_X.npy'), allow_pickle=True).item()
# watermarked_X_test = np.load(os.path.join(atk_dir, 'watermarked_X_test.npy'), allow_pickle=True)
watermarked_y = np.load(os.path.join(atk_dir, 'watermarked_y.npy'), allow_pickle=True)
wm_config = np.load(os.path.join(atk_dir, 'wm_config.npy'), allow_pickle=True).item()
watermarked_X_wmgw = watermarked_X[-cfg['poison_size'][0]:]
print(watermarked_X_wmgw.shape)
watermarked_y_wmgw = watermarked_y[-cfg['poison_size'][0]:]
print(watermarked_y_wmgw.shape)
print(watermarked_y_wmgw.sum())
print(
'Variance of the watermarked features, should be all 0s:',
np.var(
watermarked_X_wmgw[:, wm_config['wm_feat_ids']].toarray(),
axis=0,
dtype=np.float64
)
)
# ## Analysis
is_clean = np.ones(watermarked_X.shape[0])
is_clean[-cfg['poison_size'][0]:] = 0
print(is_clean.shape)
print(is_clean.sum())
# noinspection PyUnusedLocal
isof_pred, suspect, poison_found, false_positives_poison, isof = isolation_forest_analysis(
xtrain=watermarked_X,
is_clean=is_clean
)
if __name__ == '__main__':
isoforest_def()