diff --git a/Makefile b/Makefile index 235d1673..c4315026 100644 --- a/Makefile +++ b/Makefile @@ -131,11 +131,15 @@ ifneq "$(shell which systemctl 2> /dev/null)" "" || echo Please restart docker daemon after changing docker config endif +$(HOME)/.pdbrc.py: + touch $@ + .PHONY: docker-setup docker-setup: /etc/docker/daemon.json .PHONY: tests tests: DOCKER=$(_DOCKER) # force rebuilds to use docker to match tests +tests: $(HOME)/.pdbrc.py tests: docker-setup tests: $(BINARY) # note this will rebuild chalk if necessary docker compose run --rm tests $(make_args) $(args) diff --git a/docker-compose.yml b/docker-compose.yml index 77e75c15..0fd73d31 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,7 +27,7 @@ services: # -------------------------------------------------------------------------- # SERVER - server: &server + server: build: context: ./server target: deps @@ -50,8 +50,13 @@ services: interval: 1s server-tls: - <<: *server + build: + context: ./server + target: deps command: run -r -p 5858 --domain=tls.chalk.local --keyfile=cert.key --certfile=cert.pem --use-existing-cert + working_dir: /chalk/server + volumes: + - .:/chalk ports: - 5858:5858 networks: @@ -122,6 +127,7 @@ services: - seccomp=unconfined # for gdb volumes: - $PWD:$PWD + - $HOME/.pdbrc.py:/root/.pdbrc.py - /var/run/docker.sock:/var/run/docker.sock - /etc/buildkit:/etc/buildkit - /etc/docker:/etc/docker diff --git a/tests/functional/chalk/runner.py b/tests/functional/chalk/runner.py index fdbe7113..d1e70893 100644 --- a/tests/functional/chalk/runner.py +++ b/tests/functional/chalk/runner.py @@ -11,7 +11,7 @@ from ..conf import MAGIC from ..utils.bin import sha256 -from ..utils.dict import ContainsMixin +from ..utils.dict import ContainsMixin, MISSING, ANY, IfExists from ..utils.docker import Docker from ..utils.log import get_logger from ..utils.os import CalledProcessError, Program, run @@ -37,17 +37,58 @@ logger = get_logger() +def artifact_type(path: Path) -> str: + if path.suffix == ".py": + return "python" + elif path.suffix == ".zip": + return "ZIP" + else: + return "ELF" + + class ChalkReport(ContainsMixin, dict): name = "report" def __init__(self, report: dict[str, Any]): super().__init__(**report) + def deterministic(self, ignore: Optional[set[str]] = None): + return self.__class__( + { + k: v + for k, v in self.items() + if k + not in { + "_TIMESTAMP", + "_DATETIME", + "_ACTION_ID", + "_ARGV", + "_OP_ARGV", + "_EXEC_ID", + # docker does not have deterministic output + # insecure registries are not consistently ordered + "_DOCKER_INFO", + } + | (ignore or set()) + } + ) + @property def marks(self): assert len(self["_CHALKS"]) > 0 return [ChalkMark(i, report=self) for i in self["_CHALKS"]] + @property + def marks_by_path(self): + return ContainsMixin( + { + i.get("PATH_WHEN_CHALKED", i.get("_OP_ARTIFACT_PATH")): i + for i in self.marks + # paths can be missing for example in minimum report profile + if "PATH_WHEN_CHALKED" in i or "_OP_ARTIFACT_PATH" in i + } + ) + @property def mark(self): assert len(self.marks) == 1 @@ -179,6 +220,11 @@ def report(self): assert len(self.reports) == 1 return self.reports[0] + @property + def first_report(self): + assert len(self.reports) > 0 + return self.reports[0] + @property def mark(self): return self.report.mark @@ -187,6 +233,27 @@ def mark(self): def marks(self): return self.report.marks + @property + def marks_by_path(self): + return self.report.marks_by_path + + @property + def virtual_path(self): + return Path.cwd() / "virtual-chalk.json" + + @property + def vmarks(self): + assert self.virtual_path.exists() + return [ + ChalkMark.from_json(i) for i in self.virtual_path.read_text().splitlines() + ] + + @property + def vmark(self): + marks = self.vmarks + assert len(marks) == 1 + return marks[0] + class Chalk: def __init__( @@ -284,22 +351,22 @@ def run( # if chalk outputs report, sanity check its operation matches chalk_cmd if expecting_report: - try: - report = result.report - except Exception: - pass - else: - # report could be silenced on the profile level - if report: - operation = cast(str, command) - # when calling docker, the arg after docker is the operation - if not operation and "docker" in params: - try: - operation = params[params.index("buildx") + 1] - except ValueError: - operation = params[params.index("docker") + 1] - if operation: - assert report.has(_OPERATION=operation) + report = result.first_report + operation = cast(str, command) + # when calling docker, the arg after docker is the operation + if not operation and "docker" in params: + try: + operation = params[params.index("buildx") + 1] + except ValueError: + operation = params[params.index("docker") + 1] + if operation: + assert report.has(_OPERATION=IfExists(operation)) + if "_CHALKS" in report: + for mark in report.marks: + assert mark.has_if( + operation in {"insert", "build"}, + _VIRTUAL=IfExists(virtual), + ) return result @@ -313,8 +380,10 @@ def insert( log_level: ChalkLogLevel = "trace", env: Optional[dict[str, str]] = None, ignore_errors: bool = False, + expecting_report: bool = True, + expecting_chalkmarks: bool = True, ) -> ChalkProgram: - return self.run( + result = self.run( command="insert", target=artifact, config=config, @@ -322,7 +391,27 @@ def insert( log_level=log_level, env=env, ignore_errors=ignore_errors, + expecting_report=expecting_report, ) + if expecting_report: + if expecting_chalkmarks: + for chalk in result.marks: + assert chalk.has(_VIRTUAL=IfExists(virtual)) + if virtual: + assert result.virtual_path.exists() + for mark in result.vmarks: + assert mark.has( + CHALK_ID=ANY, + MAGIC=MAGIC, + ) + else: + assert result.report.has( + _CHALKS=MISSING, + _UNMARKED=IfExists(ANY), + ) + if not virtual: + assert not result.virtual_path.exists() + return result def extract( self, @@ -332,8 +421,10 @@ def extract( config: Optional[Path] = None, log_level: ChalkLogLevel = "trace", env: Optional[dict[str, str]] = None, + virtual: bool = False, + expecting_chalkmarks: bool = True, ) -> ChalkProgram: - return self.run( + result = self.run( command="extract", target=artifact, log_level=log_level, @@ -342,6 +433,22 @@ def extract( config=config, env=env, ) + if virtual: + assert result.report.has( + _CHALKS=MISSING, + _UNMARKED=IfExists(ANY), + ) + else: + if Path(artifact).exists() and expecting_chalkmarks: + for path, chalk in result.marks_by_path.items(): + assert chalk.has( + ARTIFACT_TYPE=artifact_type(Path(path)), + PLATFORM_WHEN_CHALKED=result.report["_OP_PLATFORM"], + INJECTOR_COMMIT_ID=result.report["_OP_CHALKER_COMMIT_ID"], + ) + if not expecting_chalkmarks: + assert "_CHALKS" not in result.report + return result def exec( self, @@ -373,7 +480,7 @@ def dump(self, path: Optional[Path] = None) -> ChalkProgram: if path is not None: assert not path.is_file() args = [str(path)] - result = self.run(command="dump", params=args) + result = self.run(command="dump", params=args, expecting_report=False) if path is not None: assert path.is_file() return result @@ -487,10 +594,13 @@ def docker_build( ) ) if expecting_report and expected_success and image_hash: + assert result.report.has(_VIRTUAL=IfExists(virtual)) if platforms: assert len(result.marks) == len(platforms) else: assert len(result.marks) == 1 + for chalk in result.marks: + assert chalk.has(_OP_ARTIFACT_TYPE="Docker Image") # sanity check that chalk mark includes basic chalk keys assert image_hash in [i["_CURRENT_HASH"] for i in result.marks] assert image_hash in [i["_IMAGE_ID"] for i in result.marks] @@ -517,4 +627,5 @@ def docker_push(self, image: str, buildkit: bool = True): def docker_pull(self, image: str): return self.run( params=["docker", "pull", image], + expecting_report=False, ) diff --git a/tests/functional/chalk/validate.py b/tests/functional/chalk/validate.py deleted file mode 100644 index 1b847588..00000000 --- a/tests/functional/chalk/validate.py +++ /dev/null @@ -1,169 +0,0 @@ -# Copyright (c) 2023, Crash Override, Inc. -# -# This file is part of Chalk -# (see https://crashoverride.com/docs/chalk) -from dataclasses import dataclass, field -from pathlib import Path -from typing import Any, Optional - -from ..conf import MAGIC, SHEBANG -from ..utils.dict import ANY, MISSING, Contains, IfExists, Length -from ..utils.log import get_logger -from .runner import ChalkMark, ChalkReport - - -logger = get_logger() - - -@dataclass -class ArtifactInfo: - type: str - chalk_info: dict[str, Any] = field(default_factory=dict) - host_info: dict[str, Any] = field(default_factory=dict) - - @classmethod - def path_type(cls, path: Path) -> str: - if path.suffix == ".py": - return "python" - else: - return "ELF" - - @classmethod - def one_elf( - cls, - path: Path, - chalk_info: Optional[dict[str, Any]] = None, - host_info: Optional[dict[str, Any]] = None, - ): - return { - str(path): cls( - type=cls.path_type(path), - chalk_info=chalk_info or {}, - host_info=host_info or {}, - ) - } - - @classmethod - def all_shebangs(cls): - return { - str(i.resolve()): cls(type=cls.path_type(i)) - for i in Path().iterdir() - if i.is_file() and i.read_text().startswith(SHEBANG) - } - - -# `virtual-chalk.json` file found after chalking with `--virtual` enabled -def validate_virtual_chalk( - tmp_data_dir: Path, artifact_map: dict[str, ArtifactInfo], virtual: bool -) -> dict[str, Any]: - vjsonf = tmp_data_dir / "virtual-chalk.json" - if not virtual or not artifact_map: - assert not vjsonf.is_file(), "virtual-chalk.json should not have been created!" - return {} - - assert vjsonf.is_file(), "virtual-chalk.json not found" - # jsonl is one json object per line, NOT array of json - # number of json objects is number of artifacts chalked - all_vchalks = [ChalkMark.from_json(i) for i in vjsonf.read_text().splitlines()] - - for vchalk in all_vchalks: - assert vchalk.has( - CHALK_ID=ANY, - MAGIC=MAGIC, - ) - - # return first one - return all_vchalks[0] - - -# chalk report is created after `chalk insert` operation -def validate_chalk_report( - chalk_report: ChalkReport, - artifact_map: dict[str, ArtifactInfo], - virtual: bool, - chalk_action: str = "insert", -): - assert chalk_report.has(_OPERATION=chalk_action) - - if not artifact_map: - assert chalk_report.has(_CHALKS=MISSING) - return - - assert chalk_report.has(_CHALKS=Length(len(artifact_map))) - - # check arbitrary host report values - for artifact in artifact_map.values(): - assert chalk_report.contains(artifact.host_info) - - for mark in chalk_report.marks: - path = mark.lifted["PATH_WHEN_CHALKED"] - assert path in artifact_map, "chalked artifact incorrect" - artifact = artifact_map[path] - - assert mark.lifted.has( - ARTIFACT_TYPE=artifact.type, - **artifact.chalk_info, - ) - assert mark.lifted.has_if( - chalk_action == "insert", - _VIRTUAL=virtual, - ) - - -# slightly different from above -def validate_docker_chalk_report( - chalk_report: ChalkReport, - artifact: ArtifactInfo, - virtual: bool, - chalk_action: str = "build", -): - assert chalk_report.has(_OPERATION=chalk_action, _CHALKS=Length(1)) - assert chalk_report.contains(artifact.host_info) - - for chalk in chalk_report.marks: - assert chalk.has( - # chalk id should always exist - CHALK_ID=ANY, - _OP_ARTIFACT_TYPE=artifact.type, - ) - assert chalk.contains(artifact.chalk_info) - assert chalk.has_if( - chalk_action == "build", - _VIRTUAL=virtual, - ) - - -# extracted chalk is created after `chalk extract` operation -def validate_extracted_chalk( - extracted_chalk: ChalkReport, - artifact_map: dict[str, ArtifactInfo], - virtual: bool, -) -> None: - # there should not be operation errors - assert extracted_chalk.has(_OPERATION="extract", _OP_ERRORS=IfExists(Length(0))) - - if len(artifact_map) == 0: - assert extracted_chalk.has(_CHALKS=MISSING) - return - - if virtual: - assert extracted_chalk.has( - _CHALKS=MISSING, - _UNMARKED=Contains(set(artifact_map)), - ) - - else: - # okay to have _UNMARKED as long as the chalk mark is still there - assert extracted_chalk.has(_CHALKS=Length(len(artifact_map))) - - for chalk in extracted_chalk.marks: - path = chalk["_OP_ARTIFACT_PATH"] - assert path in artifact_map, "path not found" - artifact_info = artifact_map[path] - - assert chalk.has( - ARTIFACT_TYPE=artifact_info.type, - # top level vs chalk-level sanity check - PLATFORM_WHEN_CHALKED=extracted_chalk["_OP_PLATFORM"], - INJECTOR_COMMIT_ID=extracted_chalk["_OP_CHALKER_COMMIT_ID"], - ) diff --git a/tests/functional/data/configs/validation/custom_report.c4m b/tests/functional/data/configs/validation/custom_report.c4m index b5683b2b..1e85ae61 100644 --- a/tests/functional/data/configs/validation/custom_report.c4m +++ b/tests/functional/data/configs/validation/custom_report.c4m @@ -11,7 +11,7 @@ report_template test_report_template { sink_config test_file_out { sink: "file" - filename: "/tmp/custom_report.log" + filename: env("LOG_FILE") enabled: true } diff --git a/tests/functional/setup.cfg b/tests/functional/setup.cfg index f286eafa..a548519e 100644 --- a/tests/functional/setup.cfg +++ b/tests/functional/setup.cfg @@ -4,6 +4,8 @@ ignore = # black is in charge of line length E501 + # black formats all operators + E231 # binary operator on new line W503 # whitespace before : in array slices diff --git a/tests/functional/test_command.py b/tests/functional/test_command.py index eda1731f..82649037 100644 --- a/tests/functional/test_command.py +++ b/tests/functional/test_command.py @@ -15,12 +15,6 @@ import pytest from .chalk.runner import Chalk -from .chalk.validate import ( - ArtifactInfo, - validate_chalk_report, - validate_extracted_chalk, - validate_virtual_chalk, -) from .conf import CONFIGS, DATE_PATH, LS_PATH from .utils.dict import ANY from .utils.log import get_logger @@ -32,24 +26,13 @@ # tests multiple insertions and extractions on the same binary @pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) -def test_insert_extract_repeated( - tmp_data_dir: Path, copy_files: list[Path], chalk: Chalk -): +def test_insert_extract_repeated(copy_files: list[Path], chalk: Chalk): artifact = copy_files[0] - artifact_info = ArtifactInfo.one_elf(artifact) insert = chalk.insert(artifact=artifact, virtual=False) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=False - ) + insert.marks_by_path.contains({str(artifact): {}}) extract = chalk.extract(artifact=artifact) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=False - ) - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=False - ) assert extract.report.datetime > extract.mark.datetime @@ -61,14 +44,9 @@ def test_insert_extract_repeated( # repeat the above process re-chalking the same binary and assert that the # fields are appropriately updated insert2 = chalk.insert(artifact=artifact, virtual=False) - validate_chalk_report( - chalk_report=insert2.report, artifact_map=artifact_info, virtual=False - ) + insert2.marks_by_path.contains({str(artifact): {}}) extract2 = chalk.extract(artifact=artifact) - validate_extracted_chalk( - extracted_chalk=extract2.report, artifact_map=artifact_info, virtual=False - ) # but this time timestamps and random values should be different rand2 = extract2.mark.lifted["CHALK_RAND"] @@ -79,9 +57,6 @@ def test_insert_extract_repeated( # do one final extraction extract3 = chalk.extract(artifact=artifact) - validate_extracted_chalk( - extracted_chalk=extract3.report, artifact_map=artifact_info, virtual=False - ) # report datetime is diff as its at extraction time # but chalkarm should stay consistent @@ -99,53 +74,37 @@ def test_insert_extract_directory( ): ls_artifact, date_artifact = copy_files - artifact_info = { - **ArtifactInfo.one_elf(ls_artifact), - **ArtifactInfo.one_elf(date_artifact), - } - insert = chalk.insert(artifact=tmp_data_dir, virtual=False) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=False + assert insert.marks_by_path.contains( + { + str(ls_artifact): {}, + str(date_artifact): {}, + } ) - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=False - ) - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=False - ) + assert chalk.extract(artifact=tmp_data_dir) @pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) def test_insert_extract_delete(copy_files: list[Path], chalk: Chalk): artifact = copy_files[0] - artifact_info = ArtifactInfo.one_elf(artifact) # insert insert = chalk.insert(artifact=artifact, virtual=False) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=False - ) + assert insert.marks_by_path.contains({str(artifact): {}}) insert_1_hash = insert.report["_CHALKS"][0]["HASH"] # extract extract = chalk.extract(artifact=artifact) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=False - ) # delete delete = chalk.run(command="delete", target=artifact) - assert delete.report["_OPERATION"] == "delete" for key in ["HASH", "_OP_ARTIFACT_PATH", "_OP_ARTIFACT_TYPE"]: assert extract.mark[key] == delete.mark[key] # extract again and we shouldn't get anything this time - nop_extract = chalk.extract(artifact=artifact) - assert "_CHALKS" not in nop_extract.report + assert chalk.extract(artifact=artifact, expecting_chalkmarks=False) # insert again and check that hash is the same as first insert insert2 = chalk.insert(artifact=artifact, virtual=False) diff --git a/tests/functional/test_composable.py b/tests/functional/test_composable.py index ba11232a..300c4bf0 100644 --- a/tests/functional/test_composable.py +++ b/tests/functional/test_composable.py @@ -45,13 +45,11 @@ def test_composable_valid( replace: bool, ): # load the composable config - _load = chalk_copy.load( + chalk_copy.load( config=(configs / test_config_file).absolute(), replace=replace, stdin=b"\n" * 2**15, ) - assert _load.report["_OPERATION"] == "load" - assert "_OP_ERRORS" not in _load.report # check chalk dump to validate that loaded config matches current_config_path = tmp_data_dir / "output.c4m" @@ -65,18 +63,14 @@ def test_composable_valid( # basic check insert operation bin_path = copy_files[0] - _insert = chalk_copy.insert( + assert chalk_copy.insert( artifact=bin_path, # compliance by default sends reports to localhost # which will error here ignore_errors=True, + # with full replace, testing config is not loaded hence no reports + expecting_report=not replace, ) - for report in _insert.reports: - assert report["_OPERATION"] == "insert" - - if "_OP_ERRORS" in report: - logger.error("report has unexpected errors", errors=report["_OP_ERRORS"]) - assert "_OP_ERRORS" not in report @pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) @@ -202,7 +196,7 @@ def test_composable_reload( first_load_config = get_current_config(tmp_data_dir, chalk_copy) # load default config - chalk_copy.run(command="load", params=["default"]) + chalk_copy.load("default") default_load_config = get_current_config(tmp_data_dir, chalk_copy) # reload sample valid config and ensure default is overwritten diff --git a/tests/functional/test_config.py b/tests/functional/test_config.py index f94d8a36..aa964d7d 100644 --- a/tests/functional/test_config.py +++ b/tests/functional/test_config.py @@ -208,6 +208,7 @@ def test_external_configs( command="env", config=CONFIGS / config_path, expected_success=expected_success, + expecting_report=expected_success, ignore_errors=True, ) if expected_error: @@ -220,6 +221,7 @@ def test_external_configs( result_external = chalk_copy.run( command="env", expected_success=expected_success, + expecting_report=expected_success, ignore_errors=True, ) if expected_error: @@ -232,25 +234,25 @@ def test_custom_report( chalk_copy: Chalk, copy_files: list[Path], test_config_file: Path, + tmp_file: Path, ): bin_path = copy_files[0] # config sets custom report file output here - report_path = Path("/tmp/custom_report.log") # expecting a report for insert - assert chalk_copy.run( + assert chalk_copy.insert( + bin_path, config=test_config_file, - target=bin_path, - command="insert", virtual=False, + env={"LOG_FILE": str(tmp_file)}, ).report # expecting a report for extract - assert chalk_copy.run( + assert chalk_copy.extract( + bin_path, config=test_config_file, - target=bin_path, - command="extract", virtual=False, + env={"LOG_FILE": str(tmp_file)}, ).report # not expecting a report for env in report file @@ -259,9 +261,10 @@ def test_custom_report( config=test_config_file, command="env", virtual=False, + env={"LOG_FILE": str(tmp_file)}, ).reports - log_lines = report_path.read_text().splitlines() + log_lines = tmp_file.read_text().splitlines() reports = [ChalkReport.from_json(i) for i in log_lines] # only expecting report for insert and extract @@ -452,12 +455,12 @@ def validate_report_keys(report: dict[str, Any], expected_keys: set[str]): # tests outconf profiles for non-docker operations @pytest.mark.parametrize( - "test_config_file", + "test_config_file, expecting_chalkmarks", [ - ("profiles/empty_profile.c4m"), - ("profiles/default.c4m"), - ("profiles/minimal_profile.c4m"), - ("profiles/large_profile.c4m"), + ("profiles/empty_profile.c4m", False), + ("profiles/default.c4m", True), + ("profiles/minimal_profile.c4m", True), + ("profiles/large_profile.c4m", True), ], ) @pytest.mark.parametrize( @@ -473,6 +476,7 @@ def test_profiles( chalk_copy: Chalk, test_config_file: str, use_embedded: bool, + expecting_chalkmarks: bool, ): bin_path = copy_files[0] configs = merged_configs(CONFIGS / test_config_file) @@ -481,7 +485,7 @@ def test_profiles( chalk_copy.load(CONFIGS / test_config_file, use_embedded=use_embedded) # insert report should have keys listed - insert = chalk_copy.insert(bin_path) + insert = chalk_copy.insert(bin_path, expecting_chalkmarks=expecting_chalkmarks) validate_chalk_report_keys(insert.report, configs["insert"]) # check that binary has the correct chalk mark @@ -501,7 +505,7 @@ def test_profiles( validate_report_keys(chalk_mark, configs["insert"]["mark_template"] | minimal_chalk) # extract - extract = chalk_copy.extract(bin_path) + extract = chalk_copy.extract(bin_path, expecting_chalkmarks=expecting_chalkmarks) validate_chalk_report_keys(extract.report, configs["extract"]) # exec @@ -518,7 +522,7 @@ def test_no_certs(chalk_default: Chalk, server_chalkdust: str): chalk should be able to connect to chalkdust even when system has no system certs by using bundled mozilla root CA store """ - assert Docker.run( + _, build = Docker.run( # busybox does not ship with any system certs vs for example alpine image="busybox", entrypoint="/bin/sh", @@ -529,6 +533,7 @@ def test_no_certs(chalk_default: Chalk, server_chalkdust: str): tty=False, volumes={chalk_default.binary: "/chalk"}, ) + assert build @pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) diff --git a/tests/functional/test_docker.py b/tests/functional/test_docker.py index bb74aaeb..542a400e 100644 --- a/tests/functional/test_docker.py +++ b/tests/functional/test_docker.py @@ -14,23 +14,17 @@ import pytest from .chalk.runner import Chalk, ChalkMark, ChalkProgram -from .chalk.validate import ( - MAGIC, - MISSING, - ArtifactInfo, - validate_docker_chalk_report, - validate_virtual_chalk, -) from .conf import ( CONFIGS, DOCKERFILES, DOCKER_SSH_REPO, DOCKER_TOKEN_REPO, + MAGIC, MARKS, REGISTRY, ROOT, ) -from .utils.dict import ANY, MISSING, Contains +from .utils.dict import ANY, MISSING, Contains, IfExists from .utils.docker import Docker from .utils.log import get_logger from .utils.os import run @@ -707,12 +701,8 @@ def test_virtual_valid( virtual=True, env={"SINK_TEST_OUTPUT_FILE": "/tmp/sink_file.json"}, ) - - # artifact is the docker image - # keys to check - artifact_info = ArtifactInfo( - type="Docker Image", - chalk_info={ + assert build.mark.contains( + { "_CURRENT_HASH": image_hash, "_IMAGE_ID": image_hash, "_REPO_TAGS": Contains({f"{tag}:latest"}), @@ -724,24 +714,8 @@ def test_virtual_valid( "DOCKER_TAGS": Contains({f"{tag}:latest"}), }, ) - validate_docker_chalk_report( - chalk_report=build.report, - artifact=artifact_info, - virtual=True, - ) - chalk_version = build.mark["CHALK_VERSION"] - metadata_id = build.mark["METADATA_ID"] - - vchalk = validate_virtual_chalk( - tmp_data_dir, artifact_map={image_hash: artifact_info}, virtual=True - ) - - # required keys in min chalk mark - assert "CHALK_ID" in vchalk - assert vchalk["MAGIC"] == MAGIC - assert vchalk["CHALK_VERSION"] == chalk_version - assert vchalk["METADATA_ID"] == metadata_id + assert build.vmark.contains({k: IfExists(v) for k, v in build.mark.items()}) _, result = Docker.run( image=image_hash, @@ -785,12 +759,8 @@ def test_nonvirtual_valid(chalk: Chalk, test_file: str, random_hex: str): tag=tag, config=CONFIGS / "docker_wrap.c4m", ) - - # artifact is the docker image - artifact_info = ArtifactInfo( - type="Docker Image", - # keys to check - chalk_info={ + assert build.mark.contains( + { "_CURRENT_HASH": image_hash, "_IMAGE_ID": image_hash, "_REPO_TAGS": Contains({f"{tag}:latest"}), @@ -802,24 +772,17 @@ def test_nonvirtual_valid(chalk: Chalk, test_file: str, random_hex: str): "DOCKER_TAGS": Contains({f"{tag}:latest"}), }, ) - validate_docker_chalk_report( - chalk_report=build.report, artifact=artifact_info, virtual=False - ) - - chalk_version = build.mark["CHALK_VERSION"] - metadata_id = build.mark["METADATA_ID"] _, result = Docker.run( image=image_hash, entrypoint="cat", params=["chalk.json"], ) - chalk_json = result.json() - - assert "CHALK_ID" in chalk_json - assert chalk_json["MAGIC"] == MAGIC, "chalk magic value incorrect" - assert chalk_json["CHALK_VERSION"] == chalk_version - assert chalk_json["METADATA_ID"] == metadata_id + chalk_json = ChalkMark(result.json()) + # ensure required keys are present + assert chalk_json.has(MAGIC=MAGIC, CHALK_VERSION=ANY, CHALK_ID=ANY, METADATA_ID=ANY) + # ensure all values match with build report + assert build.mark.contains({k: IfExists(v) for k, v in chalk_json.items()}) @pytest.mark.parametrize("test_file", ["invalid/sample_1", "invalid/sample_2"]) @@ -1237,39 +1200,27 @@ def test_extract(chalk: Chalk, random_hex: str): tag=tag, ) - # artifact info should be consistent - image_artifact = ArtifactInfo( - type="Docker Image", - host_info={ + # extract chalk from image id and image name + extract_by_name = chalk.extract(tag) + assert extract_by_name.report.contains( + { "_OPERATION": "extract", "_OP_EXE_NAME": chalk.binary.name, "_OP_UNMARKED_COUNT": 0, "_OP_CHALK_COUNT": 1, - }, - chalk_info={ + } + ) + assert extract_by_name.mark.contains( + { "_OP_ARTIFACT_TYPE": "Docker Image", "_IMAGE_ID": image_id, "_CURRENT_HASH": image_id, "_REPO_TAGS": Contains({f"{tag}:latest"}), - }, - ) - - # extract chalk from image id and image name - extract_by_name = chalk.extract(tag) - validate_docker_chalk_report( - chalk_report=extract_by_name.report, - artifact=image_artifact, - virtual=False, - chalk_action="extract", + } ) extract_by_id = chalk.extract(image_id[:12]) - validate_docker_chalk_report( - chalk_report=extract_by_id.report, - artifact=image_artifact, - virtual=False, - chalk_action="extract", - ) + assert extract_by_id.report.contains(extract_by_name.report.deterministic()) # run container and keep alive via tail container_id, _ = Docker.run( @@ -1283,64 +1234,60 @@ def test_extract(chalk: Chalk, random_hex: str): # let container start time.sleep(2) - # new artifact for running container - artifact_container = ArtifactInfo( - type="Docker Container", - host_info={ + # extract on container name and validate + extract_container_name = chalk.extract(container_name) + assert extract_container_name.report.contains( + { "_OPERATION": "extract", "_OP_EXE_NAME": chalk.binary.name, "_OP_UNMARKED_COUNT": 0, "_OP_CHALK_COUNT": 1, - }, - chalk_info={ + } + ) + assert extract_container_name.mark.contains( + { "_OP_ARTIFACT_TYPE": "Docker Container", "_IMAGE_ID": image_id, "_CURRENT_HASH": image_id, "_INSTANCE_CONTAINER_ID": container_id, "_INSTANCE_NAME": container_name, "_INSTANCE_STATUS": "running", - }, - ) - - # extract on container name and validate - extract_container_name = chalk.extract(container_name) - validate_docker_chalk_report( - chalk_report=extract_container_name.report, - artifact=artifact_container, - virtual=False, - chalk_action="extract", + } ) # extract on container id and validate extract_container_id = chalk.extract(container_id) - validate_docker_chalk_report( - chalk_report=extract_container_id.report, - artifact=artifact_container, - virtual=False, - chalk_action="extract", + assert extract_container_id.report.contains( + extract_container_name.report.deterministic() ) # shut down container Docker.stop_containers([container_name]) - # update artifact info - artifact_container.chalk_info["_INSTANCE_STATUS"] = "exited" - # extract on container name and container id now that container is stopped extract_container_name_stopped = chalk.extract(container_name) - validate_docker_chalk_report( - chalk_report=extract_container_name_stopped.report, - artifact=artifact_container, - virtual=False, - chalk_action="extract", + assert extract_container_name_stopped.report.contains( + { + "_OPERATION": "extract", + "_OP_EXE_NAME": chalk.binary.name, + "_OP_UNMARKED_COUNT": 0, + "_OP_CHALK_COUNT": 1, + } + ) + assert extract_container_name_stopped.mark.contains( + { + "_OP_ARTIFACT_TYPE": "Docker Container", + "_IMAGE_ID": image_id, + "_CURRENT_HASH": image_id, + "_INSTANCE_CONTAINER_ID": container_id, + "_INSTANCE_NAME": container_name, + "_INSTANCE_STATUS": "exited", + } ) extract_container_id_stopped = chalk.extract(container_id) - validate_docker_chalk_report( - chalk_report=extract_container_id_stopped.report, - artifact=artifact_container, - virtual=False, - chalk_action="extract", + assert extract_container_id_stopped.report.contains( + extract_container_name_stopped.report.deterministic() ) diff --git a/tests/functional/test_elf.py b/tests/functional/test_elf.py index 96d57b19..769581d2 100644 --- a/tests/functional/test_elf.py +++ b/tests/functional/test_elf.py @@ -7,12 +7,6 @@ import pytest from .chalk.runner import Chalk -from .chalk.validate import ( - ArtifactInfo, - validate_chalk_report, - validate_extracted_chalk, - validate_virtual_chalk, -) from .conf import DATE_PATH, GDB, LS_PATH, UNAME_PATH from .utils.log import get_logger from .utils.os import run @@ -21,51 +15,28 @@ logger = get_logger() -# XXX parameterizing this in case we need ELF files with different properties -# but we don't want to simply run different binaries like date/ls/cat/uname -# if we don't expect the behavior to vary @pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) -def test_virtual_valid(copy_files: list[Path], tmp_data_dir: Path, chalk: Chalk): +@pytest.mark.parametrize("virtual", [True, False]) +def test_valid( + copy_files: list[Path], + tmp_data_dir: Path, + chalk: Chalk, + virtual: bool, +): artifact = copy_files[0] - artifact_info = ArtifactInfo.one_elf(artifact) - insert = chalk.insert(artifact=tmp_data_dir, virtual=True) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=True - ) + insert = chalk.insert(artifact=tmp_data_dir, virtual=virtual) + assert insert.report.marks_by_path.contains({str(artifact): {}}) - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=True - ) - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=True - ) + extract = chalk.extract(artifact=tmp_data_dir, virtual=virtual) + if not virtual: + assert extract.report.marks_by_path.contains({str(artifact): {}}) # compare extractions - extract2 = chalk.extract(artifact=tmp_data_dir) + extract2 = chalk.extract(artifact=tmp_data_dir, virtual=virtual) assert extract.report.datetime < extract2.report.datetime -@pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) -def test_nonvirtual_valid(copy_files: list[Path], tmp_data_dir: Path, chalk: Chalk): - artifact = copy_files[0] - artifact_info = ArtifactInfo.one_elf(artifact) - - insert = chalk.insert(artifact=tmp_data_dir, virtual=False) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=False - ) - - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=False - ) - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=False - ) - - @pytest.mark.requires_gdb @pytest.mark.parametrize( "copy_files", diff --git a/tests/functional/test_plugins.py b/tests/functional/test_plugins.py index 88d61994..934d44ab 100644 --- a/tests/functional/test_plugins.py +++ b/tests/functional/test_plugins.py @@ -10,13 +10,6 @@ import pytest from .chalk.runner import Chalk, ChalkMark -from .chalk.validate import ( - ArtifactInfo, - validate_chalk_report, - validate_docker_chalk_report, - validate_extracted_chalk, - validate_virtual_chalk, -) from .conf import CODEOWNERS, CONFIGS, DATA, DOCKERFILES, LS_PATH, PYS from .utils.dict import ANY, MISSING from .utils.docker import Docker @@ -44,47 +37,22 @@ def test_codeowners(tmp_data_dir: Path, chalk: Chalk): folder = CODEOWNERS / "raw1" expected_owners = (folder / "CODEOWNERS").read_text() shutil.copytree(folder, tmp_data_dir, dirs_exist_ok=True) - artifact_info = ArtifactInfo.all_shebangs() - assert len(artifact_info) == 1 - artifact = Path(list(artifact_info.keys())[0]) Git(tmp_data_dir).init().add().commit() + artifact = tmp_data_dir / "helloworld.py" # chalk reports generated by insertion, json array that has one element insert = chalk.insert(artifact=artifact, virtual=True) - assert insert.mark["CODE_OWNERS"] == expected_owners - # check chalk report - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=True + assert insert.marks_by_path.contains( + {str(artifact): {"CODE_OWNERS": expected_owners}} ) # array of json chalk objects as output, of which we are only expecting one - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=True - ) - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=True - ) + assert chalk.extract(artifact=tmp_data_dir, virtual=True) @pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) def test_github(copy_files: list[Path], chalk: Chalk, server_imds: str): bin_path = copy_files[0] - artifact = ArtifactInfo.one_elf( - bin_path, - host_info={ - "BUILD_ID": "1658821493", - "BUILD_COMMIT_ID": "ffac537e6cbbf934b08745a378932722df287a53", - "BUILD_TRIGGER": "tag", - "BUILD_CONTACT": ["octocat"], - "BUILD_URI": "https://github.com/octocat/Hello-World/actions/runs/1658821493/attempts/5", - "BUILD_API_URI": server_imds, - "BUILD_ORIGIN_ID": "123", - "BUILD_ORIGIN_KEY": "abc", - "BUILD_ORIGIN_OWNER_ID": "456", - "BUILD_ORIGIN_OWNER_KEY": "xyz", - }, - ) insert = chalk.insert( bin_path, env={ @@ -105,31 +73,25 @@ def test_github(copy_files: list[Path], chalk: Chalk, server_imds: str): "GITHUB_REF_TYPE": "tag", }, ) - - validate_chalk_report( - chalk_report=insert.report, - artifact_map=artifact, - virtual=False, - chalk_action="insert", + assert insert.report.contains( + { + "BUILD_ID": "1658821493", + "BUILD_COMMIT_ID": "ffac537e6cbbf934b08745a378932722df287a53", + "BUILD_TRIGGER": "tag", + "BUILD_CONTACT": ["octocat"], + "BUILD_URI": "https://github.com/octocat/Hello-World/actions/runs/1658821493/attempts/5", + "BUILD_API_URI": server_imds, + "BUILD_ORIGIN_ID": "123", + "BUILD_ORIGIN_KEY": "abc", + "BUILD_ORIGIN_OWNER_ID": "456", + "BUILD_ORIGIN_OWNER_KEY": "xyz", + } ) @pytest.mark.parametrize("copy_files", [[LS_PATH]], indirect=True) def test_gitlab(copy_files: list[Path], chalk: Chalk): bin_path = copy_files[0] - artifact = ArtifactInfo.one_elf( - bin_path, - host_info={ - "BUILD_ID": "4999820578", - "BUILD_COMMIT_ID": "ffac537e6cbbf934b08745a378932722df287a53", - "BUILD_TRIGGER": "push", - "BUILD_CONTACT": ["user"], - "BUILD_URI": "https://gitlab.com/gitlab-org/gitlab/-/jobs/4999820578", - "BUILD_API_URI": "https://gitlab.com/api/v4", - "BUILD_ORIGIN_ID": "123", - "BUILD_ORIGIN_OWNER_ID": "456", - }, - ) insert = chalk.insert( bin_path, env={ @@ -146,11 +108,17 @@ def test_gitlab(copy_files: list[Path], chalk: Chalk): "CI_PROJECT_NAMESPACE_ID": "456", }, ) - validate_chalk_report( - chalk_report=insert.report, - artifact_map=artifact, - virtual=False, - chalk_action="insert", + assert insert.report.contains( + { + "BUILD_ID": "4999820578", + "BUILD_COMMIT_ID": "ffac537e6cbbf934b08745a378932722df287a53", + "BUILD_TRIGGER": "push", + "BUILD_CONTACT": ["user"], + "BUILD_URI": "https://gitlab.com/gitlab-org/gitlab/-/jobs/4999820578", + "BUILD_API_URI": "https://gitlab.com/api/v4", + "BUILD_ORIGIN_ID": "123", + "BUILD_ORIGIN_OWNER_ID": "456", + } ) @@ -872,16 +840,6 @@ def test_syft_docker(chalk_copy: Chalk, test_file: str, random_hex: str): assert build.report.contains(sbom_data) assert build.mark.has(SBOM=MISSING) - # artifact is the docker image - artifact_info = ArtifactInfo( - type="Docker Image", - # keys to check - host_info=sbom_data, - ) - validate_docker_chalk_report( - chalk_report=build.report, artifact=artifact_info, virtual=False - ) - # check sbom data from running container _, result = Docker.run( image=image_hash, @@ -921,15 +879,9 @@ def test_syft_binary(copy_files: list[Path], chalk_copy: Chalk, use_docker: bool } } - artifact = ArtifactInfo.one_elf(bin_path, chalk_info=sbom_data) - insert = chalk.insert(bin_path, env={"EXTERNAL_TOOL_USE_DOCKER": str(use_docker)}) - validate_chalk_report( - chalk_report=insert.report, - artifact_map=artifact, - virtual=False, - chalk_action="insert", - ) + assert insert.marks_by_path.contains({str(bin_path): {}}) + assert insert.report.contains(sbom_data) if use_docker: assert "ghcr.io/anchore/syft" in insert.logs else: @@ -1018,19 +970,12 @@ def test_semgrep( } } } - artifact = ArtifactInfo.one_elf( - tmp_data_dir / "helloworld.py", chalk_info=sast_data - ) insert = chalk.insert( artifact=tmp_data_dir, env={"EXTERNAL_TOOL_USE_DOCKER": str(use_docker)} ) - validate_chalk_report( - chalk_report=insert.report, - artifact_map=artifact, - virtual=False, - chalk_action="insert", - ) + assert insert.marks_by_path.contains({str(tmp_data_dir / "helloworld.py"): {}}) + assert insert.report.contains(sast_data) if use_docker: assert "semgrep/semgrep" in insert.logs else: diff --git a/tests/functional/test_py.py b/tests/functional/test_py.py index afdfb113..61ad07d7 100644 --- a/tests/functional/test_py.py +++ b/tests/functional/test_py.py @@ -4,18 +4,12 @@ # (see https://crashoverride.com/docs/chalk) import shutil from pathlib import Path +from typing import Optional import pytest from .chalk.runner import Chalk -from .chalk.validate import ( - MAGIC, - ArtifactInfo, - validate_chalk_report, - validate_extracted_chalk, - validate_virtual_chalk, -) -from .conf import PYS, SHEBANG +from .conf import MAGIC, PYS, SHEBANG from .utils.log import get_logger @@ -23,68 +17,70 @@ @pytest.mark.parametrize( - "test_file", + "test_file, shebang", [ - "sample_1", - "sample_2", - "sample_3", - "sample_4", + ("sample_1", "helloworld.py"), + ("sample_2", "main.py"), + ("sample_3", None), + ("sample_4", None), ], ) -def test_virtual_valid(tmp_data_dir: Path, chalk: Chalk, test_file: str): +def test_virtual_valid( + tmp_data_dir: Path, chalk: Chalk, test_file: str, shebang: Optional[str] +): shutil.copytree(PYS / test_file, tmp_data_dir, dirs_exist_ok=True) - artifact_info = ArtifactInfo.all_shebangs() # chalk reports generated by insertion, json array that has one element - insert = chalk.insert(artifact=tmp_data_dir, virtual=True) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=True + insert = chalk.insert( + artifact=tmp_data_dir, + virtual=True, + expecting_chalkmarks=bool(shebang), ) + if shebang: + assert insert.marks_by_path.contains({str(tmp_data_dir / shebang): {}}) # array of json chalk objects as output, of which we are only expecting one - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=True - ) - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=True - ) + assert chalk.extract(artifact=tmp_data_dir, virtual=True) @pytest.mark.parametrize( - "test_file", + "test_file, shebang", [ - "sample_1", - "sample_2", - "sample_3", - "sample_4", + ("sample_1", "helloworld.py"), + ("sample_2", "main.py"), + ("sample_3", None), + ("sample_4", None), ], ) -def test_nonvirtual_valid(tmp_data_dir: Path, chalk: Chalk, test_file: str): +def test_nonvirtual_valid( + tmp_data_dir: Path, chalk: Chalk, test_file: str, shebang: Optional[str] +): shutil.copytree(PYS / test_file, tmp_data_dir, dirs_exist_ok=True) - artifact_info = ArtifactInfo.all_shebangs() # chalk reports generated by insertion, json array that has one element - insert = chalk.insert(artifact=tmp_data_dir, virtual=False) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=False + insert = chalk.insert( + artifact=tmp_data_dir, + virtual=False, + expecting_chalkmarks=bool(shebang), ) + if shebang: + assert insert.marks_by_path.contains({str(tmp_data_dir / shebang): {}}) # array of json chalk objects as output, of which we are only expecting one - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=False - ) - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=False + extract = chalk.extract( + artifact=tmp_data_dir, + virtual=False, + expecting_chalkmarks=bool(shebang), ) + if shebang: + assert extract.marks_by_path.contains({str(tmp_data_dir / shebang): {}}) # check that first line shebangs are not clobbered in non-virtual chalk for file in tmp_data_dir.iterdir(): if file.suffix in {"key", "pub"}: continue - is_artifact = str(file) in artifact_info + is_artifact = file.name == shebang text = file.read_text() lines = text.splitlines() first_line = next(iter(lines), "") diff --git a/tests/functional/test_sink.py b/tests/functional/test_sink.py index 80c04468..286f7486 100644 --- a/tests/functional/test_sink.py +++ b/tests/functional/test_sink.py @@ -43,28 +43,25 @@ def _validate_chalk( # TODO add a test for the file not being present @pytest.mark.parametrize("copy_files", [[CAT_PATH]], indirect=True) -def test_file_present(tmp_data_dir: Path, chalk: Chalk, copy_files: list[Path]): +def test_file_present( + tmp_data_dir: Path, chalk: Chalk, copy_files: list[Path], tmp_file +): artifact = copy_files[0] # prep config file - file_output_path = Path("/tmp/sink_file.json") - if not file_output_path.is_file(): - # touch the file - open(file_output_path, "a").close() - os.utime(file_output_path, None) - assert file_output_path.is_file(), "file sink path must be a valid path" + assert tmp_file.is_file(), "file sink path must be a valid path" config = SINK_CONFIGS / "file.c4m" chalk.insert( config=config, artifact=artifact, - env={"SINK_TEST_OUTPUT_FILE": str(file_output_path)}, + env={"SINK_TEST_OUTPUT_FILE": str(tmp_file)}, ) # check that file output is correct - assert file_output_path.is_file(), "file sink should exist after chalk operation" + assert tmp_file.is_file(), "file sink should exist after chalk operation" - contents = file_output_path.read_text() + contents = tmp_file.read_text() assert contents chalks = json.loads(contents) assert len(chalks) == 1 diff --git a/tests/functional/test_zip.py b/tests/functional/test_zip.py index 9154f9af..0cd9e7b7 100644 --- a/tests/functional/test_zip.py +++ b/tests/functional/test_zip.py @@ -2,21 +2,12 @@ # # This file is part of Chalk # (see https://crashoverride.com/docs/chalk) -import shutil from pathlib import Path import pytest from .chalk.runner import Chalk -from .chalk.validate import ( - ArtifactInfo, - validate_chalk_report, - validate_extracted_chalk, - validate_virtual_chalk, -) from .conf import ZIPS -from .utils.dict import ANY -from .utils.git import Git from .utils.log import get_logger @@ -25,158 +16,83 @@ @pytest.mark.slow() @pytest.mark.parametrize( - "test_file", + "copy_files", [ - "nodejs", - "python", + [ZIPS / "nodejs" / "function.zip"], + [ZIPS / "python" / "my_deployment_package.zip"], ], + indirect=True, ) -def test_virtual_valid_slow(tmp_data_dir: Path, chalk: Chalk, test_file: str): - shutil.copytree(ZIPS / test_file, tmp_data_dir, dirs_exist_ok=True) - artifact = next((ZIPS / test_file).iterdir()) - - # we are only checking the ZIP chalk mark, not any of the subchalks - artifact_info = { - str(tmp_data_dir / artifact.name): ArtifactInfo(type="ZIP"), - } +@pytest.mark.parametrize("virtual", [True, False]) +def test_valid_slow( + tmp_data_dir: Path, + chalk: Chalk, + copy_files: list[Path], + virtual: bool, +): + test_file = copy_files[0] # chalk reports generated by insertion, json array that has one element - insert = chalk.insert(artifact=tmp_data_dir, virtual=True) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=True - ) + insert = chalk.insert(artifact=tmp_data_dir, virtual=virtual) + assert insert.report.marks_by_path.contains({str(test_file): {}}) # array of json chalk objects as output, of which we are only expecting one - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=True - ) - # FIXME: virtual chalks not currently validated as every subfile in zip gets chalked - # generating too many chalks to check - # validate_virtual_chalk( - # tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=True - # ) - - -def test_virtual_empty(tmp_data_dir: Path, chalk: Chalk): - # empty zip file does not get chalked, so no artifact info - shutil.copytree(ZIPS / "empty", tmp_data_dir, dirs_exist_ok=True) - - # chalk reports generated by insertion, json array that has one element - insert = chalk.insert(artifact=tmp_data_dir, virtual=True) - - # check chalk report -- operation is the only thing we can check since no _CHALK will be generated - # on an unchalked empty zip - assert insert.report["_OPERATION"] == "insert" - assert not insert.report.get("_CHALK") - - # array of json chalk objects as output, of which we are only expecting one - extract = chalk.extract(artifact=tmp_data_dir) - - # check chalk extract -- operation is the only thing we can check since no _CHALK will be generated - # on an unchalked empty zip - assert extract.report["_OPERATION"] == "extract" - assert not insert.report.get("_CHALK") + extract = chalk.extract(artifact=tmp_data_dir, virtual=virtual) + if not virtual: + assert extract.report.marks_by_path.contains({str(test_file): {}}) @pytest.mark.parametrize( - "test_file", + "copy_files", [ - "misc", - "golang", + # empty zip file does not get chalked, so no artifact info + [ZIPS / "empty" / "empty.zip"], ], + indirect=True, ) -def test_virtual_valid(tmp_data_dir: Path, chalk: Chalk, test_file: str): - shutil.copytree(ZIPS / test_file, tmp_data_dir, dirs_exist_ok=True) - artifact = next((ZIPS / test_file).iterdir()) - - # we are only checking the ZIP chalk mark, not any of the subchalks - artifact_info = { - str(tmp_data_dir / artifact.name): ArtifactInfo(type="ZIP"), - } - - # chalk reports generated by insertion, json array that has one element - insert = chalk.insert(artifact=tmp_data_dir, virtual=True) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=True +@pytest.mark.parametrize("virtual", [True, False]) +def test_empty( + tmp_data_dir: Path, + copy_files: list[Path], + chalk: Chalk, + virtual: bool, +): + # no _CHALK will be generated on an unchalked empty zip + assert chalk.insert( + artifact=tmp_data_dir, + virtual=virtual, + expecting_chalkmarks=False, ) - - # array of json chalk objects as output, of which we are only expecting one - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=True + assert chalk.extract( + artifact=tmp_data_dir, + virtual=virtual, + expecting_chalkmarks=False, ) - # FIXME: virtual chalks not currently validated as every subfile in zip gets chalked - # generating too many chalks to check - # validate_virtual_chalk( - # tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=True - # ) -@pytest.mark.slow() @pytest.mark.parametrize( - "test_file", + "copy_files", [ - "nodejs", - "python", + [ZIPS / "misc" / "misc.zip"], + [ZIPS / "golang" / "myFunction.zip"], ], + indirect=True, ) -def test_nonvirtual_valid_slow(tmp_data_dir: Path, chalk: Chalk, test_file: str): - shutil.copytree(ZIPS / test_file, tmp_data_dir, dirs_exist_ok=True) - artifact = next((ZIPS / test_file).iterdir()) - - # we are only checking the ZIP chalk mark, not any of the subchalks - artifact_info = { - str(tmp_data_dir / artifact.name): ArtifactInfo(type="ZIP"), - } +@pytest.mark.parametrize("virtual", [True, False]) +def test_valid( + tmp_data_dir: Path, + chalk: Chalk, + copy_files: list[Path], + virtual: bool, +): + test_file = copy_files[0] # chalk reports generated by insertion, json array that has one element - insert = chalk.insert(artifact=tmp_data_dir, virtual=False) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=False - ) - - # array of json chalk objects as output, of which we are only expecting one - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=False - ) - # validation here okay as we are just checking that virtual-chalk.json file doesn't exist - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=False - ) - - -@pytest.mark.parametrize( - "test_file", - [ - "misc", - "golang", - ], -) -def test_nonvirtual_valid(tmp_data_dir: Path, chalk: Chalk, test_file: str): - shutil.copytree(ZIPS / test_file, tmp_data_dir, dirs_exist_ok=True) - artifact = next((ZIPS / test_file).iterdir()) - - Git(tmp_data_dir).init().add().commit() - + insert = chalk.insert(artifact=tmp_data_dir, virtual=virtual) # we are only checking the ZIP chalk mark, not any of the subchalks - artifact_info = { - str(tmp_data_dir / artifact.name): ArtifactInfo(type="ZIP"), - } - - # chalk reports generated by insertion, json array that has one element - insert = chalk.insert(artifact=tmp_data_dir, virtual=False) - assert insert.mark.has(COMMIT_ID=ANY) - validate_chalk_report( - chalk_report=insert.report, artifact_map=artifact_info, virtual=False - ) + assert insert.report.marks_by_path.contains({str(test_file): {}}) - extract = chalk.extract(artifact=tmp_data_dir) - validate_extracted_chalk( - extracted_chalk=extract.report, artifact_map=artifact_info, virtual=False - ) - # validation here okay as we are just checking that virtual-chalk.json file doesn't exist - validate_virtual_chalk( - tmp_data_dir=tmp_data_dir, artifact_map=artifact_info, virtual=False - ) + # array of json chalk objects as output, of which we are only expecting one + extract = chalk.extract(artifact=tmp_data_dir, virtual=virtual) + if not virtual: + assert extract.report.marks_by_path.contains({str(test_file): {}})