Skip to content

Commit

Permalink
fix: Removing VFS termination from sync_outputs (#175)
Browse files Browse the repository at this point in the history
* fix: Removing VFS termination from sync_outputs

Signed-off-by: Nathan Matthews <natmatn@amazon.com>

* Adding session cleanup function to terminate fus3 processes

Signed-off-by: Nathan Matthews <natmatn@amazon.com>

* Fixing lint warning

Signed-off-by: Nathan Matthews <natmatn@amazon.com>

---------

Signed-off-by: Nathan Matthews <natmatn@amazon.com>
  • Loading branch information
natmatn committed Feb 8, 2024
1 parent 99ebaea commit ef782bf
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 62 deletions.
125 changes: 63 additions & 62 deletions src/deadline/job_attachments/asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,71 +485,72 @@ def sync_outputs(

storage_profiles_source_paths = list(storage_profiles_path_mapping_rules.keys())

try:
for manifest_properties in attachments.manifests:
local_root: Path = Path()
if (
len(storage_profiles_path_mapping_rules) > 0
and manifest_properties.fileSystemLocationName
):
if manifest_properties.rootPath in storage_profiles_source_paths:
local_root = Path(
storage_profiles_path_mapping_rules[manifest_properties.rootPath]
)
else:
raise AssetSyncError(
"Error occurred while attempting to sync output files: "
f"No path mapping rule found for the source path {manifest_properties.rootPath}"
)
else:
dir_name: str = _get_unique_dest_dir_name(manifest_properties.rootPath)
local_root = session_dir.joinpath(dir_name)

output_files: List[OutputFile] = self._get_output_files(
manifest_properties,
s3_settings,
local_root,
start_time,
)
if output_files:
output_manifest = self._generate_output_manifest(output_files)
session_action_id_with_time_stamp = (
f"{_float_to_iso_datetime_string(start_time)}_{session_action_id}"
)
full_output_prefix = s3_settings.full_output_prefix(
farm_id=self.farm_id,
queue_id=queue_id,
job_id=job_id,
step_id=step_id,
task_id=task_id,
session_action_id=session_action_id_with_time_stamp,
for manifest_properties in attachments.manifests:
local_root: Path = Path()
if (
len(storage_profiles_path_mapping_rules) > 0
and manifest_properties.fileSystemLocationName
):
if manifest_properties.rootPath in storage_profiles_source_paths:
local_root = Path(
storage_profiles_path_mapping_rules[manifest_properties.rootPath]
)
self._upload_output_manifest_to_s3(
s3_settings=s3_settings,
output_manifest=output_manifest,
full_output_prefix=full_output_prefix,
root_path=manifest_properties.rootPath,
file_system_location_name=manifest_properties.fileSystemLocationName,
else:
raise AssetSyncError(
"Error occurred while attempting to sync output files: "
f"No path mapping rule found for the source path {manifest_properties.rootPath}"
)
all_output_files.extend(output_files)
else:
dir_name: str = _get_unique_dest_dir_name(manifest_properties.rootPath)
local_root = session_dir.joinpath(dir_name)

if all_output_files:
num_output_files = len(all_output_files)
self.logger.info(
f"Uploading {num_output_files} output file{'' if num_output_files == 1 else 's'}"
f" to S3: {s3_settings.s3BucketName}/{s3_settings.full_cas_prefix()}"
output_files: List[OutputFile] = self._get_output_files(
manifest_properties,
s3_settings,
local_root,
start_time,
)
if output_files:
output_manifest = self._generate_output_manifest(output_files)
session_action_id_with_time_stamp = (
f"{_float_to_iso_datetime_string(start_time)}_{session_action_id}"
)
summary_stats: SummaryStatistics = self._upload_output_files_to_s3(
s3_settings, all_output_files, on_uploading_files
full_output_prefix = s3_settings.full_output_prefix(
farm_id=self.farm_id,
queue_id=queue_id,
job_id=job_id,
step_id=step_id,
task_id=task_id,
session_action_id=session_action_id_with_time_stamp,
)
else:
summary_stats = SummaryStatistics()
finally:
if attachments.fileSystem == JobAttachmentsFileSystem.VIRTUAL.value:
try:
Fus3ProcessManager.find_fus3()
# Shutdown all running Fus3 processes since task is completed
Fus3ProcessManager.kill_all_processes(session_dir=session_dir)
except Fus3ExecutableMissingError:
logger.error("Virtual File System not found, no processes to kill.")
self._upload_output_manifest_to_s3(
s3_settings=s3_settings,
output_manifest=output_manifest,
full_output_prefix=full_output_prefix,
root_path=manifest_properties.rootPath,
file_system_location_name=manifest_properties.fileSystemLocationName,
)
all_output_files.extend(output_files)

if all_output_files:
num_output_files = len(all_output_files)
self.logger.info(
f"Uploading {num_output_files} output file{'' if num_output_files == 1 else 's'}"
f" to S3: {s3_settings.s3BucketName}/{s3_settings.full_cas_prefix()}"
)
summary_stats: SummaryStatistics = self._upload_output_files_to_s3(
s3_settings, all_output_files, on_uploading_files
)
else:
summary_stats = SummaryStatistics()
return summary_stats

def cleanup_session(self, session_dir: Path, file_system: JobAttachmentsFileSystem):
if file_system == JobAttachmentsFileSystem.COPIED.value:
return
try:
Fus3ProcessManager.find_fus3()
# Shutdown all running Deadline VFS processes since session is complete
Fus3ProcessManager.kill_all_processes(session_dir=session_dir)
except Fus3ExecutableMissingError:
logger.error("Virtual File System not found, no processes to kill.")
21 changes: 21 additions & 0 deletions test/unit/deadline_job_attachments/test_asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from deadline.job_attachments.models import (
Attachments,
Job,
JobAttachmentsFileSystem,
JobAttachmentS3Settings,
ManifestProperties,
PathFormat,
Expand Down Expand Up @@ -882,3 +883,23 @@ def test_sync_inputs_successful_using_vfs_fallback(
}
]
mock_mount_vfs.assert_not_called()

def test_cleanup_session_fus3_terminate_called(self, tmp_path):
with patch(
f"{deadline.__package__}.job_attachments.asset_sync.Fus3ProcessManager.find_fus3",
) as mock_find_fus3, patch(
f"{deadline.__package__}.job_attachments.asset_sync.Fus3ProcessManager.kill_all_processes",
):
self.default_asset_sync.cleanup_session(
session_dir=tmp_path,
file_system=JobAttachmentsFileSystem.COPIED,
)

mock_find_fus3.assert_not_called()

self.default_asset_sync.cleanup_session(
session_dir=tmp_path,
file_system=JobAttachmentsFileSystem.VIRTUAL,
)

mock_find_fus3.assert_called_once()

0 comments on commit ef782bf

Please sign in to comment.