Skip to content

Commit

Permalink
Merge pull request #26 from OWASP/add-progress-bar
Browse files Browse the repository at this point in the history
add progress bars
  • Loading branch information
dmdhrumilmistry authored Nov 17, 2023
2 parents 7c4bfb9 + 465fabd commit b80874d
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 198 deletions.
3 changes: 1 addition & 2 deletions src/offat/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
from offat.api.config import app, task_queue, task_timeout, auth_secret_key
from offat.api.jobs import scan_api
from offat.api.models import CreateScanModel
from offat.logger import create_logger
from offat.logger import logger
from os import uname, environ


logger = create_logger(__name__)
logger.info(f'Secret Key: {auth_secret_key}')


Expand Down
5 changes: 1 addition & 4 deletions src/offat/api/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
from offat.api.models import CreateScanModel
from offat.tester.tester_utils import generate_and_run_tests
from offat.openapi import OpenAPIParser
from offat.logger import create_logger


logger = create_logger(__name__)
from offat.logger import logger


def scan_api(body_data: CreateScanModel):
Expand Down
41 changes: 20 additions & 21 deletions src/offat/config_data_handler.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,35 @@
from copy import deepcopy
from pprint import pprint
from .logger import create_logger
from .logger import logger


logger = create_logger(__name__)


def validate_config_file_data(test_config_data:dict):
def validate_config_file_data(test_config_data: dict):
if not isinstance(test_config_data, dict):
logger.warning('Invalid data format')
return False

if test_config_data.get('error', False):
logger.warning(f'Error Occurred While reading file: {test_config_data}')
logger.warning(
f'Error Occurred While reading file: {test_config_data}')
return False

if not test_config_data.get('actors', ):
logger.warning('actors are required')
return False
if not test_config_data.get('actors', [])[0].get('actor1',None):

if not test_config_data.get('actors', [])[0].get('actor1', None):
logger.warning('actor1 is required')
return False

logger.info('User provided data will be used for generating test cases')
return test_config_data


def populate_user_data(actor_data:dict, actor_name:str,tests:list[dict]):
def populate_user_data(actor_data: dict, actor_name: str, tests: list[dict]):
tests = deepcopy(tests)
headers = actor_data.get('request_headers',[])
body_params = actor_data.get('body',[])
query_params = actor_data.get('query',[])
path_params = actor_data.get('path',[])
headers = actor_data.get('request_headers', [])
body_params = actor_data.get('body', [])
query_params = actor_data.get('query', [])
path_params = actor_data.get('path', [])

# create HTTP request headers
request_headers = {}
Expand All @@ -44,10 +41,12 @@ def populate_user_data(actor_data:dict, actor_name:str,tests:list[dict]):
test['body_params'] += body_params
test['query_params'] += query_params
test['path_params'] += path_params
test['test_actor_name'] = actor_name # for post test processing tests such as broken authentication
if test.get('kwargs',{}).get('headers',{}).items():
test['kwargs']['headers'] = dict(test['kwargs']['headers'], **request_headers)
# for post test processing tests such as broken authentication
test['test_actor_name'] = actor_name
if test.get('kwargs', {}).get('headers', {}).items():
test['kwargs']['headers'] = dict(
test['kwargs']['headers'], **request_headers)
else:
test['kwargs']['headers'] = request_headers

return tests
return tests
53 changes: 13 additions & 40 deletions src/offat/logger.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,17 @@
from colorama import Fore, Style, init
import logging


init(autoreset=True)


class ColoredLogger(logging.Formatter):
grey = Fore.WHITE
yellow = Fore.YELLOW + Style.BRIGHT
red = Fore.RED
bold_red = Fore.RED + Style.BRIGHT
reset = "\x1b[0m"
format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)"
from rich.console import Console
from rich.logging import RichHandler

FORMATS = {
logging.DEBUG: grey + format,
logging.INFO: grey + format,
logging.WARNING: yellow + format,
logging.ERROR: red + format,
logging.CRITICAL: bold_red + format
}

def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)


def create_logger(logger_name:str, logging_level=logging.DEBUG):
# create logger
logger = logging.getLogger(logger_name)
logger.setLevel(logging_level)

# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
import logging

ch.setFormatter(ColoredLogger())

logger.addHandler(ch)
console = Console()

return logger

# create logger
logging.basicConfig(
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler(
console=console, rich_tracebacks=True, tracebacks_show_locals=True)],
)
logger = logging.getLogger("OWASP-OFFAT")
logger.setLevel(logging.DEBUG)
71 changes: 36 additions & 35 deletions src/offat/openapi.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,48 @@
from prance import ResolvingParser
from .logger import create_logger


logger = create_logger(__name__)
from .logger import logger


class OpenAPIParser:
''''''
def __init__(self, fpath_or_url:str, spec:dict=None) -> None:
self._parser = ResolvingParser(fpath_or_url, backend = 'openapi-spec-validator', spec_string=spec)

def __init__(self, fpath_or_url: str, spec: dict = None) -> None:
self._parser = ResolvingParser(
fpath_or_url, backend='openapi-spec-validator', spec_string=spec)

if self._parser.valid:
logger.info('Specification file is valid')
else:
logger.error('Specification file is invalid!')

self._spec = self._parser.specification

self.hosts = []
self._populate_hosts()
self.host = self.hosts[0]

self.http_scheme = 'https' if 'https' in self._spec.get('schemes',[]) else 'http'
self.http_scheme = 'https' if 'https' in self._spec.get(
'schemes', []) else 'http'
self.base_url = f"{self.http_scheme}://{self.host}{self._spec.get('basePath','')}"
self.request_response_params = self._get_request_response_params()

def _populate_hosts(self):
if self._spec.get('openapi'): # for openapi v3
servers = self._spec.get('servers',[])
if self._spec.get('openapi'): # for openapi v3
servers = self._spec.get('servers', [])
hosts = []
for server in servers:
host = server.get('url','').removeprefix('http://').removeprefix('http://').removesuffix('/')
host = server.get('url', '').removeprefix(
'http://').removeprefix('http://').removesuffix('/')
host = None if host == '' else host
hosts.append(host)
else:
host = self._spec.get('host') # for swagger files
host = self._spec.get('host') # for swagger files
if not host:
logger.error('Invalid Host: Host is missing')
raise ValueError('Host Not Found in spec file')
hosts = [host]

self.hosts = hosts


def _get_endpoints(self):
'''Returns list of endpoint paths along with HTTP methods allowed'''
endpoints = []
Expand All @@ -57,25 +57,25 @@ def _get_endpoints(self):

def _get_endpoint_details_for_fuzz_test(self):
return self._spec.get('paths')
def _get_param_definition_schema(self, param:dict):

def _get_param_definition_schema(self, param: dict):
'''Returns Model defined schema for the passed param'''
param_schema = param.get('schema')

# replace schema $ref with model params
if param_schema:
param_schema_ref = param_schema.get('$ref')

if param_schema_ref:
model_slug = param_schema_ref.split('/')[-1]
param_schema = self._spec.get('definitions',{}).get(model_slug)
param_schema = self._spec.get(
'definitions', {}).get(model_slug)

return param_schema


def _get_response_definition_schema(self, responses:dict):
def _get_response_definition_schema(self, responses: dict):
'''returns schema of API response
Args:
responses (dict): responses from path http method json data
Expand All @@ -87,13 +87,13 @@ def _get_response_definition_schema(self, responses:dict):
if 'parameters' in status_code_response:
responses[status_code]['schema'] = responses[status_code]['parameters']
elif 'schema' in status_code_response:
responses[status_code]['schema'] = self._get_param_definition_schema(responses[status_code])
else:
responses[status_code]['schema'] = self._get_param_definition_schema(
responses[status_code])
else:
continue

return responses


def _get_request_response_params(self):
'''Returns Schema of requests and response params
Expand All @@ -104,31 +104,32 @@ def _get_request_response_params(self):
list:
'''
requests = []
paths = self._spec.get('paths',{})
paths = self._spec.get('paths', {})

# extract endpoints and supported params
for path in paths.keys():
path_params = paths[path].get('parameters',[])
path_params = paths[path].get('parameters', [])

for http_method in paths.get(path,{}).keys():
for http_method in paths.get(path, {}).keys():
# consider only http methods
if http_method not in ['get', 'put', 'post', 'delete', 'options']:
continue


body_parameters = paths[path][http_method].get('parameters',[])
response_params = self._get_response_definition_schema(paths[path][http_method].get('responses',{}))
body_parameters = paths[path][http_method].get(
'parameters', [])
response_params = self._get_response_definition_schema(
paths[path][http_method].get('responses', {}))

# create list of parameters
for param in body_parameters:
param['schema'] = self._get_param_definition_schema(param)

requests.append({
'http_method':http_method,
'path':path,
'request_params':body_parameters,
'response_params':response_params,
'path_params':path_params,
'http_method': http_method,
'path': path,
'request_params': body_parameters,
'response_params': response_params,
'path_params': path_params,
})

return requests
5 changes: 1 addition & 4 deletions src/offat/report/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
from os import makedirs
from yaml import dump as yaml_dump

from ..logger import create_logger


logger = create_logger(__name__)
from ..logger import logger


class ReportGenerator:
Expand Down
8 changes: 4 additions & 4 deletions src/offat/report/table.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
from rich.console import Console
from rich.table import Table, Column
from ..logger import console


class TestResultTable:
def __init__(self, table_width_percentage: float = 98, ) -> None:
self.console = Console()
self.console = console
self.table_width_percentage = table_width_percentage

def print_table(self, table: Table):
terminal_width = self.console.width
terminal_width = console.width
table_width = int(terminal_width * (self.table_width_percentage / 100))
table.width = table_width

self.console.print(table, width=table_width, overflow='fold')
self.console.print(table)
self.console.rule()

def extract_result_table_cols(self, results: list[dict]) -> list[str]:
Expand Down
1 change: 0 additions & 1 deletion src/offat/tester/test_generator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from copy import deepcopy
from pprint import pprint
from .fuzzer import fill_params
from .test_runner import TestRunnerFiltersEnum
from .fuzzer import generate_random_int
Expand Down
Loading

0 comments on commit b80874d

Please sign in to comment.