Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job_attachments): enhance handling S3 timeout errors and BotoCoreError #206

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions scripted_tests/upload_cancel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"To stop the hash/upload process, please hit 'k' key and then 'Enter' key in succession.\n"
)

NUM_TINY_FILES = 0
NUM_SMALL_FILES = 0
NUM_MEDIUM_FILES = 0
NUM_LARGE_FILES = 1
Expand Down Expand Up @@ -62,13 +63,21 @@ def run():
root_path = pathlib.Path("/tmp/test_submit")
root_path.mkdir(parents=True, exist_ok=True)

if NUM_TINY_FILES > 0:
for i in range(0, NUM_TINY_FILES):
file_path = root_path / f"tiny_test{i}.txt"
if not os.path.exists(file_path):
with file_path.open("wb") as f:
f.write(os.urandom(2 * (1024**2))) # 2 MB files
files.append(str(file_path))

# Make small files
if NUM_SMALL_FILES > 0:
for i in range(0, NUM_SMALL_FILES):
file_path = root_path / f"small_test{i}.txt"
if not os.path.exists(file_path):
with file_path.open("w", encoding="utf-8") as f:
f.write(f"test value: {i}")
with file_path.open("wb") as f:
f.write(os.urandom(10 * (1024**2))) # 10 MB files
files.append(str(file_path))

# Make medium-sized files
Expand All @@ -86,7 +95,7 @@ def run():
file_path = root_path / f"large_test{i}.txt"
if not os.path.exists(file_path):
with file_path.open("ab") as f:
f.write(os.urandom(1 * (1024**3))) # Write 1 GB at a time
_create_large_file_with_chunks(file_path, 20 * (1024**3), 10**9)
files.append(str(file_path))

queue = get_queue(farm_id=farm_id, queue_id=queue_id)
Expand Down Expand Up @@ -130,6 +139,21 @@ def run():
main_terminated = True


def _create_large_file_with_chunks(file_path: str, total_size: int, chunk_size: int) -> None:
"""
Creates a large file of a given total size by writing in chunks with random data.
It prevents MemoryError by dividing the size into manageable chunks and writing
each chunk sequentially.
"""
with open(file_path, "wb") as f:
num_chunks = total_size // chunk_size
for _ in range(num_chunks):
f.write(os.urandom(chunk_size))
remaining = total_size % chunk_size
if remaining > 0:
f.write(os.urandom(remaining))


def mock_on_preparing_to_submit(metadata):
print(metadata)
return mock_on_cancellation_check()
Expand Down
2 changes: 2 additions & 0 deletions src/deadline/job_attachments/_aws/aws_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .aws_config import (
S3_CONNECT_TIMEOUT_IN_SECS,
S3_READ_TIMEOUT_IN_SECS,
S3_RETRIES_MODE,
VENDOR_CODE,
)

Expand Down Expand Up @@ -66,6 +67,7 @@ def get_s3_client(session: Optional[boto3.Session] = None) -> BaseClient:
signature_version="s3v4",
connect_timeout=S3_CONNECT_TIMEOUT_IN_SECS,
read_timeout=S3_READ_TIMEOUT_IN_SECS,
retries={"mode": S3_RETRIES_MODE},
user_agent_extra=f"S3A/Deadline/NA/JobAttachments/{version}",
max_pool_connections=s3_max_pool_connections,
),
Expand Down
1 change: 1 addition & 0 deletions src/deadline/job_attachments/_aws/aws_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
# S3 related
S3_CONNECT_TIMEOUT_IN_SECS: int = 30
S3_READ_TIMEOUT_IN_SECS: int = 30
S3_RETRIES_MODE: str = "standard"
31 changes: 29 additions & 2 deletions src/deadline/job_attachments/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import boto3
from boto3.s3.transfer import ProgressCallbackInvoker
from botocore.client import BaseClient
from botocore.exceptions import ClientError
from botocore.exceptions import BotoCoreError, ClientError

from .asset_manifests.base_manifest import BaseAssetManifest, BaseManifestPath as RelativeFilePath
from .asset_manifests.hash_algorithms import HashAlgorithm
Expand All @@ -28,6 +28,7 @@
COMMON_ERROR_GUIDANCE_FOR_S3,
AssetSyncError,
AssetSyncCancelledError,
JobAttachmentS3BotoCoreError,
JobAttachmentsS3ClientError,
PathOutsideDirectoryError,
JobAttachmentsError,
Expand Down Expand Up @@ -106,6 +107,13 @@ def get_manifest_from_s3(
key_or_prefix=manifest_key,
message=f"{status_code_guidance.get(status_code, '')} {str(exc)}",
) from exc
except BotoCoreError as bce:
raise JobAttachmentS3BotoCoreError(
action="downloading binary file",
error_details=str(bce),
) from bce
except Exception as e:
raise AssetSyncError from e


def _get_output_manifest_prefix(
Expand Down Expand Up @@ -195,6 +203,13 @@ def _get_tasks_manifests_keys_from_s3(
key_or_prefix=manifest_prefix,
message=f"{status_code_guidance.get(status_code, '')} {str(exc)}",
) from exc
except BotoCoreError as bce:
raise JobAttachmentS3BotoCoreError(
action="listing bucket contents",
error_details=str(bce),
) from bce
except Exception as e:
raise AssetSyncError from e

# 2. Select all files in the last subfolder (alphabetically) under each "task-{any}" folder.
for task_folder, files in task_prefixes.items():
Expand Down Expand Up @@ -476,12 +491,17 @@ def process_client_error(exc: ClientError, status_code: int):
if progress_tracker and progress_tracker.continue_reporting is False:
raise AssetSyncCancelledError("File download cancelled.")
else:
raise AssetSyncError("File upload failed.", ce) from ce
raise AssetSyncError("File download failed.", ce) from ce
except ClientError as secondExc:
status_code = int(exc.response["ResponseMetadata"]["HTTPStatusCode"])
process_client_error(secondExc, status_code)
else:
process_client_error(exc, status_code)
except BotoCoreError as bce:
raise JobAttachmentS3BotoCoreError(
action="downloading file",
error_details=str(bce),
) from bce
except Exception as e:
raise AssetSyncError from e

Expand Down Expand Up @@ -607,6 +627,13 @@ def _get_asset_root_from_s3(
key_or_prefix=manifest_key,
message=f"{status_code_guidance.get(status_code, '')} {str(exc)}",
) from exc
except BotoCoreError as bce:
raise JobAttachmentS3BotoCoreError(
action="checking for the existence of an object in the S3 bucket",
error_details=str(bce),
) from bce
except Exception as e:
raise AssetSyncError from e

return head["Metadata"].get("asset-root", None)

Expand Down
17 changes: 17 additions & 0 deletions src/deadline/job_attachments/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ def __init__(
super().__init__(", ".join(message_parts))


class JobAttachmentS3BotoCoreError(AssetSyncError):
"""
Exception to wrap any botocore.exceptions.BotoCoreError.
"""

def __init__(self, action: str, error_details: str) -> None:
self.action = action
message = (
f"An issue occurred with AWS service request while {action}: "
f"{error_details}\n"
"This could be due to temporary issues with AWS, internet connection, or your AWS credentials. "
"Please verify your credentials and network connection. If the problem persists, try again later"
" or contact support for further assistance."
)
super().__init__(message)


class MissingS3BucketError(JobAttachmentsError):
"""
Exception raised when attempting to use Job Attachments but the S3 bucket is not set in Queue.
Expand Down
22 changes: 21 additions & 1 deletion src/deadline/job_attachments/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import boto3
from boto3.s3.transfer import ProgressCallbackInvoker
from botocore.exceptions import ClientError
from botocore.exceptions import BotoCoreError, ClientError

from deadline.client.config import config_file

Expand All @@ -42,6 +42,7 @@
COMMON_ERROR_GUIDANCE_FOR_S3,
AssetSyncCancelledError,
AssetSyncError,
JobAttachmentS3BotoCoreError,
JobAttachmentsS3ClientError,
MissingS3BucketError,
MissingS3RootPrefixError,
Expand Down Expand Up @@ -401,6 +402,11 @@ def handler(bytes_uploaded):
key_or_prefix=s3_upload_key,
message=f"{status_code_guidance.get(status_code, '')} {str(exc)} (Failed to upload {str(local_path)})",
) from exc
except BotoCoreError as bce:
raise JobAttachmentS3BotoCoreError(
action="uploading file",
error_details=str(bce),
) from bce
except Exception as e:
raise AssetSyncError from e

Expand All @@ -425,6 +431,13 @@ def file_already_uploaded(self, bucket: str, key: str) -> bool:
"checking if object exists", error_code, bucket, key, message
) from exc
return False
except BotoCoreError as bce:
raise JobAttachmentS3BotoCoreError(
action="checking for the existence of an object in the S3 bucket",
error_details=str(bce),
) from bce
except Exception as e:
raise AssetSyncError from e

def upload_bytes_to_s3(
self,
Expand Down Expand Up @@ -470,6 +483,13 @@ def upload_bytes_to_s3(
key_or_prefix=key,
message=f"{status_code_guidance.get(status_code, '')} {str(exc)}",
) from exc
except BotoCoreError as bce:
raise JobAttachmentS3BotoCoreError(
action="uploading binary file",
error_details=str(bce),
) from bce
except Exception as e:
raise AssetSyncError from e


class S3AssetManager:
Expand Down
Loading