diff --git a/python/ray/data/datasource/file_based_datasource.py b/python/ray/data/datasource/file_based_datasource.py index 2badee7158f6f..1d13ad2016218 100644 --- a/python/ray/data/datasource/file_based_datasource.py +++ b/python/ray/data/datasource/file_based_datasource.py @@ -1,3 +1,4 @@ +import itertools import logging import pathlib import posixpath @@ -13,12 +14,17 @@ Optional, Tuple, Union, + TypeVar, ) +import numpy as np + from ray.data._internal.arrow_block import ArrowRow from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder from ray.data._internal.execution.interfaces import TaskContext from ray.data._internal.output_buffer import BlockOutputBuffer +from ray.data._internal.progress_bar import ProgressBar +from ray.data._internal.remote_fn import cached_remote_fn from ray.data._internal.util import _check_pyarrow_version, _resolve_custom_scheme from ray.data.block import Block, BlockAccessor from ray.data.context import DatasetContext @@ -45,6 +51,13 @@ logger = logging.getLogger(__name__) +# We should parallelize file size fetch operations beyond this threshold. +FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLD = 16 + +# 16 file size fetches from S3 takes ~1.5 seconds with Arrow's S3FileSystem. +PATHS_PER_FILE_SIZE_FETCH_TASK = 16 + + @DeveloperAPI class BlockWritePathProvider: """Abstract callable that provides concrete output paths when writing @@ -288,9 +301,7 @@ def write( def write_block(write_path: str, block: Block): logger.debug(f"Writing {write_path} file.") - fs = filesystem - if isinstance(fs, _S3FileSystemWrapper): - fs = fs.unwrap() + fs = _unwrap_s3_serialization_workaround(filesystem) if _block_udf is not None: block = _block_udf(block) @@ -373,8 +384,9 @@ def __init__( self._block_udf = _block_udf self._reader_args = reader_args paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem) - self._paths, self._file_sizes = meta_provider.expand_paths( - paths, self._filesystem + self._paths, self._file_sizes = map( + list, + zip(*meta_provider.expand_paths(paths, self._filesystem, partitioning)), ) if self._partition_filter is not None: # Use partition filter to skip files which are not needed. @@ -418,8 +430,7 @@ def read_files( fs: Union["pyarrow.fs.FileSystem", _S3FileSystemWrapper], ) -> Iterable[Block]: logger.debug(f"Reading {len(read_paths)} files.") - if isinstance(fs, _S3FileSystemWrapper): - fs = fs.unwrap() + fs = _unwrap_s3_serialization_workaround(filesystem) ctx = DatasetContext.get_current() output_buffer = BlockOutputBuffer( block_udf=_block_udf, target_max_block_size=ctx.target_max_block_size @@ -672,48 +683,6 @@ def _resolve_paths_and_filesystem( return resolved_paths, filesystem -def _expand_directory( - path: str, - filesystem: "pyarrow.fs.FileSystem", - exclude_prefixes: Optional[List[str]] = None, -) -> List[str]: - """ - Expand the provided directory path to a list of file paths. - - Args: - path: The directory path to expand. - filesystem: The filesystem implementation that should be used for - reading these files. - exclude_prefixes: The file relative path prefixes that should be - excluded from the returned file set. Default excluded prefixes are - "." and "_". - - Returns: - A list of file paths contained in the provided directory. - """ - if exclude_prefixes is None: - exclude_prefixes = [".", "_"] - - from pyarrow.fs import FileSelector - - selector = FileSelector(path, recursive=True) - files = filesystem.get_file_info(selector) - base_path = selector.base_dir - filtered_paths = [] - for file_ in files: - if not file_.is_file: - continue - file_path = file_.path - if not file_path.startswith(base_path): - continue - relative = file_path[len(base_path) :] - if any(relative.startswith(prefix) for prefix in exclude_prefixes): - continue - filtered_paths.append((file_path, file_)) - # We sort the paths to guarantee a stable order. - return zip(*sorted(filtered_paths, key=lambda x: x[0])) - - def _is_url(path) -> bool: return urllib.parse.urlparse(path).scheme != "" @@ -752,6 +721,15 @@ def _wrap_s3_serialization_workaround(filesystem: "pyarrow.fs.FileSystem"): return filesystem +def _unwrap_s3_serialization_workaround( + filesystem: Union["pyarrow.fs.FileSystem", "_S3FileSystemWrapper"] +): + if isinstance(filesystem, _S3FileSystemWrapper): + return filesystem.unwrap() + else: + return filesystem + + class _S3FileSystemWrapper: def __init__(self, fs: "pyarrow.fs.S3FileSystem"): self._fs = fs @@ -792,3 +770,31 @@ def _resolve_kwargs( kwarg_overrides = kwargs_fn() kwargs.update(kwarg_overrides) return kwargs + + +Uri = TypeVar("Uri") +Meta = TypeVar("Meta") + + +def _fetch_metadata_parallel( + uris: List[Uri], + fetch_func: Callable[[List[Uri]], List[Meta]], + desired_uris_per_task: int, + **ray_remote_args, +) -> Iterator[Meta]: + """Fetch file metadata in parallel using Ray tasks.""" + remote_fetch_func = cached_remote_fn(fetch_func, num_cpus=0.5) + if ray_remote_args: + remote_fetch_func = remote_fetch_func.options(**ray_remote_args) + # Choose a parallelism that results in a # of metadata fetches per task that + # dominates the Ray task overhead while ensuring good parallelism. + # Always launch at least 2 parallel fetch tasks. + parallelism = max(len(uris) // desired_uris_per_task, 2) + metadata_fetch_bar = ProgressBar("Metadata Fetch Progress", total=parallelism) + fetch_tasks = [] + for uri_chunk in np.array_split(uris, parallelism): + if len(uri_chunk) == 0: + continue + fetch_tasks.append(remote_fetch_func.remote(uri_chunk)) + results = metadata_fetch_bar.fetch_until_complete(fetch_tasks) + yield from itertools.chain.from_iterable(results) diff --git a/python/ray/data/datasource/file_meta_provider.py b/python/ray/data/datasource/file_meta_provider.py index f1fc90027b0b8..68a6bc14f1170 100644 --- a/python/ray/data/datasource/file_meta_provider.py +++ b/python/ray/data/datasource/file_meta_provider.py @@ -1,18 +1,23 @@ +import itertools import logging +import pathlib +import os import re from typing import ( List, Optional, Union, - TYPE_CHECKING, + Iterator, Tuple, Any, + TYPE_CHECKING, ) if TYPE_CHECKING: import pyarrow from ray.data.block import BlockMetadata +from ray.data.datasource.partitioning import Partitioning from ray.util.annotations import DeveloperAPI logger = logging.getLogger(__name__) @@ -97,7 +102,8 @@ def expand_paths( self, paths: List[str], filesystem: Optional["pyarrow.fs.FileSystem"], - ) -> Tuple[List[str], List[Optional[int]]]: + partitioning: Optional[Partitioning] = None, + ) -> Iterator[Tuple[str, int]]: """Expands all paths into concrete file paths by walking directories. Also returns a sidecar of file sizes. @@ -112,11 +118,9 @@ def expand_paths( expanding all paths and reading their files. Returns: - A tuple whose first item contains the list of file paths discovered, - and whose second item contains the size of each file. `None` may be - returned if a file size is either unknown or will be fetched later - by `_get_block_metadata()`, but the length of both lists must be - equal. + An iterator of (file_path, file_size) pairs. None may be returned for the + file size if it is either unknown or will be fetched later by + `_get_block_metadata()`, but the length of both lists must be equal. """ raise NotImplementedError @@ -154,10 +158,8 @@ def expand_paths( self, paths: List[str], filesystem: "pyarrow.fs.FileSystem", - ) -> Tuple[List[str], List[Optional[int]]]: - from pyarrow.fs import FileType - from ray.data.datasource.file_based_datasource import _expand_directory - + partitioning: Optional[Partitioning] = None, + ) -> Iterator[Tuple[str, int]]: if len(paths) > 1: logger.warning( f"Expanding {len(paths)} path(s). This may be a HIGH LATENCY " @@ -165,24 +167,8 @@ def expand_paths( f"all point to files and never directories, try rerunning this read " f"with `meta_provider=FastFileMetadataProvider()`." ) - expanded_paths = [] - file_infos = [] - for path in paths: - try: - file_info = filesystem.get_file_info(path) - except OSError as e: - _handle_read_os_error(e, path) - if file_info.type == FileType.Directory: - paths, file_infos_ = _expand_directory(path, filesystem) - expanded_paths.extend(paths) - file_infos.extend(file_infos_) - elif file_info.type == FileType.File: - expanded_paths.append(path) - file_infos.append(file_info) - else: - raise FileNotFoundError(path) - file_sizes = [file_info.size for file_info in file_infos] - return expanded_paths, file_sizes + + yield from _expand_paths(paths, filesystem, partitioning) @DeveloperAPI @@ -201,15 +187,15 @@ def expand_paths( self, paths: List[str], filesystem: "pyarrow.fs.FileSystem", - ) -> Tuple[List[str], List[Optional[int]]]: + partitioning: Optional[Partitioning] = None, + ) -> Iterator[Tuple[str, int]]: logger.warning( f"Skipping expansion of {len(paths)} path(s). If your paths contain " f"directories or if file size collection is required, try rerunning this " f"read with `meta_provider=DefaultFileMetadataProvider()`." ) - import numpy as np - return paths, np.empty(len(paths), dtype=object) + yield from zip(paths, itertools.repeat(None, len(paths))) @DeveloperAPI @@ -322,12 +308,25 @@ def prefetch_file_metadata( ) -> Optional[List["pyarrow.parquet.FileMetaData"]]: from ray.data.datasource.parquet_datasource import ( PARALLELIZE_META_FETCH_THRESHOLD, - _fetch_metadata_remotely, + PIECES_PER_META_FETCH, + _SerializedPiece, + _fetch_metadata_serialization_wrapper, _fetch_metadata, ) + from ray.data.datasource.file_based_datasource import _fetch_metadata_parallel if len(pieces) > PARALLELIZE_META_FETCH_THRESHOLD: - return _fetch_metadata_remotely(pieces, **ray_remote_args) + # Wrap Parquet fragments in serialization workaround. + pieces = [_SerializedPiece(piece) for piece in pieces] + # Fetch Parquet metadata in parallel using Ray tasks. + return list( + _fetch_metadata_parallel( + pieces, + _fetch_metadata_serialization_wrapper, + PIECES_PER_META_FETCH, + **ray_remote_args, + ) + ) else: return _fetch_metadata(pieces) @@ -365,3 +364,161 @@ def _handle_read_os_error(error: OSError, paths: Union[str, List[str]]) -> str: ) else: raise error + + +def _expand_paths( + paths: List[str], + filesystem: "pyarrow.fs.FileSystem", + partitioning: Optional[Partitioning], +) -> Iterator[Tuple[str, int]]: + """Get the file sizes for all provided file paths.""" + from pyarrow.fs import LocalFileSystem + from ray.data.datasource.file_based_datasource import ( + FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLD, + _unwrap_protocol, + ) + + # We break down our processing paths into a few key cases: + # 1. If len(paths) < threshold, fetch the file info for the individual files/paths + # serially. + # 2. If all paths are contained under the same parent directory (or base directory, + # if using partitioning), fetch all file infos at this prefix and filter to the + # provided paths on the client; this should be a single file info request. + # 3. If more than threshold requests required, parallelize them via Ray tasks. + + # 1. Small # of paths case. + if ( + len(paths) < FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLD + # Local file systems are very fast to hit. + or isinstance(filesystem, LocalFileSystem) + ): + yield from _get_file_infos_serial(paths, filesystem) + else: + # 2. Common path prefix case. + # Get longest common path of all paths. + common_path = os.path.commonpath(paths) + # If parent directory (or base directory, if using partitioning) is common to + # all paths, fetch all file infos at that prefix and filter the response to the + # provided paths. + if ( + partitioning is not None + and common_path == _unwrap_protocol(partitioning.base_dir) + ) or all(str(pathlib.Path(path).parent) == common_path for path in paths): + yield from _get_file_infos_common_path_prefix( + paths, common_path, filesystem + ) + # 3. Parallelization case. + else: + # Parallelize requests via Ray tasks. + yield from _get_file_infos_parallel(paths, filesystem) + + +def _get_file_infos_serial( + paths: List[str], + filesystem: "pyarrow.fs.FileSystem", +) -> Iterator[Tuple[str, int]]: + for path in paths: + yield from _get_file_infos(path, filesystem) + + +def _get_file_infos_common_path_prefix( + paths: List[str], + common_path: str, + filesystem: "pyarrow.fs.FileSystem", +) -> Iterator[Tuple[str, int]]: + path_to_size = {path: None for path in paths} + for path, file_size in _get_file_infos(common_path, filesystem): + if path in path_to_size: + path_to_size[path] = file_size + # Dictionaries are insertion-ordered, so this path + size pairs should be + # yielded in the order of the original paths arg. + for path, size in path_to_size.items(): + assert size is not None + yield path, size + + +def _get_file_infos_parallel( + paths: List[str], + filesystem: "pyarrow.fs.FileSystem", +) -> Iterator[Tuple[str, int]]: + from ray.data.datasource.file_based_datasource import ( + PATHS_PER_FILE_SIZE_FETCH_TASK, + _wrap_s3_serialization_workaround, + _unwrap_s3_serialization_workaround, + _fetch_metadata_parallel, + ) + + # Capture the filesystem in the fetcher func closure, but wrap it in our + # serialization workaround to make sure that the pickle roundtrip works as expected. + filesystem = _wrap_s3_serialization_workaround(filesystem) + + def _file_infos_fetcher(paths: List[str]) -> List[Tuple[str, int]]: + fs = _unwrap_s3_serialization_workaround(filesystem) + return list( + itertools.chain.from_iterable(_get_file_infos(path, fs) for path in paths) + ) + + yield from _fetch_metadata_parallel( + paths, _file_infos_fetcher, PATHS_PER_FILE_SIZE_FETCH_TASK + ) + + +def _get_file_infos( + path: str, + filesystem: "pyarrow.fs.FileSystem", +) -> Iterator[Tuple[str, int]]: + """Get the file info for all files at or under the provided path.""" + from pyarrow.fs import FileType + + try: + file_info = filesystem.get_file_info(path) + except OSError as e: + _handle_read_os_error(e, path) + if file_info.type == FileType.Directory: + yield from _expand_directory(path, filesystem) + elif file_info.type == FileType.File: + yield path, file_info.size + else: + raise FileNotFoundError(path) + + +def _expand_directory( + path: str, + filesystem: "pyarrow.fs.FileSystem", + exclude_prefixes: Optional[List[str]] = None, +) -> Iterator[Tuple[str, int]]: + """ + Expand the provided directory path to a list of file paths. + + Args: + path: The directory path to expand. + filesystem: The filesystem implementation that should be used for + reading these files. + exclude_prefixes: The file relative path prefixes that should be + excluded from the returned file set. Default excluded prefixes are + "." and "_". + + Returns: + An iterator of (file_path, file_size) tuples. + """ + if exclude_prefixes is None: + exclude_prefixes = [".", "_"] + + from pyarrow.fs import FileSelector + + selector = FileSelector(path, recursive=True) + files = filesystem.get_file_info(selector) + base_path = selector.base_dir + out = [] + for file_ in files: + if not file_.is_file: + continue + file_path = file_.path + if not file_path.startswith(base_path): + continue + relative = file_path[len(base_path) :] + if any(relative.startswith(prefix) for prefix in exclude_prefixes): + continue + out.append((file_path, file_.size)) + # We sort the paths to guarantee a stable order. + yield from sorted(out) diff --git a/python/ray/data/datasource/parquet_datasource.py b/python/ray/data/datasource/parquet_datasource.py index e7f9849e6935c..ef2a42246163d 100644 --- a/python/ray/data/datasource/parquet_datasource.py +++ b/python/ray/data/datasource/parquet_datasource.py @@ -1,4 +1,3 @@ -import itertools import logging from typing import TYPE_CHECKING, Callable, Iterator, List, Optional, Union @@ -18,7 +17,6 @@ _handle_read_os_error, ) from ray.data.datasource.parquet_base_datasource import ParquetBaseDatasource -from ray.types import ObjectRef from ray.util.annotations import PublicAPI import ray.cloudpickle as cloudpickle @@ -410,29 +408,8 @@ def _read_pieces( yield output_buffer.next() -def _fetch_metadata_remotely( - pieces: List["pyarrow._dataset.ParquetFileFragment"], - **ray_remote_args, -) -> List[ObjectRef["pyarrow.parquet.FileMetaData"]]: - - remote_fetch_metadata = cached_remote_fn(_fetch_metadata_serialization_wrapper) - metas = [] - parallelism = min(len(pieces) // PIECES_PER_META_FETCH, 100) - meta_fetch_bar = ProgressBar("Metadata Fetch Progress", total=parallelism) - for pcs in np.array_split(pieces, parallelism): - if len(pcs) == 0: - continue - metas.append( - remote_fetch_metadata.options(**ray_remote_args).remote( - [_SerializedPiece(p) for p in pcs] - ) - ) - metas = meta_fetch_bar.fetch_until_complete(metas) - return list(itertools.chain.from_iterable(metas)) - - def _fetch_metadata_serialization_wrapper( - pieces: str, + pieces: _SerializedPiece, ) -> List["pyarrow.parquet.FileMetaData"]: pieces: List[ diff --git a/python/ray/data/tests/conftest.py b/python/ray/data/tests/conftest.py index a0928a67fe0fa..8fd97b15ce726 100644 --- a/python/ray/data/tests/conftest.py +++ b/python/ray/data/tests/conftest.py @@ -185,10 +185,12 @@ def _write_partitioned_df( partition_keys, partition_path_encoder, file_writer_fn, + file_name_suffix="_1", ): import urllib.parse df_partitions = [df for _, df in df.groupby(partition_keys, as_index=False)] + paths = [] for df_partition in df_partitions: partition_values = [] for key in partition_keys: @@ -197,12 +199,15 @@ def _write_partitioned_df( partition_path_encoder.scheme.resolved_filesystem.create_dir(path) base_dir = partition_path_encoder.scheme.base_dir parsed_base_dir = urllib.parse.urlparse(base_dir) + file_name = f"test_{file_name_suffix}.tmp" if parsed_base_dir.scheme: # replace the protocol removed by the partition path generator - path = posixpath.join(f"{parsed_base_dir.scheme}://{path}", "test.tmp") + path = posixpath.join(f"{parsed_base_dir.scheme}://{path}", file_name) else: - path = os.path.join(path, "test.tmp") + path = os.path.join(path, file_name) file_writer_fn(df_partition, path) + paths.append(path) + return paths yield _write_partitioned_df @@ -274,7 +279,7 @@ def _remove_whitespace(ds_str): ), f"{ds._plan.execute()._num_computed()} != {num_computed}" # Force a data read. - values = ds_take_transform_fn(ds.take()) + values = ds_take_transform_fn(ds.take_all()) if num_computed is not None: assert ( ds._plan.execute()._num_computed() == num_computed diff --git a/python/ray/data/tests/test_dataset_csv.py b/python/ray/data/tests/test_dataset_csv.py index 3a2362d4f15f2..4ca6fa23bbf1e 100644 --- a/python/ray/data/tests/test_dataset_csv.py +++ b/python/ray/data/tests/test_dataset_csv.py @@ -1,3 +1,4 @@ +import itertools import os import shutil from functools import partial @@ -24,6 +25,7 @@ PathPartitionFilter, ) from ray.data.datasource.file_based_datasource import ( + FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLD, FileExtensionFilter, _unwrap_protocol, ) @@ -123,7 +125,7 @@ def test_csv_read(ray_start_regular_shared, fs, data_path, endpoint_url): ds = ray.data.read_csv(path, filesystem=fs, partitioning=None) df = pd.concat([df1, df2], ignore_index=True) dsdf = ds.to_pandas() - assert df.equals(dsdf) + pd.testing.assert_frame_equal(df, dsdf) if fs is None: shutil.rmtree(path) else: @@ -259,6 +261,157 @@ def test_csv_read_meta_provider( ) +@pytest.mark.parametrize( + "fs,data_path,endpoint_url", + [ + (None, lazy_fixture("local_path"), None), + (lazy_fixture("local_fs"), lazy_fixture("local_path"), None), + (lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")), + ], +) +def test_csv_read_many_files_basic( + ray_start_regular_shared, + fs, + data_path, + endpoint_url, +): + if endpoint_url is None: + storage_options = {} + else: + storage_options = dict(client_kwargs=dict(endpoint_url=endpoint_url)) + + paths = [] + dfs = [] + num_dfs = 4 * FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLD + for i in range(num_dfs): + df = pd.DataFrame({"one": list(range(i * 3, (i + 1) * 3))}) + dfs.append(df) + path = os.path.join(data_path, f"test_{i}.csv") + paths.append(path) + df.to_csv(path, index=False, storage_options=storage_options) + ds = ray.data.read_csv(paths, filesystem=fs) + + dsdf = ds.to_pandas() + df = pd.concat(dfs).reset_index(drop=True) + pd.testing.assert_frame_equal(df, dsdf) + + +@pytest.mark.parametrize( + "fs,data_path,endpoint_url", + [ + (None, lazy_fixture("local_path"), None), + (lazy_fixture("local_fs"), lazy_fixture("local_path"), None), + (lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")), + ], +) +def test_csv_read_many_files_partitioned( + ray_start_regular_shared, + fs, + data_path, + endpoint_url, + write_partitioned_df, + assert_base_partitioned_ds, +): + if endpoint_url is None: + storage_options = {} + else: + storage_options = dict(client_kwargs=dict(endpoint_url=endpoint_url)) + + partition_keys = ["one"] + partition_path_encoder = PathPartitionEncoder.of( + base_dir=data_path, + field_names=partition_keys, + filesystem=fs, + ) + paths = [] + dfs = [] + num_dfs = FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLD + num_rows = 6 * num_dfs + num_files = 2 * num_dfs + for i in range(num_dfs): + df = pd.DataFrame( + {"one": [1, 1, 1, 3, 3, 3], "two": list(range(6 * i, 6 * (i + 1)))} + ) + df_paths = write_partitioned_df( + df, + partition_keys, + partition_path_encoder, + partial(df_to_csv, storage_options=storage_options, index=False), + file_name_suffix=i, + ) + dfs.append(df) + paths.extend(df_paths) + + ds = ray.data.read_csv( + paths, + filesystem=fs, + partitioning=partition_path_encoder.scheme, + parallelism=num_files, + ) + + assert_base_partitioned_ds( + ds, + count=num_rows, + num_input_files=num_files, + num_rows=num_rows, + schema="{one: int64, two: int64}", + num_computed=num_files, + sorted_values=sorted( + itertools.chain.from_iterable( + list( + map(list, zip([1, 1, 1, 3, 3, 3], list(range(6 * i, 6 * (i + 1))))) + ) + for i in range(num_dfs) + ) + ), + ) + + +@pytest.mark.parametrize( + "fs,data_path,endpoint_url", + [ + (None, lazy_fixture("local_path"), None), + (lazy_fixture("local_fs"), lazy_fixture("local_path"), None), + (lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")), + ], +) +def test_csv_read_many_files_diff_dirs( + ray_start_regular_shared, + fs, + data_path, + endpoint_url, +): + if endpoint_url is None: + storage_options = {} + else: + storage_options = dict(client_kwargs=dict(endpoint_url=endpoint_url)) + + dir1 = os.path.join(data_path, "dir1") + dir2 = os.path.join(data_path, "dir2") + if fs is None: + os.mkdir(dir1) + os.mkdir(dir2) + else: + fs.create_dir(_unwrap_protocol(dir1)) + fs.create_dir(_unwrap_protocol(dir2)) + + paths = [] + dfs = [] + num_dfs = 2 * FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLD + for i, dir_path in enumerate([dir1, dir2]): + for j in range(num_dfs * i, num_dfs * (i + 1)): + df = pd.DataFrame({"one": list(range(3 * j, 3 * (j + 1)))}) + dfs.append(df) + path = os.path.join(dir_path, f"test_{j}.csv") + paths.append(path) + df.to_csv(path, index=False, storage_options=storage_options) + ds = ray.data.read_csv(paths, filesystem=fs) + + dsdf = ds.to_pandas() + df = pd.concat(dfs).reset_index(drop=True) + pd.testing.assert_frame_equal(df, dsdf) + + @pytest.mark.parametrize( "fs,data_path,endpoint_url", [ diff --git a/python/ray/data/tests/test_metadata_provider.py b/python/ray/data/tests/test_metadata_provider.py index b9d0f9ef3713d..f8d7ab6c79688 100644 --- a/python/ray/data/tests/test_metadata_provider.py +++ b/python/ray/data/tests/test_metadata_provider.py @@ -1,14 +1,26 @@ +from functools import partial +import logging +import os import pytest import posixpath +from unittest.mock import patch import urllib.parse -import os -import logging import pyarrow as pa +from pyarrow.fs import LocalFileSystem import pandas as pd import pyarrow.parquet as pq from pytest_lazyfixture import lazy_fixture -from ray.data.datasource.file_based_datasource import _resolve_paths_and_filesystem +from ray.data.datasource.file_based_datasource import ( + FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLD, + _resolve_paths_and_filesystem, + _unwrap_protocol, +) +from ray.data.datasource.file_meta_provider import ( + _get_file_infos_serial, + _get_file_infos_common_path_prefix, + _get_file_infos_parallel, +) from ray.tests.conftest import * # noqa from ray.data.datasource import ( @@ -18,11 +30,16 @@ DefaultFileMetadataProvider, DefaultParquetMetadataProvider, FastFileMetadataProvider, + PathPartitionEncoder, ) from ray.data.tests.conftest import * # noqa +def df_to_csv(dataframe, path, **kwargs): + dataframe.to_csv(path, **kwargs) + + def _get_parquet_file_meta_size_bytes(file_metas): return sum( sum(m.row_group(i).total_byte_size for i in range(m.num_row_groups)) @@ -125,7 +142,9 @@ def test_default_parquet_metadata_provider(fs, data_path): ), ], ) -def test_default_file_metadata_provider(caplog, fs, data_path, endpoint_url): +def test_default_file_metadata_provider( + propagate_logs, caplog, fs, data_path, endpoint_url +): storage_options = ( {} if endpoint_url is None @@ -144,8 +163,12 @@ def test_default_file_metadata_provider(caplog, fs, data_path, endpoint_url): df2.to_csv(path2, index=False, storage_options=storage_options) meta_provider = DefaultFileMetadataProvider() - with caplog.at_level(logging.WARNING): - file_paths, file_sizes = meta_provider.expand_paths(paths, fs) + with caplog.at_level(logging.WARNING), patch( + "ray.data.datasource.file_meta_provider._get_file_infos_serial", + wraps=_get_file_infos_serial, + ) as mock_get: + file_paths, file_sizes = map(list, zip(*meta_provider.expand_paths(paths, fs))) + mock_get.assert_called_once_with(paths, fs) assert "meta_provider=FastFileMetadataProvider()" in caplog.text assert file_paths == paths expected_file_sizes = _get_file_sizes_bytes(paths, fs) @@ -164,6 +187,195 @@ def test_default_file_metadata_provider(caplog, fs, data_path, endpoint_url): assert meta.schema is None +@pytest.mark.parametrize( + "fs,data_path,endpoint_url", + [ + (lazy_fixture("local_fs"), lazy_fixture("local_path"), None), + (lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")), + ], +) +def test_default_file_metadata_provider_many_files_basic( + propagate_logs, + caplog, + fs, + data_path, + endpoint_url, +): + if endpoint_url is None: + storage_options = {} + else: + storage_options = dict(client_kwargs=dict(endpoint_url=endpoint_url)) + + paths = [] + dfs = [] + num_dfs = 4 * FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLD + for i in range(num_dfs): + df = pd.DataFrame({"one": list(range(i * 3, (i + 1) * 3))}) + dfs.append(df) + path = os.path.join(data_path, f"test_{i}.csv") + paths.append(path) + df.to_csv(path, index=False, storage_options=storage_options) + paths, fs = _resolve_paths_and_filesystem(paths, fs) + + meta_provider = DefaultFileMetadataProvider() + if isinstance(fs, LocalFileSystem): + patcher = patch( + "ray.data.datasource.file_meta_provider._get_file_infos_serial", + wraps=_get_file_infos_serial, + ) + else: + patcher = patch( + "ray.data.datasource.file_meta_provider._get_file_infos_common_path_prefix", + wraps=_get_file_infos_common_path_prefix, + ) + with caplog.at_level(logging.WARNING), patcher as mock_get: + file_paths, file_sizes = map(list, zip(*meta_provider.expand_paths(paths, fs))) + if isinstance(fs, LocalFileSystem): + mock_get.assert_called_once_with(paths, fs) + else: + mock_get.assert_called_once_with(paths, _unwrap_protocol(data_path), fs) + assert "meta_provider=FastFileMetadataProvider()" in caplog.text + assert file_paths == paths + expected_file_sizes = _get_file_sizes_bytes(paths, fs) + assert file_sizes == expected_file_sizes + + +@pytest.mark.parametrize( + "fs,data_path,endpoint_url", + [ + (lazy_fixture("local_fs"), lazy_fixture("local_path"), None), + (lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")), + ], +) +def test_default_file_metadata_provider_many_files_partitioned( + propagate_logs, + caplog, + fs, + data_path, + endpoint_url, + write_partitioned_df, + assert_base_partitioned_ds, +): + if endpoint_url is None: + storage_options = {} + else: + storage_options = dict(client_kwargs=dict(endpoint_url=endpoint_url)) + + partition_keys = ["one"] + partition_path_encoder = PathPartitionEncoder.of( + base_dir=data_path, + field_names=partition_keys, + filesystem=fs, + ) + paths = [] + dfs = [] + num_dfs = FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLD + for i in range(num_dfs): + df = pd.DataFrame( + {"one": [1, 1, 1, 3, 3, 3], "two": list(range(6 * i, 6 * (i + 1)))} + ) + df_paths = write_partitioned_df( + df, + partition_keys, + partition_path_encoder, + partial(df_to_csv, storage_options=storage_options, index=False), + file_name_suffix=i, + ) + dfs.append(df) + paths.extend(df_paths) + paths, fs = _resolve_paths_and_filesystem(paths, fs) + partitioning = partition_path_encoder.scheme + + meta_provider = DefaultFileMetadataProvider() + if isinstance(fs, LocalFileSystem): + patcher = patch( + "ray.data.datasource.file_meta_provider._get_file_infos_serial", + wraps=_get_file_infos_serial, + ) + else: + patcher = patch( + "ray.data.datasource.file_meta_provider._get_file_infos_common_path_prefix", + wraps=_get_file_infos_common_path_prefix, + ) + with caplog.at_level(logging.WARNING), patcher as mock_get: + file_paths, file_sizes = map( + list, zip(*meta_provider.expand_paths(paths, fs, partitioning)) + ) + if isinstance(fs, LocalFileSystem): + mock_get.assert_called_once_with(paths, fs) + else: + mock_get.assert_called_once_with( + paths, + _unwrap_protocol(partitioning.base_dir), + fs, + ) + assert "meta_provider=FastFileMetadataProvider()" in caplog.text + assert file_paths == paths + expected_file_sizes = _get_file_sizes_bytes(paths, fs) + assert file_sizes == expected_file_sizes + + +@pytest.mark.parametrize( + "fs,data_path,endpoint_url", + [ + (lazy_fixture("local_fs"), lazy_fixture("local_path"), None), + (lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")), + ], +) +def test_default_file_metadata_provider_many_files_diff_dirs( + ray_start_regular, + propagate_logs, + caplog, + fs, + data_path, + endpoint_url, +): + if endpoint_url is None: + storage_options = {} + else: + storage_options = dict(client_kwargs=dict(endpoint_url=endpoint_url)) + + dir1 = os.path.join(data_path, "dir1") + dir2 = os.path.join(data_path, "dir2") + if fs is None: + os.mkdir(dir1) + os.mkdir(dir2) + else: + fs.create_dir(_unwrap_protocol(dir1)) + fs.create_dir(_unwrap_protocol(dir2)) + + paths = [] + dfs = [] + num_dfs = 2 * FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLD + for i, dir_path in enumerate([dir1, dir2]): + for j in range(num_dfs * i, num_dfs * (i + 1)): + df = pd.DataFrame({"one": list(range(3 * j, 3 * (j + 1)))}) + dfs.append(df) + path = os.path.join(dir_path, f"test_{j}.csv") + paths.append(path) + df.to_csv(path, index=False, storage_options=storage_options) + paths, fs = _resolve_paths_and_filesystem(paths, fs) + + meta_provider = DefaultFileMetadataProvider() + if isinstance(fs, LocalFileSystem): + patcher = patch( + "ray.data.datasource.file_meta_provider._get_file_infos_serial", + wraps=_get_file_infos_serial, + ) + else: + patcher = patch( + "ray.data.datasource.file_meta_provider._get_file_infos_parallel", + wraps=_get_file_infos_parallel, + ) + with caplog.at_level(logging.WARNING), patcher as mock_get: + file_paths, file_sizes = map(list, zip(*meta_provider.expand_paths(paths, fs))) + mock_get.assert_called_once_with(paths, fs) + assert "meta_provider=FastFileMetadataProvider()" in caplog.text + assert file_paths == paths + expected_file_sizes = _get_file_sizes_bytes(paths, fs) + assert file_sizes == expected_file_sizes + + @pytest.mark.parametrize( "fs,data_path,endpoint_url", [ @@ -182,7 +394,9 @@ def test_default_file_metadata_provider(caplog, fs, data_path, endpoint_url): ), ], ) -def test_fast_file_metadata_provider(caplog, fs, data_path, endpoint_url): +def test_fast_file_metadata_provider( + propagate_logs, caplog, fs, data_path, endpoint_url +): storage_options = ( {} if endpoint_url is None @@ -202,7 +416,7 @@ def test_fast_file_metadata_provider(caplog, fs, data_path, endpoint_url): meta_provider = FastFileMetadataProvider() with caplog.at_level(logging.WARNING): - file_paths, file_sizes = meta_provider.expand_paths(paths, fs) + file_paths, file_sizes = map(list, zip(*meta_provider.expand_paths(paths, fs))) assert "meta_provider=DefaultFileMetadataProvider()" in caplog.text assert file_paths == paths assert len(file_sizes) == len(file_paths)