From 44b953d8f4fe298e3fbb4bc8c74afe8ca758d894 Mon Sep 17 00:00:00 2001 From: Jordan Borean Date: Wed, 1 Apr 2020 06:39:02 +1000 Subject: [PATCH] ansible-galaxy - Fix tar path traversal issue during install - CVE-2020-10691 (#68596) (cherry picked from commit a20a52701402a12f91396549df04ac55809f68e9) --- .../galaxy-install-tar-path-traversal.yaml | 2 + lib/ansible/galaxy/collection.py | 48 +- .../files/build_bad_tar.py | 84 +++ .../tasks/install.yml | 20 + test/units/galaxy/test_collection.py | 582 ++++++++++++++++++ 5 files changed, 719 insertions(+), 17 deletions(-) create mode 100644 changelogs/fragments/galaxy-install-tar-path-traversal.yaml create mode 100644 test/integration/targets/ansible-galaxy-collection/files/build_bad_tar.py diff --git a/changelogs/fragments/galaxy-install-tar-path-traversal.yaml b/changelogs/fragments/galaxy-install-tar-path-traversal.yaml new file mode 100644 index 00000000000000..c2382bf4bf7c64 --- /dev/null +++ b/changelogs/fragments/galaxy-install-tar-path-traversal.yaml @@ -0,0 +1,2 @@ +bugfixes: +- ansible-galaxy - Error when install finds a tar with a file that will be extracted outside the collection install directory - CVE-2020-10691 diff --git a/lib/ansible/galaxy/collection.py b/lib/ansible/galaxy/collection.py index fd50472f79d235..69b62e0c60d837 100644 --- a/lib/ansible/galaxy/collection.py +++ b/lib/ansible/galaxy/collection.py @@ -168,24 +168,34 @@ def install(self, path, b_temp_path): shutil.rmtree(b_collection_path) os.makedirs(b_collection_path) - with tarfile.open(self.b_path, mode='r') as collection_tar: - files_member_obj = collection_tar.getmember('FILES.json') - with _tarfile_extract(collection_tar, files_member_obj) as files_obj: - files = json.loads(to_text(files_obj.read(), errors='surrogate_or_strict')) + try: + with tarfile.open(self.b_path, mode='r') as collection_tar: + files_member_obj = collection_tar.getmember('FILES.json') + with _tarfile_extract(collection_tar, files_member_obj) as files_obj: + files = json.loads(to_text(files_obj.read(), errors='surrogate_or_strict')) - _extract_tar_file(collection_tar, 'MANIFEST.json', b_collection_path, b_temp_path) - _extract_tar_file(collection_tar, 'FILES.json', b_collection_path, b_temp_path) + _extract_tar_file(collection_tar, 'MANIFEST.json', b_collection_path, b_temp_path) + _extract_tar_file(collection_tar, 'FILES.json', b_collection_path, b_temp_path) - for file_info in files['files']: - file_name = file_info['name'] - if file_name == '.': - continue + for file_info in files['files']: + file_name = file_info['name'] + if file_name == '.': + continue - if file_info['ftype'] == 'file': - _extract_tar_file(collection_tar, file_name, b_collection_path, b_temp_path, - expected_hash=file_info['chksum_sha256']) - else: - os.makedirs(os.path.join(b_collection_path, to_bytes(file_name, errors='surrogate_or_strict'))) + if file_info['ftype'] == 'file': + _extract_tar_file(collection_tar, file_name, b_collection_path, b_temp_path, + expected_hash=file_info['chksum_sha256']) + else: + os.makedirs(os.path.join(b_collection_path, to_bytes(file_name, errors='surrogate_or_strict'))) + except Exception: + # Ensure we don't leave the dir behind in case of a failure. + shutil.rmtree(b_collection_path) + + b_namespace_path = os.path.dirname(b_collection_path) + if not os.listdir(b_namespace_path): + os.rmdir(b_namespace_path) + + raise def set_latest_version(self): self.versions = set([self.latest_version]) @@ -925,8 +935,12 @@ def _extract_tar_file(tar, filename, b_dest, b_temp_path, expected_hash=None): raise AnsibleError("Checksum mismatch for '%s' inside collection at '%s'" % (n_filename, to_native(tar.name))) - b_dest_filepath = os.path.join(b_dest, to_bytes(filename, errors='surrogate_or_strict')) - b_parent_dir = os.path.split(b_dest_filepath)[0] + b_dest_filepath = os.path.abspath(os.path.join(b_dest, to_bytes(filename, errors='surrogate_or_strict'))) + b_parent_dir = os.path.dirname(b_dest_filepath) + if b_parent_dir != b_dest and not b_parent_dir.startswith(b_dest + to_bytes(os.path.sep)): + raise AnsibleError("Cannot extract tar entry '%s' as it will be placed outside the collection directory" + % to_native(filename, errors='surrogate_or_strict')) + if not os.path.exists(b_parent_dir): # Seems like Galaxy does not validate if all file entries have a corresponding dir ftype entry. This check # makes sure we create the parent directory even if it wasn't set in the metadata. diff --git a/test/integration/targets/ansible-galaxy-collection/files/build_bad_tar.py b/test/integration/targets/ansible-galaxy-collection/files/build_bad_tar.py new file mode 100644 index 00000000000000..6182e865db0e02 --- /dev/null +++ b/test/integration/targets/ansible-galaxy-collection/files/build_bad_tar.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python + +# Copyright: (c) 2020, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import hashlib +import io +import json +import os +import sys +import tarfile + +manifest = { + 'collection_info': { + 'namespace': 'suspicious', + 'name': 'test', + 'version': '1.0.0', + 'dependencies': {}, + }, + 'file_manifest_file': { + 'name': 'FILES.json', + 'ftype': 'file', + 'chksum_type': 'sha256', + 'chksum_sha256': None, + 'format': 1 + }, + 'format': 1, +} + +files = { + 'files': [ + { + 'name': '.', + 'ftype': 'dir', + 'chksum_type': None, + 'chksum_sha256': None, + 'format': 1, + }, + ], + 'format': 1, +} + + +def add_file(tar_file, filename, b_content, update_files=True): + tar_info = tarfile.TarInfo(filename) + tar_info.size = len(b_content) + tar_info.mode = 0o0755 + tar_file.addfile(tarinfo=tar_info, fileobj=io.BytesIO(b_content)) + + if update_files: + sha256 = hashlib.sha256() + sha256.update(b_content) + + files['files'].append({ + 'name': filename, + 'ftype': 'file', + 'chksum_type': 'sha256', + 'chksum_sha256': sha256.hexdigest(), + 'format': 1 + }) + + +collection_tar = os.path.join(sys.argv[1], 'suspicious-test-1.0.0.tar.gz') +with tarfile.open(collection_tar, mode='w:gz') as tar_file: + add_file(tar_file, '../../outside.sh', b"#!/usr/bin/env bash\necho \"you got pwned\"") + + b_files = json.dumps(files).encode('utf-8') + b_files_hash = hashlib.sha256() + b_files_hash.update(b_files) + manifest['file_manifest_file']['chksum_sha256'] = b_files_hash.hexdigest() + add_file(tar_file, 'FILES.json', b_files) + add_file(tar_file, 'MANIFEST.json', json.dumps(manifest).encode('utf-8')) + + b_manifest = json.dumps(manifest).encode('utf-8') + + for name, b in [('MANIFEST.json', b_manifest), ('FILES.json', b_files)]: + b_io = io.BytesIO(b) + tar_info = tarfile.TarInfo(name) + tar_info.size = len(b) + tar_info.mode = 0o0644 + tar_file.addfile(tarinfo=tar_info, fileobj=b_io) diff --git a/test/integration/targets/ansible-galaxy-collection/tasks/install.yml b/test/integration/targets/ansible-galaxy-collection/tasks/install.yml index 649fa8faa58e17..3ed1a48d0de7af 100644 --- a/test/integration/targets/ansible-galaxy-collection/tasks/install.yml +++ b/test/integration/targets/ansible-galaxy-collection/tasks/install.yml @@ -133,6 +133,26 @@ - '"Installing ''namespace3.name:1.0.0'' to" in install_tarball.stdout' - (install_tarball_actual.content | b64decode | from_json).collection_info.version == '1.0.0' +- name: setup bad tarball - {{ test_name }} + script: build_bad_tar.py {{ galaxy_dir | quote }} + +- name: fail to install a collection from a bad tarball - {{ test_name }} + command: ansible-galaxy collection install '{{ galaxy_dir }}/suspicious-test-1.0.0.tar.gz' + register: fail_bad_tar + failed_when: fail_bad_tar.rc != 1 and "Cannot extract tar entry '../../outside.sh' as it will be placed outside the collection directory" not in fail_bad_tar.stderr + environment: + ANSIBLE_COLLECTIONS_PATHS: '{{ galaxy_dir }}/ansible_collections' + +- name: get result of failed collection install - {{ test_name }} + stat: + path: '{{ galaxy_dir }}/ansible_collections\suspicious' + register: fail_bad_tar_actual + +- name: assert result of failed collection install - {{ test_name }} + assert: + that: + - not fail_bad_tar_actual.stat.exists + - name: install a collection from a URI - {{ test_name }} command: ansible-galaxy collection install '{{ test_server }}custom/collections/namespace4-name-1.0.0.tar.gz' register: install_uri diff --git a/test/units/galaxy/test_collection.py b/test/units/galaxy/test_collection.py index 18df8bbdcd6d40..41090269dccf5b 100644 --- a/test/units/galaxy/test_collection.py +++ b/test/units/galaxy/test_collection.py @@ -9,6 +9,7 @@ import json import os import pytest +import re import tarfile import uuid @@ -563,3 +564,584 @@ def test_extract_tar_file_missing_parent_dir(tmp_tarfile): collection._extract_tar_file(tfile, filename, output_dir, temp_dir, checksum) os.path.isfile(output_file) + + +def test_extract_tar_file_outside_dir(tmp_path_factory): + filename = u'ÅÑŚÌβŁÈ' + temp_dir = to_bytes(tmp_path_factory.mktemp('test-%s Collections' % to_native(filename))) + tar_file = os.path.join(temp_dir, to_bytes('%s.tar.gz' % filename)) + data = os.urandom(8) + + tar_filename = '../%s.sh' % filename + with tarfile.open(tar_file, 'w:gz') as tfile: + b_io = BytesIO(data) + tar_info = tarfile.TarInfo(tar_filename) + tar_info.size = len(data) + tar_info.mode = 0o0644 + tfile.addfile(tarinfo=tar_info, fileobj=b_io) + + expected = re.escape("Cannot extract tar entry '%s' as it will be placed outside the collection directory" + % to_native(tar_filename)) + with tarfile.open(tar_file, 'r') as tfile: + with pytest.raises(AnsibleError, match=expected): + collection._extract_tar_file(tfile, tar_filename, os.path.join(temp_dir, to_bytes(filename)), temp_dir) + + +def test_require_one_of_collections_requirements_with_both(): + cli = GalaxyCLI(args=['ansible-galaxy', 'collection', 'verify', 'namespace.collection', '-r', 'requirements.yml']) + + with pytest.raises(AnsibleError) as req_err: + cli._require_one_of_collections_requirements(('namespace.collection',), 'requirements.yml') + + with pytest.raises(AnsibleError) as cli_err: + cli.run() + + assert req_err.value.message == cli_err.value.message == 'The positional collection_name arg and --requirements-file are mutually exclusive.' + + +def test_require_one_of_collections_requirements_with_neither(): + cli = GalaxyCLI(args=['ansible-galaxy', 'collection', 'verify']) + + with pytest.raises(AnsibleError) as req_err: + cli._require_one_of_collections_requirements((), '') + + with pytest.raises(AnsibleError) as cli_err: + cli.run() + + assert req_err.value.message == cli_err.value.message == 'You must specify a collection name or a requirements file.' + + +def test_require_one_of_collections_requirements_with_collections(): + cli = GalaxyCLI(args=['ansible-galaxy', 'collection', 'verify', 'namespace1.collection1', 'namespace2.collection1:1.0.0']) + collections = ('namespace1.collection1', 'namespace2.collection1:1.0.0',) + + requirements = cli._require_one_of_collections_requirements(collections, '') + + assert requirements == [('namespace1.collection1', '*', None), ('namespace2.collection1', '1.0.0', None)] + + +@patch('ansible.cli.galaxy.GalaxyCLI._parse_requirements_file') +def test_require_one_of_collections_requirements_with_requirements(mock_parse_requirements_file, galaxy_server): + cli = GalaxyCLI(args=['ansible-galaxy', 'collection', 'verify', '-r', 'requirements.yml', 'namespace.collection']) + mock_parse_requirements_file.return_value = {'collections': [('namespace.collection', '1.0.5', galaxy_server)]} + requirements = cli._require_one_of_collections_requirements((), 'requirements.yml') + + assert mock_parse_requirements_file.call_count == 1 + assert requirements == [('namespace.collection', '1.0.5', galaxy_server)] + + +@patch('ansible.cli.galaxy.GalaxyCLI.execute_verify', spec=True) +def test_call_GalaxyCLI(execute_verify): + galaxy_args = ['ansible-galaxy', 'collection', 'verify', 'namespace.collection'] + + GalaxyCLI(args=galaxy_args).run() + + assert execute_verify.call_count == 1 + + +@patch('ansible.cli.galaxy.GalaxyCLI.execute_verify') +def test_call_GalaxyCLI_with_implicit_role(execute_verify): + galaxy_args = ['ansible-galaxy', 'verify', 'namespace.implicit_role'] + + with pytest.raises(SystemExit): + GalaxyCLI(args=galaxy_args).run() + + assert not execute_verify.called + + +@patch('ansible.cli.galaxy.GalaxyCLI.execute_verify') +def test_call_GalaxyCLI_with_role(execute_verify): + galaxy_args = ['ansible-galaxy', 'role', 'verify', 'namespace.role'] + + with pytest.raises(SystemExit): + GalaxyCLI(args=galaxy_args).run() + + assert not execute_verify.called + + +@patch('ansible.cli.galaxy.verify_collections', spec=True) +def test_execute_verify_with_defaults(mock_verify_collections): + galaxy_args = ['ansible-galaxy', 'collection', 'verify', 'namespace.collection:1.0.4'] + GalaxyCLI(args=galaxy_args).run() + + assert mock_verify_collections.call_count == 1 + + requirements, search_paths, galaxy_apis, validate, ignore_errors = mock_verify_collections.call_args[0] + + assert requirements == [('namespace.collection', '1.0.4', None)] + for install_path in search_paths: + assert install_path.endswith('ansible_collections') + assert galaxy_apis[0].api_server == 'https://galaxy.ansible.com' + assert validate is True + assert ignore_errors is False + + +@patch('ansible.cli.galaxy.verify_collections', spec=True) +def test_execute_verify(mock_verify_collections): + GalaxyCLI(args=[ + 'ansible-galaxy', 'collection', 'verify', 'namespace.collection:1.0.4', '--ignore-certs', + '-p', '~/.ansible', '--ignore-errors', '--server', 'http://galaxy-dev.com', + ]).run() + + assert mock_verify_collections.call_count == 1 + + requirements, search_paths, galaxy_apis, validate, ignore_errors = mock_verify_collections.call_args[0] + + assert requirements == [('namespace.collection', '1.0.4', None)] + for install_path in search_paths: + assert install_path.endswith('ansible_collections') + assert galaxy_apis[0].api_server == 'http://galaxy-dev.com' + assert validate is False + assert ignore_errors is True + + +def test_verify_file_hash_deleted_file(manifest_info): + data = to_bytes(json.dumps(manifest_info)) + digest = sha256(data).hexdigest() + + namespace = manifest_info['collection_info']['namespace'] + name = manifest_info['collection_info']['name'] + version = manifest_info['collection_info']['version'] + server = 'http://galaxy.ansible.com' + + error_queue = [] + + with patch.object(builtins, 'open', mock_open(read_data=data)) as m: + with patch.object(collection.os.path, 'isfile', MagicMock(return_value=False)) as mock_isfile: + collection_req = collection.CollectionRequirement(namespace, name, './', server, [version], version, False) + collection_req._verify_file_hash(b'path/', 'file', digest, error_queue) + + assert mock_isfile.called_once + + assert len(error_queue) == 1 + assert error_queue[0].installed is None + assert error_queue[0].expected == digest + + +def test_verify_file_hash_matching_hash(manifest_info): + + data = to_bytes(json.dumps(manifest_info)) + digest = sha256(data).hexdigest() + + namespace = manifest_info['collection_info']['namespace'] + name = manifest_info['collection_info']['name'] + version = manifest_info['collection_info']['version'] + server = 'http://galaxy.ansible.com' + + error_queue = [] + + with patch.object(builtins, 'open', mock_open(read_data=data)) as m: + with patch.object(collection.os.path, 'isfile', MagicMock(return_value=True)) as mock_isfile: + collection_req = collection.CollectionRequirement(namespace, name, './', server, [version], version, False) + collection_req._verify_file_hash(b'path/', 'file', digest, error_queue) + + assert mock_isfile.called_once + + assert error_queue == [] + + +def test_verify_file_hash_mismatching_hash(manifest_info): + + data = to_bytes(json.dumps(manifest_info)) + digest = sha256(data).hexdigest() + different_digest = 'not_{0}'.format(digest) + + namespace = manifest_info['collection_info']['namespace'] + name = manifest_info['collection_info']['name'] + version = manifest_info['collection_info']['version'] + server = 'http://galaxy.ansible.com' + + error_queue = [] + + with patch.object(builtins, 'open', mock_open(read_data=data)) as m: + with patch.object(collection.os.path, 'isfile', MagicMock(return_value=True)) as mock_isfile: + collection_req = collection.CollectionRequirement(namespace, name, './', server, [version], version, False) + collection_req._verify_file_hash(b'path/', 'file', different_digest, error_queue) + + assert mock_isfile.called_once + + assert len(error_queue) == 1 + assert error_queue[0].installed == digest + assert error_queue[0].expected == different_digest + + +def test_consume_file(manifest): + + manifest_file, checksum = manifest + assert checksum == collection._consume_file(manifest_file) + + +def test_consume_file_and_write_contents(manifest, manifest_info): + + manifest_file, checksum = manifest + + write_to = BytesIO() + actual_hash = collection._consume_file(manifest_file, write_to) + + write_to.seek(0) + assert to_bytes(json.dumps(manifest_info)) == write_to.read() + assert actual_hash == checksum + + +def test_get_tar_file_member(tmp_tarfile): + + temp_dir, tfile, filename, checksum = tmp_tarfile + + with collection._get_tar_file_member(tfile, filename) as tar_file_obj: + assert isinstance(tar_file_obj, tarfile.ExFileObject) + + +def test_get_nonexistent_tar_file_member(tmp_tarfile): + temp_dir, tfile, filename, checksum = tmp_tarfile + + file_does_not_exist = filename + 'nonexistent' + + with pytest.raises(AnsibleError) as err: + collection._get_tar_file_member(tfile, file_does_not_exist) + + assert to_text(err.value.message) == "Collection tar at '%s' does not contain the expected file '%s'." % (to_text(tfile.name), file_does_not_exist) + + +def test_get_tar_file_hash(tmp_tarfile): + temp_dir, tfile, filename, checksum = tmp_tarfile + + assert checksum == collection._get_tar_file_hash(tfile.name, filename) + + +def test_get_json_from_tar_file(tmp_tarfile): + temp_dir, tfile, filename, checksum = tmp_tarfile + + assert 'MANIFEST.json' in tfile.getnames() + + data = collection._get_json_from_tar_file(tfile.name, 'MANIFEST.json') + + assert isinstance(data, dict) + + +def test_verify_collection_not_installed(mock_collection): + + local_collection = mock_collection(local_installed=False) + remote_collection = mock_collection(local=False) + + with patch.object(collection.display, 'display') as mocked_display: + local_collection.verify(remote_collection, './', './') + + assert mocked_display.called + assert mocked_display.call_args[0][0] == "'%s.%s' has not been installed, nothing to verify" % (local_collection.namespace, local_collection.name) + + +def test_verify_successful_debug_info(monkeypatch, mock_collection): + local_collection = mock_collection() + remote_collection = mock_collection(local=False) + + monkeypatch.setattr(collection, '_get_tar_file_hash', MagicMock()) + monkeypatch.setattr(collection.CollectionRequirement, '_verify_file_hash', MagicMock()) + monkeypatch.setattr(collection, '_get_json_from_tar_file', MagicMock()) + + with patch.object(collection.display, 'vvv') as mock_display: + local_collection.verify(remote_collection, './', './') + + namespace = local_collection.namespace + name = local_collection.name + version = local_collection.latest_version + + assert mock_display.call_count == 4 + assert mock_display.call_args_list[0][0][0] == "Verifying '%s.%s:%s'." % (namespace, name, version) + assert mock_display.call_args_list[1][0][0] == "Installed collection found at './%s/%s'" % (namespace, name) + located = "Remote collection found at 'https://galaxy.ansible.com/download/%s-%s-%s.tar.gz'" % (namespace, name, version) + assert mock_display.call_args_list[2][0][0] == located + verified = "Successfully verified that checksums for '%s.%s:%s' match the remote collection" % (namespace, name, version) + assert mock_display.call_args_list[3][0][0] == verified + + +def test_verify_different_versions(mock_collection): + + local_collection = mock_collection(version='0.1.0') + remote_collection = mock_collection(local=False, version='3.0.0') + + with patch.object(collection.display, 'display') as mock_display: + local_collection.verify(remote_collection, './', './') + + namespace = local_collection.namespace + name = local_collection.name + installed_version = local_collection.latest_version + compared_version = remote_collection.latest_version + + msg = "%s.%s has the version '%s' but is being compared to '%s'" % (namespace, name, installed_version, compared_version) + + assert mock_display.call_count == 1 + assert mock_display.call_args[0][0] == msg + + +@patch.object(builtins, 'open', mock_open()) +def test_verify_modified_manifest(monkeypatch, mock_collection, manifest_info): + local_collection = mock_collection() + remote_collection = mock_collection(local=False) + + monkeypatch.setattr(collection, '_get_tar_file_hash', MagicMock(side_effect=['manifest_checksum'])) + monkeypatch.setattr(collection, '_consume_file', MagicMock(side_effect=['manifest_checksum_modified', 'files_manifest_checksum'])) + monkeypatch.setattr(collection, '_get_json_from_tar_file', MagicMock(side_effect=[manifest_info, {'files': []}])) + monkeypatch.setattr(collection.os.path, 'isfile', MagicMock(return_value=True)) + + with patch.object(collection.display, 'display') as mock_display: + with patch.object(collection.display, 'vvv') as mock_debug: + local_collection.verify(remote_collection, './', './') + + namespace = local_collection.namespace + name = local_collection.name + + assert mock_display.call_count == 3 + assert mock_display.call_args_list[0][0][0] == 'Collection %s.%s contains modified content in the following files:' % (namespace, name) + assert mock_display.call_args_list[1][0][0] == '%s.%s' % (namespace, name) + assert mock_display.call_args_list[2][0][0] == ' MANIFEST.json' + + # The -vvv output should show details (the checksums do not match) + assert mock_debug.call_count == 5 + assert mock_debug.call_args_list[-1][0][0] == ' Expected: manifest_checksum\n Found: manifest_checksum_modified' + + +@patch.object(builtins, 'open', mock_open()) +def test_verify_modified_files_manifest(monkeypatch, mock_collection, manifest_info): + local_collection = mock_collection() + remote_collection = mock_collection(local=False) + + monkeypatch.setattr(collection, '_get_tar_file_hash', MagicMock(side_effect=['manifest_checksum'])) + monkeypatch.setattr(collection, '_consume_file', MagicMock(side_effect=['manifest_checksum', 'files_manifest_checksum_modified'])) + monkeypatch.setattr(collection, '_get_json_from_tar_file', MagicMock(side_effect=[manifest_info, {'files': []}])) + monkeypatch.setattr(collection.os.path, 'isfile', MagicMock(return_value=True)) + + with patch.object(collection.display, 'display') as mock_display: + with patch.object(collection.display, 'vvv') as mock_debug: + local_collection.verify(remote_collection, './', './') + + namespace = local_collection.namespace + name = local_collection.name + + assert mock_display.call_count == 3 + assert mock_display.call_args_list[0][0][0] == 'Collection %s.%s contains modified content in the following files:' % (namespace, name) + assert mock_display.call_args_list[1][0][0] == '%s.%s' % (namespace, name) + assert mock_display.call_args_list[2][0][0] == ' FILES.json' + + # The -vvv output should show details (the checksums do not match) + assert mock_debug.call_count == 5 + assert mock_debug.call_args_list[-1][0][0] == ' Expected: files_manifest_checksum\n Found: files_manifest_checksum_modified' + + +@patch.object(builtins, 'open', mock_open()) +def test_verify_modified_files(monkeypatch, mock_collection, manifest_info, files_manifest_info): + + local_collection = mock_collection() + remote_collection = mock_collection(local=False) + + monkeypatch.setattr(collection, '_get_tar_file_hash', MagicMock(side_effect=['manifest_checksum'])) + fakehashes = ['manifest_checksum', 'files_manifest_checksum', 'individual_file_checksum_modified'] + monkeypatch.setattr(collection, '_consume_file', MagicMock(side_effect=fakehashes)) + monkeypatch.setattr(collection, '_get_json_from_tar_file', MagicMock(side_effect=[manifest_info, files_manifest_info])) + monkeypatch.setattr(collection.os.path, 'isfile', MagicMock(return_value=True)) + + with patch.object(collection.display, 'display') as mock_display: + with patch.object(collection.display, 'vvv') as mock_debug: + local_collection.verify(remote_collection, './', './') + + namespace = local_collection.namespace + name = local_collection.name + + assert mock_display.call_count == 3 + assert mock_display.call_args_list[0][0][0] == 'Collection %s.%s contains modified content in the following files:' % (namespace, name) + assert mock_display.call_args_list[1][0][0] == '%s.%s' % (namespace, name) + assert mock_display.call_args_list[2][0][0] == ' README.md' + + # The -vvv output should show details (the checksums do not match) + assert mock_debug.call_count == 5 + assert mock_debug.call_args_list[-1][0][0] == ' Expected: individual_file_checksum\n Found: individual_file_checksum_modified' + + +@patch.object(builtins, 'open', mock_open()) +def test_verify_identical(monkeypatch, mock_collection, manifest_info, files_manifest_info): + + local_collection = mock_collection() + remote_collection = mock_collection(local=False) + + monkeypatch.setattr(collection, '_get_tar_file_hash', MagicMock(side_effect=['manifest_checksum'])) + monkeypatch.setattr(collection, '_consume_file', MagicMock(side_effect=['manifest_checksum', 'files_manifest_checksum', 'individual_file_checksum'])) + monkeypatch.setattr(collection, '_get_json_from_tar_file', MagicMock(side_effect=[manifest_info, files_manifest_info])) + monkeypatch.setattr(collection.os.path, 'isfile', MagicMock(return_value=True)) + + with patch.object(collection.display, 'display') as mock_display: + with patch.object(collection.display, 'vvv') as mock_debug: + local_collection.verify(remote_collection, './', './') + + # Successful verification is quiet + assert mock_display.call_count == 0 + + # The -vvv output should show the checksums not matching + namespace = local_collection.namespace + name = local_collection.name + version = local_collection.latest_version + success_msg = "Successfully verified that checksums for '%s.%s:%s' match the remote collection" % (namespace, name, version) + + assert mock_debug.call_count == 4 + assert mock_debug.call_args_list[-1][0][0] == success_msg + + +@patch.object(collection.CollectionRequirement, 'verify') +def test_verify_collections_not_installed(mock_verify, mock_collection, monkeypatch): + namespace = 'ansible_namespace' + name = 'collection' + version = '1.0.0' + + local_collection = mock_collection(local_installed=False) + + found_remote = MagicMock(return_value=mock_collection(local=False)) + monkeypatch.setattr(collection.CollectionRequirement, 'from_name', found_remote) + + collections = [('%s.%s' % (namespace, name), version, None)] + search_path = './' + validate_certs = False + ignore_errors = False + apis = [local_collection.api] + + with patch.object(collection, '_download_file') as mock_download_file: + with pytest.raises(AnsibleError) as err: + collection.verify_collections(collections, search_path, apis, validate_certs, ignore_errors) + + assert err.value.message == "Collection %s.%s is not installed in any of the collection paths." % (namespace, name) + + +@patch.object(collection.CollectionRequirement, 'verify') +def test_verify_collections_not_installed_ignore_errors(mock_verify, mock_collection, monkeypatch): + namespace = 'ansible_namespace' + name = 'collection' + version = '1.0.0' + + local_collection = mock_collection(local_installed=False) + + found_remote = MagicMock(return_value=mock_collection(local=False)) + monkeypatch.setattr(collection.CollectionRequirement, 'from_name', found_remote) + + collections = [('%s.%s' % (namespace, name), version, None)] + search_path = './' + validate_certs = False + ignore_errors = True + apis = [local_collection.api] + + with patch.object(collection, '_download_file') as mock_download_file: + with patch.object(Display, 'warning') as mock_warning: + collection.verify_collections(collections, search_path, apis, validate_certs, ignore_errors) + + skip_message = "Failed to verify collection %s.%s but skipping due to --ignore-errors being set." % (namespace, name) + original_err = "Error: Collection %s.%s is not installed in any of the collection paths." % (namespace, name) + + assert mock_warning.called + assert mock_warning.call_args[0][0] == skip_message + " " + original_err + + +@patch.object(os.path, 'isdir', return_value=True) +@patch.object(collection.CollectionRequirement, 'verify') +def test_verify_collections_no_remote(mock_verify, mock_isdir, mock_collection): + namespace = 'ansible_namespace' + name = 'collection' + version = '1.0.0' + + collections = [('%s.%s' % (namespace, name), version, None)] + search_path = './' + validate_certs = False + ignore_errors = False + apis = [] + + with pytest.raises(AnsibleError) as err: + collection.verify_collections(collections, search_path, apis, validate_certs, ignore_errors) + + assert err.value.message == "Failed to find remote collection %s.%s:%s on any of the galaxy servers" % (namespace, name, version) + + +@patch.object(os.path, 'isdir', return_value=True) +@patch.object(collection.CollectionRequirement, 'verify') +def test_verify_collections_no_remote_ignore_errors(mock_verify, mock_isdir, mock_collection): + namespace = 'ansible_namespace' + name = 'collection' + version = '1.0.0' + + local_collection = mock_collection(local_installed=False) + + collections = [('%s.%s' % (namespace, name), version, None)] + search_path = './' + validate_certs = False + ignore_errors = True + apis = [] + + with patch.object(Display, 'warning') as mock_warning: + collection.verify_collections(collections, search_path, apis, validate_certs, ignore_errors) + + skip_message = "Failed to verify collection %s.%s but skipping due to --ignore-errors being set." % (namespace, name) + original_err = "Error: Failed to find remote collection %s.%s:%s on any of the galaxy servers" % (namespace, name, version) + + assert mock_warning.called + assert mock_warning.call_args[0][0] == skip_message + " " + original_err + + +def test_verify_collections_tarfile(monkeypatch): + + monkeypatch.setattr(os.path, 'isfile', MagicMock(return_value=True)) + + invalid_format = 'ansible_namespace-collection-0.1.0.tar.gz' + collections = [(invalid_format, '*', None)] + + with pytest.raises(AnsibleError) as err: + collection.verify_collections(collections, './', [], False, False) + + msg = "'%s' is not a valid collection name. The format namespace.name is expected." % invalid_format + assert err.value.message == msg + + +def test_verify_collections_path(monkeypatch): + + monkeypatch.setattr(os.path, 'isfile', MagicMock(return_value=False)) + + invalid_format = 'collections/collection_namespace/collection_name' + collections = [(invalid_format, '*', None)] + + with pytest.raises(AnsibleError) as err: + collection.verify_collections(collections, './', [], False, False) + + msg = "'%s' is not a valid collection name. The format namespace.name is expected." % invalid_format + assert err.value.message == msg + + +def test_verify_collections_url(monkeypatch): + + monkeypatch.setattr(os.path, 'isfile', MagicMock(return_value=False)) + + invalid_format = 'https://galaxy.ansible.com/download/ansible_namespace-collection-0.1.0.tar.gz' + collections = [(invalid_format, '*', None)] + + with pytest.raises(AnsibleError) as err: + collection.verify_collections(collections, './', [], False, False) + + msg = "'%s' is not a valid collection name. The format namespace.name is expected." % invalid_format + assert err.value.message == msg + + +@patch.object(os.path, 'isfile', return_value=False) +@patch.object(os.path, 'isdir', return_value=True) +@patch.object(collection.CollectionRequirement, 'verify') +def test_verify_collections_name(mock_verify, mock_isdir, mock_isfile, mock_collection, monkeypatch): + local_collection = mock_collection() + monkeypatch.setattr(collection.CollectionRequirement, 'from_path', MagicMock(return_value=local_collection)) + + located_remote_from_name = MagicMock(return_value=mock_collection(local=False)) + monkeypatch.setattr(collection.CollectionRequirement, 'from_name', located_remote_from_name) + + with patch.object(collection, '_download_file') as mock_download_file: + + collections = [('%s.%s' % (local_collection.namespace, local_collection.name), '%s' % local_collection.latest_version, None)] + search_path = './' + validate_certs = False + ignore_errors = False + apis = [local_collection.api] + + collection.verify_collections(collections, search_path, apis, validate_certs, ignore_errors) + + assert mock_download_file.call_count == 1 + assert located_remote_from_name.call_count == 1 +>>>>>>> a20a527014... ansible-galaxy - Fix tar path traversal issue during install - CVE-2020-10691 (#68596)