Skip to content

Commit

Permalink
fix(job_attachments)!: remove local storage of manifest files (#207)
Browse files Browse the repository at this point in the history
Signed-off-by: Gahyun Suh <132245153+gahyusuh@users.noreply.github.com>
  • Loading branch information
gahyusuh authored Mar 16, 2024
1 parent e936938 commit 8c5ea38
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 64 deletions.
2 changes: 1 addition & 1 deletion requirements-integ-testing.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
deadline-cloud-test-fixtures ~= 0.5.0
deadline-cloud-test-fixtures ~= 0.5.5
8 changes: 4 additions & 4 deletions scripted_tests/upload_scale_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import time

from deadline.job_attachments._aws.deadline import get_queue
from deadline.job_attachments.asset_manifests.decode import decode_manifest
from deadline.job_attachments.download import download_files_from_manifests, get_manifest_from_s3
from deadline.job_attachments.models import S3_MANIFEST_FOLDER_NAME
from deadline.job_attachments.upload import S3AssetManager
Expand Down Expand Up @@ -132,9 +131,10 @@
print("\nStarting download test...")
start = time.perf_counter()
manifest_key = f"{queue.jobAttachmentSettings.rootPrefix}/{S3_MANIFEST_FOLDER_NAME}/{attachment_settings.manifests[0].inputManifestPath}"
manifest = get_manifest_from_s3(manifest_key, queue.jobAttachmentSettings.s3BucketName)
with open(manifest) as manifest_file:
asset_manifest = decode_manifest(manifest_file.read())
asset_manifest = get_manifest_from_s3(
manifest_key, queue.jobAttachmentSettings.s3BucketName
)

download_files_from_manifests(
s3_bucket=queue.jobAttachmentSettings.s3BucketName,
manifests_by_root={"/tmp/test_download": asset_manifest},
Expand Down
13 changes: 4 additions & 9 deletions src/deadline/job_attachments/asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
SummaryStatistics,
)

from .asset_manifests.decode import decode_manifest
from .asset_manifests import (
BaseAssetManifest,
BaseManifestModel,
Expand Down Expand Up @@ -331,9 +330,7 @@ def sync_inputs(
self.logger.info(f"No attachments configured for Job {job_id}, no inputs to sync.")
return (SummaryStatistics(), [])

grouped_manifests_by_root: DefaultDict[
str, list[tuple[BaseAssetManifest, str]]
] = DefaultDict(list)
grouped_manifests_by_root: DefaultDict[str, list[BaseAssetManifest]] = DefaultDict(list)
pathmapping_rules: Dict[str, Dict[str, str]] = {}

storage_profiles_source_paths = list(storage_profiles_path_mapping_rules.keys())
Expand Down Expand Up @@ -364,14 +361,12 @@ def sync_inputs(
manifest_s3_key = s3_settings.add_root_and_manifest_folder_prefix(
manifest_properties.inputManifestPath
)
manifest_path = get_manifest_from_s3(
manifest = get_manifest_from_s3(
manifest_key=manifest_s3_key,
s3_bucket=s3_settings.s3BucketName,
session=self.session,
)
with open(manifest_path) as manifest_file:
manifest = decode_manifest(manifest_file.read())
grouped_manifests_by_root[local_root].append((manifest, manifest_path))
grouped_manifests_by_root[local_root].append(manifest)

# Handle step-step dependencies.
if step_dependencies:
Expand All @@ -392,7 +387,7 @@ def sync_inputs(
# Merge the manifests in each root into a single manifest
merged_manifests_by_root: dict[str, BaseAssetManifest] = dict()
for root, manifests in grouped_manifests_by_root.items():
merged_manifest = merge_asset_manifests([manifest[0] for manifest in manifests])
merged_manifest = merge_asset_manifests(manifests)

if merged_manifest:
merged_manifests_by_root[root] = merged_manifest
Expand Down
46 changes: 23 additions & 23 deletions src/deadline/job_attachments/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

import concurrent.futures
import io
import os
import re
import time
Expand Down Expand Up @@ -67,17 +68,21 @@

def get_manifest_from_s3(
manifest_key: str, s3_bucket: str, session: Optional[boto3.Session] = None
) -> str:
) -> BaseAssetManifest:
s3_client = get_s3_client(session=session)
try:
with NamedTemporaryFile(suffix=".json", prefix="deadline-manifest-", delete=False) as file:
s3_client.download_fileobj(
s3_bucket,
manifest_key,
file,
ExtraArgs={"ExpectedBucketOwner": get_account_id(session=session)},
)
return file.name
file_buffer = io.BytesIO()
s3_client.download_fileobj(
s3_bucket,
manifest_key,
file_buffer,
ExtraArgs={"ExpectedBucketOwner": get_account_id(session=session)},
)
byte_value = file_buffer.getvalue()
string_value = byte_value.decode("utf-8")
asset_manifest = decode_manifest(string_value)
file_buffer.close()
return asset_manifest
except ClientError as exc:
status_code = int(exc.response["ResponseMetadata"]["HTTPStatusCode"])
status_code_guidance = {
Expand Down Expand Up @@ -217,18 +222,16 @@ def get_job_input_paths_by_asset_root(
for manifest_properties in attachments.manifests:
if manifest_properties.inputManifestPath:
key = _join_s3_paths(manifest_properties.inputManifestPath)
manifest = get_manifest_from_s3(
asset_manifest = get_manifest_from_s3(
manifest_key=key,
s3_bucket=s3_settings.s3BucketName,
session=session,
)

root_path = manifest_properties.rootPath
with open(manifest) as manifest_file:
asset_manifest = decode_manifest(manifest_file.read())
if root_path not in inputs:
inputs[root_path] = ManifestPathGroup()
inputs[root_path].add_manifest_to_group(asset_manifest)
if root_path not in inputs:
inputs[root_path] = ManifestPathGroup()
inputs[root_path].add_manifest_to_group(asset_manifest)

return inputs

Expand Down Expand Up @@ -629,8 +632,7 @@ def get_job_output_paths_by_asset_root(

outputs: dict[str, ManifestPathGroup] = {}
for root, manifests in output_manifests_by_root.items():
# manifest path isn't needed here, so a variable isn't necessary
for manifest, _ in manifests:
for manifest in manifests:
if root not in outputs:
outputs[root] = ManifestPathGroup()
outputs[root].add_manifest_to_group(manifest)
Expand All @@ -647,12 +649,12 @@ def get_output_manifests_by_asset_root(
task_id: Optional[str] = None,
session_action_id: Optional[str] = None,
session: Optional[boto3.Session] = None,
) -> dict[str, list[tuple[BaseAssetManifest, str]]]:
) -> dict[str, list[BaseAssetManifest]]:
"""
For a given job/step/task, gets a map from each root path to a corresponding list of
output manifests.
"""
outputs: DefaultDict[str, list[tuple[BaseAssetManifest, str]]] = DefaultDict(list)
outputs: DefaultDict[str, list[BaseAssetManifest]] = DefaultDict(list)
manifest_prefix: str = _get_output_manifest_prefix(
s3_settings, farm_id, queue_id, job_id, step_id, task_id, session_action_id
)
Expand All @@ -664,7 +666,7 @@ def get_output_manifests_by_asset_root(
return outputs

for key in manifests_keys:
manifest_path = get_manifest_from_s3(
asset_manifest = get_manifest_from_s3(
manifest_key=key,
s3_bucket=s3_settings.s3BucketName,
session=session,
Expand All @@ -674,9 +676,7 @@ def get_output_manifests_by_asset_root(
raise MissingAssetRootError(
f"Failed to get asset root from metadata of output manifest: {key}"
)
with open(manifest_path) as manifest_file:
asset_manifest = decode_manifest(manifest_file.read())
outputs[asset_root].append((asset_manifest, manifest_path))
outputs[asset_root].append(asset_manifest)

return outputs

Expand Down
34 changes: 7 additions & 27 deletions test/unit/deadline_job_attachments/test_asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from math import trunc
from pathlib import Path
from typing import Optional, Dict
from unittest.mock import ANY, MagicMock, mock_open, patch
from unittest.mock import ANY, MagicMock, patch

import boto3
from deadline.job_attachments.progress_tracker import ProgressStatus
Expand Down Expand Up @@ -213,10 +213,7 @@ def test_sync_inputs_successful(
# WHEN
with patch(
f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3",
side_effect=[f"{local_root}/manifest.json"],
), patch("builtins.open", mock_open(read_data="test_manifest_file")), patch(
f"{deadline.__package__}.job_attachments.asset_sync.decode_manifest",
side_effect=["test_manifest_data"],
return_value="test_manifest_data",
), patch(
f"{deadline.__package__}.job_attachments.asset_sync.download_files_from_manifests",
side_effect=[DownloadSummaryStatistics()],
Expand Down Expand Up @@ -287,18 +284,13 @@ def test_sync_inputs_404_error(
job: Job = request.getfixturevalue(job_fixture_name)
s3_settings: JobAttachmentS3Settings = request.getfixturevalue(s3_settings_fixture_name)
default_queue.jobAttachmentSettings = s3_settings
session_dir = str(tmp_path)
dest_dir = "assetroot-27bggh78dd2b568ab123"
local_root = str(Path(session_dir) / dest_dir)
assert job.attachments

# WHEN
with patch(
f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3",
side_effect=[f"{local_root}/manifest.json"],
), patch("builtins.open", mock_open(read_data="test_manifest_file")), patch(
f"{deadline.__package__}.job_attachments.asset_sync.decode_manifest",
side_effect=["test_manifest_data"],
return_value="test_manifest_data",
), patch(
f"{deadline.__package__}.job_attachments.asset_sync._get_unique_dest_dir_name",
side_effect=[dest_dir],
Expand Down Expand Up @@ -349,10 +341,7 @@ def test_sync_inputs_with_step_dependencies(
# WHEN
with patch(
f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3",
side_effect=[f"{local_root}/manifest.json"],
), patch("builtins.open", mock_open(read_data="test_manifest_file")), patch(
f"{deadline.__package__}.job_attachments.asset_sync.decode_manifest",
side_effect=["test_manifest_data"],
return_value="test_manifest_data",
), patch(
f"{deadline.__package__}.job_attachments.asset_sync.download_files_from_manifests",
side_effect=[DownloadSummaryStatistics()],
Expand Down Expand Up @@ -420,8 +409,8 @@ def test_sync_inputs_with_step_dependencies_same_root_vfs_on_posix(
# WHEN
with patch(
f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3",
side_effect=[f"{local_root}/manifest.json"],
), patch("builtins.open", mock_open(read_data=json.dumps(test_manifest_one))), patch(
return_value=json.dumps(test_manifest_one),
), patch(
f"{deadline.__package__}.job_attachments.asset_sync.download_files_from_manifests",
side_effect=[DownloadSummaryStatistics()],
), patch(
Expand Down Expand Up @@ -735,12 +724,6 @@ def test_sync_inputs_with_storage_profiles_path_mapping_rules(
# WHEN
with patch(
f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3",
side_effect=[
f"{local_root}/manifest_input",
f"{local_root}/manifest-movie1_input",
],
), patch("builtins.open", mock_open(read_data="test_manifest_file")), patch(
f"{deadline.__package__}.job_attachments.asset_sync.decode_manifest",
return_value="test_manifest_data",
), patch(
f"{deadline.__package__}.job_attachments.asset_sync.download_files_from_manifests",
Expand Down Expand Up @@ -817,10 +800,7 @@ def test_sync_inputs_successful_using_vfs_fallback(
# WHEN
with patch(
f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3",
side_effect=[f"{local_root}/manifest.json"],
), patch("builtins.open", mock_open(read_data="test_manifest_file")), patch(
f"{deadline.__package__}.job_attachments.asset_sync.decode_manifest",
side_effect=["test_manifest_data"],
return_value="test_manifest_data",
), patch(
f"{deadline.__package__}.job_attachments.asset_sync.download_files_from_manifests",
side_effect=[DownloadSummaryStatistics()],
Expand Down

0 comments on commit 8c5ea38

Please sign in to comment.