Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(custom_header): not correctly parsing parameters #172

Merged
merged 7 commits into from
Sep 18, 2024
116 changes: 113 additions & 3 deletions web/reNgine/common_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import random
import shutil
import traceback
import shlex
import subprocess
from time import sleep

import humanize
Expand Down Expand Up @@ -1043,16 +1045,20 @@
return match.group(1).strip()
return ""

import re

def parse_custom_header(custom_header):
"""
Parse the custom_header input to ensure it is a dictionary.
Parse the custom_header input to ensure it is a dictionary with valid header values.

Args:
custom_header (dict or str): Dictionary or string containing the custom headers.

Returns:
dict: Parsed dictionary of custom headers.
"""
def is_valid_header_value(value):
return bool(re.match(r'^[\w\-\s.,;:@()/+*=\'\[\]{}]+$', value))

if isinstance(custom_header, str):
header_dict = {}
Expand All @@ -1061,11 +1067,19 @@
parts = header.split(':', 1)
if len(parts) == 2:
key, value = parts
header_dict[key.strip()] = value.strip()
key = key.strip()
value = value.strip()
if is_valid_header_value(value):
header_dict[key] = value
else:
raise ValueError(f"Invalid header value: '{value}'")
else:
raise ValueError(f"Invalid header format: '{header}'")
return header_dict
elif isinstance(custom_header, dict):
for key, value in custom_header.items():
if not is_valid_header_value(value):
raise ValueError(f"Invalid header value: '{value}'")
return custom_header
else:
raise ValueError("custom_header must be a dictionary or a string")
Expand All @@ -1081,6 +1095,9 @@
Returns:
str: Command-line parameter for the specified tool.
"""
logger.debug(f"Generating header parameters for tool: {tool_name}")
psyray marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(f"Input custom_header: {custom_header}")

# Ensure the custom_header is a dictionary
custom_header = parse_custom_header(custom_header)

Expand All @@ -1097,8 +1114,12 @@
'gospider': generate_gospider_params(custom_header),
}

# Get the appropriate format based on the tool name
result = format_mapping.get(tool_name, format_mapping.get('common'))
logger.debug(f"Selected format for {tool_name}: {result}")

# Return the corresponding parameter for the specified tool or default to common_headers format
return format_mapping.get(tool_name, format_mapping.get('common'))
return result

def generate_gospider_params(custom_header):
"""
Expand Down Expand Up @@ -1168,6 +1189,95 @@
domain.save()
return scan.id

def prepare_command(cmd, shell):
"""
Prepare the command for execution.

Args:
cmd (str): The command to prepare.
shell (bool): Whether to use shell execution.

Returns:
str or list: The prepared command, either as a string (for shell execution) or a list (for non-shell execution).
"""
return cmd if shell else shlex.split(cmd)

def create_command_object(cmd, scan_id, activity_id):
"""
Create a Command object in the database.

Args:
cmd (str): The command to be executed.
scan_id (int): ID of the associated scan.
activity_id (int): ID of the associated activity.

Returns:
Command: The created Command object.
"""
return Command.objects.create(
command=cmd,
time=timezone.now(),
scan_history_id=scan_id,
activity_id=activity_id
)

def process_line(line, trunc_char=None):
"""
Process a line of output from the command.

Args:
line (str): The line to process.
trunc_char (str, optional): Character to truncate the line. Defaults to None.

Returns:
str or dict: The processed line, either as a string or a JSON object if the line is valid JSON.
"""
line = line.strip()
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
line = ansi_escape.sub('', line)
line = line.replace('\\x0d\\x0a', '\n')
if trunc_char and line.endswith(trunc_char):
line = line[:-1]
try:
return json.loads(line)
except json.JSONDecodeError:
return line

def write_history(history_file, cmd, return_code, output):
"""
Write command execution history to a file.

Args:
history_file (str): Path to the history file.
cmd (str): The executed command.
return_code (int): The return code of the command.
output (str): The output of the command.
"""
mode = 'a' if os.path.exists(history_file) else 'w'
with open(history_file, mode) as f:
f.write(f'\n{cmd}\n{return_code}\n{output}\n------------------\n')

def execute_command(command, shell, cwd):
"""
Execute a command using subprocess.

Args:
command (str or list): The command to execute.
shell (bool): Whether to use shell execution.
cwd (str): The working directory for the command.

Returns:
subprocess.Popen: The Popen object for the executed command.
"""
return subprocess.Popen(
command,
AnonymousWP marked this conversation as resolved.
Show resolved Hide resolved
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
shell=shell,
cwd=cwd
)

def get_data_from_post_request(request, field):
"""
Get data from a POST request.
Expand Down
161 changes: 60 additions & 101 deletions web/reNgine/tasks.py
AnonymousWP marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -1805,9 +1805,11 @@ def fetch_url(self, urls=[], ctx={}, description=None):

# Initialize the URLs
if urls and is_iterable(urls):
logger.debug(f'URLs provided by user')
with open(input_path, 'w') as f:
f.write('\n'.join(urls))
else:
logger.debug(f'URLs gathered from database')
urls = get_http_urls(
is_alive=enable_http_crawl,
write_filepath=input_path,
Expand All @@ -1816,6 +1818,11 @@ def fetch_url(self, urls=[], ctx={}, description=None):
ctx=ctx
)

# check if urls is empty
if not urls:
logger.warning("No URLs found. Exiting fetch_url.")
return

# Log initial URLs
logger.debug(f'Initial URLs: {urls}')

Expand Down Expand Up @@ -4177,139 +4184,91 @@ def remove_duplicate_endpoints(


@app.task(name='run_command', bind=False, queue='run_command_queue')
psyray marked this conversation as resolved.
Show resolved Hide resolved
def run_command(
cmd,
cwd=None,
shell=False,
history_file=None,
scan_id=None,
activity_id=None,
remove_ansi_sequence=False
):
"""Run a given command using subprocess module.
def run_command(cmd, cwd=None, shell=False, history_file=None, scan_id=None, activity_id=None, remove_ansi_sequence=False):
"""
Execute a command and return its output.

Args:
cmd (str): Command to run.
cwd (str): Current working directory.
echo (bool): Log command.
shell (bool): Run within separate shell if True.
history_file (str): Write command + output to history file.
remove_ansi_sequence (bool): Used to remove ANSI escape sequences from output such as color coding
cmd (str): The command to execute.
cwd (str, optional): The working directory for the command. Defaults to None.
shell (bool, optional): Whether to use shell execution. Defaults to False.
history_file (str, optional): File to write command history. Defaults to None.
scan_id (int, optional): ID of the associated scan. Defaults to None.
activity_id (int, optional): ID of the associated activity. Defaults to None.
remove_ansi_sequence (bool, optional): Whether to remove ANSI escape sequences from output. Defaults to False.

Returns:
tuple: Tuple with return_code, output.
tuple: A tuple containing the return code and output of the command.
"""
logger.info(cmd)
logger.warning(activity_id)

# Create a command record in the database
command_obj = Command.objects.create(
command=cmd,
time=timezone.now(),
scan_history_id=scan_id,
activity_id=activity_id)
logger.info(f"Executing command: {cmd}")
command_obj = create_command_object(cmd, scan_id, activity_id)
command = prepare_command(cmd, shell)
logger.debug(f"Prepared run command: {command}")

# Run the command using subprocess
popen = subprocess.Popen(
cmd if shell else cmd.split(),
shell=shell,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=cwd,
universal_newlines=True)
process = execute_command(command, shell, cwd)
output = ''
for stdout_line in iter(popen.stdout.readline, ""):
for stdout_line in iter(process.stdout.readline, ""):
item = stdout_line.strip()
output += '\n' + item
logger.debug(item)
popen.stdout.close()
popen.wait()
return_code = popen.returncode

process.stdout.close()
process.wait()
return_code = process.returncode
command_obj.output = output
command_obj.return_code = return_code
command_obj.save()

if history_file:
mode = 'a'
if not os.path.exists(history_file):
mode = 'w'
with open(history_file, mode) as f:
f.write(f'\n{cmd}\n{return_code}\n{output}\n------------------\n')
write_history(history_file, cmd, return_code, output)

if remove_ansi_sequence:
output = remove_ansi_escape_sequences(output)

return return_code, output


#-------------#
# Other utils #
#-------------#

def stream_command(cmd, cwd=None, shell=False, history_file=None, encoding='utf-8', scan_id=None, activity_id=None, trunc_char=None):
# Log cmd
logger.info(cmd)
# logger.warning(activity_id)

# Create a command record in the database
command_obj = Command.objects.create(
command=cmd,
time=timezone.now(),
scan_history_id=scan_id,
activity_id=activity_id)

# Sanitize the cmd
command = cmd if shell else cmd.split()

# Run the command using subprocess
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
shell=shell)
"""
Execute a command and yield its output line by line.

# Log the output in real-time to the database
Args:
cmd (str): The command to execute.
cwd (str, optional): The working directory for the command. Defaults to None.
shell (bool, optional): Whether to use shell execution. Defaults to False.
history_file (str, optional): File to write command history. Defaults to None.
encoding (str, optional): Encoding for the command output. Defaults to 'utf-8'.
scan_id (int, optional): ID of the associated scan. Defaults to None.
activity_id (int, optional): ID of the associated activity. Defaults to None.
trunc_char (str, optional): Character to truncate lines. Defaults to None.

Yields:
str: Each line of the command output.
"""
logger.info(f"Starting execution of command: {cmd}")
command_obj = create_command_object(cmd, scan_id, activity_id)
command = prepare_command(cmd, shell)
logger.debug(f"Prepared stream command: {command}")

process = execute_command(command, shell, cwd)
output = ""

# Process the output
for line in iter(lambda: process.stdout.readline(), b''):
for line in iter(process.stdout.readline, b''):
if not line:
break
line = line.strip()
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
line = ansi_escape.sub('', line)
line = line.replace('\\x0d\\x0a', '\n')
if trunc_char and line.endswith(trunc_char):
line = line[:-1]
item = line

# Try to parse the line as JSON
try:
item = json.loads(line)
except json.JSONDecodeError:
pass

# Yield the line
#logger.debug(item)
item = process_line(line, trunc_char)
yield item

# Add the log line to the output
output += line + "\n"

# Update the command record in the database
output += line
command_obj.output = output
command_obj.save()

# Retrieve the return code and output
process.wait()
return_code = process.returncode

# Update the return code and final output in the database
command_obj.return_code = return_code
command_obj.save()
logger.info(f'Command returned exit code: {return_code}')

# Append the command, return code and output to the history file
if history_file is not None:
with open(history_file, "a") as f:
f.write(f"{cmd}\n{return_code}\n{output}\n")

if history_file:
write_history(history_file, cmd, return_code, output)

def process_httpx_response(line):
"""TODO: implement this"""
Expand Down
Loading
Loading