diff --git a/safety/scan/command.py b/safety/scan/command.py index bb502b1e..92b09366 100644 --- a/safety/scan/command.py +++ b/safety/scan/command.py @@ -1,9 +1,10 @@ +from collections import Counter from enum import Enum import itertools import logging from pathlib import Path import sys -from typing import Any, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple from typing_extensions import Annotated from safety.constants import EXIT_CODE_VULNERABILITIES_FOUND @@ -28,7 +29,7 @@ from safety.scan.finder.file_finder import should_exclude from safety.scan.main import load_policy_file, load_unverified_project_from_config, process_files, save_report_as from safety.scan.models import ScanExport, ScanOutput, SystemScanExport, SystemScanOutput -from safety.scan.render import print_brief, print_detected_ecosystems_section, print_fixes_section, print_ignore_details, render_scan_html, render_scan_spdx, render_to_console +from safety.scan.render import print_detected_ecosystems_section, print_fixes_section, print_summary, render_scan_html, render_scan_spdx, render_to_console from safety.scan.util import Stage from safety_schemas.models import Ecosystem, FileModel, FileType, ProjectModel, \ ReportModel, ScanType, VulnerabilitySeverityLabels, SecurityUpdates, Vulnerability @@ -305,6 +306,7 @@ def scan(ctx: typer.Context, exit_code = 0 fixes_count = 0 + total_resolved_vulns = 0 to_fix_files = [] fix_file_types = [fix_target[0] if isinstance(fix_target[0], str) else fix_target[0].value for fix_target in fixes_target] requirements_txt_found = False @@ -320,15 +322,6 @@ def scan(ctx: typer.Context, if exit_code == 0 and analyzed_file.dependency_results.failed: exit_code = EXIT_CODE_VULNERABILITIES_FOUND - # Handle ignored vulnerabilities for detailed output - if detailed_output: - vulns_ignored = analyzed_file.dependency_results.ignored_vulns_data \ - .values() - ignored_vulns_data = itertools.chain(vulns_ignored, - ignored_vulns_data) - - ignored.update(analyzed_file.dependency_results.ignored_vulns.keys()) - affected_specifications = analyzed_file.dependency_results.get_affected_specifications() affected_count += len(affected_specifications) @@ -359,7 +352,6 @@ def sort_vulns_by_score(vuln: Vulnerability) -> int: [vuln for vuln in spec.vulnerabilities if not vuln.ignored], key=sort_vulns_by_score, reverse=True) - critical_vulns_count = sum(1 for vuln in vulns_to_report if vuln.severity and vuln.severity.cvssv3 and vuln.severity.cvssv3.get("base_severity", "none").lower() == VulnerabilitySeverityLabels.CRITICAL.value.lower()) vulns_found = len(vulns_to_report) @@ -381,6 +373,9 @@ def sort_vulns_by_score(vuln: Vulnerability) -> int: detailed_output=detailed_output) lines = [] + + if spec.remediation.recommended: + total_resolved_vulns += spec.remediation.vulnerabilities_found # Put remediation here if not spec.remediation.recommended: @@ -438,13 +433,6 @@ def sort_vulns_by_score(vuln: Vulnerability) -> int: print_fixes_section(console, requirements_txt_found, detailed_output) console.print() - print_brief(console, ctx.obj.project, count, affected_count, - fixes_count) - print_ignore_details(console, ctx.obj.project, ignored, - is_detailed_output=detailed_output, - ignored_vulns_data=ignored_vulns_data) - - version = ctx.obj.schema metadata = ctx.obj.metadata telemetry = ctx.obj.telemetry @@ -455,6 +443,20 @@ def sort_vulns_by_score(vuln: Vulnerability) -> int: telemetry=telemetry, files=[], projects=[ctx.obj.project]) + + total_issues_with_duplicates, total_ignored_issues = get_vulnerability_summary(report.as_v30()) + + print_summary( + console=console, + total_issues_with_duplicates=total_issues_with_duplicates, + total_ignored_issues=total_ignored_issues, + project=ctx.obj.project, + dependencies_count=count, + fixes_count=fixes_count, + resolved_vulns_per_fix=total_resolved_vulns, + is_detailed_output=detailed_output, + ignored_vulns_data=ignored_vulns_data +) report_url = process_report(ctx.obj, console, report, **{**ctx.params}) project_url = f"{SAFETY_PLATFORM_URL}{ctx.obj.project.url_path}" @@ -789,3 +791,34 @@ def system_scan(ctx: typer.Context, console.print(Padding(detail, (0, 0, 0, 1)), emoji=True, overflow="crop") process_report(ctx.obj, console, report, **{**ctx.params}) + +def get_vulnerability_summary(report: Dict[str, Any]) -> Tuple[int, int, List[str]]: + """ + Summarize vulnerabilities from the given report. + + Args: + report (ReportModel): The report containing vulnerability data. + + Returns: + Tuple[int, int, List[str]]: A tuple containing: + - Total number of issues (including duplicates) + - Total number of ignored issues + - List of unique vulnerability IDs + """ + vulnerability_counter = Counter() + ignored_counter = Counter() + + for project in report.scan_results.projects: + for file in project.files: + for dependency in file.results.dependencies: + for specification in dependency.specifications: + for vulnerability in specification.vulnerabilities.known_vulnerabilities: + vulnerability_counter[vulnerability.id] += 1 + if vulnerability.ignored is not None: + ignored_counter[vulnerability.id] += 1 + + return sum(vulnerability_counter.values()), sum(ignored_counter.values()), + + + + diff --git a/safety/scan/render.py b/safety/scan/render.py index a7f4e277..a44a51ef 100644 --- a/safety/scan/render.py +++ b/safety/scan/render.py @@ -119,35 +119,8 @@ def print_detected_ecosystems_section(console: Console, file_paths: Dict[str, Se msg = f"{ecosystem.name.replace('_', ' ').title()} detected. {brief}" console.print(msg) + -def print_brief(console: Console, project: ProjectModel, dependencies_count: int = 0, affected_count: int = 0, fixes_count: int = 0) -> None: - """ - Print a brief summary of the scan results. - - Args: - console (Console): The console for output. - project (ProjectModel): The project model. - dependencies_count (int): Number of dependencies tested. - affected_count (int): Number of security issues found. - fixes_count (int): Number of fixes suggested. - """ - from ..util import pluralize - - if project.policy: - if project.policy.source is PolicySource.cloud: - policy_msg = f"policy fetched from Safety Platform" - else: - if project.id: - policy_msg = f"local {project.id} project scan policy" - else: - policy_msg = f"local scan policy file" - else: - policy_msg = "default Safety CLI policies" - - console.print(f"Tested [number]{dependencies_count}[/number] {pluralize('dependency', dependencies_count)} for known security " \ - f"issues using {policy_msg}") - console.print( - f"[number]{affected_count}[/number] security {pluralize('issue', affected_count)} found, [number]{fixes_count}[/number] {pluralize('fix', fixes_count)} suggested") def print_fixes_section(console: Console, requirements_txt_found: bool = False, is_detailed_output: bool = False) -> None: """ @@ -179,20 +152,65 @@ def print_fixes_section(console: Console, requirements_txt_found: bool = False, console.print() console.print("-" * console.size.width) - -def print_ignore_details(console: Console, project: ProjectModel, ignored: Set[str], is_detailed_output: bool = False, ignored_vulns_data: Optional[Dict[str, Vulnerability]] = None) -> None: +def print_summary( + console: Console, + total_issues_with_duplicates: int, + total_ignored_issues: int, + project: ProjectModel, + dependencies_count: int = 0, + fixes_count: int = 0, + resolved_vulns_per_fix: int = 0, + is_detailed_output: bool = False, + ignored_vulns_data: Optional[Dict[str, Vulnerability]] = None +) -> None: """ - Print details about ignored vulnerabilities. + Prints a concise summary of scan results including vulnerabilities, fixes, and ignored vulnerabilities. + + This function summarizes the results of a security scan, displaying the number of dependencies scanned, + vulnerabilities found, suggested fixes, and the impact of those fixes. It also optionally provides a + detailed breakdown of ignored vulnerabilities based on predefined policies. Args: - console (Console): The console for output. - project (ProjectModel): The project model. - ignored (Set[str]): Set of ignored vulnerabilities. - is_detailed_output (bool): Indicates if detailed output is enabled. - ignored_vulns_data (Optional[Dict[str, Vulnerability]]): Data of ignored vulnerabilities. + console (Console): The console object used to print formatted output. + total_issues_with_duplicates (int): The total number of security issues, including duplicates. + total_ignored_issues (int): The number of issues that were ignored based on project policies. + project (ProjectModel): The project model containing the scanned project details and policies. + dependencies_count (int, optional): The total number of dependencies scanned for vulnerabilities. Defaults to 0. + fixes_count (int, optional): The number of fixes suggested by the scan. Defaults to 0. + resolved_vulns_per_fix (int, optional): The number of vulnerabilities that can be resolved by the suggested fixes. Defaults to 0. + is_detailed_output (bool, optional): Flag to indicate whether detailed output, especially for ignored vulnerabilities, should be shown. Defaults to False. + ignored_vulns_data (Optional[Dict[str, Vulnerability]], optional): A dictionary of vulnerabilities that were ignored, categorized by their reason for being ignored. Defaults to None. + + Returns: + None: This function does not return any value. It prints the summary to the console. + + Usage: + Call this function after a vulnerability scan to display the results in a clear, formatted manner. + Example: + print_summary(console, unique_issues, 10, 2, project_model, dependencies_count=5, fixes_count=2) + """ + from ..util import pluralize + # Set the policy message based on the project source + if project.policy: + policy_msg = "policy fetched from Safety Platform" if project.policy.source is PolicySource.cloud else f"local {project.id or 'scan policy file'} project scan policy" + else: + policy_msg = "default Safety CLI policies" + + console.print(f"Tested [number]{dependencies_count}[/number] {pluralize('dependency', dependencies_count)} for security issues using {policy_msg}") + + if total_issues_with_duplicates == 0: + console.print("0 security issues found, 0 fixes suggested.") + else: + # Print security issues and ignored vulnerabilities + console.print(f"[number]{total_issues_with_duplicates}[/number] {pluralize('vulnerability', total_issues_with_duplicates)} found, " + f"[number]{total_ignored_issues}[/number] ignored due to policy.") + + console.print( + f"[number]{fixes_count}[/number] {pluralize('fix', fixes_count)} suggested, resolving [number]{resolved_vulns_per_fix}[/number] vulnerabilities.") + if is_detailed_output: if not ignored_vulns_data: ignored_vulns_data = iter([]) @@ -226,21 +244,23 @@ def print_ignore_details(console: Console, project: ProjectModel, ignored: Set[s f"[number]{count}[/number] were manually ignored due to the project policy:") for vuln in manual_ignored.values(): render_to_console(vuln, console, - rich_kwargs={"emoji": True, "overflow": "crop"}, - detailed_output=is_detailed_output) + rich_kwargs={"emoji": True, "overflow": "crop"}, + detailed_output=is_detailed_output) if cvss_severity_ignored: count = len(cvss_severity_ignored) console.print( f"[number]{count}[/number] {pluralize('vulnerability', count)} {pluralize('was', count)} ignored because " \ - "of their severity or exploitability impacted the following" \ + "of their severity or exploitability impacted the following" \ f" {pluralize('package', len(cvss_severity_ignored_pkgs))}: {', '.join(cvss_severity_ignored_pkgs)}" ) + if environment_ignored: count = len(environment_ignored) console.print( f"[number]{count}[/number] {pluralize('vulnerability', count)} {pluralize('was', count)} ignored because " \ - "they are inside an environment dependency." + "they are inside an environment dependency." ) + if unpinned_ignored: count = len(unpinned_ignored) console.print( @@ -249,11 +269,7 @@ def print_ignore_details(console: Console, project: ProjectModel, ignored: Set[s f"{', '.join(unpinned_ignored_pkgs)}" ) - else: - if len(ignored) > 0: - console.print(f"([number]{len(ignored)}[/number] {pluralize('vulnerability', len(ignored))} {pluralize('was', len(ignored))} ignored due to " \ - "project policy)") - + def print_wait_project_verification(console: Console, project_id: str, closure: Tuple[Any, Dict[str, Any]], on_error_delay: int = 1) -> Any: """ diff --git a/tests/scan/test_render.py b/tests/scan/test_render.py index d4d46b6c..ac667309 100644 --- a/tests/scan/test_render.py +++ b/tests/scan/test_render.py @@ -1,11 +1,10 @@ import unittest -from unittest import mock from unittest.mock import MagicMock, Mock, patch from pathlib import Path import datetime -from safety.scan.render import print_announcements, print_ignore_details, render_header -from safety_schemas.models import ProjectModel, IgnoreCodes +from safety.scan.render import print_announcements, print_summary, render_header +from safety_schemas.models import ProjectModel, IgnoreCodes, PolicySource class TestRender(unittest.TestCase): @patch('safety.scan.render.get_safety_version') @@ -56,29 +55,46 @@ def test_print_announcements(self, mock_console, mock_get_basic_announcements, m console.print.assert_any_call("[red]* Error message[/red]") + @patch('safety.scan.render.render_to_console') - def test_print_ignore_details(self, render_to_console_mocked): - render_to_console_mocked.return_value = "render_to_console_mocked" - from safety.console import main_console - console = MagicMock(wraps=main_console) - console.print = MagicMock() - - # Create a fake project - project = ProjectModel(id='prj-id') - - # Create a fake ignored vulnerabilities data - ignored_vulns_data = [ - MagicMock(ignored_code=IgnoreCodes.manual.value, vulnerability_id='v1', package_name='p1'), - MagicMock(ignored_code=IgnoreCodes.cvss_severity.value, vulnerability_id='v2', package_name='p2'), - MagicMock(ignored_code=IgnoreCodes.unpinned_specification.value, vulnerability_id='v3', package_name='p3'), - MagicMock(ignored_code=IgnoreCodes.environment_dependency.value, vulnerability_id='v4', package_name='p4'), - ] - - # Call the function - print_ignore_details(console, project, [], True, ignored_vulns_data) - - # Check that the console.print method was called with the expected arguments - console.print.assert_any_call("[number]1[/number] were manually ignored due to the project policy:") - console.print.assert_any_call("[number]1[/number] vulnerability was ignored because of their severity or exploitability impacted the following package: p2") - console.print.assert_any_call("[number]1[/number] vulnerability was ignored because they are inside an environment dependency.") - console.print.assert_any_call("[number]1[/number] vulnerability was ignored because this package has unpinned specs: p3") + def test_print_summary(self, mock_render_to_console): + console = MagicMock() + project = ProjectModel(id='test-project') + project.policy = MagicMock() + project.policy.source = PolicySource.cloud + + print_summary( + console, + total_issues_with_duplicates=0, + total_ignored_issues=0, + project=project, + dependencies_count=5, + fixes_count=0, + resolved_vulns_per_fix=0 + ) + + console.print.assert_any_call("Tested [number]5[/number] dependencies for security issues using policy fetched from Safety Platform") + console.print.assert_any_call("0 security issues found, 0 fixes suggested.") + + print_summary( + console, + total_issues_with_duplicates=2, + total_ignored_issues=1, + project=project, + dependencies_count=5, + fixes_count=1, + resolved_vulns_per_fix=2 + ) + + console.print.assert_any_call( + "Tested [number]5[/number] dependencies for security issues using policy fetched from Safety Platform" + ) + console.print.assert_any_call( + "[number]2[/number] security issues found, [number]1[/number] fix suggested, resolving [number]2[/number] vulnerabilities." + ) + + # Reset mock + console.reset_mock() + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_cli.py b/tests/test_cli.py index 27655ba4..14082564 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -516,9 +516,29 @@ def test_license_with_file(self, fetch_database_url): @patch('builtins.input', lambda *args: '') @patch('safety.safety.fetch_database', return_value={'vulnerable_packages': []}) def test_debug_flag(self, mock_get_auth_info, mock_is_valid, mock_get_auth_type, mock_fetch_database): + """ + Test the behavior of the CLI when invoked with the '--debug' flag. + + This test invokes the CLI with the 'scan' command and the '--debug' flag enabled, + verifies that the command exits successfully, and checks that the expected output snippet + is present in the CLI output. + + Args: + mock_get_auth_info: Mock for retrieving authentication info. + mock_is_valid: Mock for checking validity of inputs or authentication. + mock_get_auth_type: Mock for retrieving the authentication type. + mock_fetch_database: Mock for database fetching operations. + """ result = self.runner.invoke(cli.cli, ['--debug', 'scan']) - assert result.exit_code == 0, f"CLI exited with code {result.exit_code} and output: {result.output} and error: {result.stderr}" - assert "for known security issues using default" in result.output + assert result.exit_code == 0, ( + f"CLI exited with code {result.exit_code} and output: {result.output} and error: {result.stderr}" + ) + expected_output_snippet = "Safety 3.2.8 scanning" + assert expected_output_snippet in result.output, ( + f"Expected output to contain: {expected_output_snippet}, but got: {result.output}" + ) + + @patch('safety.auth.cli.get_auth_info', return_value={'email': 'test@test.com'}) @patch.object(Auth, 'is_valid', return_value=True)