-
Notifications
You must be signed in to change notification settings - Fork 67
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #118 from OWASP/fix-api
chore: Fix api
- Loading branch information
Showing
7 changed files
with
264 additions
and
255 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,86 +1,97 @@ | ||
from os import uname, environ | ||
from textwrap import dedent | ||
|
||
from fastapi import status, Request, Response | ||
from offat.api.config import app, task_queue, task_timeout, auth_secret_key | ||
from offat.api.jobs import scan_api | ||
from offat.api.models import CreateScanModel | ||
from offat.api.schema import CreateScanSchema | ||
from offat.logger import logger | ||
|
||
# from os import uname, environ | ||
|
||
|
||
logger.info("Secret Key: %s", auth_secret_key) | ||
logger.info('Secret Key: %s', auth_secret_key) | ||
|
||
|
||
# if uname().sysname == 'Darwin' and environ.get('OBJC_DISABLE_INITIALIZE_FORK_SAFETY') != 'YES': | ||
# logger.warning('Mac Users might need to configure OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES in env\nVisit StackOverFlow link for more info: https://stackoverflow.com/questions/50168647/multiprocessing-causes-python-to-crash-and-gives-an-error-may-have-been-in-progr') | ||
if ( | ||
uname().sysname == 'Darwin' | ||
and environ.get('OBJC_DISABLE_INITIALIZE_FORK_SAFETY') != 'YES' | ||
): | ||
logger.warning( | ||
dedent( | ||
'''Mac Users might need to configure OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES in env | ||
Visit StackOverFlow link for more info: | ||
https://stackoverflow.com/questions/50168647/multiprocessing-causes-python-to-crash-and-gives-an-error-may-have-been-in-progr | ||
''' | ||
) | ||
) | ||
|
||
|
||
@app.get("/", status_code=status.HTTP_200_OK) | ||
@app.get('/', status_code=status.HTTP_200_OK) | ||
async def root(): | ||
return { | ||
"name": "OFFAT API", | ||
"project": "https://github.com/OWASP/offat", | ||
"license": "https://github.com/OWASP/offat/blob/main/LICENSE", | ||
'name': 'OFFAT API', | ||
'project': 'https://github.com/OWASP/offat', | ||
'license': 'https://github.com/OWASP/offat/blob/main/LICENSE', | ||
} | ||
|
||
|
||
@app.post("/api/v1/scan", status_code=status.HTTP_201_CREATED) | ||
@app.post('/api/v1/scan', status_code=status.HTTP_201_CREATED) | ||
async def add_scan_task( | ||
scan_data: CreateScanModel, request: Request, response: Response | ||
scan_data: CreateScanSchema, request: Request, response: Response | ||
): | ||
# for auth | ||
client_ip = request.client.host | ||
secret_key = request.headers.get("SECRET-KEY", None) | ||
secret_key = request.headers.get('SECRET-KEY', None) | ||
if secret_key != auth_secret_key: | ||
# return 404 for better endpoint security | ||
response.status_code = status.HTTP_401_UNAUTHORIZED | ||
logger.warning("INTRUSION: %s tried to create a new scan job", client_ip) | ||
return {"message": "Unauthorized"} | ||
logger.warning('INTRUSION: %s tried to create a new scan job', client_ip) | ||
return {'message': 'Unauthorized'} | ||
|
||
msg = {"msg": "Scan Task Created", "job_id": None} | ||
msg = {'msg': 'Scan Task Created', 'job_id': None} | ||
|
||
job = task_queue.enqueue(scan_api, scan_data, job_timeout=task_timeout) | ||
msg["job_id"] = job.id | ||
msg['job_id'] = job.id | ||
|
||
logger.info("SUCCESS: %s created new scan job - %s", client_ip, job.id) | ||
logger.info('SUCCESS: %s created new scan job - %s', client_ip, job.id) | ||
|
||
return msg | ||
|
||
|
||
@app.get("/api/v1/scan/{job_id}/result") | ||
@app.get('/api/v1/scan/{job_id}/result') | ||
async def get_scan_task_result(job_id: str, request: Request, response: Response): | ||
# for auth | ||
client_ip = request.client.host | ||
secret_key = request.headers.get("SECRET-KEY", None) | ||
secret_key = request.headers.get('SECRET-KEY', None) | ||
if secret_key != auth_secret_key: | ||
# return 404 for better endpoint security | ||
response.status_code = status.HTTP_401_UNAUTHORIZED | ||
logger.warning( | ||
"INTRUSION: %s tried to access %s job scan results", client_ip, job_id | ||
'INTRUSION: %s tried to access %s job scan results', client_ip, job_id | ||
) | ||
return {"message": "Unauthorized"} | ||
return {'message': 'Unauthorized'} | ||
|
||
scan_results_job = task_queue.fetch_job(job_id=job_id) | ||
|
||
logger.info("SUCCESS: %s accessed %s job scan results", client_ip, job_id) | ||
logger.info('SUCCESS: %s accessed %s job scan results', client_ip, job_id) | ||
|
||
msg = "Task Remaining or Invalid Job Id" | ||
msg = 'Task Remaining or Invalid Job Id' | ||
results = None | ||
response.status_code = status.HTTP_202_ACCEPTED | ||
|
||
if scan_results_job and scan_results_job.is_started: | ||
msg = "Job In Progress" | ||
msg = 'Job In Progress' | ||
|
||
elif scan_results_job and scan_results_job.is_finished: | ||
msg = "Task Completed" | ||
msg = 'Task Completed' | ||
results = scan_results_job.result | ||
response.status_code = status.HTTP_200_OK | ||
|
||
elif scan_results_job and scan_results_job.is_failed: | ||
msg = "Task Failed. Try Creating Task Again." | ||
msg = 'Task Failed. Try Creating Task Again.' | ||
response.status_code = status.HTTP_200_OK | ||
|
||
msg = { | ||
"msg": msg, | ||
"results": results, | ||
'msg': msg, | ||
'results': results, | ||
} | ||
return msg |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,23 +1,30 @@ | ||
from sys import exc_info | ||
from offat.api.models import CreateScanModel | ||
from offat.utils import is_valid_url | ||
from offat.api.schema import CreateScanSchema | ||
from offat.tester.handler import generate_and_run_tests | ||
from offat.parsers import create_parser | ||
from offat.logger import logger | ||
|
||
|
||
def scan_api(body_data: CreateScanModel, ssl_verify: bool=True): | ||
def scan_api(body_data: CreateScanSchema, ssl_verify: bool = True): | ||
try: | ||
api_parser = create_parser(fpath_or_url=None, spec=body_data.openAPI, ssl_verify=ssl_verify) | ||
url = body_data.openapi if is_valid_url(body_data.openapi) else None | ||
spec = None if url else body_data.openapi | ||
|
||
api_parser = create_parser(fpath_or_url=url, spec=spec, ssl_verify=ssl_verify) | ||
|
||
results = generate_and_run_tests( | ||
api_parser=api_parser, | ||
regex_pattern=body_data.regex_pattern, | ||
req_headers=body_data.req_headers, | ||
rate_limit=body_data.rate_limit, | ||
test_data_config=body_data.test_data_config, | ||
proxies=body_data.proxies, | ||
capture_failed=body_data.capture_failed, | ||
remove_unused_data=body_data.remove_unused_data, | ||
) | ||
return results | ||
except Exception as e: | ||
logger.error('Error occurred while creating a job: %s', repr(e)) | ||
logger.debug('Debug Data:', exc_info=exc_info()) | ||
logger.error('Debug Data:', exc_info=exc_info()) | ||
return [{'error': str(e)}] |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from typing import Optional | ||
from pydantic import BaseModel | ||
|
||
|
||
class CreateScanSchema(BaseModel): | ||
openapi: str | ||
regex_pattern: Optional[str] = None | ||
req_headers: Optional[dict] = {'User-Agent': 'offat-api'} | ||
rate_limit: Optional[int] = 60 | ||
test_data_config: Optional[dict] = None | ||
proxies: Optional[list[str]] = None | ||
capture_failed: Optional[bool] = False | ||
remove_unused_data: Optional[bool] = True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.