From 59cf5df812747a5191646669566dc220e07a1d7a Mon Sep 17 00:00:00 2001 From: Arthur Chan Date: Thu, 18 Jan 2024 08:59:00 +0000 Subject: [PATCH 1/2] Sink Analyser: filter invalid call paths and block functions Signed-off-by: Arthur Chan --- .../analyses/sinks_analyser.py | 109 +++++++++++++++--- 1 file changed, 91 insertions(+), 18 deletions(-) diff --git a/src/fuzz_introspector/analyses/sinks_analyser.py b/src/fuzz_introspector/analyses/sinks_analyser.py index 6322fbd6a..351f8ac65 100644 --- a/src/fuzz_introspector/analyses/sinks_analyser.py +++ b/src/fuzz_introspector/analyses/sinks_analyser.py @@ -127,14 +127,16 @@ def _retrieve_data_list( self, proj_profile: project_profile.MergedProjectProfile, profiles: List[fuzzer_profile.FuzzerProfile] ) -> Tuple[List[cfg_load.CalltreeCallsite], - List[function_profile.FunctionProfile]]: + List[function_profile.FunctionProfile], + List[str]]: """ - Retrieve and return full list of call sites and functions - from all fuzzers profile for this project + Retrieve and return full list of call sites, functions + and fuzzer names from all fuzzers profile for this project """ - callsite_list = [] - function_list = [] + callsite_list: List[cfg_load.CalltreeCallsite] = [] + function_list: List[function_profile.FunctionProfile] = [] function_name_list: List[str] = [] + fuzzer_name_list: List[str] = [] for (key, function) in proj_profile.all_functions.items(): if key not in function_name_list: @@ -142,16 +144,30 @@ def _retrieve_data_list( function_name_list.append(function.function_name) for profile in profiles: + # Retrieve plain fuzzer name + fuzzer_name = profile.fuzzer_source_file + if "/" in fuzzer_name: + fuzzer_name = fuzzer_name.rsplit("/", 1)[1] + fuzzer_name_list.append(fuzzer_name) + + # Retrieve all call sites if profile.fuzzer_callsite_calltree is not None: callsite_list.extend( cfg_load.extract_all_callsites( profile.fuzzer_callsite_calltree)) + + # Retrieve all functions for (key, function) in profile.all_class_functions.items(): if key not in function_name_list: function_list.append(function) function_name_list.append(function.function_name) - return (callsite_list, function_list) + # Make the list unique + callsite_list = list(set(callsite_list)) + function_list = list(set(function_list)) + fuzzer_name_list = list(set(fuzzer_name_list)) + + return (callsite_list, function_list, fuzzer_name_list) def _handle_function_name(self, callsite: cfg_load.CalltreeCallsite) -> str: @@ -283,7 +299,8 @@ def _retrieve_function_link( def _determine_branch_blocker( self, callpath_list: List[List[function_profile.FunctionProfile]], - proj_profile: project_profile.MergedProjectProfile + proj_profile: project_profile.MergedProjectProfile, + fuzzer_name_list: List[str] ) -> List[function_profile.FunctionProfile]: """ Determine the branch blocker list that affect the runtime @@ -310,6 +327,10 @@ def _determine_branch_blocker( parent_fd = callpath[0] result_list.append(parent_fd) + + # Filter out invalid blockers from fuzzers + result_list = self._filter_fuzzer_blockers(result_list, fuzzer_name_list) + return result_list def _generate_callpath_page( @@ -394,13 +415,59 @@ def _filter_inaccessible_callpath( else: return callpath_list + def _filter_fuzzer_functions( + self, callpath_list: List[List[function_profile.FunctionProfile]], + fuzzer_name_list: List[str]) -> List[List[function_profile.FunctionProfile]]: + """ + Filter invalid call paths that are initiated + from any fuzzer source files + """ + result = [] + + for callpath in callpath_list: + is_valid = True + for func in callpath: + for fuzzer_name in fuzzer_name_list: + if fuzzer_name in func.function_source_file : + is_valid = False + break + if not is_valid: + break + if is_valid: + result.append(callpath) + + return result + + def _filter_fuzzer_blockers( + self, blocker_functions: List[function_profile.FunctionProfile], + fuzzer_name_list: List[str]) -> List[function_profile.FunctionProfile]: + """ + Filter invalid blocker functions that are located + in any fuzzer source files + """ + result = [] + + for func in blocker_functions: + if "$lambda" in func.function_source_file: + continue + + is_valid = True + for fuzzer_name in fuzzer_name_list: + if fuzzer_name in func.function_source_file : + is_valid = False + break + if is_valid: + result.append(func) + + return result + def _handle_callpath_dict( self, callpath_dict: Dict[function_profile.FunctionProfile, List[List[function_profile.FunctionProfile]]], proj_profile: project_profile.MergedProjectProfile, target_func: function_profile.FunctionProfile, - target_lang: str) -> Optional[str]: + target_lang: str, fuzzer_name_list: List[str]) -> Optional[str]: """ Pretty print index of callpath and generate also generate separate html page for displaying @@ -418,11 +485,13 @@ def _handle_callpath_dict( parent_func, proj_profile, target_func.function_name) callpath_list = callpath_dict[parent_func] - # Filter inaccessible callpaths and sort them - # by their depth, assuming shallowest depth is - # the function call closest to the target function + # Filter inaccessible and invalid callpaths and sort + # them by their depth, assuming shallowest depth is + # is the function call closest to the target function callpath_list = self._filter_inaccessible_callpath( callpath_list, target_lang) + callpath_list = self._filter_fuzzer_functions( + callpath_list, fuzzer_name_list) callpath_list.sort(key=len) for callpath in callpath_list: @@ -462,7 +531,7 @@ def _print_blocker_list( html += "Constants touched" html += "" for blocker in blocker_list: - if "$lambda" in blocker.function_name or blocker.function_name in handled: + if blocker.function_name in handled: # Skip repeat blockers continue handled.append(blocker.function_name) @@ -482,7 +551,7 @@ def _retrieve_content_rows( proj_profile: project_profile.MergedProjectProfile, target_lang: str, func_callsites: Dict[str, List[str]], coverage: code_coverage.CoverageProfile, - cwe: str) -> Tuple[str, str]: + cwe: str, fuzzer_name_list: List[str]) -> Tuple[str, str]: """ Retrieve the content for this analyser for a specific cwe in two formats. One in normal html table rows string and the @@ -502,7 +571,7 @@ def _retrieve_content_rows( if len(fd.reached_by_fuzzers) == 0: fuzzer_callpath = self._handle_callpath_dict( - callpath_dict, proj_profile, fd, target_lang) + callpath_dict, proj_profile, fd, target_lang, fuzzer_name_list) if not fuzzer_callpath: # No reachable call path found for this sink @@ -518,7 +587,7 @@ def _retrieve_content_rows( # If not, determine blockers of the sink functions if self._retrieve_fuzzer_hitcount(fd, coverage) == 0: blocker_list = self._determine_branch_blocker( - callpath_list, proj_profile) + callpath_list, proj_profile, fuzzer_name_list) blocker = self._print_blocker_list(blocker_list, proj_profile) else: @@ -610,14 +679,17 @@ def analysis_func(self, """ logger.info(f" - Running analysis {self.get_name()}") - # Get full function / callsite list for all fuzzer's profiles - callsite_list, function_list = self._retrieve_data_list( + # Get full function/ callsite/fuzzer filename list for all fuzzer's profiles + callsite_list, function_list, fuzzer_name_list = self._retrieve_data_list( proj_profile, profiles) + logger.info(fuzzer_name_list) + # Map callsites to each function function_callsite_dict = self._map_function_callsite( function_list, callsite_list) + # Generate html section header for sink analyser html_string = "
" @@ -634,7 +706,8 @@ def analysis_func(self, # Retrieve table content rows html_rows, json_row = self._retrieve_content_rows( function_list, proj_profile, profiles[0].target_lang, - function_callsite_dict, proj_profile.runtime_coverage, cwe) + function_callsite_dict, proj_profile.runtime_coverage, + cwe, fuzzer_name_list) self.set_json_string_result(json_row) From 76234427979cbc1a04e77c8d5b2a68b891ab4ac2 Mon Sep 17 00:00:00 2001 From: Arthur Chan Date: Thu, 18 Jan 2024 09:13:12 +0000 Subject: [PATCH 2/2] Fix formatting Signed-off-by: Arthur Chan --- .../analyses/sinks_analyser.py | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/src/fuzz_introspector/analyses/sinks_analyser.py b/src/fuzz_introspector/analyses/sinks_analyser.py index 351f8ac65..9fe98be59 100644 --- a/src/fuzz_introspector/analyses/sinks_analyser.py +++ b/src/fuzz_introspector/analyses/sinks_analyser.py @@ -127,8 +127,7 @@ def _retrieve_data_list( self, proj_profile: project_profile.MergedProjectProfile, profiles: List[fuzzer_profile.FuzzerProfile] ) -> Tuple[List[cfg_load.CalltreeCallsite], - List[function_profile.FunctionProfile], - List[str]]: + List[function_profile.FunctionProfile], List[str]]: """ Retrieve and return full list of call sites, functions and fuzzer names from all fuzzers profile for this project @@ -298,9 +297,9 @@ def _retrieve_function_link( return ("#", linenumber) def _determine_branch_blocker( - self, callpath_list: List[List[function_profile.FunctionProfile]], - proj_profile: project_profile.MergedProjectProfile, - fuzzer_name_list: List[str] + self, callpath_list: List[List[function_profile.FunctionProfile]], + proj_profile: project_profile.MergedProjectProfile, + fuzzer_name_list: List[str] ) -> List[function_profile.FunctionProfile]: """ Determine the branch blocker list that affect the runtime @@ -329,7 +328,8 @@ def _determine_branch_blocker( result_list.append(parent_fd) # Filter out invalid blockers from fuzzers - result_list = self._filter_fuzzer_blockers(result_list, fuzzer_name_list) + result_list = self._filter_fuzzer_blockers(result_list, + fuzzer_name_list) return result_list @@ -416,8 +416,9 @@ def _filter_inaccessible_callpath( return callpath_list def _filter_fuzzer_functions( - self, callpath_list: List[List[function_profile.FunctionProfile]], - fuzzer_name_list: List[str]) -> List[List[function_profile.FunctionProfile]]: + self, callpath_list: List[List[function_profile.FunctionProfile]], + fuzzer_name_list: List[str] + ) -> List[List[function_profile.FunctionProfile]]: """ Filter invalid call paths that are initiated from any fuzzer source files @@ -428,7 +429,7 @@ def _filter_fuzzer_functions( is_valid = True for func in callpath: for fuzzer_name in fuzzer_name_list: - if fuzzer_name in func.function_source_file : + if fuzzer_name in func.function_source_file: is_valid = False break if not is_valid: @@ -440,7 +441,8 @@ def _filter_fuzzer_functions( def _filter_fuzzer_blockers( self, blocker_functions: List[function_profile.FunctionProfile], - fuzzer_name_list: List[str]) -> List[function_profile.FunctionProfile]: + fuzzer_name_list: List[str] + ) -> List[function_profile.FunctionProfile]: """ Filter invalid blocker functions that are located in any fuzzer source files @@ -453,7 +455,7 @@ def _filter_fuzzer_blockers( is_valid = True for fuzzer_name in fuzzer_name_list: - if fuzzer_name in func.function_source_file : + if fuzzer_name in func.function_source_file: is_valid = False break if is_valid: @@ -466,8 +468,8 @@ def _handle_callpath_dict( callpath_dict: Dict[function_profile.FunctionProfile, List[List[function_profile.FunctionProfile]]], proj_profile: project_profile.MergedProjectProfile, - target_func: function_profile.FunctionProfile, - target_lang: str, fuzzer_name_list: List[str]) -> Optional[str]: + target_func: function_profile.FunctionProfile, target_lang: str, + fuzzer_name_list: List[str]) -> Optional[str]: """ Pretty print index of callpath and generate also generate separate html page for displaying @@ -550,8 +552,8 @@ def _retrieve_content_rows( self, functions: List[function_profile.FunctionProfile], proj_profile: project_profile.MergedProjectProfile, target_lang: str, func_callsites: Dict[str, List[str]], - coverage: code_coverage.CoverageProfile, - cwe: str, fuzzer_name_list: List[str]) -> Tuple[str, str]: + coverage: code_coverage.CoverageProfile, cwe: str, + fuzzer_name_list: List[str]) -> Tuple[str, str]: """ Retrieve the content for this analyser for a specific cwe in two formats. One in normal html table rows string and the @@ -571,7 +573,8 @@ def _retrieve_content_rows( if len(fd.reached_by_fuzzers) == 0: fuzzer_callpath = self._handle_callpath_dict( - callpath_dict, proj_profile, fd, target_lang, fuzzer_name_list) + callpath_dict, proj_profile, fd, target_lang, + fuzzer_name_list) if not fuzzer_callpath: # No reachable call path found for this sink @@ -689,7 +692,6 @@ def analysis_func(self, function_callsite_dict = self._map_function_callsite( function_list, callsite_list) - # Generate html section header for sink analyser html_string = "
" @@ -706,8 +708,8 @@ def analysis_func(self, # Retrieve table content rows html_rows, json_row = self._retrieve_content_rows( function_list, proj_profile, profiles[0].target_lang, - function_callsite_dict, proj_profile.runtime_coverage, - cwe, fuzzer_name_list) + function_callsite_dict, proj_profile.runtime_coverage, cwe, + fuzzer_name_list) self.set_json_string_result(json_row)