diff --git a/slither/tools/mutator/__main__.py b/slither/tools/mutator/__main__.py index 5c13d7aea..8a7ce3e1a 100644 --- a/slither/tools/mutator/__main__.py +++ b/slither/tools/mutator/__main__.py @@ -1,14 +1,17 @@ import argparse import inspect import logging -import sys import os import shutil +import sys +import time +from pathlib import Path from typing import Type, List, Any, Optional from crytic_compile import cryticparser from slither import Slither +from slither.tools.mutator.utils.testing_generated_mutant import run_test_cmd from slither.tools.mutator.mutators import all_mutators -from slither.utils.colors import yellow, magenta +from slither.utils.colors import blue, green, magenta, red from .mutators.abstract_mutator import AbstractMutator from .utils.command_line import output_mutators from .utils.file_handling import ( @@ -67,8 +70,18 @@ def parse_args() -> argparse.Namespace: # to print just all the mutants parser.add_argument( + "-v", "--verbose", - help="output all mutants generated", + help="log mutants that are caught as well as those that are uncaught", + action="store_true", + default=False, + ) + + # to print just all the mutants + parser.add_argument( + "-vv", + "--very-verbose", + help="log mutants that are caught, uncaught, and fail to compile. And more!", action="store_true", default=False, ) @@ -87,8 +100,8 @@ def parse_args() -> argparse.Namespace: # flag to run full mutation based revert mutator output parser.add_argument( - "--quick", - help="to stop full mutation if revert mutator passes", + "--comprehensive", + help="continue testing minor mutations if severe mutants are uncaught", action="store_true", default=False, ) @@ -135,7 +148,7 @@ def __call__( ################################################################################### -def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,too-many-locals +def main() -> None: # pylint: disable=too-many-statements,too-many-branches,too-many-locals args = parse_args() # arguments @@ -146,31 +159,32 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t timeout: Optional[int] = args.timeout solc_remappings: Optional[str] = args.solc_remaps verbose: Optional[bool] = args.verbose + very_verbose: Optional[bool] = args.very_verbose mutators_to_run: Optional[List[str]] = args.mutators_to_run - contract_names: Optional[List[str]] = args.contract_names - quick_flag: Optional[bool] = args.quick + comprehensive_flag: Optional[bool] = args.comprehensive - logger.info(magenta(f"Starting Mutation Campaign in '{args.codebase} \n")) + logger.info(blue(f"Starting mutation campaign in {args.codebase}")) if paths_to_ignore: paths_to_ignore_list = paths_to_ignore.strip("][").split(",") - logger.info(magenta(f"Ignored paths - {', '.join(paths_to_ignore_list)} \n")) + logger.info(blue(f"Ignored paths - {', '.join(paths_to_ignore_list)}")) else: paths_to_ignore_list = [] + contract_names: List[str] = [] + if args.contract_names: + contract_names = args.contract_names.split(",") + # get all the contracts as a list from given codebase - sol_file_list: List[str] = get_sol_file_list(args.codebase, paths_to_ignore_list) + sol_file_list: List[str] = get_sol_file_list(Path(args.codebase), paths_to_ignore_list) - # folder where backup files and valid mutants created + # folder where backup files and uncaught mutants are saved if output_dir is None: - output_dir = "/mutation_campaign" - output_folder = os.getcwd() + output_dir - if os.path.exists(output_folder): - shutil.rmtree(output_folder) + output_dir = "./mutation_campaign" - # set default timeout - if timeout is None: - timeout = 30 + output_folder = Path(output_dir).resolve() + if output_folder.is_dir(): + shutil.rmtree(output_folder) # setting RR mutator as first mutator mutators_list = _get_mutators(mutators_to_run) @@ -187,51 +201,138 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t CR_RR_list.insert(1, M) mutators_list = CR_RR_list + mutators_list + logger.info(blue("Timing tests..")) + + # run and time tests, abort if they're broken + start_time = time.time() + # no timeout or target_file during the first run, but be verbose on failure + if not run_test_cmd(test_command, None, None, True): + logger.error(red("Test suite fails with mutations, aborting")) + return + elapsed_time = round(time.time() - start_time) + + # set default timeout + # default to twice as long as it usually takes to run the test suite + if timeout is None: + timeout = int(elapsed_time * 2) + else: + timeout = int(timeout) + if timeout < elapsed_time: + logger.info( + red( + f"Provided timeout {timeout} is too short for tests that run in {elapsed_time} seconds" + ) + ) + return + + logger.info( + green( + f"Test suite passes in {elapsed_time} seconds, commencing mutation campaign with a timeout of {timeout} seconds\n" + ) + ) + + # Keep a list of all already mutated contracts so we don't mutate them twice + mutated_contracts: List[str] = [] + for filename in sol_file_list: # pylint: disable=too-many-nested-blocks - contract_name = os.path.split(filename)[1].split(".sol")[0] + file_name = os.path.split(filename)[1].split(".sol")[0] # slither object sl = Slither(filename, **vars(args)) # create a backup files files_dict = backup_source_file(sl.source_code, output_folder) - # total count of mutants - total_count = 0 - # count of valid mutants - v_count = 0 + # total revert/comment/tweak mutants that were generated and compiled + total_mutant_counts = [0, 0, 0] + # total uncaught revert/comment/tweak mutants + uncaught_mutant_counts = [0, 0, 0] # lines those need not be mutated (taken from RR and CR) dont_mutate_lines = [] # mutation + target_contract = "SLITHER_SKIP_MUTATIONS" if contract_names else "" try: for compilation_unit_of_main_file in sl.compilation_units: - contract_instance = "" for contract in compilation_unit_of_main_file.contracts: - if contract_names is not None and contract.name in contract_names: - contract_instance = contract - elif str(contract.name).lower() == contract_name.lower(): - contract_instance = contract - if contract_instance == "": - logger.error("Can't find the contract") - else: - for M in mutators_list: - m = M( - compilation_unit_of_main_file, - int(timeout), - test_command, - test_directory, - contract_instance, - solc_remappings, - verbose, - output_folder, - dont_mutate_lines, - ) - (count_valid, count_invalid, lines_list) = m.mutate() - v_count += count_valid - total_count += count_valid + count_invalid - dont_mutate_lines = lines_list - if not quick_flag: - dont_mutate_lines = [] + if contract.name in contract_names and contract.name not in mutated_contracts: + target_contract = contract + break + if not contract_names and contract.name.lower() == file_name.lower(): + target_contract = contract + break + + if target_contract == "": + logger.info( + f"Cannot find contracts in file {filename}, try specifying them with --contract-names" + ) + continue + + if target_contract == "SLITHER_SKIP_MUTATIONS": + logger.debug(f"Skipping mutations in {filename}") + continue + + # TODO: find a more specific way to omit interfaces + # Ideally, we wouldn't depend on naming conventions + if target_contract.name.startswith("I"): + logger.debug(f"Skipping mutations on interface {filename}") + continue + + # Add our target to the mutation list + mutated_contracts.append(target_contract.name) + logger.info(blue(f"Mutating contract {target_contract}")) + for M in mutators_list: + m = M( + compilation_unit_of_main_file, + int(timeout), + test_command, + test_directory, + target_contract, + solc_remappings, + verbose, + very_verbose, + output_folder, + dont_mutate_lines, + ) + (total_counts, uncaught_counts, lines_list) = m.mutate() + + if m.NAME == "RR": + total_mutant_counts[0] += total_counts[0] + uncaught_mutant_counts[0] += uncaught_counts[0] + if verbose: + logger.info( + magenta( + f"Mutator {m.NAME} found {uncaught_counts[0]} uncaught revert mutants (out of {total_counts[0]} that compile)" + ) + ) + elif m.NAME == "CR": + total_mutant_counts[1] += total_counts[1] + uncaught_mutant_counts[1] += uncaught_counts[1] + if verbose: + logger.info( + magenta( + f"Mutator {m.NAME} found {uncaught_counts[1]} uncaught comment mutants (out of {total_counts[1]} that compile)" + ) + ) + else: + total_mutant_counts[2] += total_counts[2] + uncaught_mutant_counts[2] += uncaught_counts[2] + if verbose: + logger.info( + magenta( + f"Mutator {m.NAME} found {uncaught_counts[2]} uncaught tweak mutants (out of {total_counts[2]} that compile)" + ) + ) + logger.info( + magenta( + f"Running total: found {uncaught_mutant_counts[2]} uncaught tweak mutants (out of {total_mutant_counts[2]} that compile)" + ) + ) + + dont_mutate_lines = lines_list + if comprehensive_flag: + dont_mutate_lines = [] + except Exception as e: # pylint: disable=broad-except logger.error(e) + transfer_and_delete(files_dict) except KeyboardInterrupt: # transfer and delete the backup files if interrupted @@ -241,14 +342,59 @@ def main() -> (None): # pylint: disable=too-many-statements,too-many-branches,t # transfer and delete the backup files transfer_and_delete(files_dict) - # output - logger.info( - yellow( - f"Done mutating, '{filename}'. Valid mutant count: '{v_count}' and Total mutant count '{total_count}'.\n" + # log results for this file + logger.info(blue(f"Done mutating {target_contract}.")) + if total_mutant_counts[0] > 0: + logger.info( + magenta( + f"Revert mutants: {uncaught_mutant_counts[0]} uncaught of {total_mutant_counts[0]} ({100 * uncaught_mutant_counts[0]/total_mutant_counts[0]}%)" + ) ) - ) + else: + logger.info(magenta("Zero Revert mutants analyzed")) + + if total_mutant_counts[1] > 0: + logger.info( + magenta( + f"Comment mutants: {uncaught_mutant_counts[1]} uncaught of {total_mutant_counts[1]} ({100 * uncaught_mutant_counts[1]/total_mutant_counts[1]}%)" + ) + ) + else: + logger.info(magenta("Zero Comment mutants analyzed")) + + if total_mutant_counts[2] > 0: + logger.info( + magenta( + f"Tweak mutants: {uncaught_mutant_counts[2]} uncaught of {total_mutant_counts[2]} ({100 * uncaught_mutant_counts[2]/total_mutant_counts[2]}%)\n" + ) + ) + else: + logger.info(magenta("Zero Tweak mutants analyzed\n")) + + # Reset mutant counts before moving on to the next file + if very_verbose: + logger.info(blue("Reseting mutant counts to zero")) + total_mutant_counts[0] = 0 + total_mutant_counts[1] = 0 + total_mutant_counts[2] = 0 + uncaught_mutant_counts[0] = 0 + uncaught_mutant_counts[1] = 0 + uncaught_mutant_counts[2] = 0 + + # Print the total time elapsed in a human-readable time format + elapsed_time = round(time.time() - start_time) + hours, remainder = divmod(elapsed_time, 3600) + minutes, seconds = divmod(remainder, 60) + if hours > 0: + elapsed_string = f"{hours} {'hour' if hours == 1 else 'hours'}" + elif minutes > 0: + elapsed_string = f"{minutes} {'minute' if minutes == 1 else 'minutes'}" + else: + elapsed_string = f"{seconds} {'second' if seconds == 1 else 'seconds'}" - logger.info(magenta(f"Finished Mutation Campaign in '{args.codebase}' \n")) + logger.info( + blue(f"Finished mutation testing assessment of '{args.codebase}' in {elapsed_string}\n") + ) # endregion diff --git a/slither/tools/mutator/mutators/RR.py b/slither/tools/mutator/mutators/RR.py index e285d7a3f..ba76d657f 100644 --- a/slither/tools/mutator/mutators/RR.py +++ b/slither/tools/mutator/mutators/RR.py @@ -24,7 +24,7 @@ def _mutate(self) -> Dict: old_str = self.in_file_str[start:stop] line_no = node.source_mapping.lines if not line_no[0] in self.dont_mutate_line: - if old_str != "revert()": + if not old_str.lstrip().startswith("revert"): new_str = "revert()" create_patch_with_line( result, diff --git a/slither/tools/mutator/mutators/abstract_mutator.py b/slither/tools/mutator/mutators/abstract_mutator.py index 375af1e6f..69c77a4ca 100644 --- a/slither/tools/mutator/mutators/abstract_mutator.py +++ b/slither/tools/mutator/mutators/abstract_mutator.py @@ -1,10 +1,10 @@ import abc import logging +from pathlib import Path from typing import Optional, Dict, Tuple, List from slither.core.compilation_unit import SlitherCompilationUnit from slither.formatters.utils.patches import apply_patch, create_diff from slither.tools.mutator.utils.testing_generated_mutant import test_patch -from slither.utils.colors import yellow from slither.core.declarations import Contract logger = logging.getLogger("Slither-Mutate") @@ -19,8 +19,6 @@ class AbstractMutator( ): # pylint: disable=too-few-public-methods,too-many-instance-attributes NAME = "" HELP = "" - VALID_MUTANTS_COUNT = 0 - INVALID_MUTANTS_COUNT = 0 def __init__( # pylint: disable=too-many-arguments self, @@ -31,7 +29,8 @@ def __init__( # pylint: disable=too-many-arguments contract_instance: Contract, solc_remappings: str | None, verbose: bool, - output_folder: str, + very_verbose: bool, + output_folder: Path, dont_mutate_line: List[int], rate: int = 10, seed: Optional[int] = None, @@ -45,11 +44,16 @@ def __init__( # pylint: disable=too-many-arguments self.timeout = timeout self.solc_remappings = solc_remappings self.verbose = verbose + self.very_verbose = very_verbose self.output_folder = output_folder self.contract = contract_instance self.in_file = self.contract.source_mapping.filename.absolute self.in_file_str = self.contract.compilation_unit.core.source_code[self.in_file] self.dont_mutate_line = dont_mutate_line + # total revert/comment/tweak mutants that were generated and compiled + self.total_mutant_counts = [0, 0, 0] + # total uncaught revert/comment/tweak mutants + self.uncaught_mutant_counts = [0, 0, 0] if not self.NAME: raise IncorrectMutatorInitialization( @@ -71,50 +75,75 @@ def _mutate(self) -> Dict: """TODO Documentation""" return {} - def mutate(self) -> Tuple[int, int, List[int]]: + # pylint: disable=too-many-branches + def mutate(self) -> Tuple[List[int], List[int], List[int]]: # call _mutate function from different mutators (all_patches) = self._mutate() if "patches" not in all_patches: logger.debug("No patches found by %s", self.NAME) - return (0, 0, self.dont_mutate_line) + return ([0, 0, 0], [0, 0, 0], self.dont_mutate_line) - for file in all_patches["patches"]: + for file in all_patches["patches"]: # Note: This should only loop over a single file original_txt = self.slither.source_code[file].encode("utf8") patches = all_patches["patches"][file] patches.sort(key=lambda x: x["start"]) - logger.info(yellow(f"Mutating {file} with {self.NAME} \n")) for patch in patches: # test the patch - flag = test_patch( + patchWasCaught = test_patch( + self.output_folder, file, patch, self.test_command, - self.VALID_MUTANTS_COUNT, self.NAME, self.timeout, self.solc_remappings, self.verbose, + self.very_verbose, ) - # if RR or CR and valid mutant, add line no. - if self.NAME in ("RR", "CR") and flag: - self.dont_mutate_line.append(patch["line_number"]) - # count the valid and invalid mutants - if not flag: - self.INVALID_MUTANTS_COUNT += 1 - continue - self.VALID_MUTANTS_COUNT += 1 - patched_txt, _ = apply_patch(original_txt, patch, 0) - diff = create_diff(self.compilation_unit, original_txt, patched_txt, file) - if not diff: - logger.info(f"Impossible to generate patch; empty {patches}") - - # add valid mutant patches to a output file - with open( - self.output_folder + "/patches_file.txt", "a", encoding="utf8" - ) as patches_file: - patches_file.write(diff + "\n") - return ( - self.VALID_MUTANTS_COUNT, - self.INVALID_MUTANTS_COUNT, - self.dont_mutate_line, - ) + + # count the uncaught mutants, flag RR/CR mutants to skip further mutations + if patchWasCaught == 0: + if self.NAME == "RR": + self.uncaught_mutant_counts[0] += 1 + self.dont_mutate_line.append(patch["line_number"]) + elif self.NAME == "CR": + self.uncaught_mutant_counts[1] += 1 + self.dont_mutate_line.append(patch["line_number"]) + else: + self.uncaught_mutant_counts[2] += 1 + + patched_txt, _ = apply_patch(original_txt, patch, 0) + diff = create_diff(self.compilation_unit, original_txt, patched_txt, file) + if not diff: + logger.info(f"Impossible to generate patch; empty {patches}") + + # add uncaught mutant patches to a output file + with (self.output_folder / "patches_files.txt").open( + "a", encoding="utf8" + ) as patches_file: + patches_file.write(diff + "\n") + + # count the total number of mutants that we were able to compile + if patchWasCaught != 2: + if self.NAME == "RR": + self.total_mutant_counts[0] += 1 + elif self.NAME == "CR": + self.total_mutant_counts[1] += 1 + else: + self.total_mutant_counts[2] += 1 + + if self.very_verbose: + if self.NAME == "RR": + logger.info( + f"Found {self.uncaught_mutant_counts[0]} uncaught revert mutants so far (out of {self.total_mutant_counts[0]} that compile)" + ) + elif self.NAME == "CR": + logger.info( + f"Found {self.uncaught_mutant_counts[1]} uncaught comment mutants so far (out of {self.total_mutant_counts[1]} that compile)" + ) + else: + logger.info( + f"Found {self.uncaught_mutant_counts[2]} uncaught tweak mutants so far (out of {self.total_mutant_counts[2]} that compile)" + ) + + return (self.total_mutant_counts, self.uncaught_mutant_counts, self.dont_mutate_line) diff --git a/slither/tools/mutator/utils/file_handling.py b/slither/tools/mutator/utils/file_handling.py index ddb3efb50..7c02ce099 100644 --- a/slither/tools/mutator/utils/file_handling.py +++ b/slither/tools/mutator/utils/file_handling.py @@ -1,106 +1,101 @@ -import os -from typing import Dict, List +import traceback +from typing import Dict, List, Union import logging +from pathlib import Path +import hashlib logger = logging.getLogger("Slither-Mutate") -duplicated_files = {} +HashedPath = str +backuped_files: Dict[str, HashedPath] = {} -def backup_source_file(source_code: Dict, output_folder: str) -> Dict: +def backup_source_file(source_code: Dict, output_folder: Path) -> Dict[str, HashedPath]: """ function to backup the source file returns: dictionary of duplicated files """ - os.makedirs(output_folder, exist_ok=True) - + output_folder.mkdir(exist_ok=True, parents=True) for file_path, content in source_code.items(): - directory, filename = os.path.split(file_path) - new_filename = f"{output_folder}/backup_{filename}" - new_file_path = os.path.join(directory, new_filename) + path_hash = hashlib.md5(bytes(file_path, "utf8")).hexdigest() + (output_folder / path_hash).write_text(content, encoding="utf8") - with open(new_file_path, "w", encoding="utf8") as new_file: - new_file.write(content) - duplicated_files[file_path] = new_file_path + backuped_files[file_path] = (output_folder / path_hash).as_posix() - return duplicated_files + return backuped_files -def transfer_and_delete(files_dict: Dict) -> None: +def transfer_and_delete(files_dict: Dict[str, HashedPath]) -> None: """function to transfer the original content to the sol file after campaign""" try: files_dict_copy = files_dict.copy() - for item, value in files_dict_copy.items(): - with open(value, "r", encoding="utf8") as duplicated_file: + for original_path, hashed_path in files_dict_copy.items(): + with open(hashed_path, "r", encoding="utf8") as duplicated_file: content = duplicated_file.read() - with open(item, "w", encoding="utf8") as original_file: + with open(original_path, "w", encoding="utf8") as original_file: original_file.write(content) - os.remove(value) + Path(hashed_path).unlink() # delete elements from the global dict - del duplicated_files[item] + del backuped_files[original_path] - except Exception as e: # pylint: disable=broad-except - logger.error(f"Error transferring content: {e}") + except FileNotFoundError as e: # pylint: disable=broad-except + logger.error("Error transferring content: %s", e) + + +global_counter = {} -def create_mutant_file(file: str, count: int, rule: str) -> None: +def create_mutant_file(output_folder: Path, file: str, rule: str) -> None: """function to create new mutant file""" try: - _, filename = os.path.split(file) + if rule not in global_counter: + global_counter[rule] = 0 + + file_path = Path(file) # Read content from the duplicated file - with open(file, "r", encoding="utf8") as source_file: - content = source_file.read() + content = file_path.read_text(encoding="utf8") # Write content to the original file - mutant_name = filename.split(".")[0] - + mutant_name = file_path.stem # create folder for each contract - os.makedirs("mutation_campaign/" + mutant_name, exist_ok=True) - with open( - "mutation_campaign/" - + mutant_name - + "/" - + mutant_name - + "_" - + rule - + "_" - + str(count) - + ".sol", - "w", - encoding="utf8", - ) as mutant_file: + mutation_dir = output_folder / mutant_name + mutation_dir.mkdir(parents=True, exist_ok=True) + + mutation_filename = f"{mutant_name}_{rule}_{global_counter[rule]}.sol" + with (mutation_dir / mutation_filename).open("w", encoding="utf8") as mutant_file: mutant_file.write(content) + global_counter[rule] += 1 # reset the file - with open(duplicated_files[file], "r", encoding="utf8") as duplicated_file: - duplicate_content = duplicated_file.read() + duplicate_content = Path(backuped_files[file]).read_text("utf8") with open(file, "w", encoding="utf8") as source_file: source_file.write(duplicate_content) except Exception as e: # pylint: disable=broad-except logger.error(f"Error creating mutant: {e}") + traceback_str = traceback.format_exc() + logger.error(traceback_str) # Log the stack trace def reset_file(file: str) -> None: """function to reset the file""" try: - # directory, filename = os.path.split(file) # reset the file - with open(duplicated_files[file], "r", encoding="utf8") as duplicated_file: + with open(backuped_files[file], "r", encoding="utf8") as duplicated_file: duplicate_content = duplicated_file.read() with open(file, "w", encoding="utf8") as source_file: source_file.write(duplicate_content) except Exception as e: # pylint: disable=broad-except - logger.error(f"Error resetting file: {e}") + logger.error("Error resetting file: %s", e) -def get_sol_file_list(codebase: str, ignore_paths: List[str] | None) -> List[str]: +def get_sol_file_list(codebase: Path, ignore_paths: Union[List[str], None]) -> List[str]: """ function to get the contracts list returns: list of .sol files @@ -110,21 +105,13 @@ def get_sol_file_list(codebase: str, ignore_paths: List[str] | None) -> List[str ignore_paths = [] # if input is contract file - if os.path.isfile(codebase): - return [codebase] + if codebase.is_file(): + return [codebase.as_posix()] # if input is folder - if os.path.isdir(codebase): - directory = os.path.abspath(codebase) - for file in os.listdir(directory): - filename = os.path.join(directory, file) - if os.path.isfile(filename): - sol_file_list.append(filename) - elif os.path.isdir(filename): - _, dirname = os.path.split(filename) - if dirname in ignore_paths: - continue - for i in get_sol_file_list(filename, ignore_paths): - sol_file_list.append(i) + if codebase.is_dir(): + for file_name in codebase.rglob("*.sol"): + if not any(part in ignore_paths for part in file_name.parts): + sol_file_list.append(file_name.as_posix()) return sol_file_list diff --git a/slither/tools/mutator/utils/testing_generated_mutant.py b/slither/tools/mutator/utils/testing_generated_mutant.py index 4c51b7e5a..39e7d39de 100644 --- a/slither/tools/mutator/utils/testing_generated_mutant.py +++ b/slither/tools/mutator/utils/testing_generated_mutant.py @@ -1,12 +1,11 @@ -import subprocess -import os import logging -import time -import signal -from typing import Dict +import sys +import subprocess +from pathlib import Path +from typing import Dict, Union import crytic_compile from slither.tools.mutator.utils.file_handling import create_mutant_file, reset_file -from slither.utils.colors import green, red +from slither.utils.colors import green, red, yellow logger = logging.getLogger("Slither-Mutate") @@ -23,13 +22,12 @@ def compile_generated_mutant(file_path: str, mappings: str) -> bool: return False -def run_test_cmd(cmd: str, test_dir: str, timeout: int) -> bool: +def run_test_cmd(cmd: str, timeout: int | None, target_file: str | None, verbose: bool) -> bool: """ function to run codebase tests returns: boolean whether the tests passed or not """ - # future purpose - _ = test_dir + # add --fail-fast for foundry tests, to exit after first failure if "forge test" in cmd and "--fail-fast" not in cmd: cmd += " --fail-fast" @@ -37,41 +35,62 @@ def run_test_cmd(cmd: str, test_dir: str, timeout: int) -> bool: elif "hardhat test" in cmd or "truffle test" in cmd and "--bail" not in cmd: cmd += " --bail" - start = time.time() + if timeout is None and "hardhat" not in cmd: # hardhat doesn't support --force flag on tests + # if no timeout, ensure all contracts are recompiled w/out using any cache + cmd += " --force" + + try: + result = subprocess.run( + cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=timeout, + check=False, # True: Raises a CalledProcessError if the return code is non-zero + ) + + except subprocess.TimeoutExpired: + # Timeout, treat this as a test failure + logger.info("Tests took too long, consider increasing the timeout") + result = None # or set result to a default value + + except KeyboardInterrupt: + logger.info(yellow("Ctrl-C received")) + if target_file is not None: + logger.info("Restoring original files") + reset_file(target_file) + logger.info("Exiting") + sys.exit(1) + + # if result is 0 then it is an uncaught mutant because tests didn't fail + if result: + code = result.returncode + if code == 0: + return True - # starting new process - with subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as P: - try: - # checking whether the process is completed or not within 30 seconds(default) - while P.poll() is None and (time.time() - start) < timeout: - time.sleep(0.05) - finally: - if P.poll() is None: - logger.error("HAD TO TERMINATE ANALYSIS (TIMEOUT OR EXCEPTION)") - # sends a SIGTERM signal to process group - bascially killing the process - os.killpg(os.getpgid(P.pid), signal.SIGTERM) - # Avoid any weird race conditions from grabbing the return code - time.sleep(0.05) - # indicates whether the command executed sucessfully or not - r = P.returncode + # If tests fail in verbose-mode, print both stdout and stderr for easier debugging + if verbose: + logger.info(yellow(result.stdout.decode("utf-8"))) + logger.info(red(result.stderr.decode("utf-8"))) - # if r is 0 then it is valid mutant because tests didn't fail - return r == 0 + return False +# return 0 if uncaught, 1 if caught, and 2 if compilation fails def test_patch( # pylint: disable=too-many-arguments + output_folder: Path, file: str, patch: Dict, command: str, - index: int, generator_name: str, timeout: int, - mappings: str | None, + mappings: Union[str, None], verbose: bool, -) -> bool: + very_verbose: bool, +) -> int: """ - function to verify the validity of each patch - returns: valid or invalid patch + function to verify whether each patch is caught by tests + returns: 0 (uncaught), 1 (caught), or 2 (compilation failure) """ with open(file, "r", encoding="utf-8") as filepath: content = filepath.read() @@ -80,21 +99,36 @@ def test_patch( # pylint: disable=too-many-arguments # Write the modified content back to the file with open(file, "w", encoding="utf-8") as filepath: filepath.write(replaced_content) + if compile_generated_mutant(file, mappings): - if run_test_cmd(command, file, timeout): - create_mutant_file(file, index, generator_name) - print( - green( - f"String '{patch['old_string']}' replaced with '{patch['new_string']}' at line no. '{patch['line_number']}' ---> VALID\n" + if run_test_cmd(command, timeout, file, False): + + create_mutant_file(output_folder, file, generator_name) + logger.info( + red( + f"[{generator_name}] Line {patch['line_number']}: '{patch['old_string']}' ==> '{patch['new_string']}' --> UNCAUGHT" ) ) - return True + reset_file(file) + + return 0 # uncaught + else: + if very_verbose: + logger.info( + yellow( + f"[{generator_name}] Line {patch['line_number']}: '{patch['old_string']}' ==> '{patch['new_string']}' --> COMPILATION FAILURE" + ) + ) + + reset_file(file) + return 2 # compile failure - reset_file(file) if verbose: - print( - red( - f"String '{patch['old_string']}' replaced with '{patch['new_string']}' at line no. '{patch['line_number']}' ---> INVALID\n" + logger.info( + green( + f"[{generator_name}] Line {patch['line_number']}: '{patch['old_string']}' ==> '{patch['new_string']}' --> CAUGHT" ) ) - return False + + reset_file(file) + return 1 # caught