Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Move datasink classes to internal #46331

Merged
merged 3 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
RATE_LIMIT_EXCEEDED_SLEEP_TIME = 11


class _BigQueryDatasink(Datasink):
class BigQueryDatasink(Datasink):
def __init__(
self,
project_id: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ray.data.datasource.file_datasink import BlockBasedFileDatasink


class _CSVDatasink(BlockBasedFileDatasink):
class CSVDatasink(BlockBasedFileDatasink):
def __init__(
self,
path: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ray.data.datasource.file_datasink import RowBasedFileDatasink


class _ImageDatasink(RowBasedFileDatasink):
class ImageDatasink(RowBasedFileDatasink):
def __init__(
self, path: str, column: str, file_format: str, **file_datasink_kwargs
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ray.data.datasource.file_datasink import BlockBasedFileDatasink


class _JSONDatasink(BlockBasedFileDatasink):
class JSONDatasink(BlockBasedFileDatasink):
def __init__(
self,
path: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
logger = logging.getLogger(__name__)


class _MongoDatasink(Datasink):
class MongoDatasink(Datasink):
def __init__(self, uri: str, database: str, collection: str) -> None:
_check_import(self, module="pymongo", package="pymongo")
_check_import(self, module="pymongoarrow", package="pymongoarrow")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from ray.data.datasource.file_datasink import BlockBasedFileDatasink


class _NumpyDatasink(BlockBasedFileDatasink):
class NumpyDatasink(BlockBasedFileDatasink):
def __init__(
self,
path: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
logger = logging.getLogger(__name__)


class _ParquetDatasink(_FileDatasink):
class ParquetDatasink(_FileDatasink):
def __init__(
self,
path: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ray.data.datasource.datasink import Datasink


class _SQLDatasink(Datasink):
class SQLDatasink(Datasink):

_MAX_ROWS_PER_WRITE = 128

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from tensorflow_metadata.proto.v0 import schema_pb2


class _TFRecordDatasink(BlockBasedFileDatasink):
class TFRecordDatasink(BlockBasedFileDatasink):
def __init__(
self,
path: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ray.data.datasource.file_datasink import BlockBasedFileDatasink


class _WebDatasetDatasink(BlockBasedFileDatasink):
class WebDatasetDatasink(BlockBasedFileDatasink):
def __init__(
self,
path: str,
Expand Down
46 changes: 21 additions & 25 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
from ray._private.usage import usage_lib
from ray.air.util.tensor_extensions.utils import _create_possibly_ragged_ndarray
from ray.data._internal.compute import ComputeStrategy
from ray.data._internal.datasource.bigquery_datasink import BigQueryDatasink
from ray.data._internal.datasource.csv_datasink import CSVDatasink
from ray.data._internal.datasource.image_datasink import ImageDatasink
from ray.data._internal.datasource.json_datasink import JSONDatasink
from ray.data._internal.datasource.mongo_datasink import MongoDatasink
from ray.data._internal.datasource.numpy_datasink import NumpyDatasink
from ray.data._internal.datasource.parquet_datasink import ParquetDatasink
from ray.data._internal.datasource.sql_datasink import SQLDatasink
from ray.data._internal.datasource.tfrecords_datasink import TFRecordDatasink
from ray.data._internal.datasource.webdataset_datasink import WebDatasetDatasink
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.equalize import _equalize
from ray.data._internal.execution.interfaces import RefBundle
Expand Down Expand Up @@ -74,21 +84,7 @@
_apply_batch_size,
)
from ray.data.context import DataContext
from ray.data.datasource import (
Connection,
Datasink,
FilenameProvider,
_BigQueryDatasink,
_CSVDatasink,
_ImageDatasink,
_JSONDatasink,
_MongoDatasink,
_NumpyDatasink,
_ParquetDatasink,
_SQLDatasink,
_TFRecordDatasink,
_WebDatasetDatasink,
)
from ray.data.datasource import Connection, Datasink, FilenameProvider
from ray.data.iterator import DataIterator
from ray.data.random_access_dataset import RandomAccessDataset
from ray.types import ObjectRef
Expand Down Expand Up @@ -2706,7 +2702,7 @@ def write_parquet(
#pyarrow.parquet.write_table>`_, which is used to write out each
block to a file.
""" # noqa: E501
datasink = _ParquetDatasink(
datasink = ParquetDatasink(
path,
arrow_parquet_args_fn=arrow_parquet_args_fn,
arrow_parquet_args=arrow_parquet_args,
Expand Down Expand Up @@ -2814,7 +2810,7 @@ def write_json(
:class:`~ray.data.Dataset` block. These
are dict(orient="records", lines=True) by default.
"""
datasink = _JSONDatasink(
datasink = JSONDatasink(
path,
pandas_json_args_fn=pandas_json_args_fn,
pandas_json_args=pandas_json_args,
Expand Down Expand Up @@ -2887,7 +2883,7 @@ def write_images(
total number of tasks run. By default, concurrency is dynamically
decided based on the available resources.
""" # noqa: E501
datasink = _ImageDatasink(
datasink = ImageDatasink(
path,
column,
file_format,
Expand Down Expand Up @@ -2992,7 +2988,7 @@ def write_csv(
#pyarrow.csv.write_csv>`_
when writing each block to a file.
"""
datasink = _CSVDatasink(
datasink = CSVDatasink(
path,
arrow_csv_args_fn=arrow_csv_args_fn,
arrow_csv_args=arrow_csv_args,
Expand Down Expand Up @@ -3088,7 +3084,7 @@ def write_tfrecords(
decided based on the available resources.

"""
datasink = _TFRecordDatasink(
datasink = TFRecordDatasink(
path=path,
tf_schema=tf_schema,
num_rows_per_file=num_rows_per_file,
Expand Down Expand Up @@ -3173,7 +3169,7 @@ def write_webdataset(
decided based on the available resources.

"""
datasink = _WebDatasetDatasink(
datasink = WebDatasetDatasink(
path,
encoder=encoder,
num_rows_per_file=num_rows_per_file,
Expand Down Expand Up @@ -3260,7 +3256,7 @@ def write_numpy(
decided based on the available resources.
"""

datasink = _NumpyDatasink(
datasink = NumpyDatasink(
path,
column,
num_rows_per_file=num_rows_per_file,
Expand Down Expand Up @@ -3337,7 +3333,7 @@ def write_sql(
total number of tasks run. By default, concurrency is dynamically
decided based on the available resources.
""" # noqa: E501
datasink = _SQLDatasink(sql=sql, connection_factory=connection_factory)
datasink = SQLDatasink(sql=sql, connection_factory=connection_factory)
self.write_datasink(
datasink,
ray_remote_args=ray_remote_args,
Expand Down Expand Up @@ -3408,7 +3404,7 @@ def write_mongo(
ValueError: if ``database`` doesn't exist.
ValueError: if ``collection`` doesn't exist.
"""
datasink = _MongoDatasink(
datasink = MongoDatasink(
uri=uri,
database=database,
collection=collection,
Expand Down Expand Up @@ -3481,7 +3477,7 @@ def write_bigquery(
else:
ray_remote_args["max_retries"] = 0

datasink = _BigQueryDatasink(
datasink = BigQueryDatasink(
project_id=project_id,
dataset=dataset,
max_retry_cnt=max_retry_cnt,
Expand Down
20 changes: 0 additions & 20 deletions python/ray/data/datasource/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from ray.data._internal.datasource.sql_datasource import Connection
from ray.data.datasource.bigquery_datasink import _BigQueryDatasink
from ray.data.datasource.csv_datasink import _CSVDatasink
from ray.data.datasource.datasink import Datasink, DummyOutputDatasink
from ray.data.datasource.datasource import (
Datasource,
Expand All @@ -23,57 +21,39 @@
FileMetadataProvider,
)
from ray.data.datasource.filename_provider import FilenameProvider
from ray.data.datasource.image_datasink import _ImageDatasink
from ray.data.datasource.json_datasink import _JSONDatasink
from ray.data.datasource.mongo_datasink import _MongoDatasink
from ray.data.datasource.numpy_datasink import _NumpyDatasink
from ray.data.datasource.parquet_datasink import _ParquetDatasink
from ray.data.datasource.parquet_meta_provider import ParquetMetadataProvider
from ray.data.datasource.partitioning import (
Partitioning,
PartitionStyle,
PathPartitionFilter,
PathPartitionParser,
)
from ray.data.datasource.sql_datasink import _SQLDatasink
from ray.data.datasource.tfrecords_datasink import _TFRecordDatasink
from ray.data.datasource.tfrecords_datasource import TFRecordDatasource
from ray.data.datasource.webdataset_datasink import _WebDatasetDatasink

# Note: HuggingFaceDatasource should NOT be imported here, because
# we want to only import the Hugging Face datasets library when we use
# ray.data.from_huggingface() or HuggingFaceDatasource() directly.
__all__ = [
"BaseFileMetadataProvider",
"_BigQueryDatasink",
"BlockBasedFileDatasink",
"Connection",
"_CSVDatasink",
"Datasink",
"Datasource",
"_SQLDatasink",
"DefaultFileMetadataProvider",
"DummyOutputDatasink",
"FastFileMetadataProvider",
"FileBasedDatasource",
"FileMetadataProvider",
"FilenameProvider",
"_ImageDatasink",
"_JSONDatasink",
"_NumpyDatasink",
"_ParquetDatasink",
"ParquetMetadataProvider",
"PartitionStyle",
"PathPartitionFilter",
"PathPartitionParser",
"Partitioning",
"RandomIntRowDatasource",
"_MongoDatasink",
"ReadTask",
"Reader",
"RowBasedFileDatasink",
"_TFRecordDatasink",
"TFRecordDatasource",
"_WebDatasetDatasink",
"_S3FileSystemWrapper",
]
6 changes: 3 additions & 3 deletions python/ray/data/tests/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from google.cloud.bigquery_storage_v1.types import stream as gcbqs_stream

import ray
from ray.data._internal.datasource.bigquery_datasink import BigQueryDatasink
from ray.data._internal.datasource.bigquery_datasource import BigQueryDatasource
from ray.data.datasource import _BigQueryDatasink
from ray.data.tests.conftest import * # noqa
from ray.data.tests.mock_http_server import * # noqa
from ray.tests.conftest import * # noqa
Expand Down Expand Up @@ -198,7 +198,7 @@ class TestWriteBigQuery:
"""Tests for BigQuery Write."""

def test_write(self, ray_get_mock):
bq_datasink = _BigQueryDatasink(
bq_datasink = BigQueryDatasink(
project_id=_TEST_GCP_PROJECT_ID,
dataset=_TEST_BQ_DATASET,
)
Expand All @@ -211,7 +211,7 @@ def test_write(self, ray_get_mock):
assert status == "ok"

def test_write_dataset_exists(self, ray_get_mock):
bq_datasink = _BigQueryDatasink(
bq_datasink = BigQueryDatasink(
project_id=_TEST_GCP_PROJECT_ID,
dataset="existingdataset" + "." + _TEST_BQ_TABLE_ID,
)
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/tests/test_consumption.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@

import ray
from ray.data._internal.block_builder import BlockBuilder
from ray.data._internal.datasource.csv_datasink import CSVDatasink
from ray.data._internal.datasource.csv_datasource import CSVDatasource
from ray.data._internal.datasource.range_datasource import RangeDatasource
from ray.data._internal.util import _check_pyarrow_version
from ray.data.block import BlockAccessor, BlockMetadata
from ray.data.context import DataContext
from ray.data.dataset import Dataset, MaterializedDataset
from ray.data.datasource.csv_datasink import _CSVDatasink
from ray.data.datasource.datasource import Datasource, ReadTask
from ray.data.tests.conftest import * # noqa
from ray.data.tests.conftest import (
Expand Down Expand Up @@ -1701,7 +1701,7 @@ def _read_stream(self, f: "pa.NativeFile", path: str):
yield block


class FlakyCSVDatasink(_CSVDatasink):
class FlakyCSVDatasink(CSVDatasink):
def __init__(self, path, **csv_datasink_kwargs):
super().__init__(path, **csv_datasink_kwargs)

Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/tests/test_execution_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest

import ray
from ray.data._internal.datasource.parquet_datasink import ParquetDatasink
from ray.data._internal.execution.interfaces import ExecutionOptions
from ray.data._internal.execution.operators.base_physical_operator import (
AllToAllOperator,
Expand Down Expand Up @@ -57,7 +58,6 @@
from ray.data._internal.stats import DatasetStats
from ray.data.aggregate import Count
from ray.data.context import DataContext
from ray.data.datasource.parquet_datasink import _ParquetDatasink
from ray.data.tests.conftest import * # noqa
from ray.data.tests.test_util import get_parquet_read_logical_op
from ray.data.tests.util import column_udf, extract_values, named_values
Expand Down Expand Up @@ -972,7 +972,7 @@ def test_write_fusion(ray_start_regular_shared, tmp_path):
def test_write_operator(ray_start_regular_shared, tmp_path):
concurrency = 2
planner = Planner()
datasink = _ParquetDatasink(tmp_path)
datasink = ParquetDatasink(tmp_path)
read_op = get_parquet_read_logical_op()
op = Write(
read_op,
Expand Down
Loading