diff --git a/CHANGELOG.md b/CHANGELOG.md index 72b9df39..8c93c88c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +Unreleased +---------------------- + +Features: +* [#368](https://github.com/godaddy/tartufo/pull/368) - Add update-signatures command to migrate deprecated signatures + v3.2.0 - 6 July 2022 ---------------------- diff --git a/docs/source/upgrading3.rst b/docs/source/upgrading3.rst index d4436b27..346b0123 100644 --- a/docs/source/upgrading3.rst +++ b/docs/source/upgrading3.rst @@ -154,6 +154,21 @@ Deprecated Options required at this time, replacing these options with their newer equivalents will reduce future disruptions when they are retired. +Updating Signatures +------------------- + +``tartufo`` release 3.2.0 deprecated a number of signatures that were generated +with the leading `+`/`-` from the git diff erroneously. These signatures will no +longer work in release 4. An additional command ``tartufo update-signatures`` was +added which scans a local repository, automatically updates the deprecated +exclude-signatures in your tartufo config file, and removes any resulting duplicates. + +Use ``--no-update-configuration`` to prevent ``tartufo`` from overwriting your config. +Use ``--no-remove-duplicates`` to prevent ``tartufo`` from removing duplicate signatures. + +When removing duplicate signatures ``tartufo`` will keep the first signature it finds +and discard the rest. + External Rules Files ++++++++++++++++++++ diff --git a/tartufo/commands/update_signatures.py b/tartufo/commands/update_signatures.py new file mode 100644 index 00000000..7ff5350e --- /dev/null +++ b/tartufo/commands/update_signatures.py @@ -0,0 +1,238 @@ +import contextlib +import functools +import io +import pathlib +import re +from typing import ( + Any, + MutableMapping, + MutableSet, + Optional, + Sequence, + Tuple, +) + +import click +import tomlkit + +from tartufo import types, util +from tartufo.config import load_config_from_path +from tartufo.scanner import GitRepoScanner + +DeprecationSetT = MutableSet[Sequence[str]] + + +def scan_local_repo( + options: types.GlobalOptions, + repo_path: str, + since_commit: Optional[str], + max_depth: int, + branch: Optional[str], + include_submodules: bool, +) -> Tuple[Optional[GitRepoScanner], io.StringIO]: + """A reworked version of the scan-local-repo command. + + :param options: The options provided to the top-level tartufo command + :param repo_path: The local filesystem path pointing to the repository + :param since_commit: A commit hash to treat as a starting point in history for the scan + :param max_depth: A maximum depth, or maximum number of commits back in history, to scan + :param branch: A specific branch to scan + :param include_submodules: Whether to also scan submodules of the repository + :returns: A tuple containing optional scanner, stdout, and stderr from the scan + """ + stdout = io.StringIO() + stderr = io.StringIO() + scanner = None + + git_options = types.GitOptions( + since_commit=since_commit, + max_depth=max_depth, + branch=branch, + include_submodules=include_submodules, + ) + + with contextlib.redirect_stdout(stdout): + with contextlib.redirect_stderr(stderr): + try: + scanner = GitRepoScanner(options, git_options, str(repo_path)) + util.process_issues(repo_path, scanner, options) + except types.GitLocalException: + message = f"{repo_path} is not a valid git repository." + click.echo(util.style_error(message), err=True) + except types.TartufoException as exc: + click.echo(util.style_error(str(exc)), err=True) + + del stdout + return scanner, stderr + + +def get_deprecations(stderr: io.StringIO) -> DeprecationSetT: + """Finds the deprecated signatures from the given input. + + :param stderr: Stderr output from the scan-local-repo subcommand + :returns: A set of tuples each containing the old and new signature + """ + deprecations: DeprecationSetT = set() + deprecation_rgx = re.compile( + r"DeprecationWarning: Signature (\w+) was.*use signature (\w+) instead\." + ) + + # Start at the beginning of the buffer + stderr.seek(0) + for line in stderr: + match = deprecation_rgx.search(line) + + if match: + # This line had a deprecation warning + deprecations.add(match.groups()) + + return deprecations + + +def replace_deprecated_signatures( + deprecations: DeprecationSetT, config_data: MutableMapping[str, Any] +) -> int: + """Update the old deprecated signatures with the new signatures. + + :param deprecations: The deprecated, and replacement signatures + :param config_data: The current tartufo config data + :returns: The number of items replaced in config_data + """ + updated = 0 + + for old_sig, new_sig in deprecations: + targets = functools.partial(lambda o, s: o == s["signature"], old_sig) + # Iterate all the deprecations and update them everywhere + # they are found in the exclude-signatures section of config + for target_signature in filter(targets, config_data["exclude_signatures"]): + updated += 1 + click.echo(f"{updated}) {old_sig!r} -> {new_sig!r}") + target_signature["signature"] = new_sig + + return updated + + +def write_updated_signatures( + config_path: pathlib.Path, config_data: MutableMapping[str, Any] +) -> None: + """Read the current config file and update it with the new data. + + :param config_path: The path to the tartufo config file + :param config_data: The updated config data + """ + with open(str(config_path), "r") as file: + result = tomlkit.loads(file.read()) + + # Assign the new signatures and write it to the config + result["tool"]["tartufo"]["exclude-signatures"] = config_data[ # type: ignore + "exclude_signatures" + ] + + with open(str(config_path), "w") as file: + file.write(tomlkit.dumps(result)) + + +def remove_duplicated_entries(config_data: MutableMapping[str, Any]) -> int: + """Removed any duplicated signature entries in the config. + Keeps the first instance of each duplicate. + + :param config_data: The config data to check for duplicates + """ + seen = set() + count = 0 + + for i, exclude in enumerate(config_data["exclude_signatures"].copy()): + if exclude["signature"] in seen: + # Remove this duplicated signature + del config_data["exclude_signatures"][i] + count += 1 + else: + # Mark this signature as seen + seen.add(exclude["signature"]) + + return count + + +@click.command("update-signatures") +@click.option("--since-commit", help="Only scan from a given commit hash.", hidden=True) +@click.option( + "--max-depth", + default=1000000, + show_default=True, + help="The max commit depth to go back when searching for secrets.", + hidden=True, +) +@click.option("--branch", help="Specify a branch name to scan only that branch.") +@click.argument( + "repo-path", + type=click.Path(exists=True, file_okay=False, resolve_path=True, allow_dash=False), +) +@click.option( + "--include-submodules/--exclude-submodules", + is_flag=True, + default=False, + show_default=True, + help="Controls whether the contents of git submodules are scanned", +) +@click.option( + "--update-configuration/--no-update-configuration", + is_flag=True, + default=True, + show_default=True, + help="Whether or not to overwrite the tartufo config file.", +) +@click.option( + "--remove-duplicates/--no-remove-duplicates", + is_flag=True, + default=True, + show_default=True, + help="Whether or not to remove duplicated signatures.", +) +@click.pass_obj +@click.pass_context +def main( + ctx: click.Context, + options: types.GlobalOptions, + repo_path: str, + since_commit: Optional[str], + max_depth: int, + branch: Optional[str], + include_submodules: bool, + update_configuration: bool, + remove_duplicates: bool, +) -> GitRepoScanner: + """Update deprecated signatures for a local repository.""" + config_path, config_data = load_config_from_path(pathlib.Path(repo_path)) + if not config_data.get("exclude_signatures"): + util.fail( + util.style_warning("No signatures found in configuration, exiting..."), + ctx, + code=0, + ) + + scanner, stderr = scan_local_repo( + options, repo_path, since_commit, max_depth, branch, include_submodules + ) + + if not scanner: + # Explicitly fail if we didn't get a scanner back + util.fail(util.style_error("Unable to update signatures"), ctx) + + plural = lambda n: "" if n == 1 else "s" + deprecations = get_deprecations(stderr) + ndeps = len(deprecations) + click.echo(f"Found {ndeps} deprecated signature{plural(ndeps)}.") + updated = replace_deprecated_signatures(deprecations, config_data) + + dups = 0 + if remove_duplicates: + dups = remove_duplicated_entries(config_data) + + if update_configuration and (deprecations or dups): + # Only rewrite the config if we have altered it + write_updated_signatures(config_path, config_data) + + click.echo(f"Removed {dups} duplicated signature{plural(dups)}.") + click.echo(f"Updated {updated} deprecated signature{plural(updated)}.") + + return scanner diff --git a/tartufo/util.py b/tartufo/util.py index ea2481da..014cfc62 100644 --- a/tartufo/util.py +++ b/tartufo/util.py @@ -17,6 +17,7 @@ Generator, List, Optional, + NoReturn, Tuple, TYPE_CHECKING, Pattern, @@ -170,7 +171,7 @@ def _style_func(msg: str, *_: Any, **__: Any) -> str: style_ok = style_error = style_warning = partial(_style_func) -def fail(msg: str, ctx: click.Context, code: int = 1) -> None: +def fail(msg: str, ctx: click.Context, code: int = 1) -> NoReturn: """Print out a styled error message and exit. :param msg: The message to print out to the user diff --git a/tests/test_update_signatures.py b/tests/test_update_signatures.py new file mode 100644 index 00000000..5677fd8b --- /dev/null +++ b/tests/test_update_signatures.py @@ -0,0 +1,391 @@ +import io +import textwrap +from os import remove +from pathlib import Path +from typing import Sequence, Set + +from unittest import mock, TestCase +import tomlkit +from click.testing import CliRunner + +from tartufo import cli, types +from tartufo.commands import update_signatures + + +class UpdateSignaturesTests(TestCase): + @mock.patch("tartufo.commands.update_signatures.load_config_from_path") + def test_with_no_signatures_in_config( + self, mock_load_config: mock.MagicMock + ) -> None: + mock_load_config.return_value = Path("."), {"test": None} + + runner = CliRunner() + with runner.isolated_filesystem(): + result = runner.invoke(cli.main, ["update-signatures", "."]) + + mock_load_config.assert_called_once() + self.assertEqual( + result.output, "No signatures found in configuration, exiting...\n" + ) + + @mock.patch("tartufo.commands.update_signatures.GitRepoScanner") + @mock.patch("tartufo.commands.update_signatures.load_config_from_path") + def test_with_no_deprecated_signatures( + self, mock_load_config: mock.MagicMock, mock_scanner: mock.MagicMock + ) -> None: + mock_load_config.return_value = Path("."), { + "exclude_signatures": [{"signature": "a"}] + } + + runner = CliRunner() + with runner.isolated_filesystem(): + result = runner.invoke(cli.main, ["update-signatures", "."]) + + mock_scanner.assert_called_once() + mock_load_config.assert_called_once() + self.assertEqual(result.output, "Found 0 deprecated signatures.\n") + + @mock.patch("tartufo.commands.update_signatures.scan_local_repo") + @mock.patch("tartufo.commands.update_signatures.load_config_from_path") + def test_update_signatures_when_scanner_is_none( + self, mock_load_config: mock.MagicMock, mock_scan_local: mock.MagicMock + ) -> None: + mock_scan_local.return_value = None, "b" + mock_load_config.return_value = Path("."), { + "exclude_signatures": [{"signature": "a"}] + } + + runner = CliRunner() + with runner.isolated_filesystem(): + result = runner.invoke(cli.main, ["update-signatures", "."]) + + mock_scan_local.assert_called_once() + mock_load_config.assert_called_once() + self.assertGreater(result.exit_code, 0) + self.assertEqual(result.output, "Unable to update signatures\n") + + @mock.patch("tartufo.commands.update_signatures.write_updated_signatures") + @mock.patch("tartufo.commands.update_signatures.get_deprecations") + @mock.patch("tartufo.commands.update_signatures.remove_duplicated_entries") + @mock.patch("tartufo.commands.update_signatures.scan_local_repo") + @mock.patch("tartufo.commands.update_signatures.load_config_from_path") + def test_when_no_remove_duplicates( + self, + mock_load_config: mock.MagicMock, + mock_scan_local: mock.MagicMock, + mock_remove_dups: mock.MagicMock, + mock_get_deprecations: mock.MagicMock, + mock_write: mock.MagicMock, + ) -> None: + mock_scanner = mock.MagicMock() + mock_write.return_value = None + mock_get_deprecations.return_value = {("123", "abc"), ("456", "def")} + mock_scan_local.return_value = mock_scanner, "b" + mock_load_config.return_value = Path("."), { + "exclude_signatures": [{"signature": "a"}] + } + + runner = CliRunner() + with runner.isolated_filesystem(): + runner.invoke( + cli.main, ["update-signatures", ".", "--no-remove-duplicates"] + ) + + mock_get_deprecations.assert_called_once() + mock_remove_dups.assert_not_called() + mock_scan_local.assert_called_once() + mock_load_config.assert_called_once() + mock_write.assert_called_once() + + @mock.patch("tartufo.commands.update_signatures.write_updated_signatures") + @mock.patch("tartufo.commands.update_signatures.get_deprecations") + @mock.patch("tartufo.commands.update_signatures.remove_duplicated_entries") + @mock.patch("tartufo.commands.update_signatures.scan_local_repo") + @mock.patch("tartufo.commands.update_signatures.load_config_from_path") + def test_when_remove_duplicates( + self, + mock_load_config: mock.MagicMock, + mock_scan_local: mock.MagicMock, + mock_remove_dups: mock.MagicMock, + mock_get_deprecations: mock.MagicMock, + mock_write: mock.MagicMock, + ) -> None: + mock_scanner = mock.MagicMock() + mock_scan_local.return_value = mock_scanner, "b" + mock_write.return_value = None + mock_get_deprecations.return_value = {("123", "abc"), ("456", "def")} + mock_load_config.return_value = Path("."), { + "exclude_signatures": [{"signature": "a"}] + } + + runner = CliRunner() + with runner.isolated_filesystem(): + runner.invoke(cli.main, ["update-signatures", "."]) + + mock_get_deprecations.assert_called_once() + mock_remove_dups.assert_called_once() + mock_scan_local.assert_called_once() + mock_load_config.assert_called_once() + mock_write.assert_called_once() + + @mock.patch("tartufo.commands.update_signatures.types.GlobalOptions") + @mock.patch("tartufo.commands.update_signatures.util.process_issues") + def test_scan_local_with_git_local_exc( + self, mock_process_issues: mock.MagicMock, mock_global_options: mock.MagicMock + ) -> None: + mock_process_issues.side_effect = types.GitLocalException() + + scanner, stderr = update_signatures.scan_local_repo( + mock_global_options, ".", None, 1, None, False + ) + + mock_process_issues.assert_called_once_with(".", scanner, mock_global_options) + self.assertEqual(stderr.getvalue(), ". is not a valid git repository.\n") + + @mock.patch("tartufo.commands.update_signatures.types.GlobalOptions") + @mock.patch("tartufo.commands.update_signatures.util.process_issues") + def test_scan_local_with_tartufo_exc( + self, mock_process_issues: mock.MagicMock, mock_global_options: mock.MagicMock + ) -> None: + mock_process_issues.side_effect = types.TartufoException("TARTUFO EXC") + + scanner, stderr = update_signatures.scan_local_repo( + mock_global_options, ".", None, 1, None, False + ) + + mock_process_issues.assert_called_once_with(".", scanner, mock_global_options) + self.assertEqual(stderr.getvalue(), "TARTUFO EXC\n") + + @mock.patch("tartufo.commands.update_signatures.write_updated_signatures") + @mock.patch("tartufo.commands.update_signatures.get_deprecations") + @mock.patch("tartufo.commands.update_signatures.scan_local_repo") + @mock.patch("tartufo.commands.update_signatures.load_config_from_path") + def test_found_output_with_signatures( + self, + mock_load_config: mock.MagicMock, + mock_scan_local: mock.MagicMock, + mock_get_deprecations: mock.MagicMock, + mock_write: mock.MagicMock, + ) -> None: + mock_scanner = mock.MagicMock() + mock_write.return_value = None + mock_get_deprecations.return_value = {("123", "abc"), ("456", "def")} + mock_scan_local.return_value = ( + mock_scanner, + ( + "DeprecationWarning: Signature 123 was ... use signature abc instead.\n" + "DeprecationWarning: Signature 456 was ... use signature def instead." + ), + ) + + mock_load_config.return_value = Path("."), { + "exclude_signatures": [{"signature": "123"}, {"signature": "456"}] + } + + runner = CliRunner() + with runner.isolated_filesystem(): + result = runner.invoke(cli.main, ["update-signatures", "."]) + + mock_write.assert_called_once() + mock_get_deprecations.assert_called_once() + mock_scan_local.assert_called_once() + mock_load_config.assert_called_once() + self.assertTrue(result.output.startswith("Found 2 deprecated signatures.\n")) + + # The numbers before the paren can vary so we leave them out of the test + self.assertTrue(") '123' -> 'abc'\n" in result.output) + self.assertTrue(") '456' -> 'def'\n" in result.output) + self.assertTrue("Removed 0 duplicated signatures.\n" in result.output) + self.assertTrue(result.output.endswith("Updated 2 deprecated signatures.\n")) + + @mock.patch("tartufo.commands.update_signatures.write_updated_signatures") + @mock.patch("tartufo.commands.update_signatures.get_deprecations") + @mock.patch("tartufo.commands.update_signatures.scan_local_repo") + @mock.patch("tartufo.commands.update_signatures.load_config_from_path") + def test_found_output_with_duplicated_signatures( + self, + mock_load_config: mock.MagicMock, + mock_scan_local: mock.MagicMock, + mock_get_deprecations: mock.MagicMock, + mock_write: mock.MagicMock, + ) -> None: + mock_scanner = mock.MagicMock() + mock_write.return_value = None + mock_get_deprecations.return_value = { + ("123", "abc"), + ("456", "def"), + ("789", "abc"), + } + + mock_scan_local.return_value = ( + mock_scanner, + ( + "DeprecationWarning: Signature 123 was ... use signature abc instead.\n" + "DeprecationWarning: Signature 456 was ... use signature def instead.\n" + "DeprecationWarning: Signature 789 was ... use signature abc instead." + ), + ) + + mock_load_config.return_value = Path("."), { + "exclude_signatures": [ + {"signature": "123"}, + {"signature": "456"}, + {"signature": "789"}, + ] + } + + runner = CliRunner() + with runner.isolated_filesystem(): + result = runner.invoke(cli.main, ["update-signatures", "."]) + + mock_write.assert_called_once() + mock_get_deprecations.assert_called_once() + mock_scan_local.assert_called_once() + mock_load_config.assert_called_once() + self.assertTrue(result.output.startswith("Found 3 deprecated signatures.\n")) + + # The numbers before the paren can vary so we leave them out of the test + self.assertTrue(") '123' -> 'abc'\n" in result.output) + self.assertTrue(") '456' -> 'def'\n" in result.output) + self.assertTrue(") '789' -> 'abc'\n" in result.output) + self.assertTrue("Removed 1 duplicated signature.\n" in result.output) + self.assertTrue(result.output.endswith("Updated 3 deprecated signatures.\n")) + + @mock.patch("tartufo.commands.update_signatures.write_updated_signatures") + @mock.patch("tartufo.commands.update_signatures.replace_deprecated_signatures") + @mock.patch("tartufo.commands.update_signatures.get_deprecations") + @mock.patch("tartufo.commands.update_signatures.scan_local_repo") + @mock.patch("tartufo.commands.update_signatures.load_config_from_path") + def test_found_output_with_no_signatures( + self, + mock_load_config: mock.MagicMock, + mock_scan_local: mock.MagicMock, + mock_get_deprecations: mock.MagicMock, + mock_replace: mock.MagicMock, + mock_write: mock.MagicMock, + ) -> None: + mock_scanner = mock.MagicMock() + mock_write.return_value = None + mock_replace.return_value = 2 + mock_get_deprecations.return_value = {} + mock_scan_local.return_value = mock_scanner, "" + mock_load_config.return_value = Path("."), {"exclude_signatures": "a"} + + runner = CliRunner() + with runner.isolated_filesystem(): + result = runner.invoke(cli.main, ["update-signatures", "."]) + + mock_replace.assert_called_once() + mock_get_deprecations.assert_called_once() + mock_scan_local.assert_called_once() + self.assertEqual(result.output, "Found 0 deprecated signatures.\n") + + def test_remove_duplicated_entries(self) -> None: + initial_data = { + "exclude_signatures": [ + {"signature": "123", "reason": "first"}, + {"signature": "123", "reason": "second"}, + {"signature": "456"}, + ] + } + + expected_data = { + "exclude_signatures": [ + {"signature": "123", "reason": "first"}, + {"signature": "456"}, + ] + } + + update_signatures.remove_duplicated_entries(initial_data) + self.assertEqual(initial_data, expected_data) + + def test_get_deprecations_with_deprecations(self) -> None: + expected_deprecations = {("123", "abc"), ("456", "def"), ("789", "ghi")} + stderr = io.StringIO( + "DeprecationWarning: Signature 123 was ... use signature abc instead.\n" + "DeprecationWarning: Signature 456 was ... use signature def instead.\n" + "DeprecationWarning: Signature 789 was ... use signature ghi instead." + ) + + deprecations = update_signatures.get_deprecations(stderr) + + self.assertEqual(len(deprecations), 3) + self.assertTrue(isinstance(deprecations, set)) + self.assertEqual(deprecations, expected_deprecations) + + def test_get_deprecations_with_no_deprecations(self) -> None: + stderr = io.StringIO("No deprecations here :).") + + deprecations = update_signatures.get_deprecations(stderr) + + self.assertTrue(isinstance(deprecations, set)) + self.assertEqual(len(deprecations), 0) + + def test_replace_deprecated_signatures(self) -> None: + deprecations: Set[Sequence[str]] = set() + deprecations.update((("123", "abc"), ("456", "def"), ("789", "ghi"))) + expected_result = { + "exclude_signatures": [ + {"signature": "abc"}, + {"signature": "def"}, + {"signature": "ghi"}, + ] + } + + config_data = { + "exclude_signatures": [ + {"signature": "123"}, + {"signature": "456"}, + {"signature": "789"}, + ] + } + + update_signatures.replace_deprecated_signatures(deprecations, config_data) + self.assertEqual(config_data, expected_result) + + def test_write_updated_signatures(self) -> None: + def build_file_content(*signatures: str) -> str: + return textwrap.dedent( + """[tool.tartufo] + exclude-signatures = [ + {signature = '%s'}, + {signature = '%s'}, + {signature = '%s'} + ] + """ # pylint: disable=C0209 + % signatures + ) + + file_name = Path("test.toml") + initial_file_content = build_file_content("123", "456", "789") + expected_deprecations: Set[Sequence[str]] = set() + expected_deprecations.update((("123", "abc"), ("456", "def"), ("789", "ghi"))) + config_data = { + "exclude_signatures": [ + {"signature": "123"}, + {"signature": "456"}, + {"signature": "789"}, + ] + } + + with open(file_name, "w") as config_file: + config_file.write(initial_file_content) + + update_signatures.replace_deprecated_signatures( + expected_deprecations, config_data + ) + update_signatures.write_updated_signatures(file_name, config_data) + + result_config_data = { + "tool": { + "tartufo": { + key.replace("_", "-"): value for key, value in config_data.items() + } + } + } + + with open(file_name, "r") as config_file: + file_content = config_file.read() + self.assertEqual(result_config_data, tomlkit.loads(file_content)) + + remove(file_name)