From f0836236433bfcfb49ee49ea9c67feb922b5f2d7 Mon Sep 17 00:00:00 2001 From: Gahyun Suh <132245153+gahyusuh@users.noreply.github.com> Date: Tue, 12 Mar 2024 17:46:13 +0000 Subject: [PATCH] feat(job_attachments): enhance handling S3 timeout errors and BotoCoreError Improve error handling for S3 requests by - adding "retries" configuration to the S3 client - adding BotoCoreError handling to cover S3 timeout errors (e.g., ReadTimeoutError, ConnectTimeoutError) Signed-off-by: Gahyun Suh <132245153+gahyusuh@users.noreply.github.com> --- scripted_tests/upload_cancel_test.py | 30 ++++- .../job_attachments/_aws/aws_clients.py | 2 + .../job_attachments/_aws/aws_config.py | 1 + src/deadline/job_attachments/download.py | 31 ++++- src/deadline/job_attachments/exceptions.py | 17 +++ src/deadline/job_attachments/upload.py | 22 +++- .../deadline_job_attachments/test_download.py | 121 +++++++++++++++++- .../deadline_job_attachments/test_upload.py | 85 +++++++++++- 8 files changed, 301 insertions(+), 8 deletions(-) diff --git a/scripted_tests/upload_cancel_test.py b/scripted_tests/upload_cancel_test.py index 6490f917..53a621d0 100644 --- a/scripted_tests/upload_cancel_test.py +++ b/scripted_tests/upload_cancel_test.py @@ -32,6 +32,7 @@ "To stop the hash/upload process, please hit 'k' key and then 'Enter' key in succession.\n" ) +NUM_TINY_FILES = 0 NUM_SMALL_FILES = 0 NUM_MEDIUM_FILES = 0 NUM_LARGE_FILES = 1 @@ -62,13 +63,21 @@ def run(): root_path = pathlib.Path("/tmp/test_submit") root_path.mkdir(parents=True, exist_ok=True) + if NUM_TINY_FILES > 0: + for i in range(0, NUM_TINY_FILES): + file_path = root_path / f"tiny_test{i}.txt" + if not os.path.exists(file_path): + with file_path.open("wb") as f: + f.write(os.urandom(2 * (1024**2))) # 2 MB files + files.append(str(file_path)) + # Make small files if NUM_SMALL_FILES > 0: for i in range(0, NUM_SMALL_FILES): file_path = root_path / f"small_test{i}.txt" if not os.path.exists(file_path): - with file_path.open("w", encoding="utf-8") as f: - f.write(f"test value: {i}") + with file_path.open("wb") as f: + f.write(os.urandom(10 * (1024**2))) # 10 MB files files.append(str(file_path)) # Make medium-sized files @@ -86,7 +95,7 @@ def run(): file_path = root_path / f"large_test{i}.txt" if not os.path.exists(file_path): with file_path.open("ab") as f: - f.write(os.urandom(1 * (1024**3))) # Write 1 GB at a time + _create_large_file_with_chunks(file_path, 20 * (1024**3), 10**9) files.append(str(file_path)) queue = get_queue(farm_id=farm_id, queue_id=queue_id) @@ -130,6 +139,21 @@ def run(): main_terminated = True +def _create_large_file_with_chunks(file_path: str, total_size: int, chunk_size: int) -> None: + """ + Creates a large file of a given total size by writing in chunks with random data. + It prevents MemoryError by dividing the size into manageable chunks and writing + each chunk sequentially. + """ + with open(file_path, "wb") as f: + num_chunks = total_size // chunk_size + for _ in range(num_chunks): + f.write(os.urandom(chunk_size)) + remaining = total_size % chunk_size + if remaining > 0: + f.write(os.urandom(remaining)) + + def mock_on_preparing_to_submit(metadata): print(metadata) return mock_on_cancellation_check() diff --git a/src/deadline/job_attachments/_aws/aws_clients.py b/src/deadline/job_attachments/_aws/aws_clients.py index fd3f6629..4a603216 100644 --- a/src/deadline/job_attachments/_aws/aws_clients.py +++ b/src/deadline/job_attachments/_aws/aws_clients.py @@ -18,6 +18,7 @@ from .aws_config import ( S3_CONNECT_TIMEOUT_IN_SECS, S3_READ_TIMEOUT_IN_SECS, + S3_RETRIES_MODE, VENDOR_CODE, ) @@ -66,6 +67,7 @@ def get_s3_client(session: Optional[boto3.Session] = None) -> BaseClient: signature_version="s3v4", connect_timeout=S3_CONNECT_TIMEOUT_IN_SECS, read_timeout=S3_READ_TIMEOUT_IN_SECS, + retries={"mode": S3_RETRIES_MODE}, user_agent_extra=f"S3A/Deadline/NA/JobAttachments/{version}", max_pool_connections=s3_max_pool_connections, ), diff --git a/src/deadline/job_attachments/_aws/aws_config.py b/src/deadline/job_attachments/_aws/aws_config.py index a8cdeb1a..489c8e04 100644 --- a/src/deadline/job_attachments/_aws/aws_config.py +++ b/src/deadline/job_attachments/_aws/aws_config.py @@ -6,3 +6,4 @@ # S3 related S3_CONNECT_TIMEOUT_IN_SECS: int = 30 S3_READ_TIMEOUT_IN_SECS: int = 30 +S3_RETRIES_MODE: str = "standard" diff --git a/src/deadline/job_attachments/download.py b/src/deadline/job_attachments/download.py index f15779f9..dcacb9e2 100644 --- a/src/deadline/job_attachments/download.py +++ b/src/deadline/job_attachments/download.py @@ -19,7 +19,7 @@ import boto3 from boto3.s3.transfer import ProgressCallbackInvoker from botocore.client import BaseClient -from botocore.exceptions import ClientError +from botocore.exceptions import BotoCoreError, ClientError from .asset_manifests.base_manifest import BaseAssetManifest, BaseManifestPath as RelativeFilePath from .asset_manifests.hash_algorithms import HashAlgorithm @@ -28,6 +28,7 @@ COMMON_ERROR_GUIDANCE_FOR_S3, AssetSyncError, AssetSyncCancelledError, + JobAttachmentS3BotoCoreError, JobAttachmentsS3ClientError, PathOutsideDirectoryError, JobAttachmentsError, @@ -106,6 +107,13 @@ def get_manifest_from_s3( key_or_prefix=manifest_key, message=f"{status_code_guidance.get(status_code, '')} {str(exc)}", ) from exc + except BotoCoreError as bce: + raise JobAttachmentS3BotoCoreError( + action="downloading binary file", + error_details=str(bce), + ) from bce + except Exception as e: + raise AssetSyncError from e def _get_output_manifest_prefix( @@ -195,6 +203,13 @@ def _get_tasks_manifests_keys_from_s3( key_or_prefix=manifest_prefix, message=f"{status_code_guidance.get(status_code, '')} {str(exc)}", ) from exc + except BotoCoreError as bce: + raise JobAttachmentS3BotoCoreError( + action="listing bucket contents", + error_details=str(bce), + ) from bce + except Exception as e: + raise AssetSyncError from e # 2. Select all files in the last subfolder (alphabetically) under each "task-{any}" folder. for task_folder, files in task_prefixes.items(): @@ -476,12 +491,17 @@ def process_client_error(exc: ClientError, status_code: int): if progress_tracker and progress_tracker.continue_reporting is False: raise AssetSyncCancelledError("File download cancelled.") else: - raise AssetSyncError("File upload failed.", ce) from ce + raise AssetSyncError("File download failed.", ce) from ce except ClientError as secondExc: status_code = int(exc.response["ResponseMetadata"]["HTTPStatusCode"]) process_client_error(secondExc, status_code) else: process_client_error(exc, status_code) + except BotoCoreError as bce: + raise JobAttachmentS3BotoCoreError( + action="downloading file", + error_details=str(bce), + ) from bce except Exception as e: raise AssetSyncError from e @@ -607,6 +627,13 @@ def _get_asset_root_from_s3( key_or_prefix=manifest_key, message=f"{status_code_guidance.get(status_code, '')} {str(exc)}", ) from exc + except BotoCoreError as bce: + raise JobAttachmentS3BotoCoreError( + action="checking for the existence of an object in the S3 bucket", + error_details=str(bce), + ) from bce + except Exception as e: + raise AssetSyncError from e return head["Metadata"].get("asset-root", None) diff --git a/src/deadline/job_attachments/exceptions.py b/src/deadline/job_attachments/exceptions.py index 959bd442..83938b25 100644 --- a/src/deadline/job_attachments/exceptions.py +++ b/src/deadline/job_attachments/exceptions.py @@ -57,6 +57,23 @@ def __init__( super().__init__(", ".join(message_parts)) +class JobAttachmentS3BotoCoreError(AssetSyncError): + """ + Exception to wrap any botocore.exceptions.BotoCoreError. + """ + + def __init__(self, action: str, error_details: str) -> None: + self.action = action + message = ( + f"An issue occurred with AWS service request while {action}: " + f"{error_details}\n" + "This could be due to temporary issues with AWS, internet connection, or your AWS credentials. " + "Please verify your credentials and network connection. If the problem persists, try again later" + " or contact support for further assistance." + ) + super().__init__(message) + + class MissingS3BucketError(JobAttachmentsError): """ Exception raised when attempting to use Job Attachments but the S3 bucket is not set in Queue. diff --git a/src/deadline/job_attachments/upload.py b/src/deadline/job_attachments/upload.py index d07d57ee..c3559c2f 100644 --- a/src/deadline/job_attachments/upload.py +++ b/src/deadline/job_attachments/upload.py @@ -17,7 +17,7 @@ import boto3 from boto3.s3.transfer import ProgressCallbackInvoker -from botocore.exceptions import ClientError +from botocore.exceptions import BotoCoreError, ClientError from deadline.client.config import config_file @@ -42,6 +42,7 @@ COMMON_ERROR_GUIDANCE_FOR_S3, AssetSyncCancelledError, AssetSyncError, + JobAttachmentS3BotoCoreError, JobAttachmentsS3ClientError, MissingS3BucketError, MissingS3RootPrefixError, @@ -401,6 +402,11 @@ def handler(bytes_uploaded): key_or_prefix=s3_upload_key, message=f"{status_code_guidance.get(status_code, '')} {str(exc)} (Failed to upload {str(local_path)})", ) from exc + except BotoCoreError as bce: + raise JobAttachmentS3BotoCoreError( + action="uploading file", + error_details=str(bce), + ) from bce except Exception as e: raise AssetSyncError from e @@ -425,6 +431,13 @@ def file_already_uploaded(self, bucket: str, key: str) -> bool: "checking if object exists", error_code, bucket, key, message ) from exc return False + except BotoCoreError as bce: + raise JobAttachmentS3BotoCoreError( + action="checking for the existence of an object in the S3 bucket", + error_details=str(bce), + ) from bce + except Exception as e: + raise AssetSyncError from e def upload_bytes_to_s3( self, @@ -470,6 +483,13 @@ def upload_bytes_to_s3( key_or_prefix=key, message=f"{status_code_guidance.get(status_code, '')} {str(exc)}", ) from exc + except BotoCoreError as bce: + raise JobAttachmentS3BotoCoreError( + action="uploading binary file", + error_details=str(bce), + ) from bce + except Exception as e: + raise AssetSyncError from e class S3AssetManager: diff --git a/test/unit/deadline_job_attachments/test_download.py b/test/unit/deadline_job_attachments/test_download.py index e5c47313..0cf9a6ae 100644 --- a/test/unit/deadline_job_attachments/test_download.py +++ b/test/unit/deadline_job_attachments/test_download.py @@ -16,7 +16,7 @@ from unittest.mock import MagicMock, call, patch import boto3 -from botocore.exceptions import ClientError +from botocore.exceptions import BotoCoreError, ClientError, ReadTimeoutError from botocore.stub import Stubber import pytest @@ -47,6 +47,7 @@ _get_tasks_manifests_keys_from_s3, ) from deadline.job_attachments.exceptions import ( + AssetSyncError, JobAttachmentsError, JobAttachmentsS3ClientError, MissingAssetRootError, @@ -1733,6 +1734,30 @@ def test_get_asset_root_from_s3_error_message_on_not_found(self): "HTTP Status Code: 404, Not found. " ) in str(err.value) + @mock_sts + def test_get_asset_root_from_s3_error_message_on_timeout(self): + """ + Test that the appropriate error is raised when a ReadTimeoutError occurs + during an S3 client's head_object call. + """ + mock_s3_client = MagicMock() + mock_s3_client.head_object.side_effect = ReadTimeoutError(endpoint_url="test_url") + + with patch( + f"{deadline.__package__}.job_attachments.download.get_s3_client", + return_value=mock_s3_client, + ): + with pytest.raises(AssetSyncError) as exc: + _get_asset_root_from_s3("test-key", "test-bucket") + assert isinstance(exc.value.__cause__, BotoCoreError) + assert ( + "An issue occurred with AWS service request while checking for the existence of an object in the S3 bucket: " + 'Read timeout on endpoint URL: "test_url"\n' + "This could be due to temporary issues with AWS, internet connection, or your AWS credentials. " + "Please verify your credentials and network connection. If the problem persists, try again later" + " or contact support for further assistance." + ) in str(exc.value) + @mock_sts def test_get_manifest_from_s3_error_message_on_access_denied(self): """ @@ -1762,6 +1787,30 @@ def test_get_manifest_from_s3_error_message_on_access_denied(self): "HTTP Status Code: 403, Forbidden or Access denied. " ) in str(exc.value) + @mock_sts + def test_get_manifest_from_s3_error_message_on_timeout(self): + """ + Test that the appropriate error is raised when a ReadTimeoutError occurs + during an S3 client's download_fileobj call. + """ + mock_s3_client = MagicMock() + mock_s3_client.download_fileobj.side_effect = ReadTimeoutError(endpoint_url="test_url") + + with patch( + f"{deadline.__package__}.job_attachments.download.get_s3_client", + return_value=mock_s3_client, + ): + with pytest.raises(AssetSyncError) as exc: + get_manifest_from_s3("test-key", "test-bucket") + assert isinstance(exc.value.__cause__, BotoCoreError) + assert ( + "An issue occurred with AWS service request while downloading binary file: " + 'Read timeout on endpoint URL: "test_url"\n' + "This could be due to temporary issues with AWS, internet connection, or your AWS credentials. " + "Please verify your credentials and network connection. If the problem persists, try again later" + " or contact support for further assistance." + ) in str(exc.value) + @mock_sts def test_get_tasks_manifests_keys_from_s3_error_message_on_access_denied(self): """ @@ -1794,6 +1843,33 @@ def test_get_tasks_manifests_keys_from_s3_error_message_on_access_denied(self): "HTTP Status Code: 403, Forbidden or Access denied. " ) in str(exc.value) + @mock_sts + def test_get_tasks_manifests_keys_from_s3_error_message_on_timeout(self): + """ + Test that the appropriate error is raised when S3 client's get_paginator call triggers + a ReadTimeoutError while getting the keys of task output manifests from S3. + """ + mock_s3_client = MagicMock() + mock_s3_client.get_paginator.side_effect = ReadTimeoutError(endpoint_url="test_url") + + with patch( + f"{deadline.__package__}.job_attachments.download.get_s3_client", + return_value=mock_s3_client, + ): + with pytest.raises(AssetSyncError) as exc: + _get_tasks_manifests_keys_from_s3( + "assetRoot", + "test-bucket", + ) + assert isinstance(exc.value.__cause__, BotoCoreError) + assert ( + "An issue occurred with AWS service request while listing bucket contents: " + 'Read timeout on endpoint URL: "test_url"\n' + "This could be due to temporary issues with AWS, internet connection, or your AWS credentials. " + "Please verify your credentials and network connection. If the problem persists, try again later" + " or contact support for further assistance." + ) in str(exc.value) + @mock_sts def test_download_file_error_message_on_access_denied(self): """ @@ -1836,6 +1912,49 @@ def test_download_file_error_message_on_access_denied(self): failed_file_path = Path("/home/username/assets/inputs/input1.txt") assert (f"(Failed to download the file to {str(failed_file_path)})") in str(exc.value) + @mock_sts + def test_download_file_error_message_on_timeout(self): + """ + Test that the appropriate error is raised when a ReadTimeoutError occurs + during a transfer manager's download operation. + """ + mock_s3_client = MagicMock() + mock_future = MagicMock() + mock_transfer_manager = MagicMock() + mock_transfer_manager.download.return_value = mock_future + mock_future.result.side_effect = ReadTimeoutError(endpoint_url="test_url") + + file_path = ManifestPathv2023_03_03( + path="inputs/input1.txt", hash="input1", size=1, mtime=1234000000 + ) + + with patch( + f"{deadline.__package__}.job_attachments.download.get_s3_client", + return_value=mock_s3_client, + ), patch( + f"{deadline.__package__}.job_attachments.download.get_s3_transfer_manager", + return_value=mock_transfer_manager, + ), patch( + f"{deadline.__package__}.job_attachments.download.Path.mkdir" + ): + with pytest.raises(AssetSyncError) as exc: + download_file( + file_path, + HashAlgorithm.XXH128, + "/home/username/assets", + "test-bucket", + "rootPrefix/Data", + mock_s3_client, + ) + assert isinstance(exc.value.__cause__, BotoCoreError) + assert ( + "An issue occurred with AWS service request while downloading file: " + 'Read timeout on endpoint URL: "test_url"\n' + "This could be due to temporary issues with AWS, internet connection, or your AWS credentials. " + "Please verify your credentials and network connection. If the problem persists, try again later" + " or contact support for further assistance." + ) in str(exc.value) + @pytest.mark.parametrize("manifest_version", [ManifestVersion.v2023_03_03]) class TestFullDownloadPrefixesWithSlashes: diff --git a/test/unit/deadline_job_attachments/test_upload.py b/test/unit/deadline_job_attachments/test_upload.py index 7c62d89b..355e5af7 100644 --- a/test/unit/deadline_job_attachments/test_upload.py +++ b/test/unit/deadline_job_attachments/test_upload.py @@ -17,7 +17,7 @@ import boto3 import py.path import pytest -from botocore.exceptions import ClientError +from botocore.exceptions import BotoCoreError, ClientError, ReadTimeoutError from botocore.stub import Stubber from moto import mock_sts @@ -1357,6 +1357,29 @@ def test_file_already_uploaded_bucket_in_different_account(self): "and your AWS IAM Role or User has the 's3:ListBucket' permission for this bucket." ) in str(err.value) + @mock_sts + def test_file_already_uploaded_timeout(self): + """ + Test that the appropriate error is raised when a ReadTimeoutError occurs + during an S3 request to check file existence in an S3 bucket. + """ + mock_s3_client = MagicMock() + mock_s3_client.head_object.side_effect = ReadTimeoutError(endpoint_url="test_url") + + uploader = S3AssetUploader() + uploader._s3 = mock_s3_client + + with pytest.raises(AssetSyncError) as err: + uploader.file_already_uploaded(self.job_attachment_s3_settings.s3BucketName, "test_key") + assert isinstance(err.value.__cause__, BotoCoreError) + assert ( + "An issue occurred with AWS service request while checking for the existence of an object in the S3 bucket: " + 'Read timeout on endpoint URL: "test_url"\n' + "This could be due to temporary issues with AWS, internet connection, or your AWS credentials. " + "Please verify your credentials and network connection. If the problem persists, try again later" + " or contact support for further assistance." + ) in str(err.value) + @mock_sts def test_upload_bytes_to_s3_bucket_in_different_account(self): """ @@ -1392,6 +1415,31 @@ def test_upload_bytes_to_s3_bucket_in_different_account(self): "HTTP Status Code: 403, Forbidden or Access denied. " ) in str(err.value) + @mock_sts + def test_upload_bytes_to_s3_timeout(self): + """ + Test that the appropriate error is raised when a ReadTimeoutError occurs + during an S3 request to upload a binary file to an S3 bucket. + """ + mock_s3_client = MagicMock() + mock_s3_client.upload_fileobj.side_effect = ReadTimeoutError(endpoint_url="test_url") + + uploader = S3AssetUploader() + uploader._s3 = mock_s3_client + + with pytest.raises(AssetSyncError) as err: + uploader.upload_bytes_to_s3( + BytesIO(), self.job_attachment_s3_settings.s3BucketName, "test_key" + ) + assert isinstance(err.value.__cause__, BotoCoreError) + assert ( + "An issue occurred with AWS service request while uploading binary file: " + 'Read timeout on endpoint URL: "test_url"\n' + "This could be due to temporary issues with AWS, internet connection, or your AWS credentials. " + "Please verify your credentials and network connection. If the problem persists, try again later" + " or contact support for further assistance." + ) in str(err.value) + @mock_sts def test_upload_file_to_s3_bucket_in_different_account(self, tmp_path: Path): """ @@ -1470,6 +1518,41 @@ def test_upload_file_to_s3_bucket_has_kms_permissions_error(self, tmp_path: Path ) in str(err.value) assert (f"(Failed to upload {str(file)})") in str(err.value) + @mock_sts + def test_upload_file_to_s3_timeout(self, tmp_path: Path): + """ + Test that the appropriate error is raised when a ReadTimeoutError occurs + during an S3 request to upload a file to an S3 bucket. + """ + mock_future = MagicMock() + mock_transfer_manager = MagicMock() + mock_transfer_manager.upload.return_value = mock_future + mock_future.result.side_effect = ReadTimeoutError(endpoint_url="test_url") + + s3 = boto3.client("s3") + uploader = S3AssetUploader() + uploader._s3 = s3 + + file = tmp_path / "test_file" + file.write_text("") + + with patch( + f"{deadline.__package__}.job_attachments.upload.get_s3_transfer_manager", + return_value=mock_transfer_manager, + ): + with pytest.raises(AssetSyncError) as err: + uploader.upload_file_to_s3( + file, self.job_attachment_s3_settings.s3BucketName, "test_key" + ) + assert isinstance(err.value.__cause__, BotoCoreError) + assert ( + "An issue occurred with AWS service request while uploading file: " + 'Read timeout on endpoint URL: "test_url"\n' + "This could be due to temporary issues with AWS, internet connection, or your AWS credentials. " + "Please verify your credentials and network connection. If the problem persists, try again later" + " or contact support for further assistance." + ) in str(err.value) + @pytest.mark.parametrize( "manifest_version", [