diff --git a/safety/scan/command.py b/safety/scan/command.py index 181015c1..a1dd3228 100644 --- a/safety/scan/command.py +++ b/safety/scan/command.py @@ -27,7 +27,7 @@ SYSTEM_SCAN_TARGET_HELP, SCAN_APPLY_FIXES, SCAN_DETAILED_OUTPUT, CLI_SCAN_COMMAND_HELP, CLI_SYSTEM_SCAN_COMMAND_HELP from safety.scan.decorators import inject_metadata, scan_project_command_init, scan_system_command_init from safety.scan.finder.file_finder import should_exclude -from safety.scan.main import load_policy_file, load_unverified_project_from_config, process_files, save_report_as +from safety.scan.main import get_report, load_policy_file, load_unverified_project_from_config, process_files, save_report_as from safety.scan.models import ScanExport, ScanOutput, SystemScanExport, SystemScanOutput from safety.scan.render import print_detected_ecosystems_section, print_fixes_section, print_summary, render_scan_html, render_scan_spdx, render_to_console from safety.scan.util import Stage @@ -995,7 +995,7 @@ def scan(ctx: typer.Context, with console.status(wait_msg, spinner=DEFAULT_SPINNER): if use_server_matching: - response_json = process_files(paths=file_paths, config=config, use_server_matching=True, obj=ctx.obj, target=target) + response_json = get_report(paths=file_paths, config=config, use_server_matching=True, obj=ctx.obj, target=target) projects = safe_get(response_json, ["scan_results", "projects"], []) for project in projects: project_files = project.get("files", []) @@ -1050,30 +1050,35 @@ def scan(ctx: typer.Context, if vulns_found != 0: msg = generate_vulnerability_message(spec_name, spec_raw, vulns_found, critical_vulns_count, vuln_word) console.print(Padding(f"{msg}]", PADDING_VALUES), emoji=True, overflow="crop") - - if detailed_output: - render_vulnerabilities_json(vulns_to_report, console, detailed_output) - lines, resolved_vulns, fixes = generate_remediation_details_json(spec, spec_name, vuln_word, critical_vulns_count) - total_resolved_vulns += resolved_vulns - fixes_count += fixes - - for line in lines: - console.print(Padding(line, PADDING_VALUES), emoji=True) - - # construct the url HERE - # https://data.safetycli.com/p/pypi/django/eda/?from=2.2&to=4.2.17 - # Construct this with 3 inputs- django, raw, - remediation = safe_get(spec, ["vulnerabilities", "remediation"], {}) - recommended = remediation.get("recommended") - URL_PREFIX = "https://data.safetycli.com/p/pypi/" - URL_MIDDLE = "/eda/?from=" - URL_END = "&to=" - more_info_url = URL_PREFIX+spec_name+URL_MIDDLE+spec_raw.split("==")[1]+URL_END+recommended - - if more_info_url: - console.print( - Padding(MSG_LEARN_MORE.format(more_info_url), PADDING_VALUES), emoji=True - ) + lines, resolved_vulns, fixes = generate_remediation_details_json(spec, spec_name, vuln_word, critical_vulns_count) + total_resolved_vulns += resolved_vulns + fixes_count += fixes + + for line in lines: + console.print(Padding(line, PADDING_VALUES), emoji=True) + + # construct the url HERE + # https://data.safetycli.com/p/pypi/django/eda/?from=2.2&to=4.2.17 + # Construct this with 3 inputs- django, raw, + remediation = safe_get(spec, ["vulnerabilities", "remediation"], {}) + recommended = remediation.get("recommended") + URL_PREFIX = "https://data.safetycli.com/p/pypi/" + URL_MIDDLE = "/eda/?from=" + URL_END = "&to=" + more_info_url = URL_PREFIX+spec_name+URL_MIDDLE+spec_raw.split("==")[1]+URL_END+recommended + + if more_info_url: + console.print( + Padding(MSG_LEARN_MORE.format(more_info_url), PADDING_VALUES), emoji=True + ) + + # # Track whether to suggest applying fixes + # display_apply_fix_suggestion = should_display_fix_suggestion_json( + # ctx, file_data, affected_specifications, apply_updates + # ) + if detailed_output: + render_vulnerabilities_json(vulns_to_report, console, detailed_output) + else: # Handle files with no issues console.print() @@ -1083,10 +1088,7 @@ def scan(ctx: typer.Context, emoji=True ) - # Track whether to suggest applying fixes - display_apply_fix_suggestion = should_display_fix_suggestion_json( - ctx, file_data, affected_specifications, apply_updates - ) + # Track if a requirements.txt file was found if not requirements_txt_found and file_data.get("type") == "requirements.txt": diff --git a/safety/scan/main.py b/safety/scan/main.py index a24a8cdd..d50e3d1c 100644 --- a/safety/scan/main.py +++ b/safety/scan/main.py @@ -247,42 +247,8 @@ def build_meta(target: Path) -> Dict[str, Any]: "client": client_metadata, } -def process_files(paths: Dict[str, Set[Path]], config: Optional[ConfigModel] = None, use_server_matching: bool = False, obj=None, target=Path(".")): - """ - Processes the files and either yields each file path along with its inspectable file (GET implementation), - or sends all files in a single POST request (new POST implementation). - - Args: - paths (Dict[str, Set[Path]]): A dictionary of file paths by file type. - config (Optional[ConfigModel]): The configuration model (optional). - use_server_matching (bool): Whether to use server-side matching (POST implementation). - obj: Context object containing the authentication client. - target (Path): Target directory for metadata. - Returns: - Union[Generator[Tuple[Path, InspectableFile], None, None], Dict[str, Any]]: - - Generator of file path and inspectable file for the GET implementation. - - JSON response from the server for the POST implementation. - """ - print("1") - if not config: - config = ConfigModel() - - # # old GET implementation - # if not use_server_matching: - # for file_type_key, f_paths in paths.items(): - # file_type = FileType(file_type_key) - # if not file_type or not file_type.ecosystem: - # continue - # for f_path in f_paths: - # with InspectableFileContext(f_path, file_type=file_type) as inspectable_file: - # if inspectable_file and inspectable_file.file_type: - # inspectable_file.inspect(config=config) - # inspectable_file.remediate() - # yield f_path, inspectable_file - - # new POST implementation - # else: +def get_report(paths: Dict[str, Set[Path]], config: Optional[ConfigModel] = None, use_server_matching: bool = False, obj=None, target=Path(".")): files = [] meta = build_meta(target) meta["project_id"] = obj.project.id @@ -311,3 +277,39 @@ def process_files(paths: Dict[str, Set[Path]], config: Optional[ConfigModel] = N LOG.error(f"Failed to send scan payload to the API. Status code: {response.status_code}") LOG.error(f"Response: {response.text}") raise SafetyError(f"Failed to send scan payload to the API. Status code: {response.status_code}, Response: {response.text}") + + + + +def process_files(paths: Dict[str, Set[Path]], config: Optional[ConfigModel] = None, use_server_matching: bool = False, obj=None, target=Path(".")): + """ + Processes the files and either yields each file path along with its inspectable file (GET implementation), + or sends all files in a single POST request (new POST implementation). + + Args: + paths (Dict[str, Set[Path]]): A dictionary of file paths by file type. + config (Optional[ConfigModel]): The configuration model (optional). + use_server_matching (bool): Whether to use server-side matching (POST implementation). + obj: Context object containing the authentication client. + target (Path): Target directory for metadata. + + Returns: + Union[Generator[Tuple[Path, InspectableFile], None, None], Dict[str, Any]]: + - Generator of file path and inspectable file for the GET implementation. + - JSON response from the server for the POST implementation. + """ + + if not config: + config = ConfigModel() + + if not use_server_matching: + for file_type_key, f_paths in paths.items(): + file_type = FileType(file_type_key) + if not file_type or not file_type.ecosystem: + continue + for f_path in f_paths: + with InspectableFileContext(f_path, file_type=file_type) as inspectable_file: + if inspectable_file and inspectable_file.file_type: + inspectable_file.inspect(config=config) + inspectable_file.remediate() + yield f_path, inspectable_file