From 82d36cbbbb14a927f7c7775cffea5f810d5d32df Mon Sep 17 00:00:00 2001 From: Miroslav Shubernetskiy Date: Thu, 7 Nov 2024 12:20:06 -0500 Subject: [PATCH] test: removing validate.py in favor of in-line runner validations (#441) * test: removing validate.py in favor of in-line runner validations Moving most of the functionality into the runner utils (e.g. insert, extract). It checks common keys are present such as `_OPERATION` for each wrapped command and other bits such as if its not virtual chalking, virtual chalk json should be absent. As this is validated in each invocation of the runner, we dont need to separately call additional validation utilities in tests. Only thing each test needs to assert is specific keys (if any) from either report/chalkmark. To make that simpler, added some utility properties like `marks_by_path` which group all chalkmarks by the path of the artifacts hence allowing to assert that all expected files were chalked as expected. * build: fixing buildx 0.18 compatibility in compose file --- Makefile | 4 + docker-compose.yml | 10 +- tests/functional/chalk/runner.py | 151 ++++++++++++-- tests/functional/chalk/validate.py | 169 --------------- .../data/configs/validation/custom_report.c4m | 2 +- tests/functional/setup.cfg | 2 + tests/functional/test_command.py | 63 +----- tests/functional/test_composable.py | 16 +- tests/functional/test_config.py | 37 ++-- tests/functional/test_docker.py | 157 +++++--------- tests/functional/test_elf.py | 55 ++--- tests/functional/test_plugins.py | 119 +++-------- tests/functional/test_py.py | 80 ++++---- tests/functional/test_sink.py | 17 +- tests/functional/test_zip.py | 194 +++++------------- 15 files changed, 380 insertions(+), 696 deletions(-) delete mode 100644 tests/functional/chalk/validate.py diff --git a/Makefile b/Makefile index 235d1673..c4315026 100644 --- a/Makefile +++ b/Makefile @@ -131,11 +131,15 @@ ifneq "$(shell which systemctl 2> /dev/null)" "" || echo Please restart docker daemon after changing docker config endif +$(HOME)/.pdbrc.py: + touch $@ + .PHONY: docker-setup docker-setup: /etc/docker/daemon.json .PHONY: tests tests: DOCKER=$(_DOCKER) # force rebuilds to use docker to match tests +tests: $(HOME)/.pdbrc.py tests: docker-setup tests: $(BINARY) # note this will rebuild chalk if necessary docker compose run --rm tests $(make_args) $(args) diff --git a/docker-compose.yml b/docker-compose.yml index 77e75c15..0fd73d31 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,7 +27,7 @@ services: # -------------------------------------------------------------------------- # SERVER - server: &server + server: build: context: ./server target: deps @@ -50,8 +50,13 @@ services: interval: 1s server-tls: - <<: *server + build: + context: ./server + target: deps command: run -r -p 5858 --domain=tls.chalk.local --keyfile=cert.key --certfile=cert.pem --use-existing-cert + working_dir: /chalk/server + volumes: + - .:/chalk ports: - 5858:5858 networks: @@ -122,6 +127,7 @@ services: - seccomp=unconfined # for gdb volumes: - $PWD:$PWD + - $HOME/.pdbrc.py:/root/.pdbrc.py - /var/run/docker.sock:/var/run/docker.sock - /etc/buildkit:/etc/buildkit - /etc/docker:/etc/docker diff --git a/tests/functional/chalk/runner.py b/tests/functional/chalk/runner.py index fdbe7113..d1e70893 100644 --- a/tests/functional/chalk/runner.py +++ b/tests/functional/chalk/runner.py @@ -11,7 +11,7 @@ from ..conf import MAGIC from ..utils.bin import sha256 -from ..utils.dict import ContainsMixin +from ..utils.dict import ContainsMixin, MISSING, ANY, IfExists from ..utils.docker import Docker from ..utils.log import get_logger from ..utils.os import CalledProcessError, Program, run @@ -37,17 +37,58 @@ logger = get_logger() +def artifact_type(path: Path) -> str: + if path.suffix == ".py": + return "python" + elif path.suffix == ".zip": + return "ZIP" + else: + return "ELF" + + class ChalkReport(ContainsMixin, dict): name = "report" def __init__(self, report: dict[str, Any]): super().__init__(**report) + def deterministic(self, ignore: Optional[set[str]] = None): + return self.__class__( + { + k: v + for k, v in self.items() + if k + not in { + "_TIMESTAMP", + "_DATETIME", + "_ACTION_ID", + "_ARGV", + "_OP_ARGV", + "_EXEC_ID", + # docker does not have deterministic output + # insecure registries are not consistently ordered + "_DOCKER_INFO", + } + | (ignore or set()) + } + ) + @property def marks(self): assert len(self["_CHALKS"]) > 0 return [ChalkMark(i, report=self) for i in self["_CHALKS"]] + @property + def marks_by_path(self): + return ContainsMixin( + { + i.get("PATH_WHEN_CHALKED", i.get("_OP_ARTIFACT_PATH")): i + for i in self.marks + # paths can be missing for example in minimum report profile + if "PATH_WHEN_CHALKED" in i or "_OP_ARTIFACT_PATH" in i + } + ) + @property def mark(self): assert len(self.marks) == 1 @@ -179,6 +220,11 @@ def report(self): assert len(self.reports) == 1 return self.reports[0] + @property + def first_report(self): + assert len(self.reports) > 0 + return self.reports[0] + @property def mark(self): return self.report.mark @@ -187,6 +233,27 @@ def mark(self): def marks(self): return self.report.marks + @property + def marks_by_path(self): + return self.report.marks_by_path + + @property + def virtual_path(self): + return Path.cwd() / "virtual-chalk.json" + + @property + def vmarks(self): + assert self.virtual_path.exists() + return [ + ChalkMark.from_json(i) for i in self.virtual_path.read_text().splitlines() + ] + + @property + def vmark(self): + marks = self.vmarks + assert len(marks) == 1 + return marks[0] + class Chalk: def __init__( @@ -284,22 +351,22 @@ def run( # if chalk outputs report, sanity check its operation matches chalk_cmd if expecting_report: - try: - report = result.report - except Exception: - pass - else: - # report could be silenced on the profile level - if report: - operation = cast(str, command) - # when calling docker, the arg after docker is the operation - if not operation and "docker" in params: - try: - operation = params[params.index("buildx") + 1] - except ValueError: - operation = params[params.index("docker") + 1] - if operation: - assert report.has(_OPERATION=operation) + report = result.first_report + operation = cast(str, command) + # when calling docker, the arg after docker is the operation + if not operation and "docker" in params: + try: + operation = params[params.index("buildx") + 1] + except ValueError: + operation = params[params.index("docker") + 1] + if operation: + assert report.has(_OPERATION=IfExists(operation)) + if "_CHALKS" in report: + for mark in report.marks: + assert mark.has_if( + operation in {"insert", "build"}, + _VIRTUAL=IfExists(virtual), + ) return result @@ -313,8 +380,10 @@ def insert( log_level: ChalkLogLevel = "trace", env: Optional[dict[str, str]] = None, ignore_errors: bool = False, + expecting_report: bool = True, + expecting_chalkmarks: bool = True, ) -> ChalkProgram: - return self.run( + result = self.run( command="insert", target=artifact, config=config, @@ -322,7 +391,27 @@ def insert( log_level=log_level, env=env, ignore_errors=ignore_errors, + expecting_report=expecting_report, ) + if expecting_report: + if expecting_chalkmarks: + for chalk in result.marks: + assert chalk.has(_VIRTUAL=IfExists(virtual)) + if virtual: + assert result.virtual_path.exists() + for mark in result.vmarks: + assert mark.has( + CHALK_ID=ANY, + MAGIC=MAGIC, + ) + else: + assert result.report.has( + _CHALKS=MISSING, + _UNMARKED=IfExists(ANY), + ) + if not virtual: + assert not result.virtual_path.exists() + return result def extract( self, @@ -332,8 +421,10 @@ def extract( config: Optional[Path] = None, log_level: ChalkLogLevel = "trace", env: Optional[dict[str, str]] = None, + virtual: bool = False, + expecting_chalkmarks: bool = True, ) -> ChalkProgram: - return self.run( + result = self.run( command="extract", target=artifact, log_level=log_level, @@ -342,6 +433,22 @@ def extract( config=config, env=env, ) + if virtual: + assert result.report.has( + _CHALKS=MISSING, + _UNMARKED=IfExists(ANY), + ) + else: + if Path(artifact).exists() and expecting_chalkmarks: + for path, chalk in result.marks_by_path.items(): + assert chalk.has( + ARTIFACT_TYPE=artifact_type(Path(path)), + PLATFORM_WHEN_CHALKED=result.report["_OP_PLATFORM"], + INJECTOR_COMMIT_ID=result.report["_OP_CHALKER_COMMIT_ID"], + ) + if not expecting_chalkmarks: + assert "_CHALKS" not in result.report + return result def exec( self, @@ -373,7 +480,7 @@ def dump(self, path: Optional[Path] = None) -> ChalkProgram: if path is not None: assert not path.is_file() args = [str(path)] - result = self.run(command="dump", params=args) + result = self.run(command="dump", params=args, expecting_report=False) if path is not None: assert path.is_file() return result @@ -487,10 +594,13 @@ def docker_build( ) ) if expecting_report and expected_success and image_hash: + assert result.report.has(_VIRTUAL=IfExists(virtual)) if platforms: assert len(result.marks) == len(platforms) else: assert len(result.marks) == 1 + for chalk in result.marks: + assert chalk.has(_OP_ARTIFACT_TYPE="Docker Image") # sanity check that chalk mark includes basic chalk keys assert image_hash in [i["_CURRENT_HASH"] for i in result.marks] assert image_hash in [i["_IMAGE_ID"] for i in result.marks] @@ -517,4 +627,5 @@ def docker_push(self, image: str, buildkit: bool = True): def docker_pull(self, image: str): return self.run( params=["docker", "pull", image], + expecting_report=False, ) diff --git a/tests/functional/chalk/validate.py b/tests/functional/chalk/validate.py deleted file mode 100644 index 1b847588..00000000 --- a/tests/functional/chalk/validate.py +++ /dev/null @@ -1,169 +0,0 @@ -# Copyright (c) 2023, Crash Override, Inc. -# -# This file is part of Chalk -# (see https://crashoverride.com/docs/chalk) -from dataclasses import dataclass, field -from pathlib import Path -from typing import Any, Optional - -from ..conf import MAGIC, SHEBANG -from ..utils.dict import ANY, MISSING, Contains, IfExists, Length -from ..utils.log import get_logger -from .runner import ChalkMark, ChalkReport - - -logger = get_logger() - - -@dataclass -class ArtifactInfo: - type: str - chalk_info: dict[str, Any] = field(default_factory=dict) - host_info: dict[str, Any] = field(default_factory=dict) - - @classmethod - def path_type(cls, path: Path) -> str: - if path.suffix == ".py": - return "python" - else: - return "ELF" - - @classmethod - def one_elf( - cls, - path: Path, - chalk_info: Optional[dict[str, Any]] = None, - host_info: Optional[dict[str, Any]] = None, - ): - return { - str(path): cls( - type=cls.path_type(path), - chalk_info=chalk_info or {}, - host_info=host_info or {}, - ) - } - - @classmethod - def all_shebangs(cls): - return { - str(i.resolve()): cls(type=cls.path_type(i)) - for i in Path().iterdir() - if i.is_file() and i.read_text().startswith(SHEBANG) - } - - -# `virtual-chalk.json` file found after chalking with `--virtual` enabled -def validate_virtual_chalk( - tmp_data_dir: Path, artifact_map: dict[str, ArtifactInfo], virtual: bool -) -> dict[str, Any]: - vjsonf = tmp_data_dir / "virtual-chalk.json" - if not virtual or not artifact_map: - assert not vjsonf.is_file(), "virtual-chalk.json should not have been created!" - return {} - - assert vjsonf.is_file(), "virtual-chalk.json not found" - # jsonl is one json object per line, NOT array of json - # number of json objects is number of artifacts chalked - all_vchalks = [ChalkMark.from_json(i) for i in vjsonf.read_text().splitlines()] - - for vchalk in all_vchalks: - assert vchalk.has( - CHALK_ID=ANY, - MAGIC=MAGIC, - ) - - # return first one - return all_vchalks[0] - - -# chalk report is created after `chalk insert` operation -def validate_chalk_report( - chalk_report: ChalkReport, - artifact_map: dict[str, ArtifactInfo], - virtual: bool, - chalk_action: str = "insert", -): - assert chalk_report.has(_OPERATION=chalk_action) - - if not artifact_map: - assert chalk_report.has(_CHALKS=MISSING) - return - - assert chalk_report.has(_CHALKS=Length(len(artifact_map))) - - # check arbitrary host report values - for artifact in artifact_map.values(): - assert chalk_report.contains(artifact.host_info) - - for mark in chalk_report.marks: - path = mark.lifted["PATH_WHEN_CHALKED"] - assert path in artifact_map, "chalked artifact incorrect" - artifact = artifact_map[path] - - assert mark.lifted.has( - ARTIFACT_TYPE=artifact.type, - **artifact.chalk_info, - ) - assert mark.lifted.has_if( - chalk_action == "insert", - _VIRTUAL=virtual, - ) - - -# slightly different from above -def validate_docker_chalk_report( - chalk_report: ChalkReport, - artifact: ArtifactInfo, - virtual: bool, - chalk_action: str = "build", -): - assert chalk_report.has(_OPERATION=chalk_action, _CHALKS=Length(1)) - assert chalk_report.contains(artifact.host_info) - - for chalk in chalk_report.marks: - assert chalk.has( - # chalk id should always exist - CHALK_ID=ANY, - _OP_ARTIFACT_TYPE=artifact.type, - ) - assert chalk.contains(artifact.chalk_info) - assert chalk.has_if( - chalk_action == "build", - _VIRTUAL=virtual, - ) - - -# extracted chalk is created after `chalk extract` operation -def validate_extracted_chalk( - extracted_chalk: ChalkReport, - artifact_map: dict[str, ArtifactInfo], - virtual: bool, -) -> None: - # there should not be operation errors - assert extracted_chalk.has(_OPERATION="extract", _OP_ERRORS=IfExists(Length(0))) - - if len(artifact_map) == 0: - assert extracted_chalk.has(_CHALKS=MISSING) - return - - if virtual: - assert extracted_chalk.has( - _CHALKS=MISSING, - _UNMARKED=Contains(set(artifact_map)), - ) - - else: - # okay to have _UNMARKED as long as the chalk mark is still there - assert extracted_chalk.has(_CHALKS=Length(len(artifact_map))) - - for chalk in extracted_chalk.marks: - path = chalk["_OP_ARTIFACT_PATH"] - assert path in artifact_map, "path not found" - artifact_info = artifact_map[path] - - assert chalk.has( - ARTIFACT_TYPE=artifact_info.type, - # top level vs chalk-level sanity check - PLATFORM_WHEN_CHALKED=extracted_chalk["_OP_PLATFORM"], - INJECTOR_COMMIT_ID=extracted_chalk["_OP_CHALKER_COMMIT_ID"], - ) diff --git a/tests/functional/data/configs/validation/custom_report.c4m b/tests/functional/data/configs/validation/custom_report.c4m index b5683b2b..1e85ae61 100644 --- a/tests/functional/data/configs/validation/custom_report.c4m +++ b/tests/functional/data/configs/validation/custom_report.c4m @@ -11,7 +11,7 @@ report_template test_report_template { sink_config test_file_out { sink: "file" - filename: "/tmp/custom_report.log" + filename: env("LOG_FILE") enabled: true } diff --git a/tests/functional/setup.cfg b/tests/functional/setup.cfg index f286eafa..a548519e 100644 --- a/tests/functional/setup.cfg +++ b/tests/functional/setup.cfg @@ -4,6 +4,8 @@ ignore = # black is in charge of line length E501 + # black formats all operators + E231 # binary operator on new line W503 # whitespace before : in array slices diff --git a/tests/functional/test_command.py b/tests/functional/test_command.py index eda1731f..82649037 100644 --- a/tests/functional/test_command.py +++ b/tests/functional/test_command.py @@ -15,12 +15,6 @@ import pytest from .chalk.runner import Chalk -from .chalk.validate import ( - ArtifactInfo, - validate_chalk_report, - validate_extracted_chalk, - validate_virtual_chalk, -) from .conf import CONFIGS, DATE_PATH, LS_PATH from .utils.dict import ANY from .utils.log import get_logger @@ -32,24 +26,13 @@ # tests multiple insertions and extractions on the same binary @pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) -def test_insert_extract_repeated( - tmp_data_dir: Path, copy_files: list[Path], chalk: Chalk -): +def test_insert_extract_repeated(copy_files: list[Path], chalk: Chalk): artifact = copy_files[0] - artifact_info = ArtifactInfo.one_elf(artifact) insert = chalk.insert(artifact=artifact, virtual=False) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=False - ) + insert.marks_by_path.contains({str(artifact): {}}) extract = chalk.extract(artifact=artifact) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=False - ) - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=False - ) assert extract.report.datetime > extract.mark.datetime @@ -61,14 +44,9 @@ def test_insert_extract_repeated( # repeat the above process re-chalking the same binary and assert that the # fields are appropriately updated insert2 = chalk.insert(artifact=artifact, virtual=False) - validate_chalk_report( - chalk_report=insert2.report, artifact_map=artifact_info, virtual=False - ) + insert2.marks_by_path.contains({str(artifact): {}}) extract2 = chalk.extract(artifact=artifact) - validate_extracted_chalk( - extracted_chalk=extract2.report, artifact_map=artifact_info, virtual=False - ) # but this time timestamps and random values should be different rand2 = extract2.mark.lifted["CHALK_RAND"] @@ -79,9 +57,6 @@ def test_insert_extract_repeated( # do one final extraction extract3 = chalk.extract(artifact=artifact) - validate_extracted_chalk( - extracted_chalk=extract3.report, artifact_map=artifact_info, virtual=False - ) # report datetime is diff as its at extraction time # but chalkarm should stay consistent @@ -99,53 +74,37 @@ def test_insert_extract_directory( ): ls_artifact, date_artifact = copy_files - artifact_info = { - **ArtifactInfo.one_elf(ls_artifact), - **ArtifactInfo.one_elf(date_artifact), - } - insert = chalk.insert(artifact=tmp_data_dir, virtual=False) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=False + assert insert.marks_by_path.contains( + { + str(ls_artifact): {}, + str(date_artifact): {}, + } ) - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=False - ) - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=False - ) + assert chalk.extract(artifact=tmp_data_dir) @pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) def test_insert_extract_delete(copy_files: list[Path], chalk: Chalk): artifact = copy_files[0] - artifact_info = ArtifactInfo.one_elf(artifact) # insert insert = chalk.insert(artifact=artifact, virtual=False) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=False - ) + assert insert.marks_by_path.contains({str(artifact): {}}) insert_1_hash = insert.report["_CHALKS"][0]["HASH"] # extract extract = chalk.extract(artifact=artifact) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=False - ) # delete delete = chalk.run(command="delete", target=artifact) - assert delete.report["_OPERATION"] == "delete" for key in ["HASH", "_OP_ARTIFACT_PATH", "_OP_ARTIFACT_TYPE"]: assert extract.mark[key] == delete.mark[key] # extract again and we shouldn't get anything this time - nop_extract = chalk.extract(artifact=artifact) - assert "_CHALKS" not in nop_extract.report + assert chalk.extract(artifact=artifact, expecting_chalkmarks=False) # insert again and check that hash is the same as first insert insert2 = chalk.insert(artifact=artifact, virtual=False) diff --git a/tests/functional/test_composable.py b/tests/functional/test_composable.py index ba11232a..300c4bf0 100644 --- a/tests/functional/test_composable.py +++ b/tests/functional/test_composable.py @@ -45,13 +45,11 @@ def test_composable_valid( replace: bool, ): # load the composable config - _load = chalk_copy.load( + chalk_copy.load( config=(configs / test_config_file).absolute(), replace=replace, stdin=b"\n" * 2**15, ) - assert _load.report["_OPERATION"] == "load" - assert "_OP_ERRORS" not in _load.report # check chalk dump to validate that loaded config matches current_config_path = tmp_data_dir / "output.c4m" @@ -65,18 +63,14 @@ def test_composable_valid( # basic check insert operation bin_path = copy_files[0] - _insert = chalk_copy.insert( + assert chalk_copy.insert( artifact=bin_path, # compliance by default sends reports to localhost # which will error here ignore_errors=True, + # with full replace, testing config is not loaded hence no reports + expecting_report=not replace, ) - for report in _insert.reports: - assert report["_OPERATION"] == "insert" - - if "_OP_ERRORS" in report: - logger.error("report has unexpected errors", errors=report["_OP_ERRORS"]) - assert "_OP_ERRORS" not in report @pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) @@ -202,7 +196,7 @@ def test_composable_reload( first_load_config = get_current_config(tmp_data_dir, chalk_copy) # load default config - chalk_copy.run(command="load", params=["default"]) + chalk_copy.load("default") default_load_config = get_current_config(tmp_data_dir, chalk_copy) # reload sample valid config and ensure default is overwritten diff --git a/tests/functional/test_config.py b/tests/functional/test_config.py index f94d8a36..aa964d7d 100644 --- a/tests/functional/test_config.py +++ b/tests/functional/test_config.py @@ -208,6 +208,7 @@ def test_external_configs( command="env", config=CONFIGS / config_path, expected_success=expected_success, + expecting_report=expected_success, ignore_errors=True, ) if expected_error: @@ -220,6 +221,7 @@ def test_external_configs( result_external = chalk_copy.run( command="env", expected_success=expected_success, + expecting_report=expected_success, ignore_errors=True, ) if expected_error: @@ -232,25 +234,25 @@ def test_custom_report( chalk_copy: Chalk, copy_files: list[Path], test_config_file: Path, + tmp_file: Path, ): bin_path = copy_files[0] # config sets custom report file output here - report_path = Path("/tmp/custom_report.log") # expecting a report for insert - assert chalk_copy.run( + assert chalk_copy.insert( + bin_path, config=test_config_file, - target=bin_path, - command="insert", virtual=False, + env={"LOG_FILE": str(tmp_file)}, ).report # expecting a report for extract - assert chalk_copy.run( + assert chalk_copy.extract( + bin_path, config=test_config_file, - target=bin_path, - command="extract", virtual=False, + env={"LOG_FILE": str(tmp_file)}, ).report # not expecting a report for env in report file @@ -259,9 +261,10 @@ def test_custom_report( config=test_config_file, command="env", virtual=False, + env={"LOG_FILE": str(tmp_file)}, ).reports - log_lines = report_path.read_text().splitlines() + log_lines = tmp_file.read_text().splitlines() reports = [ChalkReport.from_json(i) for i in log_lines] # only expecting report for insert and extract @@ -452,12 +455,12 @@ def validate_report_keys(report: dict[str, Any], expected_keys: set[str]): # tests outconf profiles for non-docker operations @pytest.mark.parametrize( - "test_config_file", + "test_config_file, expecting_chalkmarks", [ - ("profiles/empty_profile.c4m"), - ("profiles/default.c4m"), - ("profiles/minimal_profile.c4m"), - ("profiles/large_profile.c4m"), + ("profiles/empty_profile.c4m", False), + ("profiles/default.c4m", True), + ("profiles/minimal_profile.c4m", True), + ("profiles/large_profile.c4m", True), ], ) @pytest.mark.parametrize( @@ -473,6 +476,7 @@ def test_profiles( chalk_copy: Chalk, test_config_file: str, use_embedded: bool, + expecting_chalkmarks: bool, ): bin_path = copy_files[0] configs = merged_configs(CONFIGS / test_config_file) @@ -481,7 +485,7 @@ def test_profiles( chalk_copy.load(CONFIGS / test_config_file, use_embedded=use_embedded) # insert report should have keys listed - insert = chalk_copy.insert(bin_path) + insert = chalk_copy.insert(bin_path, expecting_chalkmarks=expecting_chalkmarks) validate_chalk_report_keys(insert.report, configs["insert"]) # check that binary has the correct chalk mark @@ -501,7 +505,7 @@ def test_profiles( validate_report_keys(chalk_mark, configs["insert"]["mark_template"] | minimal_chalk) # extract - extract = chalk_copy.extract(bin_path) + extract = chalk_copy.extract(bin_path, expecting_chalkmarks=expecting_chalkmarks) validate_chalk_report_keys(extract.report, configs["extract"]) # exec @@ -518,7 +522,7 @@ def test_no_certs(chalk_default: Chalk, server_chalkdust: str): chalk should be able to connect to chalkdust even when system has no system certs by using bundled mozilla root CA store """ - assert Docker.run( + _, build = Docker.run( # busybox does not ship with any system certs vs for example alpine image="busybox", entrypoint="/bin/sh", @@ -529,6 +533,7 @@ def test_no_certs(chalk_default: Chalk, server_chalkdust: str): tty=False, volumes={chalk_default.binary: "/chalk"}, ) + assert build @pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) diff --git a/tests/functional/test_docker.py b/tests/functional/test_docker.py index bb74aaeb..542a400e 100644 --- a/tests/functional/test_docker.py +++ b/tests/functional/test_docker.py @@ -14,23 +14,17 @@ import pytest from .chalk.runner import Chalk, ChalkMark, ChalkProgram -from .chalk.validate import ( - MAGIC, - MISSING, - ArtifactInfo, - validate_docker_chalk_report, - validate_virtual_chalk, -) from .conf import ( CONFIGS, DOCKERFILES, DOCKER_SSH_REPO, DOCKER_TOKEN_REPO, + MAGIC, MARKS, REGISTRY, ROOT, ) -from .utils.dict import ANY, MISSING, Contains +from .utils.dict import ANY, MISSING, Contains, IfExists from .utils.docker import Docker from .utils.log import get_logger from .utils.os import run @@ -707,12 +701,8 @@ def test_virtual_valid( virtual=True, env={"SINK_TEST_OUTPUT_FILE": "/tmp/sink_file.json"}, ) - - # artifact is the docker image - # keys to check - artifact_info = ArtifactInfo( - type="Docker Image", - chalk_info={ + assert build.mark.contains( + { "_CURRENT_HASH": image_hash, "_IMAGE_ID": image_hash, "_REPO_TAGS": Contains({f"{tag}:latest"}), @@ -724,24 +714,8 @@ def test_virtual_valid( "DOCKER_TAGS": Contains({f"{tag}:latest"}), }, ) - validate_docker_chalk_report( - chalk_report=build.report, - artifact=artifact_info, - virtual=True, - ) - chalk_version = build.mark["CHALK_VERSION"] - metadata_id = build.mark["METADATA_ID"] - - vchalk = validate_virtual_chalk( - tmp_data_dir, artifact_map={image_hash: artifact_info}, virtual=True - ) - - # required keys in min chalk mark - assert "CHALK_ID" in vchalk - assert vchalk["MAGIC"] == MAGIC - assert vchalk["CHALK_VERSION"] == chalk_version - assert vchalk["METADATA_ID"] == metadata_id + assert build.vmark.contains({k: IfExists(v) for k, v in build.mark.items()}) _, result = Docker.run( image=image_hash, @@ -785,12 +759,8 @@ def test_nonvirtual_valid(chalk: Chalk, test_file: str, random_hex: str): tag=tag, config=CONFIGS / "docker_wrap.c4m", ) - - # artifact is the docker image - artifact_info = ArtifactInfo( - type="Docker Image", - # keys to check - chalk_info={ + assert build.mark.contains( + { "_CURRENT_HASH": image_hash, "_IMAGE_ID": image_hash, "_REPO_TAGS": Contains({f"{tag}:latest"}), @@ -802,24 +772,17 @@ def test_nonvirtual_valid(chalk: Chalk, test_file: str, random_hex: str): "DOCKER_TAGS": Contains({f"{tag}:latest"}), }, ) - validate_docker_chalk_report( - chalk_report=build.report, artifact=artifact_info, virtual=False - ) - - chalk_version = build.mark["CHALK_VERSION"] - metadata_id = build.mark["METADATA_ID"] _, result = Docker.run( image=image_hash, entrypoint="cat", params=["chalk.json"], ) - chalk_json = result.json() - - assert "CHALK_ID" in chalk_json - assert chalk_json["MAGIC"] == MAGIC, "chalk magic value incorrect" - assert chalk_json["CHALK_VERSION"] == chalk_version - assert chalk_json["METADATA_ID"] == metadata_id + chalk_json = ChalkMark(result.json()) + # ensure required keys are present + assert chalk_json.has(MAGIC=MAGIC, CHALK_VERSION=ANY, CHALK_ID=ANY, METADATA_ID=ANY) + # ensure all values match with build report + assert build.mark.contains({k: IfExists(v) for k, v in chalk_json.items()}) @pytest.mark.parametrize("test_file", ["invalid/sample_1", "invalid/sample_2"]) @@ -1237,39 +1200,27 @@ def test_extract(chalk: Chalk, random_hex: str): tag=tag, ) - # artifact info should be consistent - image_artifact = ArtifactInfo( - type="Docker Image", - host_info={ + # extract chalk from image id and image name + extract_by_name = chalk.extract(tag) + assert extract_by_name.report.contains( + { "_OPERATION": "extract", "_OP_EXE_NAME": chalk.binary.name, "_OP_UNMARKED_COUNT": 0, "_OP_CHALK_COUNT": 1, - }, - chalk_info={ + } + ) + assert extract_by_name.mark.contains( + { "_OP_ARTIFACT_TYPE": "Docker Image", "_IMAGE_ID": image_id, "_CURRENT_HASH": image_id, "_REPO_TAGS": Contains({f"{tag}:latest"}), - }, - ) - - # extract chalk from image id and image name - extract_by_name = chalk.extract(tag) - validate_docker_chalk_report( - chalk_report=extract_by_name.report, - artifact=image_artifact, - virtual=False, - chalk_action="extract", + } ) extract_by_id = chalk.extract(image_id[:12]) - validate_docker_chalk_report( - chalk_report=extract_by_id.report, - artifact=image_artifact, - virtual=False, - chalk_action="extract", - ) + assert extract_by_id.report.contains(extract_by_name.report.deterministic()) # run container and keep alive via tail container_id, _ = Docker.run( @@ -1283,64 +1234,60 @@ def test_extract(chalk: Chalk, random_hex: str): # let container start time.sleep(2) - # new artifact for running container - artifact_container = ArtifactInfo( - type="Docker Container", - host_info={ + # extract on container name and validate + extract_container_name = chalk.extract(container_name) + assert extract_container_name.report.contains( + { "_OPERATION": "extract", "_OP_EXE_NAME": chalk.binary.name, "_OP_UNMARKED_COUNT": 0, "_OP_CHALK_COUNT": 1, - }, - chalk_info={ + } + ) + assert extract_container_name.mark.contains( + { "_OP_ARTIFACT_TYPE": "Docker Container", "_IMAGE_ID": image_id, "_CURRENT_HASH": image_id, "_INSTANCE_CONTAINER_ID": container_id, "_INSTANCE_NAME": container_name, "_INSTANCE_STATUS": "running", - }, - ) - - # extract on container name and validate - extract_container_name = chalk.extract(container_name) - validate_docker_chalk_report( - chalk_report=extract_container_name.report, - artifact=artifact_container, - virtual=False, - chalk_action="extract", + } ) # extract on container id and validate extract_container_id = chalk.extract(container_id) - validate_docker_chalk_report( - chalk_report=extract_container_id.report, - artifact=artifact_container, - virtual=False, - chalk_action="extract", + assert extract_container_id.report.contains( + extract_container_name.report.deterministic() ) # shut down container Docker.stop_containers([container_name]) - # update artifact info - artifact_container.chalk_info["_INSTANCE_STATUS"] = "exited" - # extract on container name and container id now that container is stopped extract_container_name_stopped = chalk.extract(container_name) - validate_docker_chalk_report( - chalk_report=extract_container_name_stopped.report, - artifact=artifact_container, - virtual=False, - chalk_action="extract", + assert extract_container_name_stopped.report.contains( + { + "_OPERATION": "extract", + "_OP_EXE_NAME": chalk.binary.name, + "_OP_UNMARKED_COUNT": 0, + "_OP_CHALK_COUNT": 1, + } + ) + assert extract_container_name_stopped.mark.contains( + { + "_OP_ARTIFACT_TYPE": "Docker Container", + "_IMAGE_ID": image_id, + "_CURRENT_HASH": image_id, + "_INSTANCE_CONTAINER_ID": container_id, + "_INSTANCE_NAME": container_name, + "_INSTANCE_STATUS": "exited", + } ) extract_container_id_stopped = chalk.extract(container_id) - validate_docker_chalk_report( - chalk_report=extract_container_id_stopped.report, - artifact=artifact_container, - virtual=False, - chalk_action="extract", + assert extract_container_id_stopped.report.contains( + extract_container_name_stopped.report.deterministic() ) diff --git a/tests/functional/test_elf.py b/tests/functional/test_elf.py index 96d57b19..769581d2 100644 --- a/tests/functional/test_elf.py +++ b/tests/functional/test_elf.py @@ -7,12 +7,6 @@ import pytest from .chalk.runner import Chalk -from .chalk.validate import ( - ArtifactInfo, - validate_chalk_report, - validate_extracted_chalk, - validate_virtual_chalk, -) from .conf import DATE_PATH, GDB, LS_PATH, UNAME_PATH from .utils.log import get_logger from .utils.os import run @@ -21,51 +15,28 @@ logger = get_logger() -# XXX parameterizing this in case we need ELF files with different properties -# but we don't want to simply run different binaries like date/ls/cat/uname -# if we don't expect the behavior to vary @pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) -def test_virtual_valid(copy_files: list[Path], tmp_data_dir: Path, chalk: Chalk): +@pytest.mark.parametrize("virtual", [True, False]) +def test_valid( + copy_files: list[Path], + tmp_data_dir: Path, + chalk: Chalk, + virtual: bool, +): artifact = copy_files[0] - artifact_info = ArtifactInfo.one_elf(artifact) - insert = chalk.insert(artifact=tmp_data_dir, virtual=True) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=True - ) + insert = chalk.insert(artifact=tmp_data_dir, virtual=virtual) + assert insert.report.marks_by_path.contains({str(artifact): {}}) - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=True - ) - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=True - ) + extract = chalk.extract(artifact=tmp_data_dir, virtual=virtual) + if not virtual: + assert extract.report.marks_by_path.contains({str(artifact): {}}) # compare extractions - extract2 = chalk.extract(artifact=tmp_data_dir) + extract2 = chalk.extract(artifact=tmp_data_dir, virtual=virtual) assert extract.report.datetime < extract2.report.datetime -@pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) -def test_nonvirtual_valid(copy_files: list[Path], tmp_data_dir: Path, chalk: Chalk): - artifact = copy_files[0] - artifact_info = ArtifactInfo.one_elf(artifact) - - insert = chalk.insert(artifact=tmp_data_dir, virtual=False) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=False - ) - - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=False - ) - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=False - ) - - @pytest.mark.requires_gdb @pytest.mark.parametrize( "copy_files", diff --git a/tests/functional/test_plugins.py b/tests/functional/test_plugins.py index 88d61994..934d44ab 100644 --- a/tests/functional/test_plugins.py +++ b/tests/functional/test_plugins.py @@ -10,13 +10,6 @@ import pytest from .chalk.runner import Chalk, ChalkMark -from .chalk.validate import ( - ArtifactInfo, - validate_chalk_report, - validate_docker_chalk_report, - validate_extracted_chalk, - validate_virtual_chalk, -) from .conf import CODEOWNERS, CONFIGS, DATA, DOCKERFILES, LS_PATH, PYS from .utils.dict import ANY, MISSING from .utils.docker import Docker @@ -44,47 +37,22 @@ def test_codeowners(tmp_data_dir: Path, chalk: Chalk): folder = CODEOWNERS / "raw1" expected_owners = (folder / "CODEOWNERS").read_text() shutil.copytree(folder, tmp_data_dir, dirs_exist_ok=True) - artifact_info = ArtifactInfo.all_shebangs() - assert len(artifact_info) == 1 - artifact = Path(list(artifact_info.keys())[0]) Git(tmp_data_dir).init().add().commit() + artifact = tmp_data_dir / "helloworld.py" # chalk reports generated by insertion, json array that has one element insert = chalk.insert(artifact=artifact, virtual=True) - assert insert.mark["CODE_OWNERS"] == expected_owners - # check chalk report - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=True + assert insert.marks_by_path.contains( + {str(artifact): {"CODE_OWNERS": expected_owners}} ) # array of json chalk objects as output, of which we are only expecting one - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=True - ) - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=True - ) + assert chalk.extract(artifact=tmp_data_dir, virtual=True) @pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) def test_github(copy_files: list[Path], chalk: Chalk, server_imds: str): bin_path = copy_files[0] - artifact = ArtifactInfo.one_elf( - bin_path, - host_info={ - "BUILD_ID": "1658821493", - "BUILD_COMMIT_ID": "ffac537e6cbbf934b08745a378932722df287a53", - "BUILD_TRIGGER": "tag", - "BUILD_CONTACT": ["octocat"], - "BUILD_URI": "https://github.com/octocat/Hello-World/actions/runs/1658821493/attempts/5", - "BUILD_API_URI": server_imds, - "BUILD_ORIGIN_ID": "123", - "BUILD_ORIGIN_KEY": "abc", - "BUILD_ORIGIN_OWNER_ID": "456", - "BUILD_ORIGIN_OWNER_KEY": "xyz", - }, - ) insert = chalk.insert( bin_path, env={ @@ -105,31 +73,25 @@ def test_github(copy_files: list[Path], chalk: Chalk, server_imds: str): "GITHUB_REF_TYPE": "tag", }, ) - - validate_chalk_report( - chalk_report=insert.report, - artifact_map=artifact, - virtual=False, - chalk_action="insert", + assert insert.report.contains( + { + "BUILD_ID": "1658821493", + "BUILD_COMMIT_ID": "ffac537e6cbbf934b08745a378932722df287a53", + "BUILD_TRIGGER": "tag", + "BUILD_CONTACT": ["octocat"], + "BUILD_URI": "https://github.com/octocat/Hello-World/actions/runs/1658821493/attempts/5", + "BUILD_API_URI": server_imds, + "BUILD_ORIGIN_ID": "123", + "BUILD_ORIGIN_KEY": "abc", + "BUILD_ORIGIN_OWNER_ID": "456", + "BUILD_ORIGIN_OWNER_KEY": "xyz", + } ) @pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) def test_gitlab(copy_files: list[Path], chalk: Chalk): bin_path = copy_files[0] - artifact = ArtifactInfo.one_elf( - bin_path, - host_info={ - "BUILD_ID": "4999820578", - "BUILD_COMMIT_ID": "ffac537e6cbbf934b08745a378932722df287a53", - "BUILD_TRIGGER": "push", - "BUILD_CONTACT": ["user"], - "BUILD_URI": "https://gitlab.com/gitlab-org/gitlab/-/jobs/4999820578", - "BUILD_API_URI": "https://gitlab.com/api/v4", - "BUILD_ORIGIN_ID": "123", - "BUILD_ORIGIN_OWNER_ID": "456", - }, - ) insert = chalk.insert( bin_path, env={ @@ -146,11 +108,17 @@ def test_gitlab(copy_files: list[Path], chalk: Chalk): "CI_PROJECT_NAMESPACE_ID": "456", }, ) - validate_chalk_report( - chalk_report=insert.report, - artifact_map=artifact, - virtual=False, - chalk_action="insert", + assert insert.report.contains( + { + "BUILD_ID": "4999820578", + "BUILD_COMMIT_ID": "ffac537e6cbbf934b08745a378932722df287a53", + "BUILD_TRIGGER": "push", + "BUILD_CONTACT": ["user"], + "BUILD_URI": "https://gitlab.com/gitlab-org/gitlab/-/jobs/4999820578", + "BUILD_API_URI": "https://gitlab.com/api/v4", + "BUILD_ORIGIN_ID": "123", + "BUILD_ORIGIN_OWNER_ID": "456", + } ) @@ -872,16 +840,6 @@ def test_syft_docker(chalk_copy: Chalk, test_file: str, random_hex: str): assert build.report.contains(sbom_data) assert build.mark.has(SBOM=MISSING) - # artifact is the docker image - artifact_info = ArtifactInfo( - type="Docker Image", - # keys to check - host_info=sbom_data, - ) - validate_docker_chalk_report( - chalk_report=build.report, artifact=artifact_info, virtual=False - ) - # check sbom data from running container _, result = Docker.run( image=image_hash, @@ -921,15 +879,9 @@ def test_syft_binary(copy_files: list[Path], chalk_copy: Chalk, use_docker: bool } } - artifact = ArtifactInfo.one_elf(bin_path, chalk_info=sbom_data) - insert = chalk.insert(bin_path, env={"EXTERNAL_TOOL_USE_DOCKER": str(use_docker)}) - validate_chalk_report( - chalk_report=insert.report, - artifact_map=artifact, - virtual=False, - chalk_action="insert", - ) + assert insert.marks_by_path.contains({str(bin_path): {}}) + assert insert.report.contains(sbom_data) if use_docker: assert "ghcr.io/anchore/syft" in insert.logs else: @@ -1018,19 +970,12 @@ def test_semgrep( } } } - artifact = ArtifactInfo.one_elf( - tmp_data_dir / "helloworld.py", chalk_info=sast_data - ) insert = chalk.insert( artifact=tmp_data_dir, env={"EXTERNAL_TOOL_USE_DOCKER": str(use_docker)} ) - validate_chalk_report( - chalk_report=insert.report, - artifact_map=artifact, - virtual=False, - chalk_action="insert", - ) + assert insert.marks_by_path.contains({str(tmp_data_dir / "helloworld.py"): {}}) + assert insert.report.contains(sast_data) if use_docker: assert "semgrep/semgrep" in insert.logs else: diff --git a/tests/functional/test_py.py b/tests/functional/test_py.py index afdfb113..61ad07d7 100644 --- a/tests/functional/test_py.py +++ b/tests/functional/test_py.py @@ -4,18 +4,12 @@ # (see https://crashoverride.com/docs/chalk) import shutil from pathlib import Path +from typing import Optional import pytest from .chalk.runner import Chalk -from .chalk.validate import ( - MAGIC, - ArtifactInfo, - validate_chalk_report, - validate_extracted_chalk, - validate_virtual_chalk, -) -from .conf import PYS, SHEBANG +from .conf import MAGIC, PYS, SHEBANG from .utils.log import get_logger @@ -23,68 +17,70 @@ @pytest.mark.parametrize( - "test_file", + "test_file, shebang", [ - "sample_1", - "sample_2", - "sample_3", - "sample_4", + ("sample_1", "helloworld.py"), + ("sample_2", "main.py"), + ("sample_3", None), + ("sample_4", None), ], ) -def test_virtual_valid(tmp_data_dir: Path, chalk: Chalk, test_file: str): +def test_virtual_valid( + tmp_data_dir: Path, chalk: Chalk, test_file: str, shebang: Optional[str] +): shutil.copytree(PYS / test_file, tmp_data_dir, dirs_exist_ok=True) - artifact_info = ArtifactInfo.all_shebangs() # chalk reports generated by insertion, json array that has one element - insert = chalk.insert(artifact=tmp_data_dir, virtual=True) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=True + insert = chalk.insert( + artifact=tmp_data_dir, + virtual=True, + expecting_chalkmarks=bool(shebang), ) + if shebang: + assert insert.marks_by_path.contains({str(tmp_data_dir / shebang): {}}) # array of json chalk objects as output, of which we are only expecting one - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=True - ) - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=True - ) + assert chalk.extract(artifact=tmp_data_dir, virtual=True) @pytest.mark.parametrize( - "test_file", + "test_file, shebang", [ - "sample_1", - "sample_2", - "sample_3", - "sample_4", + ("sample_1", "helloworld.py"), + ("sample_2", "main.py"), + ("sample_3", None), + ("sample_4", None), ], ) -def test_nonvirtual_valid(tmp_data_dir: Path, chalk: Chalk, test_file: str): +def test_nonvirtual_valid( + tmp_data_dir: Path, chalk: Chalk, test_file: str, shebang: Optional[str] +): shutil.copytree(PYS / test_file, tmp_data_dir, dirs_exist_ok=True) - artifact_info = ArtifactInfo.all_shebangs() # chalk reports generated by insertion, json array that has one element - insert = chalk.insert(artifact=tmp_data_dir, virtual=False) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=False + insert = chalk.insert( + artifact=tmp_data_dir, + virtual=False, + expecting_chalkmarks=bool(shebang), ) + if shebang: + assert insert.marks_by_path.contains({str(tmp_data_dir / shebang): {}}) # array of json chalk objects as output, of which we are only expecting one - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=False - ) - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=False + extract = chalk.extract( + artifact=tmp_data_dir, + virtual=False, + expecting_chalkmarks=bool(shebang), ) + if shebang: + assert extract.marks_by_path.contains({str(tmp_data_dir / shebang): {}}) # check that first line shebangs are not clobbered in non-virtual chalk for file in tmp_data_dir.iterdir(): if file.suffix in {"key", "pub"}: continue - is_artifact = str(file) in artifact_info + is_artifact = file.name == shebang text = file.read_text() lines = text.splitlines() first_line = next(iter(lines), "") diff --git a/tests/functional/test_sink.py b/tests/functional/test_sink.py index 80c04468..286f7486 100644 --- a/tests/functional/test_sink.py +++ b/tests/functional/test_sink.py @@ -43,28 +43,25 @@ def _validate_chalk( # TODO add a test for the file not being present @pytest.mark.parametrize("copy_files", [[CAT_PATH]], indirect=True) -def test_file_present(tmp_data_dir: Path, chalk: Chalk, copy_files: list[Path]): +def test_file_present( + tmp_data_dir: Path, chalk: Chalk, copy_files: list[Path], tmp_file +): artifact = copy_files[0] # prep config file - file_output_path = Path("/tmp/sink_file.json") - if not file_output_path.is_file(): - # touch the file - open(file_output_path, "a").close() - os.utime(file_output_path, None) - assert file_output_path.is_file(), "file sink path must be a valid path" + assert tmp_file.is_file(), "file sink path must be a valid path" config = SINK_CONFIGS / "file.c4m" chalk.insert( config=config, artifact=artifact, - env={"SINK_TEST_OUTPUT_FILE": str(file_output_path)}, + env={"SINK_TEST_OUTPUT_FILE": str(tmp_file)}, ) # check that file output is correct - assert file_output_path.is_file(), "file sink should exist after chalk operation" + assert tmp_file.is_file(), "file sink should exist after chalk operation" - contents = file_output_path.read_text() + contents = tmp_file.read_text() assert contents chalks = json.loads(contents) assert len(chalks) == 1 diff --git a/tests/functional/test_zip.py b/tests/functional/test_zip.py index 9154f9af..0cd9e7b7 100644 --- a/tests/functional/test_zip.py +++ b/tests/functional/test_zip.py @@ -2,21 +2,12 @@ # # This file is part of Chalk # (see https://crashoverride.com/docs/chalk) -import shutil from pathlib import Path import pytest from .chalk.runner import Chalk -from .chalk.validate import ( - ArtifactInfo, - validate_chalk_report, - validate_extracted_chalk, - validate_virtual_chalk, -) from .conf import ZIPS -from .utils.dict import ANY -from .utils.git import Git from .utils.log import get_logger @@ -25,158 +16,83 @@ @pytest.mark.slow() @pytest.mark.parametrize( - "test_file", + "copy_files", [ - "nodejs", - "python", + [ZIPS / "nodejs" / "function.zip"], + [ZIPS / "python" / "my_deployment_package.zip"], ], + indirect=True, ) -def test_virtual_valid_slow(tmp_data_dir: Path, chalk: Chalk, test_file: str): - shutil.copytree(ZIPS / test_file, tmp_data_dir, dirs_exist_ok=True) - artifact = next((ZIPS / test_file).iterdir()) - - # we are only checking the ZIP chalk mark, not any of the subchalks - artifact_info = { - str(tmp_data_dir / artifact.name): ArtifactInfo(type="ZIP"), - } +@pytest.mark.parametrize("virtual", [True, False]) +def test_valid_slow( + tmp_data_dir: Path, + chalk: Chalk, + copy_files: list[Path], + virtual: bool, +): + test_file = copy_files[0] # chalk reports generated by insertion, json array that has one element - insert = chalk.insert(artifact=tmp_data_dir, virtual=True) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=True - ) + insert = chalk.insert(artifact=tmp_data_dir, virtual=virtual) + assert insert.report.marks_by_path.contains({str(test_file): {}}) # array of json chalk objects as output, of which we are only expecting one - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=True - ) - # FIXME: virtual chalks not currently validated as every subfile in zip gets chalked - # generating too many chalks to check - # validate_virtual_chalk( - # tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=True - # ) - - -def test_virtual_empty(tmp_data_dir: Path, chalk: Chalk): - # empty zip file does not get chalked, so no artifact info - shutil.copytree(ZIPS / "empty", tmp_data_dir, dirs_exist_ok=True) - - # chalk reports generated by insertion, json array that has one element - insert = chalk.insert(artifact=tmp_data_dir, virtual=True) - - # check chalk report -- operation is the only thing we can check since no _CHALK will be generated - # on an unchalked empty zip - assert insert.report["_OPERATION"] == "insert" - assert not insert.report.get("_CHALK") - - # array of json chalk objects as output, of which we are only expecting one - extract = chalk.extract(artifact=tmp_data_dir) - - # check chalk extract -- operation is the only thing we can check since no _CHALK will be generated - # on an unchalked empty zip - assert extract.report["_OPERATION"] == "extract" - assert not insert.report.get("_CHALK") + extract = chalk.extract(artifact=tmp_data_dir, virtual=virtual) + if not virtual: + assert extract.report.marks_by_path.contains({str(test_file): {}}) @pytest.mark.parametrize( - "test_file", + "copy_files", [ - "misc", - "golang", + # empty zip file does not get chalked, so no artifact info + [ZIPS / "empty" / "empty.zip"], ], + indirect=True, ) -def test_virtual_valid(tmp_data_dir: Path, chalk: Chalk, test_file: str): - shutil.copytree(ZIPS / test_file, tmp_data_dir, dirs_exist_ok=True) - artifact = next((ZIPS / test_file).iterdir()) - - # we are only checking the ZIP chalk mark, not any of the subchalks - artifact_info = { - str(tmp_data_dir / artifact.name): ArtifactInfo(type="ZIP"), - } - - # chalk reports generated by insertion, json array that has one element - insert = chalk.insert(artifact=tmp_data_dir, virtual=True) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=True +@pytest.mark.parametrize("virtual", [True, False]) +def test_empty( + tmp_data_dir: Path, + copy_files: list[Path], + chalk: Chalk, + virtual: bool, +): + # no _CHALK will be generated on an unchalked empty zip + assert chalk.insert( + artifact=tmp_data_dir, + virtual=virtual, + expecting_chalkmarks=False, ) - - # array of json chalk objects as output, of which we are only expecting one - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=True + assert chalk.extract( + artifact=tmp_data_dir, + virtual=virtual, + expecting_chalkmarks=False, ) - # FIXME: virtual chalks not currently validated as every subfile in zip gets chalked - # generating too many chalks to check - # validate_virtual_chalk( - # tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=True - # ) -@pytest.mark.slow() @pytest.mark.parametrize( - "test_file", + "copy_files", [ - "nodejs", - "python", + [ZIPS / "misc" / "misc.zip"], + [ZIPS / "golang" / "myFunction.zip"], ], + indirect=True, ) -def test_nonvirtual_valid_slow(tmp_data_dir: Path, chalk: Chalk, test_file: str): - shutil.copytree(ZIPS / test_file, tmp_data_dir, dirs_exist_ok=True) - artifact = next((ZIPS / test_file).iterdir()) - - # we are only checking the ZIP chalk mark, not any of the subchalks - artifact_info = { - str(tmp_data_dir / artifact.name): ArtifactInfo(type="ZIP"), - } +@pytest.mark.parametrize("virtual", [True, False]) +def test_valid( + tmp_data_dir: Path, + chalk: Chalk, + copy_files: list[Path], + virtual: bool, +): + test_file = copy_files[0] # chalk reports generated by insertion, json array that has one element - insert = chalk.insert(artifact=tmp_data_dir, virtual=False) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=False - ) - - # array of json chalk objects as output, of which we are only expecting one - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=False - ) - # validation here okay as we are just checking that virtual-chalk.json file doesn't exist - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=False - ) - - -@pytest.mark.parametrize( - "test_file", - [ - "misc", - "golang", - ], -) -def test_nonvirtual_valid(tmp_data_dir: Path, chalk: Chalk, test_file: str): - shutil.copytree(ZIPS / test_file, tmp_data_dir, dirs_exist_ok=True) - artifact = next((ZIPS / test_file).iterdir()) - - Git(tmp_data_dir).init().add().commit() - + insert = chalk.insert(artifact=tmp_data_dir, virtual=virtual) # we are only checking the ZIP chalk mark, not any of the subchalks - artifact_info = { - str(tmp_data_dir / artifact.name): ArtifactInfo(type="ZIP"), - } - - # chalk reports generated by insertion, json array that has one element - insert = chalk.insert(artifact=tmp_data_dir, virtual=False) - assert insert.mark.has(COMMIT_ID=ANY) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=False - ) + assert insert.report.marks_by_path.contains({str(test_file): {}}) - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=False - ) - # validation here okay as we are just checking that virtual-chalk.json file doesn't exist - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=False - ) + # array of json chalk objects as output, of which we are only expecting one + extract = chalk.extract(artifact=tmp_data_dir, virtual=virtual) + if not virtual: + assert extract.report.marks_by_path.contains({str(test_file): {}})