Skip to content

Commit

Permalink
fix!: Validate paths for Job Bundles (#171)
Browse files Browse the repository at this point in the history
Breaking change!

Adds validation to Job Bundles, including the following:
- default PATH values in Job Bundle templates MUST be relative
  AND must resolve within the bundle directory
- that includes any symlinks within the bundle itself
- if any paths in the overall submission resolve outside of
  the job bundle directory or any configured storage profile
  filesystem locations, a warning message is raised
- `S3AssetManager` changes:
  - `prepare_paths_for_upload` was added and needs to be called
    before hash_assets_and_create_manifest
  - `hash_assets_and_create_manifest` interface changed

Signed-off-by: Caden Marofke <marofke@amazon.com>
  • Loading branch information
marofke authored Feb 7, 2024
1 parent fe4db33 commit 278e4f6
Show file tree
Hide file tree
Showing 20 changed files with 742 additions and 237 deletions.
7 changes: 5 additions & 2 deletions examples/submit_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ def process_job_attachments(farm_id, queue_id, inputs, outputDir, deadline_clien
queue_id=queue_id,
job_attachment_settings=JobAttachmentS3Settings(**queue["jobAttachmentSettings"]),
)
(_, manifests) = asset_manager.hash_assets_and_create_manifest(inputs, [outputDir], [])
upload_group = asset_manager.prepare_paths_for_upload(".", inputs, [outputDir], [])
(_, manifests) = asset_manager.hash_assets_and_create_manifest(
upload_group.asset_groups, upload_group.total_input_files, upload_group.total_input_bytes
)
(_, attachments) = asset_manager.upload_assets(manifests)
attachments = attachments.to_dict()
total = time.perf_counter() - start
Expand Down Expand Up @@ -167,7 +170,7 @@ def submit_custom_job(
deadline_client = boto3.client(
"deadline",
region_name="us-west-2",
endpoint_url="https://management.gamma.bealine-dev.us-west-2.amazonaws.com",
endpoint_url="https://management.deadline.us-west-2.amazonaws.com",
)

attachments = process_job_attachments(
Expand Down
9 changes: 6 additions & 3 deletions scripted_tests/upload_cancel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,13 @@ def run():

try:
print("\nStart hashing...")
upload_group = asset_manager.prepare_paths_for_upload(
".", files, [root_path / "outputs"], []
)
(summary_statistics_hashing, manifests) = asset_manager.hash_assets_and_create_manifest(
input_paths=files,
output_paths=[root_path / "outputs"],
referenced_paths=[],
asset_groups=upload_group.asset_groups,
total_input_files=upload_group.total_input_files,
total_input_bytes=upload_group.total_input_bytes,
on_preparing_to_submit=mock_on_preparing_to_submit,
)
print(f"Hashing Summary Statistics:\n{summary_statistics_hashing}")
Expand Down
5 changes: 4 additions & 1 deletion scripted_tests/upload_scale_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@
print("\nStarting upload test...")
start = time.perf_counter()

upload_group = asset_manager.prepare_paths_for_upload(".", files, [root_path / "outputs"], [])
(summary_statistics_hashing, manifests) = asset_manager.hash_assets_and_create_manifest(
files, [root_path / "outputs"], []
asset_groups=upload_group.asset_groups,
total_input_files=upload_group.total_input_files,
total_input_bytes=upload_group.total_input_bytes,
)
print(f"Summary Statistics for file hashing:\n{summary_statistics_hashing}")

Expand Down
48 changes: 33 additions & 15 deletions src/deadline/client/api/_submit_job_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
read_yaml_or_json,
read_yaml_or_json_object,
parse_yaml_or_json_content,
validate_directory_symlink_containment,
)
from ..job_bundle.parameters import (
apply_job_parameters,
Expand All @@ -33,6 +34,7 @@
from ..job_bundle.submission import AssetReferences, split_parameter_args
from ...job_attachments.models import (
JobAttachmentsFileSystem,
AssetRootGroup,
AssetRootManifest,
JobAttachmentS3Settings,
)
Expand All @@ -55,8 +57,8 @@ def create_job_from_job_bundle(
max_retries_per_task: Optional[int] = None,
print_function_callback: Callable[[str], None] = lambda msg: None,
decide_cancel_submission_callback: Callable[
[AssetReferences, SummaryStatistics], bool
] = lambda ar, hs: False,
[dict[str, int], int, int], bool
] = lambda paths, files, bytes: False,
hashing_progress_callback: Optional[Callable[[ProgressReportMetadata], bool]] = None,
upload_progress_callback: Optional[Callable[[ProgressReportMetadata], bool]] = None,
create_job_result_callback: Optional[Callable[[], bool]] = None,
Expand Down Expand Up @@ -117,7 +119,7 @@ def create_job_from_job_bundle(
max_retries_per_task (int, optional): explicit value for the maximum retries per task.
print_function_callback (Callable str -> None, optional): Callback to print messages produced in this function.
Used in the CLI to print to stdout using click.echo. By default ignores messages.
decide_cancel_submission_callback (Callable AssetReferences, SummaryStatstics -> bool): If the job has job
decide_cancel_submission_callback (Callable dict[str, int], int, int -> bool): If the job has job
attachments, decide whether or not to cancel the submission given what assets will
or will not be uploaded. If returns true, the submission is canceled. If False,
the submission continues. By default the submission always continues.
Expand All @@ -128,6 +130,9 @@ def create_job_from_job_bundle(
ProgressReport as a parameter, which can be used for projecting remaining time, as in done in the CLI.
"""

# Ensure the job bundle doesn't contain files that resolve outside of the bundle directory
validate_directory_symlink_containment(job_bundle_dir)

# Read in the job template
file_contents, file_type = read_yaml_or_json(job_bundle_dir, "template", required=True)

Expand Down Expand Up @@ -223,18 +228,31 @@ def create_job_from_job_bundle(
session=queue_role_session,
)

hash_summary, asset_manifests = _hash_attachments(
asset_manager=asset_manager,
asset_references=asset_references,
upload_group = asset_manager.prepare_paths_for_upload(
job_bundle_path=job_bundle_dir,
input_paths=sorted(asset_references.input_filenames),
output_paths=sorted(asset_references.output_directories),
referenced_paths=sorted(asset_references.referenced_paths),
storage_profile_id=storage_profile_id,
print_function_callback=print_function_callback,
hashing_progress_callback=hashing_progress_callback,
)

if decide_cancel_submission_callback(asset_references, hash_summary):
if decide_cancel_submission_callback(
upload_group.num_outside_files_by_root,
upload_group.total_input_files,
upload_group.total_input_bytes,
):
print_function_callback("Job submission canceled.")
return None

_, asset_manifests = _hash_attachments(
asset_manager=asset_manager,
asset_groups=upload_group.asset_groups,
total_input_files=upload_group.total_input_files,
total_input_bytes=upload_group.total_input_bytes,
print_function_callback=print_function_callback,
hashing_progress_callback=hashing_progress_callback,
)

attachment_settings = _upload_attachments(
asset_manager, asset_manifests, print_function_callback, upload_progress_callback
)
Expand Down Expand Up @@ -338,8 +356,9 @@ def wait_for_create_job_to_complete(

def _hash_attachments(
asset_manager: S3AssetManager,
asset_references: AssetReferences,
storage_profile_id: Optional[str] = None,
asset_groups: list[AssetRootGroup],
total_input_files: int,
total_input_bytes: int,
print_function_callback: Callable = lambda msg: None,
hashing_progress_callback: Optional[Callable] = None,
config: Optional[ConfigParser] = None,
Expand All @@ -356,10 +375,9 @@ def _default_update_hash_progress(hashing_metadata: Dict[str, str]) -> bool:
hashing_progress_callback = _default_update_hash_progress

hashing_summary, manifests = asset_manager.hash_assets_and_create_manifest(
input_paths=sorted(asset_references.input_filenames),
output_paths=sorted(asset_references.output_directories),
referenced_paths=sorted(asset_references.referenced_paths),
storage_profile_id=storage_profile_id,
asset_groups=asset_groups,
total_input_files=total_input_files,
total_input_bytes=total_input_bytes,
hash_cache_dir=os.path.expanduser(os.path.join("~", ".deadline", "cache")),
on_preparing_to_submit=hashing_progress_callback,
)
Expand Down
32 changes: 25 additions & 7 deletions src/deadline/client/cli/_groups/bundle_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,34 @@ def bundle_submit(
def _check_create_job_wait_canceled() -> bool:
return sigint_handler.continue_operation

def _decide_cancel_submission(asset_references, hash_summary):
def _decide_cancel_submission(
deviated_file_count_by_root: dict[str, int],
num_files: int,
upload_size: int,
):
message_text = (
f"Job submission contains {num_files} files totaling {_human_readable_file_size(upload_size)}. "
" All files will be uploaded to S3 if they are not already present in the job attachments bucket."
)
if deviated_file_count_by_root:
root_by_count_message = "\n\n".join(
[
f"{file_count} files from : '{directory}'"
for directory, file_count in deviated_file_count_by_root.items()
]
)
message_text += (
f"\n\nFiles were found outside of the configured storage profile location(s). "
" Please confirm that you intend to upload files from the following directories:\n\n"
f"{root_by_count_message}"
)
message_text += "\n\nDo you wish to proceed?"
return (
not (yes or config_file.str2bool(get_setting("settings.auto_accept", config=config)))
and len(asset_references.input_filenames) > 0
and num_files > 0
and not click.confirm(
f"Job submission contains {hash_summary.total_files} files "
f"totaling {_human_readable_file_size(hash_summary.total_bytes)}. "
"All files will be uploaded to S3 if they are not already present in the job attachments bucket. "
"Do you wish to proceed?",
default=True,
message_text,
default=not deviated_file_count_by_root,
)
)

Expand Down
4 changes: 4 additions & 0 deletions src/deadline/client/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ class DeadlineOperationError(Exception):

class CreateJobWaiterCanceled(Exception):
"""Error for when the waiter on CreateJob is interrupted"""


class UserInitiatedCancel(Exception):
"""Error for when the user requests cancelation"""
24 changes: 24 additions & 0 deletions src/deadline/client/job_bundle/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,41 @@
"read_yaml_or_json",
"read_yaml_or_json_object",
"parse_yaml_or_json_content",
"validate_directory_symlink_containment",
]

import json
import os
from itertools import chain
from typing import Any, Dict, Optional, Tuple

import yaml

from ..exceptions import DeadlineOperationError


def validate_directory_symlink_containment(job_bundle_dir: str) -> None:
"""
Validates the integrity of the job bundle, validating that all files
contained in the job bundle resolve inside of the job bundle directory.
"""
# The job bundle could itself be a symlink dir
resolved_root = os.path.realpath(job_bundle_dir)
if not os.path.isdir(resolved_root):
raise DeadlineOperationError(
f"Job bundle path provided is not a directory:\n{job_bundle_dir}"
)
for root_dir, dir_names, file_names in os.walk(resolved_root):
for path in chain(dir_names, file_names):
norm_path = os.path.normpath(os.path.join(root_dir, path))
resolved_path = os.path.realpath(norm_path)
common_path = os.path.commonpath([resolved_root, resolved_path])
if common_path != resolved_root:
raise DeadlineOperationError(
f"Job bundle cannot contain a path that resolves outside of the resolved bundle directory:\n{resolved_root}\n\nPath in bundle:\n{norm_path}\nResolves to:\n{resolved_path}"
)


def read_yaml_or_json(job_bundle_dir: str, filename: str, required: bool) -> Tuple[str, str]:
"""
Checks whether {filename}.json or {filename}.yaml exist in the provided
Expand Down
27 changes: 19 additions & 8 deletions src/deadline/client/job_bundle/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ def apply_job_parameters(
# Apply the job_parameters value if available
parameter_value = param_dict.pop(parameter_name, None)
if parameter_value is not None:
# Make PATH parameter values that are not constrained by allowedValues
# Make PATH parameter values that are not constrained by allowedValues
# absolute by joining with the current working directory
if parameter_type == "PATH" and "allowedValues" not in parameter:
if parameter_value == "":
Expand All @@ -608,7 +608,8 @@ def apply_job_parameters(
# This path is referenced, but its contents are not necessarily
# input or output.
asset_references.referenced_paths.add(parameter_value)
else:
elif parameter_value != "":
# While empty parameters are allowed, we don't want to add them to asset references
object_type = parameter.get("objectType")

if "IN" in data_flow:
Expand Down Expand Up @@ -685,22 +686,32 @@ def read_job_bundle_parameters(bundle_dir: str) -> list[JobParameter]:
# values such as "deadline:*"
template_parameters[name] = parameter_value

# Make valueless PATH parameters with default but not constrained
# by allowedValues, absolute by joining with the job bundle directory
for parameter in template_parameters.values():
# Make valueless PATH parameters with 'default' (but not constrained
# by allowedValues) absolute by joining with the job bundle directory
for name, parameter in template_parameters.items():
if (
"value" not in parameter
and parameter["type"] == "PATH"
and "allowedValues" not in parameter
):
default = parameter.get("default")
if default:
if os.path.isabs(default):
raise DeadlineOperationError(
f"Job Template for job bundle {bundle_dir}:\nDefault PATH '{default}' for parameter '{name}' is absolute.\nPATH values must be relative, and must resolve within the Job Bundle directory."
)
bundle_real_path = os.path.realpath(bundle_dir)
default_real_path = os.path.realpath(os.path.join(bundle_real_path, default))
common_path = os.path.commonpath([bundle_real_path, default_real_path])
if common_path != bundle_real_path:
raise DeadlineOperationError(
f"Job Template for job bundle {bundle_dir}:\nDefault PATH '{default_real_path}' for parameter '{name}' specifies files outside of Job Bundle directory '{bundle_real_path}'.\nPATH values must be relative, and must resolve within the Job Bundle directory."
)

default_absolute = os.path.normpath(
os.path.abspath(os.path.join(bundle_dir, default))
)

if default_absolute != default:
parameter["value"] = default_absolute
parameter["value"] = default_absolute

# Rearrange the dict from the template into a list
return [
Expand Down
Loading

0 comments on commit 278e4f6

Please sign in to comment.