-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenerate_csv_data.py
163 lines (132 loc) · 5.43 KB
/
generate_csv_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import apktool.extractor as extractor
import generator.generator as generator
import vtapi.vtapi as vtapi
import sqlite3
import hashlib
import os
import multiprocessing
import tqdm
def app_processing_task(app_data):
out_path = app_data["out_path"]
app_path = app_data["app_path"]
vt_data = app_data["vt_data"]
sha256 = app_data["sha256"]
csv_file_path = app_data["csv_file_path"]
# Extract the apk file (Only needed to run once per app)
if not os.path.exists(out_path):
extractor.extract_app(app_path, out_path)
if os.path.exists(csv_file_path):
return # Dont run for existing apps
# Load the app manifest and apktool file
manifest = extractor.load_manifest(out_path)
apktoolfile = extractor.load_apktoolfile(out_path)
# Search for permissions
app_permissions = extractor.get_permissions(manifest)
# Get the nuber of activities defined
number_of_activities = extractor.get_number_of_activities(manifest)
# Search for ipv4 addresses
ipv4s = extractor.search_ip_in_files(out_path)
# Search for domains
domains = extractor.search_domains_in_files(out_path)
# Get number of files
num_files = extractor.count_files(out_path)
# Get cert country code
country_dial = extractor.get_cert_country(out_path)
# Get sdk version minimum and target
sdk_version_minimum, sdk_version_target = extractor.get_sdk_versions(apktoolfile)
# Get file size in bytes
size_bytes = os.path.getsize(app_path)
# TODO: If there's no data here for a malware app, we should probably throw an exception
top_label = "clean"
if vt_data is not None:
top_label = vtapi.get_labels(vt_data)["top"]
# Write a line to a csv file
generator.add_csv_line(csv_file_path,
app_permissions,
number_of_activities,
len(ipv4s),
len(domains),
num_files,
country_dial,
sdk_version_minimum,
sdk_version_target,
size_bytes,
sha256,
top_label,
True)
def process_apps(app_dir, app_extracted_dir, json_dir, json_combined_filename, csvs_dir, combined_csv_file_path):
# #############################################################
# AVClass
# Generate a AVClass2 compatible file with json data
# from all apps to be analyzed.
print("Running avclass...")
vtapi.combine_vt_files(json_dir, json_combined_filename)
# Run avclass2 on VT json data
av_labels = vtapi.run_avclass2(json_combined_filename)
# #############################################################
# Preprocessing
print()
print("Preprocessing app data for labeling...")
# This array will contain app data from apktool and VT, for use in the multiprocessing part of the code
app_data_array = []
for package_name in tqdm.tqdm(os.listdir(app_dir)):
# We need to get the vt data to pack with each app before going multi-process
# This is via the sha256 hash through vtapi.get_app_by_sha256
app = package_name
app_path = app_dir + app # Where to find the apk
out_path = app_extracted_dir + app # Where to put the extracted folder
# Get file sha256 hash
sha256_hash = hashlib.sha256()
with open(app_path,"rb") as f:
# Read and update hash string value in blocks of 4K
for byte_block in iter(lambda: f.read(4096),b""):
sha256_hash.update(byte_block)
sha256_string = str(sha256_hash.hexdigest())
# Get labels from VT
vt_data = vtapi.get_app_by_sha256(sha256_string, json_dir, av_labels)
app_data_array.append({
"sha256": sha256_string,
"app_path": app_path,
"out_path": out_path,
"vt_data": vt_data,
"csv_file_path": csvs_dir + sha256_string + ".csv"})
# #############################################################
# Create app csvs with all data
print()
print("Processing apps....")
if not os.path.exists(csvs_dir):
os.makedirs(csvs_dir)
with multiprocessing.Pool() as pool_label:
for _ in tqdm.tqdm(pool_label.imap_unordered(app_processing_task, app_data_array), total=len(app_data_array)):
pass
# #############################################################
# Combine csvs
print()
print("Combining csvs....")
with open(combined_csv_file_path, "w", newline="") as full_file:
# Create header
generator.create_csv_header(
generator.create_csv_writer(full_file))
# Add all files into the one
for csv_filename in tqdm.tqdm(os.listdir(csvs_dir)):
with open(csvs_dir + csv_filename) as part_file:
if os.stat(csvs_dir + csv_filename).st_size == 0:
print("Warn: empty file - " + csv_filename)
for part_line in part_file:
full_file.write(part_line)
print("Done!")
# TODO: This could probably just have "clean" and "malware" as parameters...
process_apps(
'app_data/malware_apps/',
'app_data/malware_apps_extracted/',
'app_data/malware_jsons/',
'app_data/vt_combined_malware.json',
'app_data/malware_csvs/',
'app_data/apps_complete_malware.csv')
process_apps(
'app_data/clean_apps/',
'app_data/clean_apps_extracted/',
'app_data/clean_jsons/',
'app_data/vt_combined_clean.json',
'app_data/clean_csvs/',
'app_data/apps_complete_clean.csv')