Skip to content

Commit

Permalink
[Data] Remove ineffective retry code in plan_read_op (#47456)
Browse files Browse the repository at this point in the history
Currently, Ray Data calls read tasks with retries. The intended purpose
is to retry transient errors while reading data.


https://github.com/ray-project/ray/blob/eda6d092973831523693be15535872ed8ea14fdd/python/ray/data/_internal/planner/plan_read_op.py#L103-L109

However, the code doesn't achieve the intended result because read tasks
return generator objects, and Python will never raise runtime errors
while returning a generator (Python might raise runtime errors when the
programs iterates over the returned generator).

---------

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
  • Loading branch information
bveeramani authored Sep 7, 2024
1 parent 53f6408 commit 542f51a
Showing 1 changed file with 2 additions and 16 deletions.
18 changes: 2 additions & 16 deletions python/ray/data/_internal/planner/plan_read_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@
)
from ray.data._internal.execution.util import memory_string
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.util import _warn_on_high_parallelism, call_with_retry
from ray.data._internal.util import _warn_on_high_parallelism
from ray.data.block import Block, BlockMetadata
from ray.data.datasource.datasource import ReadTask
from ray.util.debug import log_once

TASK_SIZE_WARN_THRESHOLD_BYTES = 1024 * 1024 # 1 MiB

# Transient errors that can occur during longer reads. Trigger retry when these occur.
READ_FILE_RETRY_ON_ERRORS = ["AWS Error NETWORK_CONNECTION", "AWS Error ACCESS_DENIED"]
READ_FILE_MAX_ATTEMPTS = 10
READ_FILE_RETRY_MAX_BACKOFF_SECONDS = 32

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -96,17 +91,8 @@ def get_input_data(target_max_block_size) -> List[RefBundle]:
)

def do_read(blocks: Iterable[ReadTask], _: TaskContext) -> Iterable[Block]:
"""Yield from read tasks, with retry logic upon transient read errors."""
for read_task in blocks:
read_fn_name = read_task._read_fn.__name__

yield from call_with_retry(
f=read_task,
description=f"read file {read_fn_name}",
match=READ_FILE_RETRY_ON_ERRORS,
max_attempts=READ_FILE_MAX_ATTEMPTS,
max_backoff_s=READ_FILE_RETRY_MAX_BACKOFF_SECONDS,
)
yield from read_task()

# Create a MapTransformer for a read operator
transform_fns: List[MapTransformFn] = [
Expand Down

0 comments on commit 542f51a

Please sign in to comment.