Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] move developerapi datasource classes to internal #46328

Merged
merged 3 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
RATE_LIMIT_EXCEEDED_SLEEP_TIME = 11


class _BigQueryDatasink(Datasink):
class BigQueryDatasink(Datasink):
def __init__(
self,
project_id: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ray.data.datasource.file_datasink import BlockBasedFileDatasink


class _CSVDatasink(BlockBasedFileDatasink):
class CSVDatasink(BlockBasedFileDatasink):
def __init__(
self,
path: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ray.data.datasource.file_datasink import RowBasedFileDatasink


class _ImageDatasink(RowBasedFileDatasink):
class ImageDatasink(RowBasedFileDatasink):
def __init__(
self, path: str, column: str, file_format: str, **file_datasink_kwargs
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from ray.data.block import Block, BlockMetadata
from ray.data.datasource.file_based_datasource import FileBasedDatasource
from ray.data.datasource.file_meta_provider import DefaultFileMetadataProvider
from ray.util.annotations import DeveloperAPI

if TYPE_CHECKING:
import pyarrow
Expand All @@ -27,7 +26,6 @@
IMAGE_ENCODING_RATIO_ESTIMATE_LOWER_BOUND = 0.5


@DeveloperAPI
class ImageDatasource(FileBasedDatasource):
"""A datasource that lets you read images."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ray.data.datasource.file_datasink import BlockBasedFileDatasink


class _JSONDatasink(BlockBasedFileDatasink):
class JSONDatasink(BlockBasedFileDatasink):
def __init__(
self,
path: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from ray.data._internal.util import _check_import
from ray.data.block import BlockMetadata
from ray.data.datasource.datasource import Datasource, ReadTask
from ray.util.annotations import DeveloperAPI

if TYPE_CHECKING:
import pyarrow
Expand All @@ -15,7 +14,6 @@
logger = logging.getLogger(__name__)


@DeveloperAPI
class LanceDatasource(Datasource):
"""Lance datasource, for reading Lance dataset."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
logger = logging.getLogger(__name__)


class _MongoDatasink(Datasink):
class MongoDatasink(Datasink):
def __init__(self, uri: str, database: str, collection: str) -> None:
_check_import(self, module="pymongo", package="pymongo")
_check_import(self, module="pymongoarrow", package="pymongoarrow")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from ray.data.datasource.file_datasink import BlockBasedFileDatasink


class _NumpyDatasink(BlockBasedFileDatasink):
class NumpyDatasink(BlockBasedFileDatasink):
def __init__(
self,
path: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
logger = logging.getLogger(__name__)


class _ParquetDatasink(_FileDatasink):
class ParquetDatasink(_FileDatasink):
def __init__(
self,
path: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ray.data.datasource.datasink import Datasink


class _SQLDatasink(Datasink):
class SQLDatasink(Datasink):

_MAX_ROWS_PER_WRITE = 128

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from tensorflow_metadata.proto.v0 import schema_pb2


class _TFRecordDatasink(BlockBasedFileDatasink):
class TFRecordDatasink(BlockBasedFileDatasink):
def __init__(
self,
path: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data.block import BlockMetadata
from ray.data.datasource.datasource import Datasource, ReadTask
from ray.util.annotations import DeveloperAPI

if TYPE_CHECKING:
import torch
Expand All @@ -12,7 +11,6 @@
TORCH_DATASOURCE_READER_BATCH_SIZE = 32


@DeveloperAPI
class TorchDatasource(Datasource):
"""Torch datasource, for reading from `Torch
datasets <https://pytorch.org/docs/stable/data.html/>`_.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ray.data.datasource.file_datasink import BlockBasedFileDatasink


class _WebDatasetDatasink(BlockBasedFileDatasink):
class WebDatasetDatasink(BlockBasedFileDatasink):
def __init__(
self,
path: str,
Expand Down
46 changes: 21 additions & 25 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
from ray._private.usage import usage_lib
from ray.air.util.tensor_extensions.utils import _create_possibly_ragged_ndarray
from ray.data._internal.compute import ComputeStrategy
from ray.data._internal.datasource.bigquery_datasink import BigQueryDatasink
from ray.data._internal.datasource.csv_datasink import CSVDatasink
from ray.data._internal.datasource.image_datasink import ImageDatasink
from ray.data._internal.datasource.json_datasink import JSONDatasink
from ray.data._internal.datasource.mongo_datasink import MongoDatasink
from ray.data._internal.datasource.numpy_datasink import NumpyDatasink
from ray.data._internal.datasource.parquet_datasink import ParquetDatasink
from ray.data._internal.datasource.sql_datasink import SQLDatasink
from ray.data._internal.datasource.tfrecords_datasink import TFRecordDatasink
from ray.data._internal.datasource.webdataset_datasink import WebDatasetDatasink
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.equalize import _equalize
from ray.data._internal.execution.interfaces import RefBundle
Expand Down Expand Up @@ -74,21 +84,7 @@
_apply_batch_size,
)
from ray.data.context import DataContext
from ray.data.datasource import (
Connection,
Datasink,
FilenameProvider,
_BigQueryDatasink,
_CSVDatasink,
_ImageDatasink,
_JSONDatasink,
_MongoDatasink,
_NumpyDatasink,
_ParquetDatasink,
_SQLDatasink,
_TFRecordDatasink,
_WebDatasetDatasink,
)
from ray.data.datasource import Connection, Datasink, FilenameProvider
from ray.data.iterator import DataIterator
from ray.data.random_access_dataset import RandomAccessDataset
from ray.types import ObjectRef
Expand Down Expand Up @@ -2706,7 +2702,7 @@ def write_parquet(
#pyarrow.parquet.write_table>`_, which is used to write out each
block to a file.
""" # noqa: E501
datasink = _ParquetDatasink(
datasink = ParquetDatasink(
path,
arrow_parquet_args_fn=arrow_parquet_args_fn,
arrow_parquet_args=arrow_parquet_args,
Expand Down Expand Up @@ -2814,7 +2810,7 @@ def write_json(
:class:`~ray.data.Dataset` block. These
are dict(orient="records", lines=True) by default.
"""
datasink = _JSONDatasink(
datasink = JSONDatasink(
path,
pandas_json_args_fn=pandas_json_args_fn,
pandas_json_args=pandas_json_args,
Expand Down Expand Up @@ -2887,7 +2883,7 @@ def write_images(
total number of tasks run. By default, concurrency is dynamically
decided based on the available resources.
""" # noqa: E501
datasink = _ImageDatasink(
datasink = ImageDatasink(
path,
column,
file_format,
Expand Down Expand Up @@ -2992,7 +2988,7 @@ def write_csv(
#pyarrow.csv.write_csv>`_
when writing each block to a file.
"""
datasink = _CSVDatasink(
datasink = CSVDatasink(
path,
arrow_csv_args_fn=arrow_csv_args_fn,
arrow_csv_args=arrow_csv_args,
Expand Down Expand Up @@ -3088,7 +3084,7 @@ def write_tfrecords(
decided based on the available resources.

"""
datasink = _TFRecordDatasink(
datasink = TFRecordDatasink(
path=path,
tf_schema=tf_schema,
num_rows_per_file=num_rows_per_file,
Expand Down Expand Up @@ -3173,7 +3169,7 @@ def write_webdataset(
decided based on the available resources.

"""
datasink = _WebDatasetDatasink(
datasink = WebDatasetDatasink(
path,
encoder=encoder,
num_rows_per_file=num_rows_per_file,
Expand Down Expand Up @@ -3260,7 +3256,7 @@ def write_numpy(
decided based on the available resources.
"""

datasink = _NumpyDatasink(
datasink = NumpyDatasink(
path,
column,
num_rows_per_file=num_rows_per_file,
Expand Down Expand Up @@ -3337,7 +3333,7 @@ def write_sql(
total number of tasks run. By default, concurrency is dynamically
decided based on the available resources.
""" # noqa: E501
datasink = _SQLDatasink(sql=sql, connection_factory=connection_factory)
datasink = SQLDatasink(sql=sql, connection_factory=connection_factory)
self.write_datasink(
datasink,
ray_remote_args=ray_remote_args,
Expand Down Expand Up @@ -3408,7 +3404,7 @@ def write_mongo(
ValueError: if ``database`` doesn't exist.
ValueError: if ``collection`` doesn't exist.
"""
datasink = _MongoDatasink(
datasink = MongoDatasink(
uri=uri,
database=database,
collection=collection,
Expand Down Expand Up @@ -3481,7 +3477,7 @@ def write_bigquery(
else:
ray_remote_args["max_retries"] = 0

datasink = _BigQueryDatasink(
datasink = BigQueryDatasink(
project_id=project_id,
dataset=dataset,
max_retry_cnt=max_retry_cnt,
Expand Down
26 changes: 0 additions & 26 deletions python/ray/data/datasource/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from ray.data._internal.datasource.sql_datasource import Connection
from ray.data.datasource.bigquery_datasink import _BigQueryDatasink
from ray.data.datasource.csv_datasink import _CSVDatasink
from ray.data.datasource.datasink import Datasink, DummyOutputDatasink
from ray.data.datasource.datasource import (
Datasource,
Expand All @@ -23,63 +21,39 @@
FileMetadataProvider,
)
from ray.data.datasource.filename_provider import FilenameProvider
from ray.data.datasource.image_datasink import _ImageDatasink
from ray.data.datasource.image_datasource import ImageDatasource
from ray.data.datasource.json_datasink import _JSONDatasink
from ray.data.datasource.lance_datasource import LanceDatasource
from ray.data.datasource.mongo_datasink import _MongoDatasink
from ray.data.datasource.numpy_datasink import _NumpyDatasink
from ray.data.datasource.parquet_datasink import _ParquetDatasink
from ray.data.datasource.parquet_meta_provider import ParquetMetadataProvider
from ray.data.datasource.partitioning import (
Partitioning,
PartitionStyle,
PathPartitionFilter,
PathPartitionParser,
)
from ray.data.datasource.sql_datasink import _SQLDatasink
from ray.data.datasource.tfrecords_datasink import _TFRecordDatasink
from ray.data.datasource.tfrecords_datasource import TFRecordDatasource
from ray.data.datasource.torch_datasource import TorchDatasource
from ray.data.datasource.webdataset_datasink import _WebDatasetDatasink

# Note: HuggingFaceDatasource should NOT be imported here, because
# we want to only import the Hugging Face datasets library when we use
# ray.data.from_huggingface() or HuggingFaceDatasource() directly.
__all__ = [
"BaseFileMetadataProvider",
"_BigQueryDatasink",
"BlockBasedFileDatasink",
"Connection",
"_CSVDatasink",
"Datasink",
"Datasource",
"_SQLDatasink",
"DefaultFileMetadataProvider",
"DummyOutputDatasink",
"FastFileMetadataProvider",
"FileBasedDatasource",
"FileMetadataProvider",
"FilenameProvider",
"_ImageDatasink",
"ImageDatasource",
"_JSONDatasink",
"LanceDatasource",
"_NumpyDatasink",
"_ParquetDatasink",
"ParquetMetadataProvider",
"PartitionStyle",
"PathPartitionFilter",
"PathPartitionParser",
"Partitioning",
"RandomIntRowDatasource",
"_MongoDatasink",
"ReadTask",
"Reader",
"RowBasedFileDatasink",
"_TFRecordDatasink",
"TFRecordDatasource",
"TorchDatasource",
"_WebDatasetDatasink",
"_S3FileSystemWrapper",
]
2 changes: 1 addition & 1 deletion python/ray/data/datasource/_default_metadata_providers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import List, Optional

from ray.data._internal.datasource.image_datasource import _ImageFileMetadataProvider
from ray.data.datasource.file_meta_provider import (
DefaultFileMetadataProvider,
FastFileMetadataProvider,
)
from ray.data.datasource.image_datasource import _ImageFileMetadataProvider
from ray.data.datasource.parquet_meta_provider import ParquetMetadataProvider


Expand Down
6 changes: 3 additions & 3 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
from ray.data._internal.datasource.bigquery_datasource import BigQueryDatasource
from ray.data._internal.datasource.binary_datasource import BinaryDatasource
from ray.data._internal.datasource.csv_datasource import CSVDatasource
from ray.data._internal.datasource.image_datasource import ImageDatasource
from ray.data._internal.datasource.json_datasource import JSONDatasource
from ray.data._internal.datasource.lance_datasource import LanceDatasource
from ray.data._internal.datasource.mongo_datasource import MongoDatasource
from ray.data._internal.datasource.numpy_datasource import NumpyDatasource
from ray.data._internal.datasource.parquet_bulk_datasource import ParquetBulkDatasource
from ray.data._internal.datasource.parquet_datasource import ParquetDatasource
from ray.data._internal.datasource.range_datasource import RangeDatasource
from ray.data._internal.datasource.sql_datasource import SQLDatasource
from ray.data._internal.datasource.text_datasource import TextDatasource
from ray.data._internal.datasource.torch_datasource import TorchDatasource
from ray.data._internal.datasource.webdataset_datasource import WebDatasetDatasource
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.logical.operators.from_operators import (
Expand Down Expand Up @@ -62,12 +65,9 @@
BaseFileMetadataProvider,
Connection,
Datasource,
ImageDatasource,
LanceDatasource,
ParquetMetadataProvider,
PathPartitionFilter,
TFRecordDatasource,
TorchDatasource,
)
from ray.data.datasource._default_metadata_providers import (
get_generic_metadata_provider,
Expand Down
Loading
Loading