Skip to content

Commit

Permalink
fix!: Add empty paths to referenced paths
Browse files Browse the repository at this point in the history
Signed-off-by: Caden Marofke <132690522+marofke@users.noreply.github.com>
  • Loading branch information
marofke committed Apr 25, 2024
1 parent 1df086b commit 491a523
Show file tree
Hide file tree
Showing 12 changed files with 372 additions and 186 deletions.
48 changes: 21 additions & 27 deletions src/deadline/client/api/_submit_job_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
JobAttachmentsFileSystem,
AssetRootGroup,
AssetRootManifest,
AssetUploadGroup,
JobAttachmentS3Settings,
)
from ...job_attachments.progress_tracker import SummaryStatistics, ProgressReportMetadata
Expand All @@ -58,11 +59,12 @@ def create_job_from_job_bundle(
max_retries_per_task: Optional[int] = None,
print_function_callback: Callable[[str], None] = lambda msg: None,
decide_cancel_submission_callback: Callable[
[dict[str, int], int, int], bool
] = lambda paths, files, bytes: False,
[AssetUploadGroup], bool
] = lambda upload_group: False,
hashing_progress_callback: Optional[Callable[[ProgressReportMetadata], bool]] = None,
upload_progress_callback: Optional[Callable[[ProgressReportMetadata], bool]] = None,
create_job_result_callback: Optional[Callable[[], bool]] = None,
require_paths_exist: bool = False,
) -> Union[str, None]:
"""
Creates a job in the AWS Deadline Cloud farm/queue configured as default for the
Expand Down Expand Up @@ -212,10 +214,15 @@ def create_job_from_job_bundle(
if asset_references and "jobAttachmentSettings" in queue:
# Extend input_filenames with all the files in the input_directories
missing_directories: set[str] = set()
empty_directories: set[str] = set()
for directory in asset_references.input_directories:
if not os.path.isdir(directory):
missing_directories.add(directory)
if require_paths_exist:
missing_directories.add(directory)
else:
logger.info(
f"Input path '{directory}' does not exist. Adding to referenced paths."
)
asset_references.referenced_paths.add(directory)
continue

is_dir_empty = True
Expand All @@ -226,30 +233,21 @@ def create_job_from_job_bundle(
asset_references.input_filenames.update(
os.path.normpath(os.path.join(root, file)) for file in files
)
# Empty directories just become references since there's nothing to upload
if is_dir_empty:
empty_directories.add(directory)
logger.info(f"Input directory '{directory}' is empty. Adding to referenced paths.")
asset_references.referenced_paths.add(directory)
asset_references.input_directories.clear()

misconfigured_directories = missing_directories or empty_directories
if misconfigured_directories:
all_misconfigured_inputs = ""
if missing_directories:
all_missing_directories = "\n\t".join(sorted(list(missing_directories)))
misconfigured_directories_msg = (
"Job submission contains misconfigured input directories and cannot be submitted."
" All input directories must exist and cannot be empty."
" All input directories must exist."
f"\nNon-existent directories:\n\t{all_missing_directories}"
)

if missing_directories:
missing_directory_list = sorted(list(missing_directories))
all_missing_directories = "\n\t".join(missing_directory_list)
all_misconfigured_inputs += (
f"\nNon-existent directories:\n\t{all_missing_directories}"
)
if empty_directories:
empty_directory_list = sorted(list(empty_directories))
all_empty_directories = "\n\t".join(empty_directory_list)
all_misconfigured_inputs += f"\nEmpty directories:\n\t{all_empty_directories}"

raise MisconfiguredInputsError(misconfigured_directories_msg + all_misconfigured_inputs)
raise MisconfiguredInputsError(misconfigured_directories_msg)

queue_role_session = api.get_queue_user_boto3_session(
deadline=deadline,
Expand All @@ -267,18 +265,14 @@ def create_job_from_job_bundle(
)

upload_group = asset_manager.prepare_paths_for_upload(
job_bundle_path=job_bundle_dir,
input_paths=sorted(asset_references.input_filenames),
output_paths=sorted(asset_references.output_directories),
referenced_paths=sorted(asset_references.referenced_paths),
storage_profile=storage_profile,
require_paths_exist=require_paths_exist,
)
if upload_group.asset_groups:
if decide_cancel_submission_callback(
upload_group.num_outside_files_by_root,
upload_group.total_input_files,
upload_group.total_input_bytes,
):
if decide_cancel_submission_callback(upload_group):
print_function_callback("Job submission canceled.")
return None

Expand Down
79 changes: 52 additions & 27 deletions src/deadline/client/cli/_groups/bundle_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
from botocore.exceptions import ClientError

from deadline.client import api
from deadline.client.config import set_setting
from deadline.client.config import config_file, get_setting, set_setting
from deadline.job_attachments.exceptions import (
AssetSyncError,
AssetSyncCancelledError,
MisconfiguredInputsError,
)
from deadline.job_attachments.models import JobAttachmentsFileSystem
from deadline.job_attachments.models import AssetUploadGroup, JobAttachmentsFileSystem
from deadline.job_attachments.progress_tracker import ProgressReportMetadata
from deadline.job_attachments._utils import _human_readable_file_size

Expand Down Expand Up @@ -91,6 +91,11 @@ def validate_parameters(ctx, param, value):
is_flag=True,
help="Skip any confirmation prompts",
)
@click.option(
"--require-paths-exist",
is_flag=True,
help="Require all input paths to exist",
)
@click.argument("job_bundle_dir")
@_handle_error
def bundle_submit(
Expand All @@ -102,6 +107,7 @@ def bundle_submit(
priority,
max_failed_tasks_count,
max_retries_per_task,
require_paths_exist,
**args,
):
"""
Expand All @@ -116,36 +122,54 @@ def bundle_submit(
def _check_create_job_wait_canceled() -> bool:
return sigint_handler.continue_operation

def _decide_cancel_submission(
deviated_file_count_by_root: dict[str, int],
num_files: int,
upload_size: int,
):
def _decide_cancel_submission(upload_group: AssetUploadGroup) -> bool:
"""
Callback to decide if submission should be cancelled or not. Return 'True' to cancel.
Prints a warning that requires confirmation if paths are found outside of configured storage profile locations.
"""
warning_message = ""
for group in upload_group.asset_groups:
if not group.file_system_location_name:
warning_message += f"\n\nUnder the directory '{group.root_path}':"
warning_message += (
f"\n\t{len(group.inputs)} input file{'' if len(group.inputs) == 1 else 's'}"
if len(group.inputs) > 0
else ""
)
warning_message += (
f"\n\t{len(group.outputs)} output director{'y' if len(group.outputs) == 1 else 'ies'}"
if len(group.outputs) > 0
else ""
)
warning_message += (
f"\n\t{len(group.references)} referenced file{'' if len(group.references) == 1 else 's'} and/or director{'y' if len(group.outputs) == 1 else 'ies'}"
if len(group.references) > 0
else ""
)

# Exit early if there are no warnings and we've either set auto accept or there's no files to confirm
if not warning_message and (
yes
or config_file.str2bool(get_setting("settings.auto_accept", config=config))
or upload_group.total_input_files == 0
):
return False

message_text = (
f"Job submission contains {num_files} files totaling {_human_readable_file_size(upload_size)}. "
" All files will be uploaded to S3 if they are not already present in the job attachments bucket."
f"Job submission contains {upload_group.total_input_files} input files totaling {_human_readable_file_size(upload_group.total_input_bytes)}. "
" All input files will be uploaded to S3 if they are not already present in the job attachments bucket."
)
if deviated_file_count_by_root:
root_by_count_message = "\n\n".join(
[
f"{file_count} files from: '{directory}'"
for directory, file_count in deviated_file_count_by_root.items()
]
)
if warning_message:
message_text += (
f"\n\nFiles were found outside of the configured storage profile location(s). "
" Please confirm that you intend to upload files from the following directories:\n\n"
f"{root_by_count_message}\n\n"
"To permanently remove this warning you must only upload files located within a storage profile location."
f"\n\nFiles were specified outside of the configured storage profile location(s). "
" Please confirm that you intend to submit a job that uses files from the following directories:"
f"{warning_message}\n\n"
"To permanently remove this warning you must only use files located within a storage profile location."
)
message_text += "\n\nDo you wish to proceed?"
return (
not yes
and num_files > 0
and not click.confirm(
message_text,
default=not deviated_file_count_by_root,
)
return not click.confirm(
message_text,
default=not warning_message,
)

try:
Expand All @@ -163,6 +187,7 @@ def _decide_cancel_submission(
create_job_result_callback=_check_create_job_wait_canceled,
print_function_callback=click.echo,
decide_cancel_submission_callback=_decide_cancel_submission,
require_paths_exist=require_paths_exist,
)

# Check Whether the CLI options are modifying any of the default settings that affect
Expand Down
Loading

0 comments on commit 491a523

Please sign in to comment.