Skip to content

Commit

Permalink
feat(job_attachments): enhance handling S3 timeout errors and BotoCor…
Browse files Browse the repository at this point in the history
…eError

Improve error handling for S3 requests by
- adding "retries" configuration to the S3 client
- adding BotoCoreError handling to cover S3 timeout errors (e.g., ReadTimeoutError, ConnectTimeoutError)

Signed-off-by: Gahyun Suh <132245153+gahyusuh@users.noreply.github.com>
  • Loading branch information
gahyusuh committed Mar 15, 2024
1 parent 739cb20 commit e4c309d
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 8 deletions.
30 changes: 27 additions & 3 deletions scripted_tests/upload_cancel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"To stop the hash/upload process, please hit 'k' key and then 'Enter' key in succession.\n"
)

NUM_TINY_FILES = 0
NUM_SMALL_FILES = 0
NUM_MEDIUM_FILES = 0
NUM_LARGE_FILES = 1
Expand Down Expand Up @@ -62,13 +63,21 @@ def run():
root_path = pathlib.Path("/tmp/test_submit")
root_path.mkdir(parents=True, exist_ok=True)

if NUM_TINY_FILES > 0:
for i in range(0, NUM_TINY_FILES):
file_path = root_path / f"tiny_test{i}.txt"
if not os.path.exists(file_path):
with file_path.open("wb") as f:
f.write(os.urandom(2 * (1024**2))) # 2 MB files
files.append(str(file_path))

# Make small files
if NUM_SMALL_FILES > 0:
for i in range(0, NUM_SMALL_FILES):
file_path = root_path / f"small_test{i}.txt"
if not os.path.exists(file_path):
with file_path.open("w", encoding="utf-8") as f:
f.write(f"test value: {i}")
with file_path.open("wb") as f:
f.write(os.urandom(10 * (1024**2))) # 10 MB files
files.append(str(file_path))

# Make medium-sized files
Expand All @@ -86,7 +95,7 @@ def run():
file_path = root_path / f"large_test{i}.txt"
if not os.path.exists(file_path):
with file_path.open("ab") as f:
f.write(os.urandom(1 * (1024**3))) # Write 1 GB at a time
_create_large_file_with_chunks(file_path, 20 * (1024**3), 10**9)
files.append(str(file_path))

queue = get_queue(farm_id=farm_id, queue_id=queue_id)
Expand Down Expand Up @@ -130,6 +139,21 @@ def run():
main_terminated = True


def _create_large_file_with_chunks(file_path: str, total_size: int, chunk_size: int) -> None:
"""
Creates a large file of a given total size by writing in chunks with random data.
It prevents MemoryError by dividing the size into manageable chunks and writing
each chunk sequentially.
"""
with open(file_path, "wb") as f:
num_chunks = total_size // chunk_size
for _ in range(num_chunks):
f.write(os.urandom(chunk_size))
remaining = total_size % chunk_size
if remaining > 0:
f.write(os.urandom(remaining))


def mock_on_preparing_to_submit(metadata):
print(metadata)
return mock_on_cancellation_check()
Expand Down
2 changes: 2 additions & 0 deletions src/deadline/job_attachments/_aws/aws_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .aws_config import (
S3_CONNECT_TIMEOUT_IN_SECS,
S3_READ_TIMEOUT_IN_SECS,
S3_RETRIES_MODE,
VENDOR_CODE,
)

Expand Down Expand Up @@ -66,6 +67,7 @@ def get_s3_client(session: Optional[boto3.Session] = None) -> BaseClient:
signature_version="s3v4",
connect_timeout=S3_CONNECT_TIMEOUT_IN_SECS,
read_timeout=S3_READ_TIMEOUT_IN_SECS,
retries={"mode": S3_RETRIES_MODE},
user_agent_extra=f"S3A/Deadline/NA/JobAttachments/{version}",
max_pool_connections=s3_max_pool_connections,
),
Expand Down
1 change: 1 addition & 0 deletions src/deadline/job_attachments/_aws/aws_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
# S3 related
S3_CONNECT_TIMEOUT_IN_SECS: int = 30
S3_READ_TIMEOUT_IN_SECS: int = 30
S3_RETRIES_MODE: str = "standard"
31 changes: 29 additions & 2 deletions src/deadline/job_attachments/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import boto3
from boto3.s3.transfer import ProgressCallbackInvoker
from botocore.client import BaseClient
from botocore.exceptions import ClientError
from botocore.exceptions import BotoCoreError, ClientError

from .asset_manifests.base_manifest import BaseAssetManifest, BaseManifestPath as RelativeFilePath
from .asset_manifests.hash_algorithms import HashAlgorithm
Expand All @@ -27,6 +27,7 @@
COMMON_ERROR_GUIDANCE_FOR_S3,
AssetSyncError,
AssetSyncCancelledError,
JobAttachmentS3BotoCoreError,
JobAttachmentsS3ClientError,
PathOutsideDirectoryError,
JobAttachmentsError,
Expand Down Expand Up @@ -101,6 +102,13 @@ def get_manifest_from_s3(
key_or_prefix=manifest_key,
message=f"{status_code_guidance.get(status_code, '')} {str(exc)}",
) from exc
except BotoCoreError as bce:
raise JobAttachmentS3BotoCoreError(
action="downloading binary file",
error_details=str(bce),
) from bce
except Exception as e:
raise AssetSyncError from e


def _get_output_manifest_prefix(
Expand Down Expand Up @@ -190,6 +198,13 @@ def _get_tasks_manifests_keys_from_s3(
key_or_prefix=manifest_prefix,
message=f"{status_code_guidance.get(status_code, '')} {str(exc)}",
) from exc
except BotoCoreError as bce:
raise JobAttachmentS3BotoCoreError(
action="listing bucket contents",
error_details=str(bce),
) from bce
except Exception as e:
raise AssetSyncError from e

# 2. Select all files in the last subfolder (alphabetically) under each "task-{any}" folder.
for task_folder, files in task_prefixes.items():
Expand Down Expand Up @@ -473,12 +488,17 @@ def process_client_error(exc: ClientError, status_code: int):
if progress_tracker and progress_tracker.continue_reporting is False:
raise AssetSyncCancelledError("File download cancelled.")
else:
raise AssetSyncError("File upload failed.", ce) from ce
raise AssetSyncError("File download failed.", ce) from ce
except ClientError as secondExc:
status_code = int(exc.response["ResponseMetadata"]["HTTPStatusCode"])
process_client_error(secondExc, status_code)
else:
process_client_error(exc, status_code)
except BotoCoreError as bce:
raise JobAttachmentS3BotoCoreError(
action="downloading file",
error_details=str(bce),
) from bce
except Exception as e:
raise AssetSyncError from e

Expand Down Expand Up @@ -604,6 +624,13 @@ def _get_asset_root_from_s3(
key_or_prefix=manifest_key,
message=f"{status_code_guidance.get(status_code, '')} {str(exc)}",
) from exc
except BotoCoreError as bce:
raise JobAttachmentS3BotoCoreError(
action="checking for the existence of an object in the S3 bucket",
error_details=str(bce),
) from bce
except Exception as e:
raise AssetSyncError from e

return head["Metadata"].get("asset-root", None)

Expand Down
17 changes: 17 additions & 0 deletions src/deadline/job_attachments/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ def __init__(
super().__init__(", ".join(message_parts))


class JobAttachmentS3BotoCoreError(AssetSyncError):
"""
Exception to wrap any botocore.exceptions.BotoCoreError.
"""

def __init__(self, action: str, error_details: str) -> None:
self.action = action
message = (
f"An issue occurred with AWS service request while {action}: "
f"{error_details}\n"
"This could be due to temporary issues with AWS, internet connection, or your AWS credentials. "
"Please verify your credentials and network connection. If the problem persists, try again later"
" or contact support for further assistance."
)
super().__init__(message)


class MissingS3BucketError(JobAttachmentsError):
"""
Exception raised when attempting to use Job Attachments but the S3 bucket is not set in Queue.
Expand Down
22 changes: 21 additions & 1 deletion src/deadline/job_attachments/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import boto3
from boto3.s3.transfer import ProgressCallbackInvoker
from botocore.exceptions import ClientError
from botocore.exceptions import BotoCoreError, ClientError

from deadline.client.config import config_file

Expand All @@ -42,6 +42,7 @@
COMMON_ERROR_GUIDANCE_FOR_S3,
AssetSyncCancelledError,
AssetSyncError,
JobAttachmentS3BotoCoreError,
JobAttachmentsS3ClientError,
MissingS3BucketError,
MissingS3RootPrefixError,
Expand Down Expand Up @@ -401,6 +402,11 @@ def handler(bytes_uploaded):
key_or_prefix=s3_upload_key,
message=f"{status_code_guidance.get(status_code, '')} {str(exc)} (Failed to upload {str(local_path)})",
) from exc
except BotoCoreError as bce:
raise JobAttachmentS3BotoCoreError(
action="uploading file",
error_details=str(bce),
) from bce
except Exception as e:
raise AssetSyncError from e

Expand All @@ -425,6 +431,13 @@ def file_already_uploaded(self, bucket: str, key: str) -> bool:
"checking if object exists", error_code, bucket, key, message
) from exc
return False
except BotoCoreError as bce:
raise JobAttachmentS3BotoCoreError(
action="checking for the existence of an object in the S3 bucket",
error_details=str(bce),
) from bce
except Exception as e:
raise AssetSyncError from e

def upload_bytes_to_s3(
self,
Expand Down Expand Up @@ -470,6 +483,13 @@ def upload_bytes_to_s3(
key_or_prefix=key,
message=f"{status_code_guidance.get(status_code, '')} {str(exc)}",
) from exc
except BotoCoreError as bce:
raise JobAttachmentS3BotoCoreError(
action="uploading binary file",
error_details=str(bce),
) from bce
except Exception as e:
raise AssetSyncError from e


class S3AssetManager:
Expand Down
Loading

0 comments on commit e4c309d

Please sign in to comment.