Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev RELEASE: v0.15.5 #66

Merged
merged 11 commits into from
Mar 20, 2024
5 changes: 4 additions & 1 deletion src/offat/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
load_dotenv()

app = FastAPI(
title='OFFAT - API'
title='OFFAT - API',
servers=[{
'url':'http://localhost:8000',
}],
)

auth_secret_key = environ.get(
Expand Down
51 changes: 37 additions & 14 deletions src/offat/parsers/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class OpenAPIv3Parser(BaseParser):
'''OpenAPI v3 Spec File Parser'''
# while adding new method to this class, make sure same method is present in SwaggerParser class


def __init__(self, file_or_url: str, spec: dict | None = None) -> None:
super().__init__(file_or_url=file_or_url, spec=spec) # noqa
if not self.is_v3:
Expand All @@ -25,6 +26,7 @@ def __init__(self, file_or_url: str, spec: dict | None = None) -> None:

self.request_response_params = self._get_request_response_params()


def _populate_hosts(self):
servers = self.specification.get('servers', [])
hosts = []
Expand All @@ -41,6 +43,7 @@ def _populate_hosts(self):
self.hosts = hosts
self.host = self.hosts[0]


def _get_scheme(self):
servers = self.specification.get('servers', [])
schemes = []
Expand All @@ -50,6 +53,21 @@ def _get_scheme(self):
scheme = 'https' if 'https' in schemes else 'http'
return scheme


def _fetch_schema_from_spec(self, param_schema_ref:str) -> dict:
schema_spec_path = param_schema_ref.split('/')[1:]

if len(schema_spec_path) > 3:
logger.error('Schema spec $ref path should not be greater than 3 (excluding #)')
return {}

schema_data:dict = self.specification
for child_ele in schema_spec_path:
schema_data:dict = schema_data.get(child_ele, {})

return schema_data


def _get_param_definition_schema(self, param: dict):
'''Returns Model defined schema for the passed param'''
param_schema = param.get('schema')
Expand All @@ -58,9 +76,7 @@ def _get_param_definition_schema(self, param: dict):
if param_schema:
param_schema_ref = param_schema.get('$ref')
if param_schema_ref:
model_slug = param_schema_ref.split('/')[-1]
param_schema = self.specification.get(
'components', {}).get('schemas', {}).get(model_slug, {}) # model schema
param_schema = self._fetch_schema_from_spec(param_schema_ref)

return param_schema

Expand All @@ -75,20 +91,27 @@ def _get_response_definition_schema(self, responses: dict):
'''
for status_code in responses.keys():
# below line could return: ["application/json", "application/xml"]
status_code_content_type_response = responses[status_code]['content'].keys()

for status_code_content_type in status_code_content_type_response:
status_code_content = responses[status_code]['content'][status_code_content_type].keys(
)
if 'parameters' in status_code_content:
# done
responses[status_code]['schema'] = responses[status_code]['content'][status_code_content_type]['parameters']
elif 'schema' in status_code_content:
responses[status_code]['schema'] = self._get_param_definition_schema(
responses[status_code]['content'][status_code_content_type])
content = responses[status_code].get('content', None)

if content:
status_code_content_type_responses = content.keys()
for status_code_content_type in status_code_content_type_responses:
status_code_content = responses[status_code]['content'][status_code_content_type].keys()
if 'parameters' in status_code_content:
responses[status_code]['schema'] = responses[status_code]['content'][status_code_content_type]['parameters']
elif 'schema' in status_code_content:
responses[status_code]['schema'] = self._get_param_definition_schema(
responses[status_code]['content'][status_code_content_type])

else:
# Fetch $ref schema directly
ref = responses[status_code].get('$ref', None)
if ref:
responses[status_code]['schema'] = self._fetch_schema_from_spec(ref)

return responses


def _get_request_response_params(self):
'''Returns Schema of requests and response params

Expand Down
2 changes: 1 addition & 1 deletion src/offat/parsers/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class InvalidSpecVersion(Exception):
class BaseParser:
def __init__(self, file_or_url: str, spec: dict = None) -> None:
if spec:
self.specification = spec
self.specification:dict = spec
base_uri = ""
else:
self.specification, base_uri = read_from_filename(file_or_url)
Expand Down
71 changes: 51 additions & 20 deletions src/offat/report/generator.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
"""
Report Generator utils
"""
from copy import deepcopy
from html import escape
from json import dumps as json_dumps
from offat.report import templates
from os.path import dirname, join as path_join
from os import makedirs
from rich.table import Table
from yaml import dump as yaml_dump

from .templates.table import TestResultTable
from ..logger import logger, console
from ..report import templates


class ReportGenerator:
"""can generate report in json,yaml,table and html formats"""

@staticmethod
def generate_html_report(results: list[dict]):
"""generates html report from OFFAT results"""
html_report_template_file_name = 'report.html'
html_report_file_path = path_join(
dirname(templates.__file__), html_report_template_file_name)
dirname(templates.__file__), html_report_template_file_name
)

with open(html_report_file_path, 'r') as f:
with open(html_report_file_path, 'r', encoding='utf-8') as f:
report_file_content = f.read()

# TODO: validate report data to avoid HTML injection attacks.
Expand All @@ -27,7 +34,7 @@ def generate_html_report(results: list[dict]):

# HTML escape data
escaped_results = []
escape_keys = ["response_body"]
escape_keys = ['response_body']
for result_dict in results:
escaped_result_dict = {}
for key, value in result_dict.items():
Expand All @@ -40,64 +47,88 @@ def generate_html_report(results: list[dict]):
escaped_results.append(escaped_result_dict)

report_file_content = report_file_content.replace(
'{ results }', json_dumps(escaped_results))
'{ results }', json_dumps(escaped_results)
)

return report_file_content

@staticmethod
def handle_report_format(results: list[dict], report_format: str | None) -> str | Table:
def handle_report_format(
results: list[dict], report_format: str | None
) -> str | Table:
"""returns report in specified format"""
result = None

match report_format:
case 'html':
logger.warning('HTML output format displays only basic data.')
result = ReportGenerator.generate_html_report(results=results)
case 'yaml':
logger.warning('YAML output format needs to be sanitized before using it further.')
result = yaml_dump({
'results': results,
})
logger.warning(
'YAML output format needs to be sanitized before using it further.'
)
result = yaml_dump(
{
'results': results,
}
)
case 'json':
report_format = 'json'
result = json_dumps({
'results': results,
})
result = json_dumps(
{
'results': results,
}
)
case _: # default: CLI table
# TODO: filter failed requests first and then create new table for failed requests
report_format = 'table'
results_table = TestResultTable().generate_result_table(
deepcopy(results))
deepcopy(results)
)
result = results_table

logger.info('Generated %s format report.', report_format.upper())
return result

@staticmethod
def save_report(report_path: str | None, report_file_content: str | Table | None):
"""saves/prints report to console"""
if report_path != '/' and report_path:
dir_name = dirname(report_path)
if dir_name != '' and report_path:
makedirs(dir_name, exist_ok=True)

# print to cli if report path and file content as absent else write to file location.
if report_path and report_file_content and not isinstance(report_file_content, Table):
with open(report_path, 'w') as f:
if (
report_path
and report_file_content
and not isinstance(report_file_content, Table)
):
with open(report_path, 'w', encoding='utf-8') as f:
logger.info('Writing report to file: %s', report_path)
f.write(report_file_content)
else:
if isinstance(report_file_content, Table) and report_file_content.columns:
TestResultTable().print_table(report_file_content)
elif isinstance(report_file_content, Table) and not report_file_content.columns:
elif (
isinstance(report_file_content, Table)
and not report_file_content.columns
):
logger.warning('No Columns found in Table.')
else:
console.print(report_file_content)

@staticmethod
def generate_report(results: list[dict], report_format: str | None, report_path: str | None):
def generate_report(
results: list[dict], report_format: str | None, report_path: str | None
):
"""main function used to generate report"""
if report_path:
report_format = report_path.split('.')[-1]

formatted_results = ReportGenerator.handle_report_format(
results=results, report_format=report_format)
results=results, report_format=report_format
)
ReportGenerator.save_report(
report_path=report_path, report_file_content=formatted_results)
report_path=report_path, report_file_content=formatted_results
)
2 changes: 1 addition & 1 deletion src/offat/tester/tester_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def is_host_up(openapi_parser: SwaggerParser | OpenAPIv3Parser) -> bool:
case _:
proto = http_client.HTTPConnection

logger.info("Checking whether host %s:%d is available", host, port)
logger.info("Checking whether host %s:%s is available", host, port)
try:
conn = proto(host=host, port=port, timeout=5)
conn.request("GET", "/")
Expand Down
2 changes: 1 addition & 1 deletion src/offat/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,6 @@ def is_valid_url(url: str) -> bool:
Any exception occurred during operation
'''
url_regex = re_compile(
r'https?://(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)'
r'https?:\/\/[a-z.-]+(:\d+)?.*'
)
return bool(match(url_regex, url))
28 changes: 14 additions & 14 deletions src/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "offat"
version = "0.15.4"
version = "0.15.5"
description = "Offensive API tester tool automates checks for common API vulnerabilities"
authors = ["Dhrumil Mistry <dhrumil.mistry@owasp.org>"]
license = "MIT"
Expand Down
Loading