From 6c33ed8e584a7b68b727969810c52610c9d7fed2 Mon Sep 17 00:00:00 2001 From: Dhrumil Mistry <56185972+dmdhrumilmistry@users.noreply.github.com> Date: Tue, 26 Mar 2024 19:38:49 +0530 Subject: [PATCH 1/2] add TODO --- src/offat/config_data_handler.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/offat/config_data_handler.py b/src/offat/config_data_handler.py index 1cabcba..f64f312 100644 --- a/src/offat/config_data_handler.py +++ b/src/offat/config_data_handler.py @@ -11,7 +11,9 @@ def validate_config_file_data(test_config_data: dict): logger.warning('Error Occurred While reading file: %s', test_config_data) return False - if not test_config_data.get('actors', ): + if not test_config_data.get( + 'actors', + ): logger.warning('actors are required') return False @@ -36,7 +38,7 @@ def populate_user_data(actor_data: dict, actor_name: str, tests: list[dict]): request_headers[header.get('name')] = header.get('value') for test in tests: - # replace key and value instead of appending + # TODO: replace key and value instead of appending test['body_params'] += body_params test['query_params'] += query_params test['path_params'] += path_params @@ -44,7 +46,8 @@ def populate_user_data(actor_data: dict, actor_name: str, tests: list[dict]): test['test_actor_name'] = actor_name if test.get('kwargs', {}).get('headers', {}).items(): test['kwargs']['headers'] = dict( - test['kwargs']['headers'], **request_headers) + test['kwargs']['headers'], **request_headers + ) else: test['kwargs']['headers'] = request_headers From 9bce1f94e81313e55c64795cb95f9d0086b6a22f Mon Sep 17 00:00:00 2001 From: Dhrumil Mistry <56185972+dmdhrumilmistry@users.noreply.github.com> Date: Tue, 26 Mar 2024 21:47:04 +0530 Subject: [PATCH 2/2] replace params from list[dict] while using user inputs from config file --- src/offat/config_data_handler.py | 44 +++++++++- src/offat/tester/post_test_processor.py | 36 ++++---- src/offat/tester/tester_utils.py | 106 ++++++++++++++++-------- 3 files changed, 130 insertions(+), 56 deletions(-) diff --git a/src/offat/config_data_handler.py b/src/offat/config_data_handler.py index f64f312..8fec71a 100644 --- a/src/offat/config_data_handler.py +++ b/src/offat/config_data_handler.py @@ -2,6 +2,37 @@ from .logger import logger +def overwrite_user_params(list1: list[dict], list2: list[dict]) -> list[dict]: + """ + Update values in list1 based on the corresponding "name" values in list2. + + Args: + list1 (list of dict): The list of dictionaries to be updated. + list2 (list of dict): The list of dictionaries containing values to update from. + + Returns: + list of dict: The updated list1 with values from list2. + + Example: + ```python + list1 = [{'name': 'id', 'value': 67}, {'name': 'email', 'value': 'old@example.com'}] + list2 = [{'name': 'id', 'value': 10}, {'name': 'email', 'value': 'new@example.com'}] + updated_list = update_values(list1, list2) + print(updated_list) + # Output: [{'name': 'id', 'value': 10}, {'name': 'email', 'value': 'new@example.com'}] + ``` + """ + # Create a dictionary for faster lookup + lookup_dict = {item['name']: item['value'] for item in list2} + + # Update values in list1 using index lookup + for item in list1: + if item['name'] in lookup_dict: + item['value'] = lookup_dict[item['name']] + + return list1 + + def validate_config_file_data(test_config_data: dict): if not isinstance(test_config_data, dict): logger.warning('Invalid data format') @@ -38,10 +69,15 @@ def populate_user_data(actor_data: dict, actor_name: str, tests: list[dict]): request_headers[header.get('name')] = header.get('value') for test in tests: - # TODO: replace key and value instead of appending - test['body_params'] += body_params - test['query_params'] += query_params - test['path_params'] += path_params + test['body_params'] = overwrite_user_params( + deepcopy(test['body_params']), body_params + ) + test['query_params'] = overwrite_user_params( + deepcopy(test['query_params']), query_params + ) + test['path_params'] += overwrite_user_params( + deepcopy(test['path_params']), path_params + ) # for post test processing tests such as broken authentication test['test_actor_name'] = actor_name if test.get('kwargs', {}).get('headers', {}).items(): diff --git a/src/offat/tester/post_test_processor.py b/src/offat/tester/post_test_processor.py index 611d300..d793ba8 100644 --- a/src/offat/tester/post_test_processor.py +++ b/src/offat/tester/post_test_processor.py @@ -13,21 +13,25 @@ class PostTestFiltersEnum(Enum): class PostRunTests: '''class Includes tests that should be ran after running all the active test''' + @staticmethod - def run_broken_access_control_tests(results: list[dict], test_data_config: dict) -> list[dict]: + def run_broken_access_control_tests( + results: list[dict], test_data_config: dict + ) -> list[dict]: ''' Runs tests for broken access control Args: results (list[dict]): list of dict for tests results ran - test_data_config (dict): user based config for running tests + test_data_config (dict): user based config for running tests Returns: - list[dict]: list of results + list[dict]: list of results Raises: Any Exception occurred during the test. ''' + def re_match(patterns: list[str], endpoint: str) -> bool: '''Matches endpoint for specified patterns @@ -52,8 +56,7 @@ def re_match(patterns: list[str], endpoint: str) -> bool: actor_names = [] for actor in actors: actor_name = list(actor.keys())[-1] - unauth_endpoint_regex = actor[actor_name].get( - 'unauthorized_endpoints', []) + unauth_endpoint_regex = actor[actor_name].get('unauthorized_endpoints', []) for result in results: if result.get('test_actor_name') != actor_name: @@ -69,18 +72,16 @@ def re_match(patterns: list[str], endpoint: str) -> bool: actor_test_result['test_name'] = 'Broken Access Control' actor_test_result['result_details'] = { True: 'Endpoint might not vulnerable to BAC', # passed - # failed - False: f'BAC: Endpoint is accessible to {actor_name}', + False: f'BAC: Endpoint is accessible to {actor_name}', # failed } - actor_based_tests.append( - PostRunTests.filter_status_code_based_results(actor_test_result)) + actor_based_tests.append(actor_test_result) - return actor_based_tests + return PostRunTests.filter_status_code_based_results(actor_based_tests) @staticmethod def detect_data_exposure(results: list[dict]) -> list[dict]: - '''Detects data exposure against sensitive data regex - patterns and returns dict of matched results + '''Detects data exposure against sensitive data regex + patterns and returns dict of matched results Args: data (str): data to be analyzed for exposure @@ -88,6 +89,7 @@ def detect_data_exposure(results: list[dict]) -> list[dict]: Returns: dict: dictionary with tag as dict key and matched pattern as dict value ''' + def detect_exposure(data: str) -> dict: # Dictionary to store detected data exposures detected_exposures = {} @@ -112,6 +114,7 @@ def detect_exposure(data: str) -> dict: # take a list and filter all at once def filter_status_code_based_results(results: list[dict]) -> list[dict]: new_results = [] + for result in results: new_result = deepcopy(result) response_status_code = result.get('response_status_code') @@ -142,7 +145,8 @@ def update_result_details(results: list[dict]): for result in results: new_result = deepcopy(result) new_result['result_details'] = result['result_details'].get( - result['result']) + result['result'] + ) new_results.append(new_result) @@ -154,13 +158,13 @@ def matcher(results: list[dict]): Args: results (list[dict]): list of dict for tests results ran - match_location (ResponseMatchLocation): Search for match at - specified location (`ResponseMatchLocation.BODY`, + match_location (ResponseMatchLocation): Search for match at + specified location (`ResponseMatchLocation.BODY`, `ResponseMatchLocation.HEADER`,`ResponseMatchLocation.STATUS_CODE`). match_regex (str): regex to match as string Returns: - list[dict]: list of results + list[dict]: list of results Raises: Any Exception occurred during the test. diff --git a/src/offat/tester/tester_utils.py b/src/offat/tester/tester_utils.py index 0c10317..0b06c88 100644 --- a/src/offat/tester/tester_utils.py +++ b/src/offat/tester/tester_utils.py @@ -2,8 +2,8 @@ OWASP OFFAT Tester Utils Module """ from http import client as http_client +from sys import exc_info from typing import Optional -from sys import exc_info, exit from asyncio import run from asyncio.exceptions import CancelledError from re import search as regex_search @@ -21,18 +21,18 @@ def is_host_up(openapi_parser: SwaggerParser | OpenAPIv3Parser) -> bool: - '''checks whether the host from openapi doc is available or not. + '''checks whether the host from openapi doc is available or not. Returns True is host is available else returns False''' - tokens = openapi_parser.host.split(":") + tokens = openapi_parser.host.split(':') match len(tokens): case 1: host = tokens[0] - port = 443 if openapi_parser.http_scheme == "https" else 80 + port = 443 if openapi_parser.http_scheme == 'https' else 80 case 2: host = tokens[0] port = tokens[1] case _: - logger.warning("Invalid host: %s", openapi_parser.host) + logger.warning('Invalid host: %s', openapi_parser.host) return False host = host.split('/')[0] @@ -43,29 +43,35 @@ def is_host_up(openapi_parser: SwaggerParser | OpenAPIv3Parser) -> bool: case _: proto = http_client.HTTPConnection - logger.info("Checking whether host %s:%s is available", host, port) + logger.info('Checking whether host %s:%s is available', host, port) try: conn = proto(host=host, port=port, timeout=5) - conn.request("GET", "/") + conn.request('GET', '/') res = conn.getresponse() - logger.info("Host returned status code: %d", res.status) + logger.info('Host returned status code: %d', res.status) return res.status in range(200, 499) except Exception as e: - logger.error("Unable to connect to host %s:%d due to error: %s", host, port, repr(e)) + logger.error( + 'Unable to connect to host %s:%d due to error: %s', host, port, repr(e) + ) return False -def run_test(test_runner: TestRunner, tests: list[dict], regex_pattern: Optional[str] = None, skip_test_run: Optional[bool] = False, post_run_matcher_test: Optional[bool] = False, description: Optional[str] = None) -> list: +def run_test( + test_runner: TestRunner, + tests: list[dict], + regex_pattern: Optional[str] = None, + skip_test_run: Optional[bool] = False, + post_run_matcher_test: Optional[bool] = False, + description: Optional[str] = None, +) -> list: '''Run tests and print result on console''' logger.info('Tests Generated: %d', len(tests)) # filter data if regex is passed if regex_pattern: tests = list( - filter( - lambda x: regex_search(regex_pattern, x.get('endpoint', '')), - tests - ) + filter(lambda x: regex_search(regex_pattern, x.get('endpoint', '')), tests) ) try: @@ -74,12 +80,17 @@ def run_test(test_runner: TestRunner, tests: list[dict], regex_pattern: Optional else: test_results = run(test_runner.run_tests(tests, description)) - except (KeyboardInterrupt, CancelledError,): - logger.error("[!] User Interruption Detected!") + except ( + KeyboardInterrupt, + CancelledError, + ): + logger.error('[!] User Interruption Detected!') exit(-1) except Exception as e: - logger.error("[*] Exception occurred while running tests: %s", e, exc_info=exc_info()) + logger.error( + '[*] Exception occurred while running tests: %s', e, exc_info=exc_info() + ) return [] if post_run_matcher_test: @@ -98,14 +109,26 @@ def run_test(test_runner: TestRunner, tests: list[dict], regex_pattern: Optional # Note: redirects are allowed by default making it easier for pentesters/researchers -def generate_and_run_tests(api_parser: SwaggerParser | OpenAPIv3Parser, regex_pattern: Optional[str] = None, output_file: Optional[str] = None, output_file_format: Optional[str] = None, rate_limit: Optional[int] = None, req_headers: Optional[dict] = None, proxy: Optional[str] = None, test_data_config: Optional[dict] = None): - global test_table_generator, logger - +def generate_and_run_tests( + api_parser: SwaggerParser | OpenAPIv3Parser, + regex_pattern: Optional[str] = None, + output_file: Optional[str] = None, + output_file_format: Optional[str] = None, + rate_limit: Optional[int] = None, + req_headers: Optional[dict] = None, + proxy: Optional[str] = None, + test_data_config: Optional[dict] = None, +): + ''' + Generates and runs tests for provied OAS/Swagger file. + ''' if not is_host_up(openapi_parser=api_parser): - logger.error("Stopping tests due to unavailibility of host: %s", api_parser.host) + logger.error( + 'Stopping tests due to unavailibility of host: %s', api_parser.host + ) return - logger.info("Host %s is up", api_parser.host) + logger.info('Host %s is up', api_parser.host) test_runner = TestRunner( rate_limit=rate_limit, @@ -119,13 +142,14 @@ def generate_and_run_tests(api_parser: SwaggerParser | OpenAPIv3Parser, regex_pa test_name = 'Checking for Unsupported HTTP Methods/Verbs:' logger.info(test_name) unsupported_http_endpoint_tests = test_generator.check_unsupported_http_methods( - api_parser) + api_parser + ) results += run_test( test_runner=test_runner, tests=unsupported_http_endpoint_tests, regex_pattern=regex_pattern, - description='(FUZZED) ' + test_name + description='(FUZZED) ' + test_name, ) # sqli fuzz test @@ -153,7 +177,8 @@ def generate_and_run_tests(api_parser: SwaggerParser | OpenAPIv3Parser, regex_pa test_name = 'Checking for OS Command Injection Vulnerability with fuzzed params and checking response body:' logger.info(test_name) os_command_injection_tests = test_generator.os_command_injection_fuzz_params_test( - api_parser) + api_parser + ) results += run_test( test_runner=test_runner, tests=os_command_injection_tests, @@ -166,7 +191,8 @@ def generate_and_run_tests(api_parser: SwaggerParser | OpenAPIv3Parser, regex_pa test_name = 'Checking for XSS/HTML Injection Vulnerability with fuzzed params and checking response body:' logger.info(test_name) os_command_injection_tests = test_generator.xss_html_injection_fuzz_params_test( - api_parser) + api_parser + ) results += run_test( test_runner=test_runner, tests=os_command_injection_tests, @@ -179,31 +205,36 @@ def generate_and_run_tests(api_parser: SwaggerParser | OpenAPIv3Parser, regex_pa test_name = 'Checking for BOLA in PATH using fuzzed params:' logger.info(test_name) bola_fuzzed_path_tests = test_generator.bola_fuzz_path_test( - api_parser, success_codes=[200, 201, 301]) + api_parser, success_codes=[200, 201, 301] + ) results += run_test( test_runner=test_runner, tests=bola_fuzzed_path_tests, regex_pattern=regex_pattern, - description='(FUZZED) Checking for BOLA in PATH:' + description='(FUZZED) Checking for BOLA in PATH:', ) # BOLA path test with fuzzed data + trailing slash - test_name = 'Checking for BOLA in PATH with trailing slash and id using fuzzed params:' + test_name = ( + 'Checking for BOLA in PATH with trailing slash and id using fuzzed params:' + ) logger.info(test_name) bola_trailing_slash_path_tests = test_generator.bola_fuzz_trailing_slash_path_test( - api_parser, success_codes=[200, 201, 301]) + api_parser, success_codes=[200, 201, 301] + ) results += run_test( test_runner=test_runner, tests=bola_trailing_slash_path_tests, regex_pattern=regex_pattern, - description='(FUZZED) Checking for BOLA in PATH with trailing slash:' + description='(FUZZED) Checking for BOLA in PATH with trailing slash:', ) # Mass Assignment / BOPLA test_name = 'Checking for Mass Assignment Vulnerability with fuzzed params and checking response status codes:' logger.info(test_name) bopla_tests = test_generator.bopla_fuzz_test( - api_parser, success_codes=[200, 201, 301]) + api_parser, success_codes=[200, 201, 301] + ) results += run_test( test_runner=test_runner, tests=bopla_tests, @@ -213,10 +244,12 @@ def generate_and_run_tests(api_parser: SwaggerParser | OpenAPIv3Parser, regex_pa # Tests with User provided Data if bool(test_data_config): - logger.info('[bold]Testing with user provided data[/bold]') + logger.info('[bold] Testing with user provided data [/bold]') - # BOLA path tests with fuzzed + user provided data - test_name = 'Checking for BOLA in PATH using fuzzed and user provided params:', + # # BOLA path tests with fuzzed + user provided data + test_name = ( + 'Checking for BOLA in PATH using fuzzed and user provided params:', + ) logger.info(test_name) bola_fuzzed_user_data_tests = test_generator.test_with_user_data( test_data_config, @@ -283,7 +316,8 @@ def generate_and_run_tests(api_parser: SwaggerParser | OpenAPIv3Parser, regex_pa test_name = 'Checking for Broken Access Control:' logger.info(test_name) bac_results = PostRunTests.run_broken_access_control_tests( - results, test_data_config) + results, test_data_config + ) results += run_test( test_runner=test_runner, tests=bac_results,