diff --git a/src/fuzz_introspector/analyses/sinks_analyser.py b/src/fuzz_introspector/analyses/sinks_analyser.py index 6322fbd6a..9fe98be59 100644 --- a/src/fuzz_introspector/analyses/sinks_analyser.py +++ b/src/fuzz_introspector/analyses/sinks_analyser.py @@ -127,14 +127,15 @@ def _retrieve_data_list( self, proj_profile: project_profile.MergedProjectProfile, profiles: List[fuzzer_profile.FuzzerProfile] ) -> Tuple[List[cfg_load.CalltreeCallsite], - List[function_profile.FunctionProfile]]: + List[function_profile.FunctionProfile], List[str]]: """ - Retrieve and return full list of call sites and functions - from all fuzzers profile for this project + Retrieve and return full list of call sites, functions + and fuzzer names from all fuzzers profile for this project """ - callsite_list = [] - function_list = [] + callsite_list: List[cfg_load.CalltreeCallsite] = [] + function_list: List[function_profile.FunctionProfile] = [] function_name_list: List[str] = [] + fuzzer_name_list: List[str] = [] for (key, function) in proj_profile.all_functions.items(): if key not in function_name_list: @@ -142,16 +143,30 @@ def _retrieve_data_list( function_name_list.append(function.function_name) for profile in profiles: + # Retrieve plain fuzzer name + fuzzer_name = profile.fuzzer_source_file + if "/" in fuzzer_name: + fuzzer_name = fuzzer_name.rsplit("/", 1)[1] + fuzzer_name_list.append(fuzzer_name) + + # Retrieve all call sites if profile.fuzzer_callsite_calltree is not None: callsite_list.extend( cfg_load.extract_all_callsites( profile.fuzzer_callsite_calltree)) + + # Retrieve all functions for (key, function) in profile.all_class_functions.items(): if key not in function_name_list: function_list.append(function) function_name_list.append(function.function_name) - return (callsite_list, function_list) + # Make the list unique + callsite_list = list(set(callsite_list)) + function_list = list(set(function_list)) + fuzzer_name_list = list(set(fuzzer_name_list)) + + return (callsite_list, function_list, fuzzer_name_list) def _handle_function_name(self, callsite: cfg_load.CalltreeCallsite) -> str: @@ -282,8 +297,9 @@ def _retrieve_function_link( return ("#", linenumber) def _determine_branch_blocker( - self, callpath_list: List[List[function_profile.FunctionProfile]], - proj_profile: project_profile.MergedProjectProfile + self, callpath_list: List[List[function_profile.FunctionProfile]], + proj_profile: project_profile.MergedProjectProfile, + fuzzer_name_list: List[str] ) -> List[function_profile.FunctionProfile]: """ Determine the branch blocker list that affect the runtime @@ -310,6 +326,11 @@ def _determine_branch_blocker( parent_fd = callpath[0] result_list.append(parent_fd) + + # Filter out invalid blockers from fuzzers + result_list = self._filter_fuzzer_blockers(result_list, + fuzzer_name_list) + return result_list def _generate_callpath_page( @@ -394,13 +415,61 @@ def _filter_inaccessible_callpath( else: return callpath_list + def _filter_fuzzer_functions( + self, callpath_list: List[List[function_profile.FunctionProfile]], + fuzzer_name_list: List[str] + ) -> List[List[function_profile.FunctionProfile]]: + """ + Filter invalid call paths that are initiated + from any fuzzer source files + """ + result = [] + + for callpath in callpath_list: + is_valid = True + for func in callpath: + for fuzzer_name in fuzzer_name_list: + if fuzzer_name in func.function_source_file: + is_valid = False + break + if not is_valid: + break + if is_valid: + result.append(callpath) + + return result + + def _filter_fuzzer_blockers( + self, blocker_functions: List[function_profile.FunctionProfile], + fuzzer_name_list: List[str] + ) -> List[function_profile.FunctionProfile]: + """ + Filter invalid blocker functions that are located + in any fuzzer source files + """ + result = [] + + for func in blocker_functions: + if "$lambda" in func.function_source_file: + continue + + is_valid = True + for fuzzer_name in fuzzer_name_list: + if fuzzer_name in func.function_source_file: + is_valid = False + break + if is_valid: + result.append(func) + + return result + def _handle_callpath_dict( self, callpath_dict: Dict[function_profile.FunctionProfile, List[List[function_profile.FunctionProfile]]], proj_profile: project_profile.MergedProjectProfile, - target_func: function_profile.FunctionProfile, - target_lang: str) -> Optional[str]: + target_func: function_profile.FunctionProfile, target_lang: str, + fuzzer_name_list: List[str]) -> Optional[str]: """ Pretty print index of callpath and generate also generate separate html page for displaying @@ -418,11 +487,13 @@ def _handle_callpath_dict( parent_func, proj_profile, target_func.function_name) callpath_list = callpath_dict[parent_func] - # Filter inaccessible callpaths and sort them - # by their depth, assuming shallowest depth is - # the function call closest to the target function + # Filter inaccessible and invalid callpaths and sort + # them by their depth, assuming shallowest depth is + # is the function call closest to the target function callpath_list = self._filter_inaccessible_callpath( callpath_list, target_lang) + callpath_list = self._filter_fuzzer_functions( + callpath_list, fuzzer_name_list) callpath_list.sort(key=len) for callpath in callpath_list: @@ -462,7 +533,7 @@ def _print_blocker_list( html += "Constants touched" html += "" for blocker in blocker_list: - if "$lambda" in blocker.function_name or blocker.function_name in handled: + if blocker.function_name in handled: # Skip repeat blockers continue handled.append(blocker.function_name) @@ -481,8 +552,8 @@ def _retrieve_content_rows( self, functions: List[function_profile.FunctionProfile], proj_profile: project_profile.MergedProjectProfile, target_lang: str, func_callsites: Dict[str, List[str]], - coverage: code_coverage.CoverageProfile, - cwe: str) -> Tuple[str, str]: + coverage: code_coverage.CoverageProfile, cwe: str, + fuzzer_name_list: List[str]) -> Tuple[str, str]: """ Retrieve the content for this analyser for a specific cwe in two formats. One in normal html table rows string and the @@ -502,7 +573,8 @@ def _retrieve_content_rows( if len(fd.reached_by_fuzzers) == 0: fuzzer_callpath = self._handle_callpath_dict( - callpath_dict, proj_profile, fd, target_lang) + callpath_dict, proj_profile, fd, target_lang, + fuzzer_name_list) if not fuzzer_callpath: # No reachable call path found for this sink @@ -518,7 +590,7 @@ def _retrieve_content_rows( # If not, determine blockers of the sink functions if self._retrieve_fuzzer_hitcount(fd, coverage) == 0: blocker_list = self._determine_branch_blocker( - callpath_list, proj_profile) + callpath_list, proj_profile, fuzzer_name_list) blocker = self._print_blocker_list(blocker_list, proj_profile) else: @@ -610,10 +682,12 @@ def analysis_func(self, """ logger.info(f" - Running analysis {self.get_name()}") - # Get full function / callsite list for all fuzzer's profiles - callsite_list, function_list = self._retrieve_data_list( + # Get full function/ callsite/fuzzer filename list for all fuzzer's profiles + callsite_list, function_list, fuzzer_name_list = self._retrieve_data_list( proj_profile, profiles) + logger.info(fuzzer_name_list) + # Map callsites to each function function_callsite_dict = self._map_function_callsite( function_list, callsite_list) @@ -634,7 +708,8 @@ def analysis_func(self, # Retrieve table content rows html_rows, json_row = self._retrieve_content_rows( function_list, proj_profile, profiles[0].target_lang, - function_callsite_dict, proj_profile.runtime_coverage, cwe) + function_callsite_dict, proj_profile.runtime_coverage, cwe, + fuzzer_name_list) self.set_json_string_result(json_row)