diff --git a/src/deadline/job_attachments/asset_sync.py b/src/deadline/job_attachments/asset_sync.py index 7b82fe6c..d5f430fc 100644 --- a/src/deadline/job_attachments/asset_sync.py +++ b/src/deadline/job_attachments/asset_sync.py @@ -56,7 +56,7 @@ PathFormat, ) from .upload import S3AssetUploader -from .os_file_permission import FileSystemPermissionSettings +from .os_file_permission import FileSystemPermissionSettings, PosixFileSystemPermissionSettings from ._utils import ( _float_to_iso_datetime_string, _get_unique_dest_dir_name, @@ -461,6 +461,7 @@ def sync_inputs( and fs_permission_settings is not None and os_env_vars is not None and "AWS_PROFILE" in os_env_vars + and isinstance(fs_permission_settings, PosixFileSystemPermissionSettings) ): try: VFSProcessManager.find_vfs() @@ -469,8 +470,7 @@ def sync_inputs( manifests_by_root=merged_manifests_by_root, boto3_session=self.session, session_dir=session_dir, - os_user=fs_permission_settings.os_user, # type: ignore[union-attr] - os_group=getattr(fs_permission_settings, "os_group", ""), + fs_permission_settings=fs_permission_settings, # type: ignore[arg-type] os_env_vars=os_env_vars, # type: ignore[arg-type] cas_prefix=s3_settings.full_cas_prefix(), ) diff --git a/src/deadline/job_attachments/download.py b/src/deadline/job_attachments/download.py index dc4f2aef..be51c8e8 100644 --- a/src/deadline/job_attachments/download.py +++ b/src/deadline/job_attachments/download.py @@ -66,6 +66,14 @@ S3_DOWNLOAD_MAX_CONCURRENCY = 10 VFS_CACHE_REL_PATH_IN_SESSION = ".vfs_object_cache" +VFS_MERGED_MANIFEST_FOLDER_IN_SESSION = ".vfs_manifests" + +VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS = PosixFileSystemPermissionSettings( + os_user="", + os_group="", + dir_mode=0o31, + file_mode=0o64, +) def get_manifest_from_s3( @@ -878,14 +886,26 @@ def merge_asset_manifests(manifests: list[BaseAssetManifest]) -> BaseAssetManife return output_manifest -def write_manifest_to_temp_file(manifest: BaseAssetManifest) -> str: +def _write_manifest_to_temp_file(manifest: BaseAssetManifest, dir: Path) -> str: with NamedTemporaryFile( - suffix=".json", prefix="deadline-merged-manifest-", delete=False, mode="w" + suffix=".json", prefix="deadline-merged-manifest-", delete=False, mode="w", dir=dir ) as file: file.write(manifest.encode()) return file.name +def _read_manifest_file(input_manifest_path: Path): + """ + Given a manifest path, open the file at that location and decode + Args: + input_manifest_path: Path to manifest + Returns: + BaseAssetManifest : Single decoded manifest + """ + with open(input_manifest_path) as input_manifest_file: + return decode_manifest(input_manifest_file.read()) + + def handle_existing_vfs( manifest: BaseAssetManifest, session_dir: Path, mount_point: str, os_user: str ) -> BaseAssetManifest: @@ -901,15 +921,14 @@ def handle_existing_vfs( Returns: BaseAssetManifest : A single manifest containing the merged paths or the original manifest """ - if not os.path.ismount(mount_point): + if not VFSProcessManager.is_mount(mount_point): return manifest input_manifest_path: Optional[Path] = VFSProcessManager.get_manifest_path_for_mount( session_dir=session_dir, mount_point=mount_point ) if input_manifest_path is not None: - with open(input_manifest_path) as input_manifest_file: - input_manifest: BaseAssetManifest = decode_manifest(input_manifest_file.read()) + input_manifest = _read_manifest_file(input_manifest_path) merged_input_manifest: Optional[BaseAssetManifest] = merge_asset_manifests( [input_manifest, manifest] @@ -931,9 +950,8 @@ def mount_vfs_from_manifests( manifests_by_root: dict[str, BaseAssetManifest], boto3_session: boto3.Session, session_dir: Path, - os_user: str, - os_group: str, os_env_vars: dict[str, str], + fs_permission_settings: FileSystemPermissionSettings, cas_prefix: Optional[str] = None, ) -> None: """ @@ -952,7 +970,8 @@ def mount_vfs_from_manifests( Returns: None """ - + if not isinstance(fs_permission_settings, PosixFileSystemPermissionSettings): + raise TypeError("VFS can only be mounted from manifests on posix file systems.") vfs_cache_dir: Path = session_dir / VFS_CACHE_REL_PATH_IN_SESSION asset_cache_hash_path: Path = vfs_cache_dir if cas_prefix is not None: @@ -961,26 +980,39 @@ def mount_vfs_from_manifests( asset_cache_hash_path.mkdir(parents=True, exist_ok=True) + _set_fs_group([str(asset_cache_hash_path)], str(vfs_cache_dir), fs_permission_settings) + + manifest_dir: Path = session_dir / VFS_MERGED_MANIFEST_FOLDER_IN_SESSION + manifest_dir.mkdir(parents=True, exist_ok=True) + manifest_dir_permissions = VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS + manifest_dir_permissions.os_user = fs_permission_settings.os_user + manifest_dir_permissions.os_group = fs_permission_settings.os_group + + _set_fs_group([str(manifest_dir)], str(manifest_dir), manifest_dir_permissions) + for mount_point, manifest in manifests_by_root.items(): # Validate the file paths to see if they are under the given download directory. _ensure_paths_within_directory( mount_point, [path.path for path in manifest.paths] # type: ignore ) final_manifest: BaseAssetManifest = handle_existing_vfs( - manifest=manifest, session_dir=session_dir, mount_point=mount_point, os_user=os_user + manifest=manifest, + session_dir=session_dir, + mount_point=mount_point, + os_user=fs_permission_settings.os_user, ) # Write out a temporary file with the contents of the newly merged manifest - manifest_path: str = write_manifest_to_temp_file(final_manifest) + manifest_path: str = _write_manifest_to_temp_file(final_manifest, dir=manifest_dir) vfs_manager: VFSProcessManager = VFSProcessManager( s3_bucket, boto3_session.region_name, manifest_path, mount_point, - os_user, + fs_permission_settings.os_user, os_env_vars, - os_group, + getattr(fs_permission_settings, "os_group", ""), cas_prefix, str(vfs_cache_dir), ) diff --git a/test/unit/deadline_job_attachments/test_asset_sync.py b/test/unit/deadline_job_attachments/test_asset_sync.py index 85e82080..30a1ea6d 100644 --- a/test/unit/deadline_job_attachments/test_asset_sync.py +++ b/test/unit/deadline_job_attachments/test_asset_sync.py @@ -432,7 +432,7 @@ def test_sync_inputs_with_step_dependencies_same_root_vfs_on_posix( ), patch( f"{deadline.__package__}.job_attachments.asset_sync.merge_asset_manifests", ) as merge_manifests_mock, patch( - f"{deadline.__package__}.job_attachments.download.write_manifest_to_temp_file", + f"{deadline.__package__}.job_attachments.download._write_manifest_to_temp_file", return_value="tmp_manifest", ), patch( "sys.platform", "linux" diff --git a/test/unit/deadline_job_attachments/test_download.py b/test/unit/deadline_job_attachments/test_download.py index 5beac7d0..df7bfe4c 100644 --- a/test/unit/deadline_job_attachments/test_download.py +++ b/test/unit/deadline_job_attachments/test_download.py @@ -12,6 +12,7 @@ import json from pathlib import Path import sys +import tempfile from typing import Any, Callable, List from unittest.mock import MagicMock, call, patch @@ -41,10 +42,15 @@ get_job_input_paths_by_asset_root, get_job_output_paths_by_asset_root, get_manifest_from_s3, + handle_existing_vfs, + mount_vfs_from_manifests, merge_asset_manifests, _ensure_paths_within_directory, _get_asset_root_from_s3, _get_tasks_manifests_keys_from_s3, + VFS_CACHE_REL_PATH_IN_SESSION, + VFS_MERGED_MANIFEST_FOLDER_IN_SESSION, + VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS, ) from deadline.job_attachments.exceptions import ( AssetSyncError, @@ -2255,3 +2261,140 @@ def download_file(*args): ) assert sorted(downloaded_files) == ["a.txt", "b.txt", "c.txt", "d.txt"] + + +def test_handle_existing_vfs_no_mount_returns(test_manifest_one: dict): + """ + Test that handling an existing manifest for a non existent mount returns the manifest + """ + manifest = decode_manifest(json.dumps(test_manifest_one)) + with patch( + f"{deadline.__package__}.job_attachments.download.VFSProcessManager.is_mount", + return_value=False, + ) as mock_is_mount: + result_manifest = handle_existing_vfs( + manifest, Path("/some/session/dir"), "/not/a/mount", "test-user" + ) + mock_is_mount.assert_called_once_with("/not/a/mount") + assert manifest == result_manifest + + +def test_handle_existing_vfs_success( + test_manifest_one: dict, test_manifest_two: dict, merged_manifest: dict +): + """ + Test that handling an existing manifest for a mount which exists attempts to merge the manifests and + shut down the mount + """ + manifest_one = decode_manifest(json.dumps(test_manifest_one)) + manifest_two = decode_manifest(json.dumps(test_manifest_two)) + merged_decoded = decode_manifest(json.dumps(merged_manifest)) + session_path = Path("/some/session/dir") + with patch( + f"{deadline.__package__}.job_attachments.download.VFSProcessManager.is_mount", + return_value=True, + ) as mock_is_mount, patch( + f"{deadline.__package__}.job_attachments.download.VFSProcessManager.get_manifest_path_for_mount", + return_value="/some/manifest/path", + ) as mock_get_manifest_path, patch( + f"{deadline.__package__}.job_attachments.download._read_manifest_file", + return_value=manifest_one, + ) as mock_decode_manifest, patch( + f"{deadline.__package__}.job_attachments.download.VFSProcessManager.kill_process_at_mount", + ) as mock_kill_process: + result_manifest = handle_existing_vfs( + manifest_two, session_path, "/some/mount", "test-user" + ) + mock_is_mount.assert_called_once_with("/some/mount") + mock_get_manifest_path.assert_called_once_with( + session_dir=session_path, mount_point="/some/mount" + ) + mock_decode_manifest.assert_called_once_with("/some/manifest/path") + mock_kill_process.assert_called_once_with( + session_dir=session_path, mount_point="/some/mount", os_user="test-user" + ) + assert result_manifest == merged_decoded + + +@pytest.mark.skipif( + sys.platform == "win32", + reason="This VFS test is currently not valid for windows - VFS is a linux only feature currently.", +) +def test_mount_vfs_from_manifests( + test_manifest_one: dict, test_manifest_two: dict, merged_manifest: dict +): + """ + Test that handling an existing manifest for a mount which exists attempts to merge the manifests and + shut down the mount + """ + manifest_one = decode_manifest(json.dumps(test_manifest_one)) + manifest_two = decode_manifest(json.dumps(test_manifest_two)) + merged_decoded = decode_manifest(json.dumps(merged_manifest)) + temp_dir = tempfile.TemporaryDirectory() + temp_dir_path = Path(temp_dir.name) + manifests_by_root = {"/some/root/one": manifest_one, "/some/root/two": manifest_two} + fs_permissions = PosixFileSystemPermissionSettings("test-user", "test-group", 0o31, 0o66) + manifest_permissions = PosixFileSystemPermissionSettings( + fs_permissions.os_user, + fs_permissions.os_group, + VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS.dir_mode, + VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS.file_mode, + ) + + cache_path = temp_dir_path / VFS_CACHE_REL_PATH_IN_SESSION + manifest_path = temp_dir_path / VFS_MERGED_MANIFEST_FOLDER_IN_SESSION + with patch( + f"{deadline.__package__}.job_attachments.download._set_fs_group", + ) as mock_set_vs_group, patch( + f"{deadline.__package__}.job_attachments.download.handle_existing_vfs", + return_value=merged_decoded, + ) as mock_handle_existing, patch( + f"{deadline.__package__}.job_attachments.download._write_manifest_to_temp_file", + ) as mock_write_manifest, patch( + f"{deadline.__package__}.job_attachments.download.VFSProcessManager.start", + ) as mock_vfs_start: + mount_vfs_from_manifests( + "test-bucket", + manifests_by_root, + boto3_session=boto3.Session(region_name="us-west-2"), + session_dir=temp_dir_path, + os_env_vars={}, + fs_permission_settings=fs_permissions, + cas_prefix="cas/test", + ) + # Were the cache and manifest folders created + assert os.path.isdir(cache_path) + assert os.path.isdir(manifest_path) + + # + # Did we attempt to assign the expected permissions + mock_set_vs_group.assert_has_calls( + [ + call([str(cache_path / "cas/test")], str(cache_path), fs_permissions), + call([str(manifest_path)], str(manifest_path), manifest_permissions), + ] + ) + + mock_handle_existing.assert_has_calls( + [ + call( + manifest=manifest_one, + session_dir=temp_dir_path, + mount_point="/some/root/one", + os_user="test-user", + ), + call( + manifest=manifest_two, + session_dir=temp_dir_path, + mount_point="/some/root/two", + os_user="test-user", + ), + ] + ) + + mock_write_manifest.assert_has_calls( + [call(merged_decoded, dir=manifest_path), call(merged_decoded, dir=manifest_path)] + ) + mock_vfs_start.assert_has_calls( + [call(session_dir=temp_dir_path), call(session_dir=temp_dir_path)] + )