Skip to content

Commit

Permalink
🐛 Attempt to fix 400 RequestTimeout when uploading to S3 ⚠️ (#4996)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrei Neagu <neagu@itis.swiss>
  • Loading branch information
GitHK and Andrei Neagu authored Nov 10, 2023
1 parent 910c1a1 commit d1f2dc6
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ def __init__(self, msg: str | None = None):
super().__init__(msg or "Error while transferring to/from S3 storage")


class AwsS3BadRequestRequestTimeoutError(NodeportsException):
"""Sometimes the request to S3 can time out and a 400 with a `RequestTimeout`
reason in the body will be received. For details regarding the error
see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
In this case the entire multipart upload needs to be abandoned and retried.
"""

def __init__(self, body: str):
super().__init__(f"S3 replied with 400 RequestTimeout: {body=}")


class S3InvalidPathError(NodeportsException):
"""S3 transfer error"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from contextlib import AsyncExitStack
from dataclasses import dataclass
from pathlib import Path
from typing import IO, Any, Final, Protocol, runtime_checkable
from typing import IO, Any, Coroutine, Final, Protocol, runtime_checkable

import aiofiles
from aiohttp import (
Expand Down Expand Up @@ -242,13 +242,6 @@ def _check_for_aws_http_errors(exc: BaseException) -> bool:
):
return True

# Sometimes the request to S3 can time out and a 400 with a `RequestTimeout`
# reason in the body will be received. This also needs retrying,
# for more information see:
# see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if exc.status == web.HTTPBadRequest.status_code and "RequestTimeout" in exc.body:
return True

return False


Expand Down Expand Up @@ -327,6 +320,49 @@ async def _upload_file_part(
raise exceptions.S3TransferError(msg)


def _get_file_size_and_name(
file_to_upload: Path | UploadableFileObject,
) -> tuple[int, str]:
if isinstance(file_to_upload, Path):
file_size = file_to_upload.stat().st_size
file_name = file_to_upload.as_posix()
else:
file_size = file_to_upload.file_size
file_name = file_to_upload.file_name

return file_size, file_name


async def _process_batch(
*,
upload_tasks: list[Coroutine],
max_concurrency: int,
file_name: str,
file_size: int,
file_chunk_size: int,
last_chunk_size: int,
) -> list[UploadedPart]:
results: list[UploadedPart] = []
try:
upload_results = await logged_gather(
*upload_tasks, log=_logger, max_concurrency=max_concurrency
)

for i, e_tag in upload_results:
results.append(UploadedPart(number=i + 1, e_tag=e_tag))
except ExtendedClientResponseError as e:
if e.status == web.HTTPBadRequest.status_code and "RequestTimeout" in e.body:
raise exceptions.AwsS3BadRequestRequestTimeoutError(e.body) from e
except ClientError as exc:
msg = (
f"Could not upload file {file_name} ({file_size=}, "
f"{file_chunk_size=}, {last_chunk_size=}):{exc}"
)
raise exceptions.S3TransferError(msg) from exc

return results


async def upload_file_to_presigned_links(
session: ClientSession,
file_upload_links: FileUploadSchema,
Expand All @@ -336,22 +372,15 @@ async def upload_file_to_presigned_links(
io_log_redirect_cb: LogRedirectCB | None,
progress_bar: ProgressBarData,
) -> list[UploadedPart]:
file_size = 0
file_name = ""
if isinstance(file_to_upload, Path):
file_size = file_to_upload.stat().st_size
file_name = file_to_upload.as_posix()
else:
file_size = file_to_upload.file_size
file_name = file_to_upload.file_name
file_size, file_name = _get_file_size_and_name(file_to_upload)

# NOTE: when the file object is already created it cannot be duplicated so
# no concurrency is allowed in that case
max_concurrency = 4 if isinstance(file_to_upload, Path) else 1
max_concurrency: int = 4 if isinstance(file_to_upload, Path) else 1

file_chunk_size = int(file_upload_links.chunk_size)
num_urls = len(file_upload_links.urls)
last_chunk_size = file_size - file_chunk_size * (num_urls - 1)
num_urls: int = len(file_upload_links.urls)
last_chunk_size: int = file_size - file_chunk_size * (num_urls - 1)

results: list[UploadedPart] = []
async with AsyncExitStack() as stack:
Expand All @@ -372,7 +401,7 @@ async def upload_file_to_presigned_links(
for partition_of_indexed_urls in partition_gen(
indexed_urls, slice_size=_CONCURRENT_MULTIPART_UPLOADS_COUNT
):
upload_tasks = []
upload_tasks: list[Coroutine] = []
for index, upload_url in partition_of_indexed_urls:
this_file_chunk_size = (
file_chunk_size if (index + 1) < num_urls else last_chunk_size
Expand All @@ -391,19 +420,15 @@ async def upload_file_to_presigned_links(
progress_bar=sub_progress,
)
)
try:
upload_results = await logged_gather(
*upload_tasks, log=_logger, max_concurrency=max_concurrency
results.extend(
await _process_batch(
upload_tasks=upload_tasks,
max_concurrency=max_concurrency,
file_name=file_name,
file_size=file_chunk_size,
file_chunk_size=file_chunk_size,
last_chunk_size=last_chunk_size,
)

for i, e_tag in upload_results:
results.append(UploadedPart(number=i + 1, e_tag=e_tag))

except ClientError as exc: # noqa: PERF203
msg = (
f"Could not upload file {file_name} ({file_size=}, "
f"{file_chunk_size=}, {last_chunk_size=}):{exc}"
)
raise exceptions.S3TransferError(msg) from exc
)

return results
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
from servicelib.file_utils import create_sha256_checksum
from servicelib.progress_bar import ProgressBarData
from settings_library.r_clone import RCloneSettings
from tenacity import AsyncRetrying
from tenacity.after import after_log
from tenacity.before_sleep import before_sleep_log
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_random_exponential
from yarl import URL

from ..node_ports_common.client_session_manager import ClientSessionContextManager
Expand Down Expand Up @@ -284,6 +290,45 @@ async def upload_path(
:raises exceptions.NodeportsException
:return: stored id, S3 entity_tag
"""
async for attempt in AsyncRetrying(
reraise=True,
wait=wait_random_exponential(),
stop=stop_after_attempt(
NodePortsSettings.create_from_envs().NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS
),
retry=retry_if_exception_type(exceptions.AwsS3BadRequestRequestTimeoutError),
before_sleep=before_sleep_log(_logger, logging.WARNING, exc_info=True),
after=after_log(_logger, log_level=logging.ERROR),
):
with attempt:
result = await _upload_path(
user_id=user_id,
store_id=store_id,
store_name=store_name,
s3_object=s3_object,
path_to_upload=path_to_upload,
io_log_redirect_cb=io_log_redirect_cb,
client_session=client_session,
r_clone_settings=r_clone_settings,
progress_bar=progress_bar,
exclude_patterns=exclude_patterns,
)
return result


async def _upload_path(
*,
user_id: UserID,
store_id: LocationID | None,
store_name: LocationName | None,
s3_object: StorageFileID,
path_to_upload: Path | UploadableFileObject,
io_log_redirect_cb: LogRedirectCB | None,
client_session: ClientSession | None,
r_clone_settings: RCloneSettings | None,
progress_bar: ProgressBarData | None,
exclude_patterns: set[str] | None,
) -> UploadedFile | UploadedFolder:
_logger.debug(
"Uploading %s to %s:%s@%s",
f"{path_to_upload=}",
Expand Down Expand Up @@ -385,7 +430,6 @@ async def _upload_to_s3( # pylint: disable=too-many-arguments # noqa: PLR0913
exclude_patterns=exclude_patterns,
)
else:
# uploading a file
uploaded_parts = await upload_file_to_presigned_links(
session,
upload_links,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
from typing import Final

from pydantic import Field, NonNegativeInt, PositiveInt
from settings_library.base import BaseCustomSettings
from settings_library.postgres import PostgresSettings
from settings_library.storage import StorageSettings

from .constants import MINUTE

NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS_DEFAULT_VALUE: Final[NonNegativeInt] = 3


class NodePortsSettings(BaseCustomSettings):
NODE_PORTS_STORAGE: StorageSettings = Field(auto_default_from_env=True)
POSTGRES_SETTINGS: PostgresSettings = Field(auto_default_from_env=True)

NODE_PORTS_MULTIPART_UPLOAD_COMPLETION_TIMEOUT_S: NonNegativeInt = 5 * MINUTE
NODE_PORTS_IO_NUM_RETRY_ATTEMPTS: PositiveInt = 5
NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS: NonNegativeInt = (
NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS_DEFAULT_VALUE
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dataclasses import dataclass
from pathlib import Path
from typing import AsyncIterable, AsyncIterator, Awaitable, Callable
from unittest.mock import AsyncMock

import pytest
from aiobotocore.session import AioBaseClient, get_session
Expand All @@ -19,10 +20,13 @@
)
from moto.server import ThreadedMotoServer
from pydantic import AnyUrl, ByteSize, parse_obj_as
from pytest_mock import MockerFixture
from servicelib.progress_bar import ProgressBarData
from simcore_sdk.node_ports_common.exceptions import AwsS3BadRequestRequestTimeoutError
from simcore_sdk.node_ports_common.file_io_utils import (
ExtendedClientResponseError,
_check_for_aws_http_errors,
_process_batch,
_raise_for_status,
upload_file_to_presigned_links,
)
Expand Down Expand Up @@ -61,17 +65,6 @@ class _TestParams:
@pytest.mark.parametrize(
"test_params",
[
_TestParams(
will_retry=True,
status_code=400,
body='<?xml version="1.0" encoding="UTF-8"?><Error><Code>RequestTimeout</Code>'
"<Message>Your socket connection to the server was not read from or written to within "
"the timeout period. Idle connections will be closed.</Message>"
"<RequestId>7EE901348D6C6812</RequestId><HostId>"
"FfQE7jdbUt39E6mcQq/"
"ZeNR52ghjv60fccNT4gCE4IranXjsGLG+L6FUyiIxx1tAuXL9xtz2NAY7ZlbzMTm94fhY3TBiCBmf"
"</HostId></Error>",
),
_TestParams(will_retry=True, status_code=500),
_TestParams(will_retry=True, status_code=503),
_TestParams(will_retry=False, status_code=400),
Expand All @@ -92,7 +85,64 @@ async def test_check_for_aws_http_errors(
try:
await _raise_for_status(resp)
except ExtendedClientResponseError as exception:
assert _check_for_aws_http_errors(exception) is test_params.will_retry
assert ( # noqa: PT017
_check_for_aws_http_errors(exception) is test_params.will_retry
)


async def test_process_batch_captures_400_request_timeout_and_wraps_in_error(
aioresponses_mocker: aioresponses, client_session: ClientSession
):
async def _mock_upload_task() -> None:
body = (
'<?xml version="1.0" encoding="UTF-8"?><Error><Code>RequestTimeout</Code>'
"<Message>Your socket connection to the server was not read from or written to within "
"the timeout period. Idle connections will be closed.</Message>"
"<RequestId>7EE901348D6C6812</RequestId><HostId>"
"FfQE7jdbUt39E6mcQq/"
"ZeNR52ghjv60fccNT4gCE4IranXjsGLG+L6FUyiIxx1tAuXL9xtz2NAY7ZlbzMTm94fhY3TBiCBmf"
"</HostId></Error>"
)
aioresponses_mocker.get(A_TEST_ROUTE, body=body, status=400)

async with client_session.get(A_TEST_ROUTE) as resp:
# raises like _session_put does
await _raise_for_status(resp)

with pytest.raises(AwsS3BadRequestRequestTimeoutError):
await _process_batch(
upload_tasks=[_mock_upload_task()],
max_concurrency=1,
file_name="mock_file",
file_size=1,
file_chunk_size=1,
last_chunk_size=1,
)


async def test_upload_file_to_presigned_links_raises_aws_s3_400_request_time_out_error(
mocker: MockerFixture,
create_upload_links: Callable[[int, ByteSize], Awaitable[FileUploadSchema]],
create_file_of_size: Callable[[ByteSize], Path],
):
file_size = ByteSize(1)
upload_links = await create_upload_links(1, file_size)

mocker.patch(
"simcore_sdk.node_ports_common.file_io_utils._upload_file_part",
side_effect=AwsS3BadRequestRequestTimeoutError(body="nothing"),
)

async with ProgressBarData(steps=1) as progress_bar:
with pytest.raises(AwsS3BadRequestRequestTimeoutError):
await upload_file_to_presigned_links(
session=AsyncMock(),
file_upload_links=upload_links,
file_to_upload=create_file_of_size(file_size),
num_retries=0,
io_log_redirect_cb=None,
progress_bar=progress_bar,
)


@pytest.fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@
ClusterAuthentication,
NoAuthentication,
)
from pydantic import AnyHttpUrl, AnyUrl, ConstrainedStr, Field, parse_obj_as, validator
from pydantic import (
AnyHttpUrl,
AnyUrl,
ConstrainedStr,
Field,
NonNegativeInt,
parse_obj_as,
validator,
)
from settings_library.base import BaseCustomSettings
from settings_library.catalog import CatalogSettings
from settings_library.docker_registry import RegistrySettings
Expand All @@ -34,6 +42,9 @@
from settings_library.storage import StorageSettings
from settings_library.utils_logging import MixinLoggingSettings
from simcore_postgres_database.models.clusters import ClusterType
from simcore_sdk.node_ports_common.settings import (
NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS_DEFAULT_VALUE,
)
from simcore_sdk.node_ports_v2 import FileLinkType

from .dynamic_services_settings import DynamicServicesSettings
Expand Down Expand Up @@ -177,6 +188,11 @@ class AppSettings(BaseCustomSettings, MixinLoggingSettings):
description="useful when developing with an alternative registry namespace",
)

DIRECTOR_V2_NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS: NonNegativeInt = Field(
default=NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS_DEFAULT_VALUE,
description="forwarded to sidecars which use nodeports",
)

# monitoring
MONITORING_ENABLED: bool = False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def _get_environment_variables(
"DY_SIDECAR_SERVICE_VERSION": scheduler_data.version,
"DY_SIDECAR_USER_PREFERENCES_PATH": f"{scheduler_data.user_preferences_path}",
"DY_SIDECAR_PRODUCT_NAME": f"{scheduler_data.product_name}",
"NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS": f"{app_settings.DIRECTOR_V2_NODE_PORTS_400_REQUEST_TIMEOUT_ATTEMPTS}",
}


Expand Down
Loading

0 comments on commit d1f2dc6

Please sign in to comment.