This repository has been archived by the owner on Apr 4, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
/
yaraexporter.py
executable file
·415 lines (341 loc) · 17.4 KB
/
yaraexporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
#!/usr/bin/env python3
"""
Yaraexporter
------------
This little program exports attributes (regkey, regkey|value, pattern-in-file and mutex) from misp events and creates
yara files that are usable with `Thor <https://www.bsk-consulting.de/apt-scanner-thor/>`_. Only attributes that are not
proposed to delete and marked as 'export to ics' are exported.
This script can be used with different parameters from the cli:
.. code-block:: none
usage: yaraexporter.py [-h] -u URL [-k] [-s SSL] -a ATTRIBUTE [-f FILE] [-c]
[-d] [-i IGNORE]
Connects to an MISP instance and exports yara rules based on givenattribute
types.
optional arguments:
-h, --help show this help message and exit
-u URL, --url URL The url of the MISP instance.
-k, --key Prompts for the API key. If not given read MISP_KEY
from env.
-s SSL, --ssl SSL Path to certificate file for validation, if not
globally trusted.
-a ATTRIBUTE, --attribute ATTRIBUTE
Which attribute to export. (Currently supported:
regkey, pattern-in-file, mutex)
-f FILE, --file FILE Path to output file for the yara rules. If not given,
rules are printed to stdout.
-c, --compile Compile the rules and place *.yas next to FILE.
-d, --debug Turn on debug mode.
-i IGNORE, --ignore IGNORE
Comma separated list of events to ignore.
Thanks for using! CERT-Bund, 2017
The preferred way to call this script should be:
.. code-block:: none
MISP_KEY=Thisisyourmispapikey12345 ./yaraexporter.py [PARAMS]
But you're able to call it with the -k param and enter the api key in the cli:
.. code-block:: none
./yaraexporter.py -k -u https://your.misp-instance.com -a mutex
Enter API Key for https://your.misp-instance.com:
Attribute types
---------------
Regkey and Regkey|value
^^^^^^^^^^^^^^^^^^^^^^^
For the registry yara rules, all attributes containig registry relevant values are exported. While doing so, the hive
part (e.g. HKEY_LOCAL_MACHINE etc.) are cutted, because Thor loads them seperately. Also, the rules containing
'registry' in the name. Apart from that, the rules are similar to the normal pattern-in-file rules.
Mutex
^^^^^
Mutex rules include the parameter 'limit = "Mutex"' in the meta part of the rule to select them for mutex enumeration.
Pattern-in-file
^^^^^^^^^^^^^^^
Nothing special here. Remember not to use too generic values in misp.
Class functions
---------------
"""
import argparse
import getpass
import io
import os
import re
import progressbar
import pymisp
import yara
try:
from typing import Union
HAS_TYPING = True
except ImportError:
# No support for typing (https://docs.python.org/3/library/typing.html)
HAS_TYPING = False
_AVAILABLETYPES = ['regkey', 'regkey|value', 'pattern-in-file', 'mutex']
class YaraexporterError(Exception):
"""Parent class for Exceptions.
:param message: Error message"""
def __init__(self, message: str):
self.message = message
class NoApiKeyError(YaraexporterError):
"""This is raised if no API key was given.
:param message: Error message"""
def __init__(self, message: str):
YaraexporterError.__init__(self, message)
class AttributeNotSupportedError(YaraexporterError):
"""This is raised if an unsupported attribute type is queried.
:param message: Error message"""
def __init__(self, message: str):
YaraexporterError.__init__(self, message)
class Yaraexporter:
"""Creates a pymisp instance to fetch given attributes and create yara rules from it. This can also be imported to
other modules.
:param url: Url to MISP instance.
:param key: API key
:param ssl: True for validation, False to skip or path to self-signed cert.
:param debug: If set to true, it can be used for locating errors.
:param ignore: Comma separated list of MISP eventIds to ignore."""
if HAS_TYPING:
def __init__(self, url: str, key: str, ssl: Union[bool, str]=True, debug: bool=False,
ignore: Union[str, None]=None):
if ssl and not os.path.isfile(ssl):
ssl = True
self.debug = debug
self._debug('Connecting to {}.'.format(url))
self.misp = pymisp.PyMISP(url=url, key=key, ssl=ssl)
if ignore:
self.ignore = ignore.split(',')
else:
self.ignore = None
else:
def __init__(self, url: str, key: str, ssl=True, debug: bool=False, ignore=None):
# Python <= 3.5 doesn support typing.Union. This method will be removed
# when we stop supporting python 3.4
if ssl and not os.path.isfile(ssl):
ssl = True
self.debug = debug
self._debug('Connecting to {}.'.format(url))
self.misp = pymisp.PyMISP(url=url, key=key, ssl=ssl)
if ignore:
self.ignore = ignore.split(',')
else:
self.ignore = None
def __enter__(self):
"""Needed for 'with'"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Needed for 'with'"""
os.environ['MISP_KEY'] = 'removed.'
def _debug(self, string: str) -> None:
"""For debugging."""
if self.debug:
print('[DEBUG] {}'.format(string))
def _debug_and_stop(self, string: str) -> None:
"""For debugging."""
self._debug(string)
self.debug = False
def _search_for_type(self, type_attribute: str) -> list:
"""The actual request to misp. Skip attributes which are proposed for deletion.
:param type_attribute: MISP event attribute type to search for
:returns: List of values per event matching type_attribute"""
results = self.misp.search(type_attribute=type_attribute, deleted=False)
attribute_values = []
self._debug('Processing MISP results...')
bar = progressbar.ProgressBar(max_value=progressbar.UnknownLength)
for idx, events in enumerate(results.get('response', None)):
bar.update(idx)
event_info = events.get('Event').get('info')
event_id = events.get('Event').get('id')
if self.ignore and event_id in self.ignore:
continue
attribute_values.append({'info': event_info,
'id': event_id,
'values': []})
for values in events.get('Event').get('Attribute'):
# Skip attributed which are proposed to delete
shadow_attribute = values.get('ShadowAttribute', None)
if len(shadow_attribute) > 0 and shadow_attribute[0].get('proposal_to_delete', False):
continue
# Skip attributes which are not marked for ids export
if not values.get('to_ids', None):
continue
if type_attribute in values.get('type'):
attribute_values[idx]['values'].append(values.get('value'))
return attribute_values
def _create_regkey_rule(self, searchresults: list) -> str:
"""Create regkey rules. Delete symbols, that are not allowed for yara rules and format it according to thor
manual.
:param searchresults: Results from MISP search (pymisp.PyMisp().search()
:returns: yara rules as a string"""
rules = ''
bar = progressbar.ProgressBar(max_value=len(searchresults))
for p, event in enumerate(searchresults):
eventinfo = re.sub(r'[^\x30-\x7a]', r'', event.get('info')).replace(' ', '_').replace(':', '') \
.replace('[', '').replace('\\', '').replace(']', '').replace('^', '').replace('@', '').replace('?', '') \
.replace('>', '').replace('<', '')
rulename = 'Registry_MISPID_{}_{}'.format(event.get('id'), eventinfo)[0:127]
rule = 'rule {} {{\n\tmeta:\n\t\t' \
'description = "Created with yaraexporter, CERT-Bund 2017."\n\t\t' \
'author = "Nils Kuhnert"\n\t\t' \
'score = 70\n\t\t' \
'reference = {}\n\t' \
'strings:\n'.format(rulename, event.get('id'))
strings = ''
for idx, value in enumerate(event.get('values')):
# Remove hive path and apply Thor formatting
value = re.sub(r'^(HKEY_LOCAL_MACHINE\\|HKEY_CURRENT_USER\\|'
r'HKCC\\|HKCR\\|HKLM\\|HKCU\\|\\)(\.|)[A-Za-z0-9\-* ]*\\*', '', value)
# Remove non ascii chars (illegal characters everywhere...)
value = re.sub(r'[^\x20-\x7a|]', '', value)
# Split given value for | in case it is regkey|value
split_regkey_value = value.split('|')
# Split the regkey-path in order to allow correct Thor formatting
split_regkey_path = split_regkey_value[0].split('\\')
last = len(split_regkey_path) - 2
regstring = ''
# For every given string in the path, check if there must be a backslash or semicolon to split
for idy, path in enumerate(split_regkey_path):
path = path.replace('\\', '\\\\').replace('"', '\\"')
if idy < last:
regstring += '{}\\\\'.format(path)
elif idy <= last:
regstring += '{};'.format(path)
else:
regstring += path
if len(split_regkey_value) > 1:
regstring += ';{}'.format(split_regkey_value[1].replace('\\', '\\\\').replace('"', '\\"'))
if len(regstring) > 0 and ('\\\\' in regstring or ';' in regstring):
strings += '\t\t$a{} = "{}"\n'.format(idx, regstring)
rule += strings + '\tcondition:\n\t\t1 of them\n}\n\n'
# Skip adding rule to ruleset, if strings is empty (yara error otherwise)
if 'strings:\n\tcondition:' in rule:
continue
# Add rule to ruleset
rules += rule
# Update progressbar
bar.update(p + 1, True)
return rules
def _create_pattern_rule(self, searchresults: list) -> str:
"""Create simple pattern matching yara rule
:param searchresults: Results from MISP search (pymisp.PyMisp().search())
:returns: yara rules as a string"""
rules = ''
bar = progressbar.ProgressBar(max_value=len(searchresults))
for p, event in enumerate(searchresults):
eventinfo = re.sub(r'[^\x30-\x7a]', r'', event.get('info')).replace(' ', '_').replace(':', '') \
.replace('[', '').replace('\\', '').replace(']', '').replace('^', '').replace('@', '').replace('?', '') \
.replace('>', '').replace('<', '')
rulename = 'Pattern_MISP_{}_{}'.format(event.get('id'), eventinfo)[0:127]
rule = 'rule {} {{\n\tmeta:\n\t\t' \
'description = "Created with yaraexporter, CERT-Bund 2017."\n\t\t' \
'author = "Nils Kuhnert"\n\t\t' \
'score = 70\n\t\t' \
'reference = {}\n\t' \
'strings:\n'.format(rulename, event.get('id'))
strings = ''
for idx, value in enumerate(event.get('values')):
value = value.replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n')
strings += '\t\t$a{} = "{}"\n'.format(idx, value)
rule += strings + '\tcondition:\n\t\t1 of them\n}\n\n'
if 'strings:\n\tcondition:' in rule:
continue
rules += rule
bar.update(p + 1, True)
return rules
def _create_mutex_rule(self, searchresults: list) -> str:
"""Create mutex rule. Double the strings for ascii and wide search.
:param searchresults: Results from MISP search (pymisp.PyMisp().search())
:returns: yara rules as a string"""
rules = ''
bar = progressbar.ProgressBar(max_value=len(searchresults))
for p, event in enumerate(searchresults):
eventinfo = re.sub(r'[^\x30-\x7a]', r'', event.get('info')).replace(' ', '_').replace(':', '') \
.replace('[', '').replace('\\', '').replace(']', '').replace('^', '').replace('@', '').replace('?', '') \
.replace('>', '').replace('<', '')
rulename = 'Mutex_MISP_{}_{}'.format(event.get('id'), eventinfo)[0:127]
rule = 'rule {} {{\n\tmeta:\n\t\t' \
'description = "Created with yaraexporter, CERT-Bund 2017."\n\t\t' \
'author = "Yaraexporter, Nils Kuhnert"\n\t\t' \
'score = 70\n\t\t' \
'reference = {}\n\t\t' \
'limit = "Mutex"\n\t' \
'strings:\n'.format(rulename, event.get('id'))
strings = ''
for idx, value in enumerate(event.get('values')):
value = re.sub(r'^.*\\', r'', value)
value = value.replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n')
strings += '\t\t$a{} = "{}" ascii\n'.format(idx, value)
strings += '\t\t$aw{} = "{}" wide\n'.format(idx, value)
rule += strings + '\tcondition:\n\t\t1 of them\n}\n\n'
if 'strings:\n\tcondition:' in rule:
continue
rules += rule
bar.update(p + 1, True)
return rules
def get_rules_for_type(self, type_attribute: str) -> str:
"""Sends the request to misp using pymisp api and call the specific rule creation function"""
rule = ''
if type_attribute in _AVAILABLETYPES:
self._debug('Downloading attributes...')
searchresult = self._search_for_type(type_attribute=type_attribute)
self._debug('Start processing search results...')
if type_attribute == 'regkey' or type_attribute == 'regkey|value':
rule = self._create_regkey_rule(searchresults=searchresult)
elif type_attribute == 'pattern-in-file':
rule = self._create_pattern_rule(searchresults=searchresult)
elif type_attribute == 'mutex':
rule = self._create_mutex_rule(searchresults=searchresult)
else:
raise AttributeNotSupportedError('Attribute {} not supported, yet.'.format(type_attribute))
return rule
def run() -> None:
"""This is just the basic runner which parses args and delegates to functions"""
parser = argparse.ArgumentParser(description='Connects to an MISP instance and exports yara rules based on given'
'attribute types.',
epilog='Thanks for using!\nCERT-Bund, 2017')
parser.add_argument('-u', '--url', type=str, required=True, help='The url of the MISP instance.')
parser.add_argument('-k', '--key', dest='prompt', action='store_true',
help='Prompts for the API key. If not given read MISP_KEY from env.')
parser.add_argument('-s', '--ssl', type=str,
help='Path to certificate file for validation, if not globally trusted.')
parser.add_argument('-a', '--attribute', type=str, required=True,
help='Which attribute to export. (Currently supported: regkey, pattern-in-file, mutex)')
parser.add_argument('-f', '--file', type=str,
help='Path to output file for the yara rules. If not given, rules are printed to stdout.')
parser.add_argument('-c', '--compile', dest='compileyara', action='store_true',
help='Compile the rules and place *.yas next to FILE.')
parser.add_argument('-d', '--debug', dest='debug', action='store_true',
help='Turn on debug mode.')
parser.add_argument('-i', '--ignore', type=str, help='Comma separated list of events to ignore.')
parser.set_defaults(prompt=False, compileyara=False, debug=False)
args = parser.parse_args()
# Prompt for api key or get it from env
if args.prompt:
key = getpass.getpass(prompt='Enter API Key for {0}: '.format(args.url))
else:
key = os.environ.get('MISP_KEY')
if not key:
raise NoApiKeyError('No API key given. Can not connect to MISP this way.')
# Check if certificate is a file, or set validation to true
if args.ssl and os.path.isfile(args.ssl):
ssl = args.ssl
else:
ssl = True
# Create misp session and do things
with Yaraexporter(url=args.url, key=key, ssl=ssl, debug=args.debug) as con:
if args.debug:
print('[DEBUG] Connected to {}.'.format(args.url))
ruleset = con.get_rules_for_type(type_attribute=args.attribute)
if args.file:
if args.debug:
print('[DEBUG] Writing rules to file {}.'.format(args.file))
with io.open(args.file, mode='w') as file:
file.write(ruleset)
else:
print(ruleset)
# Compiling the rule
if args.compileyara and args.file:
cfile = args.file.split('.')[0]
if args.debug:
print('[DEBUG] Compiling rules to {}.'.format(cfile))
rules = yara.compile(args.file)
rules.save('{}.yas'.format(cfile))
if __name__ == '__main__':
try:
run()
except YaraexporterError as e:
print(e.message)