Skip to content

Commit

Permalink
fix(JobAttachments): Ignore empty lists for job attachments (#181)
Browse files Browse the repository at this point in the history
Signed-off-by: Caden Marofke <marofke@amazon.com>
  • Loading branch information
marofke authored and YutongLi291 committed Feb 21, 2024
1 parent 346c81e commit f849152
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 46 deletions.
49 changes: 25 additions & 24 deletions src/deadline/client/api/_submit_job_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,30 +235,31 @@ def create_job_from_job_bundle(
referenced_paths=sorted(asset_references.referenced_paths),
storage_profile_id=storage_profile_id,
)

if decide_cancel_submission_callback(
upload_group.num_outside_files_by_root,
upload_group.total_input_files,
upload_group.total_input_bytes,
):
print_function_callback("Job submission canceled.")
return None

_, asset_manifests = _hash_attachments(
asset_manager=asset_manager,
asset_groups=upload_group.asset_groups,
total_input_files=upload_group.total_input_files,
total_input_bytes=upload_group.total_input_bytes,
print_function_callback=print_function_callback,
hashing_progress_callback=hashing_progress_callback,
)

attachment_settings = _upload_attachments(
asset_manager, asset_manifests, print_function_callback, upload_progress_callback
)

attachment_settings["fileSystem"] = JobAttachmentsFileSystem(job_attachments_file_system)
create_job_args["attachments"] = attachment_settings
if upload_group.asset_groups:
if decide_cancel_submission_callback(
upload_group.num_outside_files_by_root,
upload_group.total_input_files,
upload_group.total_input_bytes,
):
print_function_callback("Job submission canceled.")
return None

_, asset_manifests = _hash_attachments(
asset_manager=asset_manager,
asset_groups=upload_group.asset_groups,
total_input_files=upload_group.total_input_files,
total_input_bytes=upload_group.total_input_bytes,
print_function_callback=print_function_callback,
hashing_progress_callback=hashing_progress_callback,
)

attachment_settings = _upload_attachments(
asset_manager, asset_manifests, print_function_callback, upload_progress_callback
)
attachment_settings["fileSystem"] = JobAttachmentsFileSystem(
job_attachments_file_system
)
create_job_args["attachments"] = attachment_settings

create_job_args.update(app_parameters_formatted)

Expand Down
34 changes: 18 additions & 16 deletions src/deadline/client/ui/dialogs/submit_job_progress_dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,26 +254,28 @@ def _start_submission(self):
referenced_paths=sorted(self.asset_references.referenced_paths),
storage_profile_id=self._storage_profile_id,
)
if (
not self._auto_accept
and not self._confirm_asset_references_outside_storage_profile(
upload_group.num_outside_files_by_root,
# If we find any Job Attachments, start a background thread
if upload_group.asset_groups:
if (
not self._auto_accept
and not self._confirm_asset_references_outside_storage_profile(
upload_group.num_outside_files_by_root,
upload_group.total_input_files,
upload_group.total_input_bytes,
)
):
raise UserInitiatedCancel("Submission canceled.")

self._start_hashing(
upload_group.asset_groups,
upload_group.total_input_files,
upload_group.total_input_bytes,
)
):
raise UserInitiatedCancel("Submission canceled.")
return

self._start_hashing(
upload_group.asset_groups,
upload_group.total_input_files,
upload_group.total_input_bytes,
)

else:
self.hashing_progress.setVisible(False)
self.upload_progress.setVisible(False)
self._start_create_job()
self.hashing_progress.setVisible(False)
self.upload_progress.setVisible(False)
self._start_create_job()

def _hashing_background_thread(
self,
Expand Down
117 changes: 113 additions & 4 deletions test/unit/deadline_client/api/test_job_bundle_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from deadline.client import api, config
from deadline.client.api import _submit_job_bundle
from deadline.job_attachments.models import (
AssetRootGroup,
AssetUploadGroup,
Attachments,
JobAttachmentsFileSystem,
Expand Down Expand Up @@ -452,7 +453,9 @@ def test_create_job_from_job_bundle_job_attachments(
client_mock().get_queue.side_effect = [MOCK_GET_QUEUE_RESPONSE]
client_mock().create_job.side_effect = [MOCK_CREATE_JOB_RESPONSE]
client_mock().get_job.side_effect = [MOCK_GET_JOB_RESPONSE]
expected_upload_group = AssetUploadGroup(total_input_files=3, total_input_bytes=256)
expected_upload_group = AssetUploadGroup(
total_input_files=3, total_input_bytes=256, asset_groups=[AssetRootGroup()]
)
mock_prepare_paths.return_value = expected_upload_group
mock_upload_assets.return_value = [
SummaryStatistics(),
Expand Down Expand Up @@ -521,7 +524,7 @@ def fake_print_callback(msg: str) -> None:
)
mock_hash_attachments.assert_called_once_with(
asset_manager=ANY,
asset_groups=[],
asset_groups=[AssetRootGroup()],
total_input_files=3,
total_input_bytes=256,
print_function_callback=fake_print_callback,
Expand All @@ -541,6 +544,110 @@ def fake_print_callback(msg: str) -> None:
)


def test_create_job_from_job_bundle_empty_job_attachments(
fresh_deadline_config, temp_job_bundle_dir, temp_assets_dir
):
"""
Test that when we have asset references that do not fall under Job Attachments
(for example, if under a SHARED Storage Profile Filesystem Location), no Job
Attachments calls are made.
"""
# Use a temporary directory for the job bundle
with patch.object(_submit_job_bundle.api, "get_boto3_session"), patch.object(
_submit_job_bundle.api, "get_boto3_client"
) as client_mock, patch.object(
_submit_job_bundle.api, "get_queue_user_boto3_session"
), patch.object(
api._submit_job_bundle, "_hash_attachments", return_value=(None, None)
) as mock_hash_attachments, patch.object(
S3AssetManager,
"prepare_paths_for_upload",
) as mock_prepare_paths, patch.object(
S3AssetManager, "upload_assets"
) as mock_upload_assets, patch.object(
_submit_job_bundle.api, "get_telemetry_client"
):
client_mock().get_queue.side_effect = [MOCK_GET_QUEUE_RESPONSE]
client_mock().create_job.side_effect = [MOCK_CREATE_JOB_RESPONSE]
client_mock().get_job.side_effect = [MOCK_GET_JOB_RESPONSE]

# When this function returns an empty object, we skip Job Attachments calls
expected_upload_group = AssetUploadGroup()
mock_prepare_paths.return_value = expected_upload_group

config.set_setting("defaults.farm_id", MOCK_FARM_ID)
config.set_setting("defaults.queue_id", MOCK_QUEUE_ID)
config.set_setting("settings.storage_profile_id", MOCK_STORAGE_PROFILE_ID)

# Write a JSON template
with open(os.path.join(temp_job_bundle_dir, "template.json"), "w", encoding="utf8") as f:
f.write(MOCK_JOB_TEMPLATE_CASES["MINIMAL_JSON"][1])

# Create some files in the assets dir
asset_contents = {
"asset-1.txt": "This is asset 1",
"somedir/asset-2.txt": "Asset 2",
"somedir/asset-3.bat": "@echo asset 3",
}
_write_asset_files(temp_assets_dir, asset_contents)

# Write the asset_references file
asset_references = {
"inputs": {
"filenames": [os.path.join(temp_assets_dir, "asset-1.txt")],
"directories": [os.path.join(temp_assets_dir, "somedir")],
},
"outputs": {"directories": [os.path.join(temp_assets_dir, "somedir")]},
}
with open(
os.path.join(temp_job_bundle_dir, "asset_references.json"), "w", encoding="utf8"
) as f:
json.dump({"assetReferences": asset_references}, f)

def fake_hashing_callback(metadata: ProgressReportMetadata) -> bool:
return True

def fake_upload_callback(metadata: ProgressReportMetadata) -> bool:
return True

def fake_print_callback(msg: str) -> None:
pass

# This is the function we're testing
api.create_job_from_job_bundle(
temp_job_bundle_dir,
print_function_callback=fake_print_callback,
hashing_progress_callback=fake_hashing_callback,
upload_progress_callback=fake_upload_callback,
queue_parameter_definitions=[],
)

mock_prepare_paths.assert_called_once_with(
job_bundle_path=temp_job_bundle_dir,
input_paths=sorted(
[
os.path.join(temp_assets_dir, "asset-1.txt"),
os.path.join(temp_assets_dir, os.path.normpath("somedir/asset-2.txt")),
os.path.join(temp_assets_dir, os.path.normpath("somedir/asset-3.bat")),
]
),
output_paths=[os.path.join(temp_assets_dir, "somedir")],
referenced_paths=[],
storage_profile_id=MOCK_STORAGE_PROFILE_ID,
)
mock_hash_attachments.assert_not_called()
mock_upload_assets.assert_not_called()
# Should not be called with Job Attachments
client_mock().create_job.assert_called_once_with(
farmId=MOCK_FARM_ID,
queueId=MOCK_QUEUE_ID,
template=ANY,
templateType=ANY,
priority=50,
storageProfileId=MOCK_STORAGE_PROFILE_ID,
)


def test_create_job_from_job_bundle_with_empty_asset_references(
fresh_deadline_config, temp_job_bundle_dir
):
Expand Down Expand Up @@ -621,7 +728,9 @@ def test_create_job_from_job_bundle_with_single_asset_file(
client_mock().create_job.side_effect = [MOCK_CREATE_JOB_RESPONSE]
client_mock().get_queue.side_effect = [MOCK_GET_QUEUE_RESPONSE]
client_mock().get_job.side_effect = [MOCK_GET_JOB_RESPONSE]
expected_upload_group = AssetUploadGroup(total_input_files=1, total_input_bytes=1)
expected_upload_group = AssetUploadGroup(
total_input_files=1, total_input_bytes=1, asset_groups=[AssetRootGroup()]
)
mock_prepare_paths.return_value = expected_upload_group
mock_upload_assets.return_value = [
SummaryStatistics(),
Expand Down Expand Up @@ -690,7 +799,7 @@ def fake_print_callback(msg: str) -> None:
)
mock_hash_attachments.assert_called_once_with(
asset_manager=ANY,
asset_groups=[],
asset_groups=[AssetRootGroup()],
total_input_files=1,
total_input_bytes=1,
print_function_callback=fake_print_callback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from deadline.client.exceptions import DeadlineOperationError
from deadline.job_attachments.models import (
Attachments,
AssetRootGroup,
AssetUploadGroup,
JobAttachmentsFileSystem,
AssetRootManifest,
Expand Down Expand Up @@ -262,7 +263,7 @@ def test_create_job_from_job_bundle_with_all_asset_ref_variants(
client_mock().create_job.side_effect = [MOCK_CREATE_JOB_RESPONSE]
client_mock().get_queue.side_effect = [MOCK_GET_QUEUE_RESPONSE]
client_mock().get_job.side_effect = [MOCK_GET_JOB_RESPONSE]
mock_prepare_paths.return_value = AssetUploadGroup()
mock_prepare_paths.return_value = AssetUploadGroup(asset_groups=[AssetRootGroup()])
mock_hash_assets.return_value = [SummaryStatistics(), AssetRootManifest()]
mock_upload_assets.return_value = [
SummaryStatistics(),
Expand Down Expand Up @@ -382,7 +383,7 @@ def test_create_job_from_job_bundle_with_all_asset_ref_variants(
storage_profile_id="",
)
mock_hash_assets.assert_called_once_with(
asset_groups=[],
asset_groups=[AssetRootGroup()],
total_input_files=0,
total_input_bytes=0,
hash_cache_dir=os.path.expanduser(os.path.join("~", ".deadline", "cache")),
Expand Down

0 comments on commit f849152

Please sign in to comment.