diff --git a/src/fuzz_introspector/frontends/datatypes.py b/src/fuzz_introspector/frontends/datatypes.py new file mode 100644 index 000000000..a6469ddb4 --- /dev/null +++ b/src/fuzz_introspector/frontends/datatypes.py @@ -0,0 +1,65 @@ +# Copyright 2025 Fuzz Introspector Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +################################################################################ + +from typing import Any, Optional + + +class Project(): + """Wrapper for doing analysis of a collection of source files.""" + + def __init__(self, source_code_files: list[Any]): + self.source_code_files = source_code_files + + def dump_module_logic(self, + report_name: str, + entry_function: str = '', + harness_name: str = '', + harness_source: str = '', + dump_output: bool = True): + """Dumps the data for the module in full.""" + # Dummy function for subclasses + pass + + def extract_calltree(self, + source_file: str = '', + source_code: Optional[Any] = None, + function: Optional[str] = None, + visited_functions: Optional[set[str]] = None, + depth: int = 0, + line_number: int = -1, + other_props: Optional[dict[str, Any]] = None) -> str: + """Extracts calltree string of a calltree so that FI core can use it.""" + # Dummy function for subclasses + return '' + + def get_reachable_functions( + self, + source_file: str = '', + source_code: Optional[Any] = None, + function: Optional[str] = None, + visited_functions: Optional[set[str]] = None) -> set[str]: + """Get a list of reachable functions for a provided function name.""" + # Dummy function for subclasses + return set() + + def get_source_codes_with_harnesses(self) -> list[Any]: + """Gets the source codes that holds libfuzzer harnesses.""" + harnesses = [] + for source_code in self.source_code_files: + if source_code.has_libfuzzer_harness(): + harnesses.append(source_code) + + return harnesses diff --git a/src/fuzz_introspector/frontends/frontend_c.py b/src/fuzz_introspector/frontends/frontend_c.py index f440a9669..98145f32e 100644 --- a/src/fuzz_introspector/frontends/frontend_c.py +++ b/src/fuzz_introspector/frontends/frontend_c.py @@ -25,6 +25,8 @@ from typing import Any, Optional, Set +from fuzz_introspector.frontends.datatypes import Project + logger = logging.getLogger(name=__name__) tree_sitter_languages = {'c': Language(tree_sitter_c.language())} @@ -32,16 +34,15 @@ language_parsers = {'c': Parser(Language(tree_sitter_c.language()))} -class Project(): +class CProject(Project): """Wrapper for doing analysis of a collection of source files.""" - def __init__(self, source_code_files): - self.source_code_files = source_code_files - def dump_module_logic(self, report_name, entry_function: str = '', - harness_source: str = ''): + harness_name: str = '', + harness_source: str = '', + dump_output: bool = True): """Dumps the data for the module in full.""" logger.info('Dumping project-wide logic.') report: dict[str, Any] = {'report': 'name'} @@ -117,16 +118,9 @@ def dump_module_logic(self, report['All functions']['Elements'] = function_list report['included-header-files'] = list(included_header_files) - with open(report_name, 'w', encoding='utf-8') as f: - f.write(yaml.dump(report)) - - def get_source_codes_with_harnesses(self): - """Gets the source codes that holds libfuzzer harnesses.""" - harnesses = [] - for source_code in self.source_code_files: - if source_code.has_libfuzzer_harness(): - harnesses.append(source_code) - return harnesses + if dump_output: + with open(report_name, 'w', encoding='utf-8') as f: + f.write(yaml.dump(report)) def get_source_code_with_target(self, target_func_name): for source_code in self.source_code_files: @@ -137,11 +131,13 @@ def get_source_code_with_target(self, target_func_name): return None def extract_calltree(self, - source_code=None, - function=None, - visited_functions=None, - depth=0, - line_number=-1): + source_file: str = '', + source_code: Optional[Any] = None, + function: Optional[str] = None, + visited_functions: Optional[set[str]] = None, + depth: int = 0, + line_number: int = -1, + other_props: Optional[dict[str, Any]] = None) -> str: """Extracts calltree string of a calltree so that FI core can use it.""" # Create calltree from a given function # Find the function in the source code @@ -185,7 +181,8 @@ def extract_calltree(self, def get_reachable_functions( self, - source_code: Optional['SourceCodeFile'] = None, + source_file: str = '', + source_code: Optional[Any] = None, function: Optional[str] = None, visited_functions: Optional[set[str]] = None) -> Set[str]: """Gets the reachable frunctions from a given function.""" diff --git a/src/fuzz_introspector/frontends/frontend_cpp.py b/src/fuzz_introspector/frontends/frontend_cpp.py index a17688391..97c3484a3 100644 --- a/src/fuzz_introspector/frontends/frontend_cpp.py +++ b/src/fuzz_introspector/frontends/frontend_cpp.py @@ -14,7 +14,7 @@ # ################################################################################ -from typing import Any, Optional, Set, List +from typing import Any, Optional import os import logging @@ -23,6 +23,8 @@ import tree_sitter_cpp import yaml +from fuzz_introspector.frontends.datatypes import Project + logger = logging.getLogger(name=__name__) LOG_FMT = '%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s' @@ -570,16 +572,18 @@ def extract_callsites(self, project): self.detailed_callsites.append({'Src': src_loc, 'Dst': dst}) -class Project(): +class CppProject(Project): """Wrapper for doing analysis of a collection of source files.""" def __init__(self, source_code_files: list[SourceCodeFile]): - self.source_code_files = source_code_files - self.all_functions: List[FunctionDefinition] = [] + super().__init__(source_code_files) + self.all_functions: list[FunctionDefinition] = [] def dump_module_logic(self, report_name: str, - harness_name: Optional[str] = None, + entry_function: str = '', + harness_name: str = '', + harness_source: str = '', dump_output=True): """Dumps the data for the module in full.""" logger.info('Dumping project-wide logic.') @@ -652,14 +656,6 @@ def dump_module_logic(self, with open(report_name, 'w', encoding='utf-8') as f: f.write(yaml.dump(report)) - def get_source_codes_with_harnesses(self) -> list[SourceCodeFile]: - """Gets the source codes that holds libfuzzer harnesses.""" - harnesses = [] - for source_code in self.source_code_files: - if source_code.has_libfuzzer_harness(): - harnesses.append(source_code) - return harnesses - def get_function_from_name(self, function_name): for func in self.all_functions: if func.name == function_name: @@ -668,12 +664,13 @@ def get_function_from_name(self, function_name): return None def extract_calltree(self, - source_file: str, - source_code: Optional[SourceCodeFile] = None, + source_file: str = '', + source_code: Optional[Any] = None, function: Optional[str] = None, visited_functions: Optional[set[str]] = None, depth: int = 0, - line_number: int = -1) -> str: + line_number: int = -1, + other_props: Optional[dict[str, Any]] = None) -> str: """Extracts calltree string of a calltree so that FI core can use it.""" # Create calltree from a given function # Find the function in the source code @@ -744,11 +741,12 @@ def extract_calltree(self, logger.debug('Done') return line_to_print - def get_reachable_functions(self, - source_code: Optional[SourceCodeFile] = None, - function: Optional[str] = None, - visited_functions: Optional[set[str]] = None, - depth: int = 0) -> Set[str]: + def get_reachable_functions( + self, + source_file: str = '', + source_code: Optional[Any] = None, + function: Optional[str] = None, + visited_functions: Optional[set[str]] = None) -> set[str]: """Gets the reachable frunctions from a given function.""" # Create calltree from a given function # Find the function in the source code @@ -789,7 +787,6 @@ def get_reachable_functions(self, source_code=source_code, function=cs, visited_functions=visited_functions, - depth=depth + 1, ) return visited_functions @@ -923,7 +920,7 @@ def analyse_source_code(source_content: str) -> SourceCodeFile: def get_function_node(target_name: str, - function_list: List[FunctionDefinition], + function_list: list[FunctionDefinition], one_layer_only: bool = False, namespace: str = '') -> Optional[FunctionDefinition]: """Helper to retrieve the RustFunction object of a function.""" diff --git a/src/fuzz_introspector/frontends/frontend_go.py b/src/fuzz_introspector/frontends/frontend_go.py index 357308443..a5b609bc1 100644 --- a/src/fuzz_introspector/frontends/frontend_go.py +++ b/src/fuzz_introspector/frontends/frontend_go.py @@ -15,7 +15,7 @@ ################################################################################ """Fuzz Introspector Light frontend for Go""" -from typing import Optional +from typing import Any, Optional import logging @@ -23,7 +23,7 @@ import tree_sitter_go import yaml -from typing import Any +from fuzz_introspector.frontends.datatypes import Project logger = logging.getLogger(name=__name__) @@ -178,11 +178,12 @@ def get_entry_function_name(self) -> str: return '' -class Project(): +class GoProject(Project): """Wrapper for doing analysis of a collection of source files.""" def __init__(self, source_code_files: list[SourceCodeFile]): - self.source_code_files = source_code_files + super().__init__(source_code_files) + full_functions_methods = [ item for src in source_code_files for item in src.functions + src.methods @@ -195,7 +196,9 @@ def __init__(self, source_code_files: list[SourceCodeFile]): def dump_module_logic(self, report_name: str, entry_function: str = '', - harness_source: str = ''): + harness_name: str = '', + harness_source: str = '', + dump_output: bool = True): """Dumps the data for the module in full.""" logger.info('Dumping project-wide logic.') report: dict[str, Any] = {'report': 'name'} @@ -265,24 +268,18 @@ def dump_module_logic(self, report['All functions'] = {} report['All functions']['Elements'] = function_list - with open(report_name, 'w', encoding='utf-8') as f: - f.write(yaml.dump(report)) - - def get_source_codes_with_harnesses(self) -> list[SourceCodeFile]: - """Gets the source codes that holds libfuzzer harnesses.""" - harnesses = [] - for source_code in self.source_code_files: - if source_code.has_libfuzzer_harness(): - harnesses.append(source_code) - return harnesses + if dump_output: + with open(report_name, 'w', encoding='utf-8') as f: + f.write(yaml.dump(report)) def extract_calltree(self, - source_file: str, - source_code: Optional[SourceCodeFile] = None, + source_file: str = '', + source_code: Optional[Any] = None, function: Optional[str] = None, visited_functions: Optional[set[str]] = None, depth: int = 0, - line_number: int = -1) -> str: + line_number: int = -1, + other_props: Optional[dict[str, Any]] = None) -> str: """Extracts calltree string of a calltree so that FI core can use it.""" if not visited_functions: visited_functions = set() @@ -329,8 +326,8 @@ def extract_calltree(self, def get_reachable_functions( self, - source_file: str, - source_code: Optional[SourceCodeFile] = None, + source_file: str = '', + source_code: Optional[Any] = None, function: Optional[str] = None, visited_functions: Optional[set[str]] = None) -> set[str]: """Get a list of reachable functions for a provided function name.""" diff --git a/src/fuzz_introspector/frontends/frontend_jvm.py b/src/fuzz_introspector/frontends/frontend_jvm.py index 1bf77764e..36ad2e995 100644 --- a/src/fuzz_introspector/frontends/frontend_jvm.py +++ b/src/fuzz_introspector/frontends/frontend_jvm.py @@ -15,7 +15,7 @@ ################################################################################ """Fuzz Introspector Light frontend for Java""" -from typing import Optional +from typing import Any, Optional import logging @@ -23,7 +23,7 @@ import tree_sitter_java import yaml -from typing import Any +from fuzz_introspector.frontends.datatypes import Project logger = logging.getLogger(name=__name__) @@ -1008,19 +1008,21 @@ def has_method_definition( return False, None -class Project(): +class JvmProject(Project): """Wrapper for doing analysis of a collection of source files.""" def __init__(self, source_code_files: list[SourceCodeFile]): - self.source_code_files = source_code_files + super().__init__(source_code_files) self.all_classes = [] for source_code in self.source_code_files: self.all_classes.extend(source_code.classes) def dump_module_logic(self, report_name: str, - harness_name: Optional[str] = None, - harness_source: str = ''): + entry_function: str = '', + harness_name: str = '', + harness_source: str = '', + dump_output: bool = True): """Dumps the data for the module in full.""" logger.info('Dumping project-wide logic.') report: dict[str, Any] = {'report': 'name'} @@ -1043,9 +1045,9 @@ def dump_module_logic(self, # Log entry method if provided if harness_name and source_code.has_class(harness_name): - entry_method = source_code.get_entry_method_name(True) - if entry_method: - report['Fuzzing method'] = entry_method + entry_function = source_code.get_entry_method_name(True) + if entry_function: + report['Fuzzing method'] = entry_function # Retrieve full proejct methods and information methods = source_code.get_all_methods() @@ -1122,17 +1124,9 @@ def dump_module_logic(self, report['All functions'] = {} report['All functions']['Elements'] = method_list - with open(report_name, 'w', encoding='utf-8') as f: - f.write(yaml.dump(report)) - - def get_source_codes_with_harnesses(self) -> list[SourceCodeFile]: - """Gets the source codes that holds libfuzzer harnesses.""" - harnesses = [] - for source_code in self.source_code_files: - if source_code.has_libfuzzer_harness(): - harnesses.append(source_code) - - return harnesses + if dump_output: + with open(report_name, 'w', encoding='utf-8') as f: + f.write(yaml.dump(report)) def find_source_with_method(self, name: str) -> Optional[SourceCodeFile]: """Finds the source code with a given method name.""" @@ -1186,94 +1180,94 @@ def _recursive_method_depth(method: JavaMethod) -> int: return method_depth def extract_calltree(self, - source_file: str, - source_code: Optional[SourceCodeFile] = None, - method: Optional[str] = None, - visited_methods: Optional[set[str]] = None, + source_file: str = '', + source_code: Optional[Any] = None, + function: Optional[str] = None, + visited_functions: Optional[set[str]] = None, depth: int = 0, - line_number: int = -1) -> str: + line_number: int = -1, + other_props: Optional[dict[str, Any]] = None) -> str: """Extracts calltree string of a calltree so that FI core can use it.""" - if not visited_methods: - visited_methods = set() + if not visited_functions: + visited_functions = set() - if not source_code and method: - source_code = self.find_source_with_method(method) + if not source_code and function: + source_code = self.find_source_with_method(function) - if not method and source_code: - method = source_code.get_entry_method_name(True) + if not function and source_code: + function = source_code.get_entry_method_name(True) - if not method: + if not function: return '' line_to_print = ' ' * depth - line_to_print += method + line_to_print += function line_to_print += ' ' line_to_print += source_file - line_to_print += ' ' line_to_print += str(line_number) - line_to_print += '\n' + if not source_code: return line_to_print - method_node = source_code.get_method_node(method) - if not method_node: + function_node = source_code.get_method_node(function) + if not function_node: return line_to_print - callsites = method_node.base_callsites + callsites = function_node.base_callsites - if method in visited_methods: + if function in visited_functions: return line_to_print - visited_methods.add(method) + visited_functions.add(function) for cs, line_number in callsites: line_to_print += self.extract_calltree( source_code.source_file, - method=cs, - visited_methods=visited_methods, + function=cs, + visited_functions=visited_functions, depth=depth + 1, line_number=line_number) return line_to_print - def get_reachable_methods( + def get_reachable_functions( self, - source_file: str, - source_code: Optional[SourceCodeFile] = None, - method: Optional[str] = None, - visited_methods: Optional[set[str]] = None) -> set[str]: + source_file: str = '', + source_code: Optional[Any] = None, + function: Optional[str] = None, + visited_functions: Optional[set[str]] = None) -> set[str]: """Get a list of reachable functions for a provided function name.""" - if not visited_methods: - visited_methods = set() + if not visited_functions: + visited_functions = set() - if not source_code and method: - source_code = self.find_source_with_method(method) + if not source_code and function: + source_code = self.find_source_with_method(function) - if not method and source_code: - method = source_code.get_entry_method_name(True) + if not function and source_code: + function = source_code.get_entry_method_name(True) - if source_code and method: - method_node = source_code.get_method_node(method) - if not method_node: - visited_methods.add(method) - return visited_methods + if source_code and function: + function_node = source_code.get_method_node(function) + if not function_node: + visited_functions.add(function) + return visited_functions else: - if method: - visited_methods.add(method) - return visited_methods + if function: + visited_functions.add(function) + return visited_functions - visited_methods.add(method) - for cs, _ in method_node.base_callsites: - if cs in visited_methods: + visited_functions.add(function) + for cs, _ in function_node.base_callsites: + if cs in visited_functions: continue - visited_methods = self.get_reachable_methods( + visited_functions = self.get_reachable_functions( source_code.source_file, - method=cs, - visited_methods=visited_methods) + function=cs, + visited_functions=visited_functions) - return visited_methods + return visited_functions def load_treesitter_trees(source_files: list[str], diff --git a/src/fuzz_introspector/frontends/frontend_rust.py b/src/fuzz_introspector/frontends/frontend_rust.py index f5114122f..b6cacf767 100644 --- a/src/fuzz_introspector/frontends/frontend_rust.py +++ b/src/fuzz_introspector/frontends/frontend_rust.py @@ -23,6 +23,8 @@ import tree_sitter_rust import yaml +from fuzz_introspector.frontends.datatypes import Project + logger = logging.getLogger(name=__name__) LOG_FMT = '%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s' @@ -585,16 +587,15 @@ def _process_callsites(stmt: Node) -> list[tuple[str, int, int]]: self.detailed_callsites.append({'Src': src_loc, 'Dst': dst}) -class Project(): +class RustProject(Project): """Wrapper for doing analysis of a collection of source files.""" - def __init__(self, source_code_files: list[SourceCodeFile]): - self.source_code_files = source_code_files - def dump_module_logic(self, report_name: str, - harness_name: Optional[str] = None, - harness_source: str = ''): + entry_function: str = '', + harness_name: str = '', + harness_source: str = '', + dump_output: bool = True): """Dumps the data for the module in full.""" logger.info('Dumping project-wide logic.') report: dict[str, Any] = {'report': 'name'} @@ -671,8 +672,9 @@ def dump_module_logic(self, report['All functions'] = {} report['All functions']['Elements'] = func_list - with open(report_name, 'w', encoding='utf-8') as f: - f.write(yaml.dump(report)) + if dump_output: + with open(report_name, 'w', encoding='utf-8') as f: + f.write(yaml.dump(report)) def _find_source_with_function(self, name: str) -> Optional[SourceCodeFile]: @@ -732,38 +734,43 @@ def _recursive_function_depth(function: RustFunction) -> int: return func_depth def extract_calltree(self, - source_file: str, - source_code: Optional[SourceCodeFile] = None, - func: Optional[str] = None, - visited_funcs: Optional[set[str]] = None, + source_file: str = '', + source_code: Optional[Any] = None, + function: Optional[str] = None, + visited_functions: Optional[set[str]] = None, depth: int = 0, line_number: int = -1, - is_macro: bool = False) -> str: + other_props: Optional[dict[str, Any]] = None) -> str: """Extracts calltree string of a calltree so that FI core can use it.""" func_node = None - if not visited_funcs: - visited_funcs = set() + if other_props: + is_macro = other_props.get('is_macro', False) + else: + is_macro = False + + if not visited_functions: + visited_functions = set() - if not source_code and func: - source_code = self._find_source_with_function(func) + if not source_code and function: + source_code = self._find_source_with_function(function) - if not func and source_code: + if not function and source_code: func_node = source_code.get_entry_function() if func_node: - func = func_node.name + function = func_node.name - if func: + if function: if not func_node: - func_node = get_function_node(func, self.all_functions) + func_node = get_function_node(function, self.all_functions) if func_node and not is_macro: func_name = func_node.name - if func.count('::') > func_name.count('::'): - func_name = func + if function.count('::') > func_name.count('::'): + func_name = function else: func_node = None - func_name = func + func_name = function else: return '' @@ -771,27 +778,28 @@ def extract_calltree(self, line_to_print += func_name line_to_print += ' ' line_to_print += source_file - line_to_print += ' ' line_to_print += str(line_number) - line_to_print += '\n' - if func in visited_funcs or not func_node or not source_code or not func: + if function in visited_functions or not func_node or not source_code or not function: return line_to_print callsites = func_node.base_callsites - visited_funcs.add(func) + visited_functions.add(function) for cs, line_number in callsites: is_macro = bool(func_node and func_node.is_macro and func_node.name != 'fuzz_target') - line_to_print += self.extract_calltree(source_code.source_file, - func=cs, - visited_funcs=visited_funcs, - depth=depth + 1, - line_number=line_number, - is_macro=is_macro) + other_props = {} + other_props['is_macro'] = is_macro + line_to_print += self.extract_calltree( + source_code.source_file, + function=cs, + visited_functions=visited_functions, + depth=depth + 1, + line_number=line_number, + other_props=other_props) return line_to_print @@ -806,45 +814,47 @@ def get_source_codes_with_harnesses(self) -> list[SourceCodeFile]: def get_reachable_functions( self, - source_file: str, - source_code: Optional[SourceCodeFile] = None, - func: Optional[str] = None, - visited_funcs: Optional[set[str]] = None) -> set[str]: + source_file: str = '', + source_code: Optional[Any] = None, + function: Optional[str] = None, + visited_functions: Optional[set[str]] = None) -> set[str]: """Get a list of reachable functions for a provided function name.""" func_node = None - if not visited_funcs: - visited_funcs = set() + if not visited_functions: + visited_functions = set() - if not source_code and func: - source_code = self._find_source_with_function(func) + if not source_code and function: + source_code = self._find_source_with_function(function) - if not func and source_code: + if not function and source_code: func_node = source_code.get_entry_function() if func_node: - func = func_node.name + function = func_node.name - if source_code and func: + if source_code and function: if not func_node: - func_node = get_function_node(func, self.all_functions) + func_node = get_function_node(function, self.all_functions) if not func_node: - visited_funcs.add(func) - return visited_funcs + visited_functions.add(function) + return visited_functions else: - if func: - visited_funcs.add(func) - return visited_funcs + if function: + visited_functions.add(function) + return visited_functions - visited_funcs.add(func) + visited_functions.add(function) for cs, _ in func_node.base_callsites: - if cs in visited_funcs: + if cs in visited_functions: continue - visited_funcs = self.get_reachable_functions( - source_code.source_file, func=cs, visited_funcs=visited_funcs) + visited_functions = self.get_reachable_functions( + source_code.source_file, + function=cs, + visited_functions=visited_functions) - return visited_funcs + return visited_functions def load_treesitter_trees(source_files: list[str], diff --git a/src/fuzz_introspector/frontends/oss_fuzz.py b/src/fuzz_introspector/frontends/oss_fuzz.py index aa1098843..176484034 100644 --- a/src/fuzz_introspector/frontends/oss_fuzz.py +++ b/src/fuzz_introspector/frontends/oss_fuzz.py @@ -19,13 +19,12 @@ import pathlib import logging -from typing import Any - from fuzz_introspector.frontends import frontend_c from fuzz_introspector.frontends import frontend_cpp from fuzz_introspector.frontends import frontend_go from fuzz_introspector.frontends import frontend_jvm from fuzz_introspector.frontends import frontend_rust +from fuzz_introspector.frontends.datatypes import Project logger = logging.getLogger(name=__name__) LOG_FMT = '%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s' @@ -93,10 +92,8 @@ def process_c_project(target_dir: str, out: str, source_files: list[str], module_only: bool = False, - dump_output=True) -> frontend_c.Project: + dump_output: bool = True) -> Project: """Process a project in C language""" - calltrees = [] - # Default entrypoint if not entrypoint: entrypoint = 'LLVMFuzzerTestOneInput' @@ -107,13 +104,14 @@ def process_c_project(target_dir: str, source_codes = frontend_c.load_treesitter_trees(source_files) logger.info('Creating base project.') - project = frontend_c.Project(source_codes) + project = frontend_c.CProject(source_codes) # We may not need to do this, but will do it while refactoring into # the new frontends. if not project.get_source_codes_with_harnesses(): target = os.path.join(out, 'fuzzerLogFile-0.data.yaml') - project.dump_module_logic(target, 'no-harness-in-project', target_dir) + project.dump_module_logic(target, 'no-harness-in-project', '', + target_dir, dump_output) with open(os.path.join(out, 'fuzzerLogFile-0.data'), 'w') as f: f.write("Call tree\n") @@ -122,12 +120,13 @@ def process_c_project(target_dir: str, if module_only: idx = 1 target = os.path.join(out, 'report.yaml') - project.dump_module_logic(target, '', target_dir) + project.dump_module_logic(target, harness_source=target_dir) if entrypoint != 'LLVMFuzzerTestOneInput': calltree_source = project.get_source_code_with_target(entrypoint) if calltree_source: - calltree = project.extract_calltree(calltree_source, entrypoint) + calltree = project.extract_calltree(source_code=calltree_source, + function=entrypoint) with open(os.path.join(out, 'targetCalltree.txt'), 'w') as f: f.write("Call tree\n") f.write(calltree) @@ -137,12 +136,13 @@ def process_c_project(target_dir: str, project.get_source_codes_with_harnesses()): target = os.path.join(out, f'fuzzerLogFile-{idx}.data.yaml') - project.dump_module_logic(target, 'LLVMFuzzerTestOneInput', - harness.source_file) + project.dump_module_logic(target, 'LLVMFuzzerTestOneInput', '', + harness.source_file, dump_output) logger.info('Extracting calltree for %s', harness.source_file) calltree = project.extract_calltree(harness, entrypoint) - calltrees.append(calltree) + calltree = project.extract_calltree(source_code=harness, + function=entrypoint) with open(os.path.join(out, 'fuzzerLogFile-%d.data' % (idx)), 'w', encoding='utf-8') as f: @@ -156,10 +156,8 @@ def process_c_project(target_dir: str, def process_cpp_project(entrypoint: str, out: str, source_files: list[str], - dump_output=True) -> frontend_cpp.Project: + dump_output: bool = True) -> Project: """Process a project in CPP language""" - calltrees = [] - # Default entrypoint if not entrypoint: entrypoint = 'LLVMFuzzerTestOneInput' @@ -172,40 +170,15 @@ def process_cpp_project(entrypoint: str, # Create and dump project logger.info('Creating base project.') - project = frontend_cpp.Project(source_codes) - - # Process calltree and method data - for harness in project.get_source_codes_with_harnesses(): - harness_name = harness.source_file.split('/')[-1].split('.')[0] - - # Method data - logger.info(f'Dump methods for {harness_name}') - target = os.path.join(out, f'fuzzerLogFile-{harness_name}.data.yaml') - project.dump_module_logic(target, - harness_name, - dump_output=dump_output) - - # Calltree - logger.info(f'Extracting calltree for {harness_name}') - calltree = project.extract_calltree(harness.source_file, harness, - entrypoint) - calltrees.append(calltree) - if dump_output: - project.dump_module_logic(target, harness_name) - target = os.path.join(out, f'fuzzerLogFile-{harness_name}.data') - with open(target, 'w', encoding='utf-8') as f: - f.write(f'Call tree\n{calltree}') - - logger.info('Complete cpp frontend.') + project = frontend_cpp.CppProject(source_codes) + return project def process_go_project(out: str, source_files: list[str], - dump_output=True) -> frontend_go.Project: + dump_output: bool = True) -> Project: """Process a project in Go language""" - calltrees = [] - # Process tree sitter for go source files logger.info('Going Go route') logger.info('Found %d files to include in analysis', len(source_files)) @@ -214,22 +187,7 @@ def process_go_project(out: str, # Create and dump project logger.info('Creating base project.') - project = frontend_go.Project(source_codes) - - # Process calltree - for harness in project.get_source_codes_with_harnesses(): - harness_name = harness.source_file.split('/')[-1].split('.')[0] - logger.info(f'Dump functions/methods for {harness_name}') - target = os.path.join(out, f'fuzzerLogFile-{harness_name}.data.yaml') - project.dump_module_logic(target, harness.get_entry_function_name(), - harness.source_file) - - logger.info(f'Extracting calltree for {harness_name}') - calltree = project.extract_calltree(harness.source_file, harness) - calltrees.append(calltree) - target = os.path.join(out, f'fuzzerLogFile-{harness_name}.data') - with open(target, 'w', encoding='utf-8') as f: - f.write(f'Call tree\n{calltree}') + project = frontend_go.GoProject(source_codes) return project @@ -237,10 +195,8 @@ def process_go_project(out: str, def process_jvm_project(entrypoint: str, out: str, source_files: list[str], - dump_output=True) -> frontend_jvm.Project: + dump_output: bool = True) -> Project: """Process a project in JVM based language""" - calltrees = [] - # Default entrypoint if not entrypoint: entrypoint = 'fuzzerTestOneInput' @@ -253,34 +209,15 @@ def process_jvm_project(entrypoint: str, # Create and dump project logger.info('Creating base project.') - project = frontend_jvm.Project(source_codes) - - # Process calltree and method data - for harness in project.get_source_codes_with_harnesses(): - harness_name = harness.source_file.split('/')[-1].split('.')[0] - - # Method data - logger.info(f'Dump methods for {harness_name}') - target = os.path.join(out, f'fuzzerLogFile-{harness_name}.data.yaml') - project.dump_module_logic(target, harness_name, harness.source_file) - - # Calltree - logger.info(f'Extracting calltree for {harness_name}') - calltree = project.extract_calltree(harness.source_file, harness) - calltrees.append(calltree) - target = os.path.join(out, f'fuzzerLogFile-{harness_name}.data') - with open(target, 'w', encoding='utf-8') as f: - f.write(f'Call tree\n{calltree}') + project = frontend_jvm.JvmProject(source_codes) return project def process_rust_project(out: str, source_files: list[str], - dump_output=True) -> frontend_rust.Project: + dump_output: bool = True) -> Project: """Process a project in Rust based language""" - calltrees = [] - # Process tree sitter for rust source files logger.info('Going Rust route') logger.info('Found %d files to include in analysis', len(source_files)) @@ -289,24 +226,7 @@ def process_rust_project(out: str, # Create and dump project logger.info('Creating base project.') - project = frontend_rust.Project(source_codes) - - # Process calltree and method data - for harness in project.get_source_codes_with_harnesses(): - harness_name = harness.source_file.split('/')[-1].split('.')[0] - - # Method data - logger.info(f'Dump methods for {harness_name}') - target = os.path.join(out, f'fuzzerLogFile-{harness_name}.data.yaml') - project.dump_module_logic(target, harness_name, harness.source_file) - - # Calltree - logger.info(f'Extracting calltree for {harness_name}') - calltree = project.extract_calltree(harness.source_file, harness) - calltrees.append(calltree) - target = os.path.join(out, f'fuzzerLogFile-{harness_name}.data') - with open(target, 'w', encoding='utf-8') as f: - f.write(f'Call tree\n{calltree}') + project = frontend_rust.RustProject(source_codes) return project @@ -316,35 +236,73 @@ def analyse_folder(language: str = '', entrypoint: str = '', out='', module_only=False, - dump_output=True) -> Any: + dump_output=True) -> Project: """Runs a full frontend analysis on a given directory""" # Extract source files for target language source_files = capture_source_files_in_tree(directory, language) if language == 'c': - return process_c_project(directory, - entrypoint, - out, - source_files, - module_only, - dump_output=dump_output) - elif language.lower() in ['cpp', 'c++']: - return process_cpp_project(entrypoint, - out, - source_files, - dump_output=dump_output) - elif language == 'go': - return process_go_project(out, source_files, dump_output=dump_output) - elif language == 'jvm': - return process_jvm_project(entrypoint, - out, - source_files, - dump_output=dump_output) - elif language == 'rust': - return process_rust_project(out, source_files, dump_output=dump_output) - - return [], None + project = process_c_project(directory, + entrypoint, + out, + source_files, + module_only, + dump_output=dump_output) + else: + # Process for different language + if language.lower() in ['cpp', 'c++']: + project = process_cpp_project(entrypoint, + out, + source_files, + dump_output=dump_output) + elif language == 'go': + project = process_go_project(out, + source_files, + dump_output=dump_output) + elif language == 'jvm': + project = process_jvm_project(entrypoint, + out, + source_files, + dump_output=dump_output) + elif language == 'rust': + project = process_rust_project(out, + source_files, + dump_output=dump_output) + else: + logger.error('Unsupported language: %s' % language) + return Project([]) + + # Process calltree and method data + for harness in project.get_source_codes_with_harnesses(): + if language == 'go': + entry_function = harness.get_entry_function_name() + else: + entry_function = entrypoint + + harness_name = harness.source_file.split('/')[-1].split('.')[0] + + # Functions/Methods data + logger.info(f'Dump methods for {harness_name}') + target = os.path.join(out, + f'fuzzerLogFile-{harness_name}.data.yaml') + project.dump_module_logic(target, + entry_function=entry_function, + harness_name=harness_name, + harness_source=harness.source_file, + dump_output=dump_output) + + # Calltree + logger.info(f'Extracting calltree for {harness_name}') + calltree = project.extract_calltree(harness.source_file, harness, + entry_function) + if dump_output: + target = os.path.join(out, + f'fuzzerLogFile-{harness_name}.data') + with open(target, 'w', encoding='utf-8') as f: + f.write(f'Call tree\n{calltree}') + + return project def main(): diff --git a/src/test/test_frontends_jvm.py b/src/test/test_frontends_jvm.py index 90e82e591..c0ff30697 100644 --- a/src/test/test_frontends_jvm.py +++ b/src/test/test_frontends_jvm.py @@ -28,7 +28,7 @@ def test_tree_sitter_jvm_sample1(): harness = project.get_source_codes_with_harnesses() assert len(harness) == 1 - functions_reached = project.get_reachable_methods(harness[0].source_file, harness[0]) + functions_reached = project.get_reachable_functions(harness[0].source_file, harness[0]) # Callsite check assert '[simple.SimpleClass].(String)' in functions_reached @@ -49,7 +49,7 @@ def test_tree_sitter_jvm_sample2(): harness = project.get_source_codes_with_harnesses() assert len(harness) == 1 - functions_reached = project.get_reachable_methods(harness[0].source_file, harness[0]) + functions_reached = project.get_reachable_functions(harness[0].source_file, harness[0]) # Callsite check assert '[String].equals(String)' in functions_reached @@ -72,7 +72,7 @@ def test_tree_sitter_jvm_sample3(): harness = project.get_source_codes_with_harnesses() assert len(harness) == 1 - functions_reached = project.get_reachable_methods(harness[0].source_file, harness[0]) + functions_reached = project.get_reachable_functions(harness[0].source_file, harness[0]) # Callsite check assert ( @@ -97,7 +97,7 @@ def test_tree_sitter_jvm_sample4(): harness = project.get_source_codes_with_harnesses() assert len(harness) == 1 - functions_reached = project.get_reachable_methods(harness[0].source_file, harness[0]) + functions_reached = project.get_reachable_functions(harness[0].source_file, harness[0]) # Callsite check assert '[crosspackage.helper.HelperClass].helperMethod()' in functions_reached @@ -117,7 +117,7 @@ def test_tree_sitter_jvm_sample5(): harness = project.get_source_codes_with_harnesses() assert len(harness) == 1 - functions_reached = project.get_reachable_methods(harness[0].source_file, harness[0]) + functions_reached = project.get_reachable_functions(harness[0].source_file, harness[0]) # Callsite check assert '[complex.C].()' in functions_reached @@ -157,7 +157,7 @@ def test_tree_sitter_jvm_sample6(): harness = project.get_source_codes_with_harnesses() assert len(harness) == 1 - functions_reached = project.get_reachable_methods(harness[0].source_file, harness[0]) + functions_reached = project.get_reachable_functions(harness[0].source_file, harness[0]) # Callsite check assert '[inheritance.SubClass].()' in functions_reached @@ -179,7 +179,7 @@ def test_tree_sitter_jvm_sample7(): harness = project.get_source_codes_with_harnesses() assert len(harness) == 1 - functions_reached = project.get_reachable_methods(harness[0].source_file, harness[0]) + functions_reached = project.get_reachable_functions(harness[0].source_file, harness[0]) # Callsite check assert '[combined.ConcreteClass].chainMethod()' in functions_reached @@ -205,7 +205,7 @@ def test_tree_sitter_jvm_sample8(): harness = project.get_source_codes_with_harnesses() assert len(harness) == 1 - functions_reached = project.get_reachable_methods(harness[0].source_file, harness[0]) + functions_reached = project.get_reachable_functions(harness[0].source_file, harness[0]) # Callsite check assert '[variable.B].callInstanceMethod(variable.test.A)' in functions_reached @@ -225,8 +225,8 @@ def test_tree_sitter_jvm_sample9(): harness = project.get_source_codes_with_harnesses() assert len(harness) == 2 - result_one = project.get_reachable_methods(harness[0].source_file, harness[0]) - result_two = project.get_reachable_methods(harness[1].source_file, harness[1]) + result_one = project.get_reachable_functions(harness[0].source_file, harness[0]) + result_two = project.get_reachable_functions(harness[1].source_file, harness[1]) # Callsite check if 'FuzzerOne' in harness[0].source_file: