diff --git a/python/ray/data/datasource/bigquery_datasink.py b/python/ray/data/_internal/datasource/bigquery_datasink.py similarity index 99% rename from python/ray/data/datasource/bigquery_datasink.py rename to python/ray/data/_internal/datasource/bigquery_datasink.py index 33550f0791cb..7491540a5d73 100644 --- a/python/ray/data/datasource/bigquery_datasink.py +++ b/python/ray/data/_internal/datasource/bigquery_datasink.py @@ -20,7 +20,7 @@ RATE_LIMIT_EXCEEDED_SLEEP_TIME = 11 -class _BigQueryDatasink(Datasink): +class BigQueryDatasink(Datasink): def __init__( self, project_id: str, diff --git a/python/ray/data/datasource/csv_datasink.py b/python/ray/data/_internal/datasource/csv_datasink.py similarity index 96% rename from python/ray/data/datasource/csv_datasink.py rename to python/ray/data/_internal/datasource/csv_datasink.py index 91b374bd3836..5a84e42b3916 100644 --- a/python/ray/data/datasource/csv_datasink.py +++ b/python/ray/data/_internal/datasource/csv_datasink.py @@ -7,7 +7,7 @@ from ray.data.datasource.file_datasink import BlockBasedFileDatasink -class _CSVDatasink(BlockBasedFileDatasink): +class CSVDatasink(BlockBasedFileDatasink): def __init__( self, path: str, diff --git a/python/ray/data/datasource/image_datasink.py b/python/ray/data/_internal/datasource/image_datasink.py similarity index 93% rename from python/ray/data/datasource/image_datasink.py rename to python/ray/data/_internal/datasource/image_datasink.py index d9ec1ef63c86..ac561fbaaa9f 100644 --- a/python/ray/data/datasource/image_datasink.py +++ b/python/ray/data/_internal/datasource/image_datasink.py @@ -6,7 +6,7 @@ from ray.data.datasource.file_datasink import RowBasedFileDatasink -class _ImageDatasink(RowBasedFileDatasink): +class ImageDatasink(RowBasedFileDatasink): def __init__( self, path: str, column: str, file_format: str, **file_datasink_kwargs ): diff --git a/python/ray/data/datasource/json_datasink.py b/python/ray/data/_internal/datasource/json_datasink.py similarity index 96% rename from python/ray/data/datasource/json_datasink.py rename to python/ray/data/_internal/datasource/json_datasink.py index e8f0ca91c97d..09eb7f7108a4 100644 --- a/python/ray/data/datasource/json_datasink.py +++ b/python/ray/data/_internal/datasource/json_datasink.py @@ -7,7 +7,7 @@ from ray.data.datasource.file_datasink import BlockBasedFileDatasink -class _JSONDatasink(BlockBasedFileDatasink): +class JSONDatasink(BlockBasedFileDatasink): def __init__( self, path: str, diff --git a/python/ray/data/datasource/mongo_datasink.py b/python/ray/data/_internal/datasource/mongo_datasink.py similarity index 98% rename from python/ray/data/datasource/mongo_datasink.py rename to python/ray/data/_internal/datasource/mongo_datasink.py index 0f26167c8742..5f731134f808 100644 --- a/python/ray/data/datasource/mongo_datasink.py +++ b/python/ray/data/_internal/datasource/mongo_datasink.py @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) -class _MongoDatasink(Datasink): +class MongoDatasink(Datasink): def __init__(self, uri: str, database: str, collection: str) -> None: _check_import(self, module="pymongo", package="pymongo") _check_import(self, module="pymongoarrow", package="pymongoarrow") diff --git a/python/ray/data/datasource/numpy_datasink.py b/python/ray/data/_internal/datasource/numpy_datasink.py similarity index 92% rename from python/ray/data/datasource/numpy_datasink.py rename to python/ray/data/_internal/datasource/numpy_datasink.py index 0d232f66bd46..5fc6f3d4df43 100644 --- a/python/ray/data/datasource/numpy_datasink.py +++ b/python/ray/data/_internal/datasource/numpy_datasink.py @@ -5,7 +5,7 @@ from ray.data.datasource.file_datasink import BlockBasedFileDatasink -class _NumpyDatasink(BlockBasedFileDatasink): +class NumpyDatasink(BlockBasedFileDatasink): def __init__( self, path: str, diff --git a/python/ray/data/datasource/parquet_datasink.py b/python/ray/data/_internal/datasource/parquet_datasink.py similarity index 98% rename from python/ray/data/datasource/parquet_datasink.py rename to python/ray/data/_internal/datasource/parquet_datasink.py index 8d36d8a72ece..dd83ad9e74e6 100644 --- a/python/ray/data/datasource/parquet_datasink.py +++ b/python/ray/data/_internal/datasource/parquet_datasink.py @@ -18,7 +18,7 @@ logger = logging.getLogger(__name__) -class _ParquetDatasink(_FileDatasink): +class ParquetDatasink(_FileDatasink): def __init__( self, path: str, diff --git a/python/ray/data/datasource/sql_datasink.py b/python/ray/data/_internal/datasource/sql_datasink.py similarity index 97% rename from python/ray/data/datasource/sql_datasink.py rename to python/ray/data/_internal/datasource/sql_datasink.py index 887f578d992d..5efd6edb7927 100644 --- a/python/ray/data/datasource/sql_datasink.py +++ b/python/ray/data/_internal/datasource/sql_datasink.py @@ -6,7 +6,7 @@ from ray.data.datasource.datasink import Datasink -class _SQLDatasink(Datasink): +class SQLDatasink(Datasink): _MAX_ROWS_PER_WRITE = 128 diff --git a/python/ray/data/datasource/tfrecords_datasink.py b/python/ray/data/_internal/datasource/tfrecords_datasink.py similarity index 96% rename from python/ray/data/datasource/tfrecords_datasink.py rename to python/ray/data/_internal/datasource/tfrecords_datasink.py index 519a88fbea92..8bec333a5948 100644 --- a/python/ray/data/datasource/tfrecords_datasink.py +++ b/python/ray/data/_internal/datasource/tfrecords_datasink.py @@ -13,7 +13,7 @@ from tensorflow_metadata.proto.v0 import schema_pb2 -class _TFRecordDatasink(BlockBasedFileDatasink): +class TFRecordDatasink(BlockBasedFileDatasink): def __init__( self, path: str, diff --git a/python/ray/data/datasource/webdataset_datasink.py b/python/ray/data/_internal/datasource/webdataset_datasink.py similarity index 97% rename from python/ray/data/datasource/webdataset_datasink.py rename to python/ray/data/_internal/datasource/webdataset_datasink.py index b0d96baa6d48..57e44600c63c 100644 --- a/python/ray/data/datasource/webdataset_datasink.py +++ b/python/ray/data/_internal/datasource/webdataset_datasink.py @@ -15,7 +15,7 @@ from ray.data.datasource.file_datasink import BlockBasedFileDatasink -class _WebDatasetDatasink(BlockBasedFileDatasink): +class WebDatasetDatasink(BlockBasedFileDatasink): def __init__( self, path: str, diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 14405f6bf836..05f1e343d96a 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -29,6 +29,16 @@ from ray._private.usage import usage_lib from ray.air.util.tensor_extensions.utils import _create_possibly_ragged_ndarray from ray.data._internal.compute import ComputeStrategy +from ray.data._internal.datasource.bigquery_datasink import BigQueryDatasink +from ray.data._internal.datasource.csv_datasink import CSVDatasink +from ray.data._internal.datasource.image_datasink import ImageDatasink +from ray.data._internal.datasource.json_datasink import JSONDatasink +from ray.data._internal.datasource.mongo_datasink import MongoDatasink +from ray.data._internal.datasource.numpy_datasink import NumpyDatasink +from ray.data._internal.datasource.parquet_datasink import ParquetDatasink +from ray.data._internal.datasource.sql_datasink import SQLDatasink +from ray.data._internal.datasource.tfrecords_datasink import TFRecordDatasink +from ray.data._internal.datasource.webdataset_datasink import WebDatasetDatasink from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder from ray.data._internal.equalize import _equalize from ray.data._internal.execution.interfaces import RefBundle @@ -74,21 +84,7 @@ _apply_batch_size, ) from ray.data.context import DataContext -from ray.data.datasource import ( - Connection, - Datasink, - FilenameProvider, - _BigQueryDatasink, - _CSVDatasink, - _ImageDatasink, - _JSONDatasink, - _MongoDatasink, - _NumpyDatasink, - _ParquetDatasink, - _SQLDatasink, - _TFRecordDatasink, - _WebDatasetDatasink, -) +from ray.data.datasource import Connection, Datasink, FilenameProvider from ray.data.iterator import DataIterator from ray.data.random_access_dataset import RandomAccessDataset from ray.types import ObjectRef @@ -2706,7 +2702,7 @@ def write_parquet( #pyarrow.parquet.write_table>`_, which is used to write out each block to a file. """ # noqa: E501 - datasink = _ParquetDatasink( + datasink = ParquetDatasink( path, arrow_parquet_args_fn=arrow_parquet_args_fn, arrow_parquet_args=arrow_parquet_args, @@ -2814,7 +2810,7 @@ def write_json( :class:`~ray.data.Dataset` block. These are dict(orient="records", lines=True) by default. """ - datasink = _JSONDatasink( + datasink = JSONDatasink( path, pandas_json_args_fn=pandas_json_args_fn, pandas_json_args=pandas_json_args, @@ -2887,7 +2883,7 @@ def write_images( total number of tasks run. By default, concurrency is dynamically decided based on the available resources. """ # noqa: E501 - datasink = _ImageDatasink( + datasink = ImageDatasink( path, column, file_format, @@ -2992,7 +2988,7 @@ def write_csv( #pyarrow.csv.write_csv>`_ when writing each block to a file. """ - datasink = _CSVDatasink( + datasink = CSVDatasink( path, arrow_csv_args_fn=arrow_csv_args_fn, arrow_csv_args=arrow_csv_args, @@ -3088,7 +3084,7 @@ def write_tfrecords( decided based on the available resources. """ - datasink = _TFRecordDatasink( + datasink = TFRecordDatasink( path=path, tf_schema=tf_schema, num_rows_per_file=num_rows_per_file, @@ -3173,7 +3169,7 @@ def write_webdataset( decided based on the available resources. """ - datasink = _WebDatasetDatasink( + datasink = WebDatasetDatasink( path, encoder=encoder, num_rows_per_file=num_rows_per_file, @@ -3260,7 +3256,7 @@ def write_numpy( decided based on the available resources. """ - datasink = _NumpyDatasink( + datasink = NumpyDatasink( path, column, num_rows_per_file=num_rows_per_file, @@ -3337,7 +3333,7 @@ def write_sql( total number of tasks run. By default, concurrency is dynamically decided based on the available resources. """ # noqa: E501 - datasink = _SQLDatasink(sql=sql, connection_factory=connection_factory) + datasink = SQLDatasink(sql=sql, connection_factory=connection_factory) self.write_datasink( datasink, ray_remote_args=ray_remote_args, @@ -3408,7 +3404,7 @@ def write_mongo( ValueError: if ``database`` doesn't exist. ValueError: if ``collection`` doesn't exist. """ - datasink = _MongoDatasink( + datasink = MongoDatasink( uri=uri, database=database, collection=collection, @@ -3481,7 +3477,7 @@ def write_bigquery( else: ray_remote_args["max_retries"] = 0 - datasink = _BigQueryDatasink( + datasink = BigQueryDatasink( project_id=project_id, dataset=dataset, max_retry_cnt=max_retry_cnt, diff --git a/python/ray/data/datasource/__init__.py b/python/ray/data/datasource/__init__.py index a19f508305be..f2ff0ca2ddd0 100644 --- a/python/ray/data/datasource/__init__.py +++ b/python/ray/data/datasource/__init__.py @@ -1,6 +1,4 @@ from ray.data._internal.datasource.sql_datasource import Connection -from ray.data.datasource.bigquery_datasink import _BigQueryDatasink -from ray.data.datasource.csv_datasink import _CSVDatasink from ray.data.datasource.datasink import Datasink, DummyOutputDatasink from ray.data.datasource.datasource import ( Datasource, @@ -23,11 +21,6 @@ FileMetadataProvider, ) from ray.data.datasource.filename_provider import FilenameProvider -from ray.data.datasource.image_datasink import _ImageDatasink -from ray.data.datasource.json_datasink import _JSONDatasink -from ray.data.datasource.mongo_datasink import _MongoDatasink -from ray.data.datasource.numpy_datasink import _NumpyDatasink -from ray.data.datasource.parquet_datasink import _ParquetDatasink from ray.data.datasource.parquet_meta_provider import ParquetMetadataProvider from ray.data.datasource.partitioning import ( Partitioning, @@ -35,45 +28,32 @@ PathPartitionFilter, PathPartitionParser, ) -from ray.data.datasource.sql_datasink import _SQLDatasink -from ray.data.datasource.tfrecords_datasink import _TFRecordDatasink from ray.data.datasource.tfrecords_datasource import TFRecordDatasource -from ray.data.datasource.webdataset_datasink import _WebDatasetDatasink # Note: HuggingFaceDatasource should NOT be imported here, because # we want to only import the Hugging Face datasets library when we use # ray.data.from_huggingface() or HuggingFaceDatasource() directly. __all__ = [ "BaseFileMetadataProvider", - "_BigQueryDatasink", "BlockBasedFileDatasink", "Connection", - "_CSVDatasink", "Datasink", "Datasource", - "_SQLDatasink", "DefaultFileMetadataProvider", "DummyOutputDatasink", "FastFileMetadataProvider", "FileBasedDatasource", "FileMetadataProvider", "FilenameProvider", - "_ImageDatasink", - "_JSONDatasink", - "_NumpyDatasink", - "_ParquetDatasink", "ParquetMetadataProvider", "PartitionStyle", "PathPartitionFilter", "PathPartitionParser", "Partitioning", "RandomIntRowDatasource", - "_MongoDatasink", "ReadTask", "Reader", "RowBasedFileDatasink", - "_TFRecordDatasink", "TFRecordDatasource", - "_WebDatasetDatasink", "_S3FileSystemWrapper", ] diff --git a/python/ray/data/tests/test_bigquery.py b/python/ray/data/tests/test_bigquery.py index adeb6b0af00a..325de3eaa586 100644 --- a/python/ray/data/tests/test_bigquery.py +++ b/python/ray/data/tests/test_bigquery.py @@ -8,8 +8,8 @@ from google.cloud.bigquery_storage_v1.types import stream as gcbqs_stream import ray +from ray.data._internal.datasource.bigquery_datasink import BigQueryDatasink from ray.data._internal.datasource.bigquery_datasource import BigQueryDatasource -from ray.data.datasource import _BigQueryDatasink from ray.data.tests.conftest import * # noqa from ray.data.tests.mock_http_server import * # noqa from ray.tests.conftest import * # noqa @@ -198,7 +198,7 @@ class TestWriteBigQuery: """Tests for BigQuery Write.""" def test_write(self, ray_get_mock): - bq_datasink = _BigQueryDatasink( + bq_datasink = BigQueryDatasink( project_id=_TEST_GCP_PROJECT_ID, dataset=_TEST_BQ_DATASET, ) @@ -211,7 +211,7 @@ def test_write(self, ray_get_mock): assert status == "ok" def test_write_dataset_exists(self, ray_get_mock): - bq_datasink = _BigQueryDatasink( + bq_datasink = BigQueryDatasink( project_id=_TEST_GCP_PROJECT_ID, dataset="existingdataset" + "." + _TEST_BQ_TABLE_ID, ) diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index e090c5473ff1..cc806d905263 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -13,13 +13,13 @@ import ray from ray.data._internal.block_builder import BlockBuilder +from ray.data._internal.datasource.csv_datasink import CSVDatasink from ray.data._internal.datasource.csv_datasource import CSVDatasource from ray.data._internal.datasource.range_datasource import RangeDatasource from ray.data._internal.util import _check_pyarrow_version from ray.data.block import BlockAccessor, BlockMetadata from ray.data.context import DataContext from ray.data.dataset import Dataset, MaterializedDataset -from ray.data.datasource.csv_datasink import _CSVDatasink from ray.data.datasource.datasource import Datasource, ReadTask from ray.data.tests.conftest import * # noqa from ray.data.tests.conftest import ( @@ -1701,7 +1701,7 @@ def _read_stream(self, f: "pa.NativeFile", path: str): yield block -class FlakyCSVDatasink(_CSVDatasink): +class FlakyCSVDatasink(CSVDatasink): def __init__(self, path, **csv_datasink_kwargs): super().__init__(path, **csv_datasink_kwargs) diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index 4d2941c44a19..408726fece9f 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -8,6 +8,7 @@ import pytest import ray +from ray.data._internal.datasource.parquet_datasink import ParquetDatasink from ray.data._internal.execution.interfaces import ExecutionOptions from ray.data._internal.execution.operators.base_physical_operator import ( AllToAllOperator, @@ -57,7 +58,6 @@ from ray.data._internal.stats import DatasetStats from ray.data.aggregate import Count from ray.data.context import DataContext -from ray.data.datasource.parquet_datasink import _ParquetDatasink from ray.data.tests.conftest import * # noqa from ray.data.tests.test_util import get_parquet_read_logical_op from ray.data.tests.util import column_udf, extract_values, named_values @@ -972,7 +972,7 @@ def test_write_fusion(ray_start_regular_shared, tmp_path): def test_write_operator(ray_start_regular_shared, tmp_path): concurrency = 2 planner = Planner() - datasink = _ParquetDatasink(tmp_path) + datasink = ParquetDatasink(tmp_path) read_op = get_parquet_read_logical_op() op = Write( read_op,