From ef782bffad18786d58363ed4bc339d4b4a237479 Mon Sep 17 00:00:00 2001 From: Nathan Matthews <29493311+natmatn@users.noreply.github.com> Date: Thu, 8 Feb 2024 15:22:06 -0800 Subject: [PATCH] fix: Removing VFS termination from sync_outputs (#175) * fix: Removing VFS termination from sync_outputs Signed-off-by: Nathan Matthews * Adding session cleanup function to terminate fus3 processes Signed-off-by: Nathan Matthews * Fixing lint warning Signed-off-by: Nathan Matthews --------- Signed-off-by: Nathan Matthews --- src/deadline/job_attachments/asset_sync.py | 125 +++++++++--------- .../test_asset_sync.py | 21 +++ 2 files changed, 84 insertions(+), 62 deletions(-) diff --git a/src/deadline/job_attachments/asset_sync.py b/src/deadline/job_attachments/asset_sync.py index eb8f1b06..e99261bf 100644 --- a/src/deadline/job_attachments/asset_sync.py +++ b/src/deadline/job_attachments/asset_sync.py @@ -485,71 +485,72 @@ def sync_outputs( storage_profiles_source_paths = list(storage_profiles_path_mapping_rules.keys()) - try: - for manifest_properties in attachments.manifests: - local_root: Path = Path() - if ( - len(storage_profiles_path_mapping_rules) > 0 - and manifest_properties.fileSystemLocationName - ): - if manifest_properties.rootPath in storage_profiles_source_paths: - local_root = Path( - storage_profiles_path_mapping_rules[manifest_properties.rootPath] - ) - else: - raise AssetSyncError( - "Error occurred while attempting to sync output files: " - f"No path mapping rule found for the source path {manifest_properties.rootPath}" - ) - else: - dir_name: str = _get_unique_dest_dir_name(manifest_properties.rootPath) - local_root = session_dir.joinpath(dir_name) - - output_files: List[OutputFile] = self._get_output_files( - manifest_properties, - s3_settings, - local_root, - start_time, - ) - if output_files: - output_manifest = self._generate_output_manifest(output_files) - session_action_id_with_time_stamp = ( - f"{_float_to_iso_datetime_string(start_time)}_{session_action_id}" - ) - full_output_prefix = s3_settings.full_output_prefix( - farm_id=self.farm_id, - queue_id=queue_id, - job_id=job_id, - step_id=step_id, - task_id=task_id, - session_action_id=session_action_id_with_time_stamp, + for manifest_properties in attachments.manifests: + local_root: Path = Path() + if ( + len(storage_profiles_path_mapping_rules) > 0 + and manifest_properties.fileSystemLocationName + ): + if manifest_properties.rootPath in storage_profiles_source_paths: + local_root = Path( + storage_profiles_path_mapping_rules[manifest_properties.rootPath] ) - self._upload_output_manifest_to_s3( - s3_settings=s3_settings, - output_manifest=output_manifest, - full_output_prefix=full_output_prefix, - root_path=manifest_properties.rootPath, - file_system_location_name=manifest_properties.fileSystemLocationName, + else: + raise AssetSyncError( + "Error occurred while attempting to sync output files: " + f"No path mapping rule found for the source path {manifest_properties.rootPath}" ) - all_output_files.extend(output_files) + else: + dir_name: str = _get_unique_dest_dir_name(manifest_properties.rootPath) + local_root = session_dir.joinpath(dir_name) - if all_output_files: - num_output_files = len(all_output_files) - self.logger.info( - f"Uploading {num_output_files} output file{'' if num_output_files == 1 else 's'}" - f" to S3: {s3_settings.s3BucketName}/{s3_settings.full_cas_prefix()}" + output_files: List[OutputFile] = self._get_output_files( + manifest_properties, + s3_settings, + local_root, + start_time, + ) + if output_files: + output_manifest = self._generate_output_manifest(output_files) + session_action_id_with_time_stamp = ( + f"{_float_to_iso_datetime_string(start_time)}_{session_action_id}" ) - summary_stats: SummaryStatistics = self._upload_output_files_to_s3( - s3_settings, all_output_files, on_uploading_files + full_output_prefix = s3_settings.full_output_prefix( + farm_id=self.farm_id, + queue_id=queue_id, + job_id=job_id, + step_id=step_id, + task_id=task_id, + session_action_id=session_action_id_with_time_stamp, ) - else: - summary_stats = SummaryStatistics() - finally: - if attachments.fileSystem == JobAttachmentsFileSystem.VIRTUAL.value: - try: - Fus3ProcessManager.find_fus3() - # Shutdown all running Fus3 processes since task is completed - Fus3ProcessManager.kill_all_processes(session_dir=session_dir) - except Fus3ExecutableMissingError: - logger.error("Virtual File System not found, no processes to kill.") + self._upload_output_manifest_to_s3( + s3_settings=s3_settings, + output_manifest=output_manifest, + full_output_prefix=full_output_prefix, + root_path=manifest_properties.rootPath, + file_system_location_name=manifest_properties.fileSystemLocationName, + ) + all_output_files.extend(output_files) + + if all_output_files: + num_output_files = len(all_output_files) + self.logger.info( + f"Uploading {num_output_files} output file{'' if num_output_files == 1 else 's'}" + f" to S3: {s3_settings.s3BucketName}/{s3_settings.full_cas_prefix()}" + ) + summary_stats: SummaryStatistics = self._upload_output_files_to_s3( + s3_settings, all_output_files, on_uploading_files + ) + else: + summary_stats = SummaryStatistics() return summary_stats + + def cleanup_session(self, session_dir: Path, file_system: JobAttachmentsFileSystem): + if file_system == JobAttachmentsFileSystem.COPIED.value: + return + try: + Fus3ProcessManager.find_fus3() + # Shutdown all running Deadline VFS processes since session is complete + Fus3ProcessManager.kill_all_processes(session_dir=session_dir) + except Fus3ExecutableMissingError: + logger.error("Virtual File System not found, no processes to kill.") diff --git a/test/unit/deadline_job_attachments/test_asset_sync.py b/test/unit/deadline_job_attachments/test_asset_sync.py index 09cf0f77..4d659200 100644 --- a/test/unit/deadline_job_attachments/test_asset_sync.py +++ b/test/unit/deadline_job_attachments/test_asset_sync.py @@ -26,6 +26,7 @@ from deadline.job_attachments.models import ( Attachments, Job, + JobAttachmentsFileSystem, JobAttachmentS3Settings, ManifestProperties, PathFormat, @@ -882,3 +883,23 @@ def test_sync_inputs_successful_using_vfs_fallback( } ] mock_mount_vfs.assert_not_called() + + def test_cleanup_session_fus3_terminate_called(self, tmp_path): + with patch( + f"{deadline.__package__}.job_attachments.asset_sync.Fus3ProcessManager.find_fus3", + ) as mock_find_fus3, patch( + f"{deadline.__package__}.job_attachments.asset_sync.Fus3ProcessManager.kill_all_processes", + ): + self.default_asset_sync.cleanup_session( + session_dir=tmp_path, + file_system=JobAttachmentsFileSystem.COPIED, + ) + + mock_find_fus3.assert_not_called() + + self.default_asset_sync.cleanup_session( + session_dir=tmp_path, + file_system=JobAttachmentsFileSystem.VIRTUAL, + ) + + mock_find_fus3.assert_called_once()