Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use user provided test data #67

Merged
merged 2 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 45 additions & 6 deletions src/offat/config_data_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,37 @@
from .logger import logger


def overwrite_user_params(list1: list[dict], list2: list[dict]) -> list[dict]:
"""
Update values in list1 based on the corresponding "name" values in list2.

Args:
list1 (list of dict): The list of dictionaries to be updated.
list2 (list of dict): The list of dictionaries containing values to update from.

Returns:
list of dict: The updated list1 with values from list2.

Example:
```python
list1 = [{'name': 'id', 'value': 67}, {'name': 'email', 'value': 'old@example.com'}]
list2 = [{'name': 'id', 'value': 10}, {'name': 'email', 'value': 'new@example.com'}]
updated_list = update_values(list1, list2)
print(updated_list)
# Output: [{'name': 'id', 'value': 10}, {'name': 'email', 'value': 'new@example.com'}]
```
"""
# Create a dictionary for faster lookup
lookup_dict = {item['name']: item['value'] for item in list2}

# Update values in list1 using index lookup
for item in list1:
if item['name'] in lookup_dict:
item['value'] = lookup_dict[item['name']]

return list1


def validate_config_file_data(test_config_data: dict):
if not isinstance(test_config_data, dict):
logger.warning('Invalid data format')
Expand All @@ -11,7 +42,9 @@ def validate_config_file_data(test_config_data: dict):
logger.warning('Error Occurred While reading file: %s', test_config_data)
return False

if not test_config_data.get('actors', ):
if not test_config_data.get(
'actors',
):
logger.warning('actors are required')
return False

Expand All @@ -36,15 +69,21 @@ def populate_user_data(actor_data: dict, actor_name: str, tests: list[dict]):
request_headers[header.get('name')] = header.get('value')

for test in tests:
# replace key and value instead of appending
test['body_params'] += body_params
test['query_params'] += query_params
test['path_params'] += path_params
test['body_params'] = overwrite_user_params(
deepcopy(test['body_params']), body_params
)
test['query_params'] = overwrite_user_params(
deepcopy(test['query_params']), query_params
)
test['path_params'] += overwrite_user_params(
deepcopy(test['path_params']), path_params
)
# for post test processing tests such as broken authentication
test['test_actor_name'] = actor_name
if test.get('kwargs', {}).get('headers', {}).items():
test['kwargs']['headers'] = dict(
test['kwargs']['headers'], **request_headers)
test['kwargs']['headers'], **request_headers
)
else:
test['kwargs']['headers'] = request_headers

Expand Down
36 changes: 20 additions & 16 deletions src/offat/tester/post_test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,25 @@ class PostTestFiltersEnum(Enum):

class PostRunTests:
'''class Includes tests that should be ran after running all the active test'''

@staticmethod
def run_broken_access_control_tests(results: list[dict], test_data_config: dict) -> list[dict]:
def run_broken_access_control_tests(
results: list[dict], test_data_config: dict
) -> list[dict]:
'''
Runs tests for broken access control

Args:
results (list[dict]): list of dict for tests results ran
test_data_config (dict): user based config for running tests
test_data_config (dict): user based config for running tests

Returns:
list[dict]: list of results
list[dict]: list of results

Raises:
Any Exception occurred during the test.
'''

def re_match(patterns: list[str], endpoint: str) -> bool:
'''Matches endpoint for specified patterns

Expand All @@ -52,8 +56,7 @@ def re_match(patterns: list[str], endpoint: str) -> bool:
actor_names = []
for actor in actors:
actor_name = list(actor.keys())[-1]
unauth_endpoint_regex = actor[actor_name].get(
'unauthorized_endpoints', [])
unauth_endpoint_regex = actor[actor_name].get('unauthorized_endpoints', [])

for result in results:
if result.get('test_actor_name') != actor_name:
Expand All @@ -69,25 +72,24 @@ def re_match(patterns: list[str], endpoint: str) -> bool:
actor_test_result['test_name'] = 'Broken Access Control'
actor_test_result['result_details'] = {
True: 'Endpoint might not vulnerable to BAC', # passed
# failed
False: f'BAC: Endpoint is accessible to {actor_name}',
False: f'BAC: Endpoint is accessible to {actor_name}', # failed
}
actor_based_tests.append(
PostRunTests.filter_status_code_based_results(actor_test_result))
actor_based_tests.append(actor_test_result)

return actor_based_tests
return PostRunTests.filter_status_code_based_results(actor_based_tests)

@staticmethod
def detect_data_exposure(results: list[dict]) -> list[dict]:
'''Detects data exposure against sensitive data regex
patterns and returns dict of matched results
'''Detects data exposure against sensitive data regex
patterns and returns dict of matched results

Args:
data (str): data to be analyzed for exposure

Returns:
dict: dictionary with tag as dict key and matched pattern as dict value
'''

def detect_exposure(data: str) -> dict:
# Dictionary to store detected data exposures
detected_exposures = {}
Expand All @@ -112,6 +114,7 @@ def detect_exposure(data: str) -> dict:
# take a list and filter all at once
def filter_status_code_based_results(results: list[dict]) -> list[dict]:
new_results = []

for result in results:
new_result = deepcopy(result)
response_status_code = result.get('response_status_code')
Expand Down Expand Up @@ -142,7 +145,8 @@ def update_result_details(results: list[dict]):
for result in results:
new_result = deepcopy(result)
new_result['result_details'] = result['result_details'].get(
result['result'])
result['result']
)

new_results.append(new_result)

Expand All @@ -154,13 +158,13 @@ def matcher(results: list[dict]):

Args:
results (list[dict]): list of dict for tests results ran
match_location (ResponseMatchLocation): Search for match at
specified location (`ResponseMatchLocation.BODY`,
match_location (ResponseMatchLocation): Search for match at
specified location (`ResponseMatchLocation.BODY`,
`ResponseMatchLocation.HEADER`,`ResponseMatchLocation.STATUS_CODE`).
match_regex (str): regex to match as string

Returns:
list[dict]: list of results
list[dict]: list of results

Raises:
Any Exception occurred during the test.
Expand Down
Loading