Skip to content

Commit

Permalink
Retry transient network errors in WebDatasetDatasource (#46892)
Browse files Browse the repository at this point in the history
## Why are these changes needed?

#46685 didn't include handling
for webdatasets

## Related issue number

Fixes #43803 for webdatasets

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [x] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [x] This PR is not tested :(

---------

Signed-off-by: Eric Meier <eric@meier.sh>
Co-authored-by: Scott Lee <scottjlee@users.noreply.github.com>
  • Loading branch information
BitPhinix and scottjlee authored Sep 3, 2024
1 parent d4a52ea commit d1a0f99
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions python/ray/data/_internal/datasource/webdataset_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from functools import partial
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union

import ray
from ray.data._internal.util import iterate_with_retry
from ray.data.block import BlockAccessor
from ray.data.datasource.file_based_datasource import FileBasedDatasource

Expand Down Expand Up @@ -342,12 +344,20 @@ def _read_stream(self, stream: "pyarrow.NativeFile", path: str):
"""
import pandas as pd

files = _tar_file_iterator(
stream,
fileselect=self.fileselect,
filerename=self.filerename,
verbose_open=self.verbose_open,
def get_tar_file_iterator():
return _tar_file_iterator(
stream,
fileselect=self.fileselect,
filerename=self.filerename,
verbose_open=self.verbose_open,
)

# S3 can raise transient errors during iteration
ctx = ray.data.DataContext.get_current()
files = iterate_with_retry(
get_tar_file_iterator, "iterate tar file", match=ctx.retried_io_errors
)

samples = _group_by_keys(files, meta=dict(__url__=path), suffixes=self.suffixes)
for sample in samples:
if self.decoder is not None:
Expand Down

0 comments on commit d1a0f99

Please sign in to comment.