-
Notifications
You must be signed in to change notification settings - Fork 71
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
12 changed files
with
424 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,3 +99,6 @@ ENV/ | |
|
||
# mypy | ||
.mypy_cache/ | ||
|
||
.idea | ||
kittylogs |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,57 @@ | ||
# APIFuzzer | ||
Fuzz your application using you Swagger or API Blueprint definition without coding | ||
# APIFuzzer — HTTP API Testing Framework | ||
|
||
APIFuzzer reads your API description and step by step fuzzes the fields to validate | ||
if you application can cope with the fuzzed parameters. Does not require coding. | ||
|
||
### Supported API Description Formats | ||
|
||
- [Swagger][] | ||
|
||
### Work in progress | ||
- [API Blueprint][] | ||
|
||
## Installation | ||
|
||
Fetch the most recent code from GitHub | ||
``` | ||
$ git clone https://github.com/KissPeter/APIFuzzer.git | ||
``` | ||
Install requirements. If you don't have pip installed, then sudo apt-get install python-pip -y | ||
``` | ||
$ pip2.7 install -r APIFuzzer/requirements.txt | ||
``` | ||
|
||
## Quick Start | ||
Check the help (some of them are not implemented yet): | ||
``` | ||
$ python2.7 fuzzer.py -h | ||
usage: fuzzer.py [-h] -s SRC_FILE [-r REPORT_DIR] [-l LEVEL] | ||
[-u ALTERNATE_URL] [-t TEST_RESULT_DST] | ||
API fuzzer configuration | ||
optional arguments: | ||
-h, --help show this help message and exit | ||
-s SRC_FILE, --src_file SRC_FILE | ||
API definition file path | ||
-r REPORT_DIR, --report_dir REPORT_DIR | ||
Directory where error reports will be saved, default: | ||
/tmp/ | ||
-l LEVEL, --level LEVEL | ||
Test deepness: [1,2], higher is the deeper !!!Not | ||
implemented!!! | ||
-u ALTERNATE_URL, --url ALTERNATE_URL | ||
Use CLI defined url instead compile the url from the API | ||
definition. Useful for testing | ||
-t TEST_RESULT_DST, --test_report TEST_RESULT_DST | ||
JUnit test result xml save path !!!Not implemented!!! | ||
``` | ||
Start fuzzing: | ||
|
||
``` | ||
$ python2.7 fuzzer.py -s your_swagger_definition.json -u http://localhost:8080/ | ||
``` | ||
|
||
[API Blueprint]: https://apiblueprint.org/ | ||
[Swagger]: http://swagger.io/ |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from __future__ import print_function | ||
from kitty.model import Static, Template, Container | ||
from utils import get_field_type_by_method | ||
|
||
|
||
class BaseTemplate(object): | ||
url = None | ||
method = None | ||
fuzz_params = None | ||
|
||
def compile_template(self): | ||
_url = Static(name='url', value=self.url.encode()) | ||
_method = Static(name='method', value=self.method.encode()) | ||
template = Template(name='{}_{}'.format(self.url.replace('/', '_'), self.method), fields=[_url, _method]) | ||
self.fuzz_place = get_field_type_by_method(self.method) | ||
template.append_fields([Container(name='{}'.format(self.fuzz_place), fields=self.fuzz_params)]) | ||
return template |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from kitty.model import RandomBits | ||
|
||
|
||
class RandomBitsField(RandomBits): | ||
"""Creates a fields which compatible field with String and Delimiter""" | ||
|
||
def not_implemented(self, func_name): | ||
pass | ||
|
||
def __init__(self, value, name, fuzzable=True): | ||
super(RandomBitsField, self).__init__(name=name, value=value, min_length=20, max_length=100, fuzzable=fuzzable, num_mutations=80) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
import logging | ||
from logging import handlers | ||
import json | ||
from time import time | ||
|
||
import requests | ||
from urllib import urlencode | ||
from kitty.data.report import Report | ||
from kitty.targets.server import ServerTarget | ||
from requests.exceptions import RequestException | ||
|
||
|
||
class FuzzerTarget(ServerTarget): | ||
def not_implemented(self, func_name): | ||
pass | ||
|
||
def __init__(self, name, base_url, report_dir, logger=None): | ||
super(FuzzerTarget, self).__init__(name, logger) | ||
self.base_url = base_url | ||
formtter = logging.Formatter(' [%(levelname)s] %(name)s: %(message)s') | ||
self.logger = logging.getLogger('HTTPFuzzer') | ||
handler = logging.handlers.SysLogHandler(address='/dev/log', | ||
facility=logging.handlers.SysLogHandler.LOG_LOCAL2) | ||
handler.setFormatter(formtter) | ||
self.logger.addHandler(handler) | ||
self.logger.setLevel(logging.WARNING) | ||
self._last_sent_request = None | ||
self.accepted_status_codes = list(range(200, 300)) + list(range(400, 500)) | ||
self.report_dir = report_dir | ||
|
||
def error_report(self, msg, req): | ||
if hasattr(req, 'request'): | ||
self.report.add('request method', req.request.method) | ||
self.report.add('request body', req.request.body) | ||
self.report.add('response', req.text) | ||
else: | ||
for k, v in req.items(): | ||
self.report.add(k, v) | ||
self.report.set_status(Report.ERROR) | ||
self.report.error(msg) | ||
|
||
def save_report_to_disc(self): | ||
try: | ||
with open('{}/{}_{}.json'.format(self.report_dir, self.test_number, time()), 'wb') as report_dump_file: | ||
report_dump_file.write(json.dumps(self.report.to_dict())) | ||
except Exception as e: | ||
self.logger.error( | ||
'Failed to save report "{}" to {} because: {}' | ||
.format(self.report.to_dict(), self.report_dir, e) | ||
) | ||
|
||
def transmit(self, **kwargs): | ||
try: | ||
_req_url = list() | ||
for url_part in self.base_url, kwargs['url']: | ||
_req_url.append(url_part.strip('/')) | ||
#kwargs['url'] = urlencode('/'.join(_req_url)) | ||
kwargs['url'] = '/'.join(_req_url) | ||
_return = requests.request(**kwargs) | ||
status_code = _return.status_code | ||
if status_code: | ||
if status_code not in self.accepted_status_codes: | ||
self.report.add('parsed status_code', status_code) | ||
self.report.add('request method', _return.request.method) | ||
self.report.add('request body', _return.request.body) | ||
self.report.add('response', _return.text) | ||
self.report.set_status(Report.FAILED) | ||
self.report.failed('return code {} is not in the expected list'.format(status_code)) | ||
else: | ||
self.error_report('Failed to parse http response code', _return) | ||
return _return | ||
except (RequestException, UnicodeDecodeError) as e: # request failure such as InvalidHeader | ||
self.error_report('Failed to parse http response code, exception: {}'.format(e), kwargs) | ||
pass | ||
|
||
def post_test(self, test_num): | ||
"""Called after a test is completed, perform cleanup etc.""" | ||
super(FuzzerTarget, self).post_test(test_num) | ||
if self.report.get('status') != Report.PASSED: | ||
self.save_report_to_disc() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
from __future__ import print_function | ||
import json | ||
from kitty.fuzzers import ServerFuzzer | ||
from kitty.model import Container | ||
from utils import get_field_type_by_method | ||
from kitty.data.report import Report | ||
|
||
|
||
def _flatten_dict_entry(orig_key, v): | ||
entries = [] | ||
if isinstance(v, list): | ||
for i in range(len(v)): | ||
entries.extend(_flatten_dict_entry('%s[%s]' % (orig_key, i), v[i])) | ||
elif isinstance(v, dict): | ||
for k in v: | ||
entries.extend(_flatten_dict_entry('%s/%s' % (orig_key, k), v[k])) | ||
else: | ||
entries.append((orig_key, v)) | ||
return entries | ||
|
||
|
||
class OpenApiServerFuzzer(ServerFuzzer): | ||
"""Extends the ServerFuzzer with exit after the end message.""" | ||
|
||
def not_implemented(self, func_name): | ||
pass | ||
|
||
def __init__(self, logger=None): | ||
super(OpenApiServerFuzzer, self).__init__(logger=logger) | ||
|
||
def _end_message(self): | ||
super(OpenApiServerFuzzer, self)._end_message() | ||
# Sometimes Kitty has stopped the fuzzer before it has finished the work. We can't continue, but can log | ||
self.logger.info('Stop fuzzing session_info: {}'.format(self.session_info.as_dict())) | ||
test_list_str_end = self.session_info.as_dict().get('test_list_str', '0-0').split('-', 1)[1].strip() | ||
if self.session_info.as_dict().get('end_index') != int(test_list_str_end): | ||
self.logger.error('Fuzzer want to exit before the end of the tests') | ||
self._exit_now(None, None) | ||
|
||
def _transmit(self, node): | ||
payload = {} | ||
for key in ('url', 'method'): | ||
payload[key] = node.get_field_by_name(key).render().tobytes() | ||
fuzz_place = get_field_type_by_method(node.get_field_by_name('method')._default_value) | ||
payload[fuzz_place] = self._recurse_params(node.get_field_by_name(fuzz_place)) | ||
self._last_payload = payload | ||
try: | ||
return self.target.transmit(**payload) | ||
except Exception as e: | ||
self.logger.error('Error in transmit: %s', e) | ||
raise | ||
|
||
@staticmethod | ||
def _recurse_params(param): | ||
_return = dict() | ||
if type(param) != Container: | ||
_return = param.render().tobytes() | ||
else: | ||
for field in param._fields: | ||
_return[field.get_name()] = OpenApiServerFuzzer._recurse_params(field) | ||
return _return | ||
|
||
def _store_report(self, report): | ||
self.logger.debug('<in>') | ||
report.add('test_number', self.model.current_index()) | ||
report.add('fuzz_path', self.model.get_sequence_str()) | ||
test_info = self.model.get_test_info() | ||
data_model_report = Report(name='Data Model') | ||
for k, v in test_info.items(): | ||
new_entries = _flatten_dict_entry(k, v) | ||
for (k_, v_) in new_entries: | ||
data_model_report.add(k_, v_) | ||
report.add(data_model_report.get_name(), data_model_report) | ||
payload = self._last_payload | ||
if payload is not None: | ||
data_report = Report('payload') | ||
data_report.add('raw', payload) | ||
data_report.add('hex', json.dumps(payload).encode('hex')) | ||
data_report.add('length', len(payload)) | ||
report.add('payload', data_report) | ||
else: | ||
report.add('payload', None) | ||
|
||
self.dataman.store_report(report, self.model.current_index()) | ||
self.dataman.get_report_by_id(self.model.current_index()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
from base_template import BaseTemplate | ||
from utils import get_sample_data_by_type, get_fuzz_type_by_param_type | ||
from template_generator_base import TemplateGenerator | ||
|
||
|
||
class SwaggerTemplateGenerator(TemplateGenerator): | ||
def __init__(self, api_resources): | ||
self.api_resources = api_resources | ||
self.templates = list() | ||
|
||
def process_api_resources(self): | ||
for resource in self.api_resources['paths'].keys(): | ||
print(resource) | ||
for method in self.api_resources['paths'][resource].keys(): | ||
print(method) | ||
template = BaseTemplate() | ||
template.url = resource | ||
template.method = method.upper() | ||
params = list() | ||
for param in self.api_resources['paths'][resource][method].get('parameters',[]): | ||
print(param['name']) | ||
fuzz_type = get_fuzz_type_by_param_type(param.get('type')) | ||
params.append( | ||
fuzz_type( | ||
name=param['name'], | ||
value=get_sample_data_by_type(param.get('type')) | ||
) | ||
) | ||
template.fuzz_params = params | ||
self.templates.append(template) | ||
|
||
def compile_base_url(self): | ||
if 'http' in self.api_resources['schemes']: | ||
_protocol = 'http' | ||
else: | ||
_protocol = self.api_resources['schemes'][0] | ||
_base_url = '{}://{}{}'.format( | ||
_protocol, | ||
self.api_resources['host'], | ||
self.api_resources['basePath'] | ||
) | ||
return _base_url |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
class TemplateGenerator(object): | ||
|
||
def process_api_resources(self): | ||
pass | ||
|
||
def compile_base_url(self): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
from kitty.model import * | ||
from custom_fuzzers import RandomBitsField | ||
|
||
|
||
def get_field_type_by_method(http_method): | ||
fields = { | ||
'GET': 'params', | ||
'POST': 'data', | ||
'PUT': 'data' | ||
} | ||
return fields.get(http_method, 'data') | ||
|
||
|
||
def get_fuzz_type_by_param_type(fuzz_type): | ||
# TODO we should have a shallow and a deep scan. This could be the difference | ||
return RandomBitsField | ||
|
||
|
||
def get_sample_data_by_type(param_type): | ||
types = { | ||
'name': 012, | ||
'string': 'asd' | ||
} | ||
return types.get(param_type, 'asd') |
Oops, something went wrong.