Skip to content

Commit

Permalink
[Data] Replace usages of example:// from docs and code snippets wit…
Browse files Browse the repository at this point in the history
…h S3 paths (ray-project#37359)

As the first part of ray-project#37347, update docs and code snippets which currently use `example://` to use `s3://anonymous@ray-example-data/` instead. In a future PR, we will deprecate support for `example://` altogether, and also remove internal usage from tests at that point.

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Victor <vctr.y.m@example.com>
  • Loading branch information
2 people authored and Victor committed Oct 11, 2023
1 parent 7700516 commit 0a66ef4
Show file tree
Hide file tree
Showing 19 changed files with 156 additions and 142 deletions.
24 changes: 12 additions & 12 deletions doc/source/data/doc_code/batch_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
# __simple_map_function_start__
import ray

ds = ray.data.read_csv("example://iris.csv")
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

def map_function(data):
return data[data["sepal.length"] < 5]
return data[data["sepal length (cm)"] < 5]

batch = ds.take_batch(10, batch_format="pandas")
mapped_batch = map_function(batch)
Expand All @@ -20,18 +20,18 @@ def map_function(data):
import ray
import pandas as pd

ds = ray.data.read_csv("example://iris.csv")
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
ds.show(1)
# -> {'sepal.length': 5.1, ..., 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal length (cm)': 5.1, ..., 'petal width (cm)': 0.2, 'target': 0}

def transform_pandas(df_batch: pd.DataFrame) -> pd.DataFrame:
df_batch = df_batch[df_batch["variety"] == "Versicolor"]
df_batch.loc[:, "normalized.sepal.length"] = df_batch["sepal.length"] / df_batch["sepal.length"].max()
df_batch = df_batch.drop(columns=["sepal.length"])
df_batch = df_batch[df_batch["target"] == 2]
df_batch.loc[:, "normalized.sepal length (cm)"] = df_batch["sepal length (cm)"] / df_batch["sepal length (cm)"].max()
df_batch = df_batch.drop(columns=["sepal length (cm)"])
return df_batch

ds.map_batches(transform_pandas, batch_format="pandas").show(1)
# -> {..., 'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {..., 'target': 2, 'normalized.sepal length (cm)': 1.0}
# __simple_pandas_end__

# __simple_numpy_start__
Expand Down Expand Up @@ -62,19 +62,19 @@ def transform_numpy(arr: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
import pyarrow as pa
import pyarrow.compute as pac

ds = ray.data.read_csv("example://iris.csv")
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")


def transform_pyarrow(batch: pa.Table) -> pa.Table:
batch = batch.filter(pac.equal(batch["variety"], "Versicolor"))
return batch.drop(["sepal.length"])
batch = batch.filter(pac.equal(batch["target"], 1))
return batch.drop(["sepal length (cm)"])


# test map function on a batch
batch = ds.take_batch(1, batch_format="pyarrow")
mapped_batch = transform_pyarrow(batch)

ds.map_batches(transform_pyarrow, batch_format="pyarrow").show(1)
# -> {'sepal.width': 3.2, ..., 'variety': 'Versicolor'}
# -> {'sepal width (cm)': 3.2, ..., 'target': 1}
# __simple_pyarrow_end__
# fmt: on
6 changes: 3 additions & 3 deletions doc/source/data/doc_code/key_concepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def map_udf(df):
df["sepal.area"] = df["sepal.length"] * df["sepal.width"]
return df

ds = ray.data.read_parquet("example://iris.parquet") \
ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet") \
.lazy() \
.map_batches(map_udf) \
.filter(lambda row: row["sepal.area"] > 15)
Expand All @@ -81,15 +81,15 @@ def map_udf(df):
import ray

# ML ingest re-reading from storage on every epoch.
torch_ds = ray.data.read_parquet("example://iris.parquet") \
torch_ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet") \
.repeat() \
.random_shuffle_each_window() \
.iter_torch_batches()

# Streaming batch inference pipeline that pipelines the transforming of a single
# file with the reading of a single file (at most 2 file's worth of data in-flight
# at a time).
infer_ds = ray.data.read_binary_files("example://mnist_subset_partitioned/") \
infer_ds = ray.data.read_binary_files("s3://anonymous@ray-example-data/mnist_subset_partitioned/") \
.window(blocks_per_window=1) \
.map(lambda bytes_: np.asarray(PIL.Image.open(BytesIO(bytes_)).convert("L"))) \
.map_batches(lambda imgs: [img.mean() > 0.5 for img in imgs])
Expand Down
16 changes: 8 additions & 8 deletions doc/source/data/doc_code/tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ def gen_image_and_embed(batch: pd.DataFrame) -> pd.DataFrame:
# schema={data: numpy.ndarray(shape=(128, 128, 3), dtype=int64)})

# From saved numpy files.
ray.data.read_numpy("example://mnist_subset.npy")
ray.data.read_numpy("s3://anonymous@ray-example-data/mnist_subset.npy")
# -> Dataset(num_blocks=1, num_rows=3,
# schema={data: numpy.ndarray(shape=(28, 28), dtype=uint8)})
# __create_numpy_end__

# __create_parquet_1_begin__
# Reading previously saved Tensor data works out of the box.
ds = ray.data.read_parquet("example://parquet_images_mini")
ds = ray.data.read_parquet("s3://anonymous@ray-example-data/parquet_images_mini")
# -> Dataset(num_blocks=3, num_rows=3,
# schema={image: numpy.ndarray(shape=(128, 128, 3), dtype=uint8),
# label: string})
Expand Down Expand Up @@ -156,7 +156,7 @@ def cast_udf(block: pa.Table) -> pa.Table:
ds.materialize()

# __create_images_begin__
ds = ray.data.read_images("example://image-datasets/simple")
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
# -> Dataset(num_blocks=3, num_rows=3,
# schema={data: numpy.ndarray(shape=(32, 32, 3), dtype=uint8)})

Expand All @@ -171,7 +171,7 @@ def cast_udf(block: pa.Table) -> pa.Table:
# __create_images_end__

# __consume_pandas_2_begin__
ds = ray.data.read_parquet("example://parquet_images_mini")
ds = ray.data.read_parquet("s3://anonymous@ray-example-data/parquet_images_mini")
# -> Dataset(num_blocks=3, num_rows=3,
# schema={image: numpy.ndarray(shape=(128, 128, 3), dtype=uint8),
# label: string})
Expand All @@ -194,7 +194,7 @@ def add_one(batch: pd.DataFrame) -> pd.DataFrame:
# __consume_pyarrow_2_begin__
from ray.data.extensions.tensor_extension import ArrowTensorArray

ds = ray.data.read_parquet("example://parquet_images_mini")
ds = ray.data.read_parquet("s3://anonymous@ray-example-data/parquet_images_mini")
# -> Dataset(num_blocks=3, num_rows=3,
# schema={image: numpy.ndarray(shape=(128, 128, 3), dtype=uint8),
# label: object})
Expand Down Expand Up @@ -233,7 +233,7 @@ def to_numpy(buf):
# __consume_pyarrow_2_end__

# __consume_numpy_2_begin__
ds = ray.data.read_parquet("example://parquet_images_mini")
ds = ray.data.read_parquet("s3://anonymous@ray-example-data/parquet_images_mini")
# -> Dataset(num_blocks=3, num_rows=3,
# schema={image: numpy.ndarray(shape=(128, 128, 3), dtype=uint8),
# label: object})
Expand Down Expand Up @@ -270,7 +270,7 @@ def add_one(batch: Dict[str, Any]) -> Dict[str, Any]:

# __write_1_begin__
# Read a multi-column example dataset.
ds = ray.data.read_parquet("example://parquet_images_mini")
ds = ray.data.read_parquet("s3://anonymous@ray-example-data/parquet_images_mini")
# -> Dataset(num_blocks=3, num_rows=3,
# schema={image: numpy.ndarray(shape=(128, 128, 3), dtype=uint8),
# label: object})
Expand All @@ -290,7 +290,7 @@ def add_one(batch: Dict[str, Any]) -> Dict[str, Any]:

# __write_2_begin__
# Read a single-column example dataset.
ds = ray.data.read_numpy("example://mnist_subset.npy")
ds = ray.data.read_numpy("s3://anonymous@ray-example-data/mnist_subset.npy")
# -> Dataset(num_blocks=1, num_rows=3,
# schema={data: numpy.ndarray(shape=(28, 28), dtype=uint8)})

Expand Down
82 changes: 41 additions & 41 deletions doc/source/data/doc_code/transforming_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,58 +46,58 @@ def to_lowercase(row: Dict[str, Any]) -> Dict[str, Any]:
import numpy as np
from typing import Dict

ds = ray.data.read_csv("example://iris.csv")
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

def numpy_transform(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
new_col = batch["sepal.length"] / np.max(batch["sepal.length"])
batch["normalized.sepal.length"] = new_col
del batch["sepal.length"]
new_col = batch["sepal length (cm)"] / np.max(batch["sepal length (cm)"])
batch["normalized.sepal length (cm)"] = new_col
del batch["sepal length (cm)"]
return batch

ds.map_batches(numpy_transform, batch_format="numpy").show(2)
# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
# 'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
# 'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}
# -> {'sepal width (cm)': 3.2, 'petal length (cm)': 4.7, 'petal width (cm)': 1.4,
# 'target': 0, 'normalized.sepal length (cm)': 1.0}
# -> {'sepal width (cm)': 3.2, 'petal length (cm)': 4.5, 'petal width (cm)': 1.5,
# 'target': 0, 'normalized.sepal length (cm)': 0.9142857142857144}
# __writing_numpy_udfs_end__

# __writing_pandas_udfs_begin__
import ray
import pandas as pd

ds = ray.data.read_csv("example://iris.csv")
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

def pandas_transform(df: pd.DataFrame) -> pd.DataFrame:
df.loc[:, "normalized.sepal.length"] = df["sepal.length"] / df["sepal.length"].max()
df = df.drop(columns=["sepal.length"])
df.loc[:, "normalized.sepal length (cm)"] = df["sepal length (cm)"] / df["sepal length (cm)"].max()
df = df.drop(columns=["sepal length (cm)"])
return df

ds.map_batches(pandas_transform, batch_format="pandas").show(2)
# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
# 'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
# 'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}
# -> {'sepal width (cm)': 3.2, 'petal length (cm)': 4.7, 'petal width (cm)': 1.4,
# 'target': 0, 'normalized.sepal length (cm)': 1.0}
# -> {'sepal width (cm)': 3.2, 'petal length (cm)': 4.5, 'petal width (cm)': 1.5,
# 'target': 0, 'normalized.sepal length (cm)': 0.9142857142857144}
# __writing_pandas_udfs_end__

# __writing_arrow_udfs_begin__
import ray
import pyarrow as pa
import pyarrow.compute as pac

ds = ray.data.read_csv("example://iris.csv")
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

def pyarrow_transform(batch: pa.Table) -> pa.Table:
batch = batch.append_column(
"normalized.sepal.length",
pac.divide(batch["sepal.length"], pac.max(batch["sepal.length"])),
"normalized.sepal length (cm)",
pac.divide(batch["sepal length (cm)"], pac.max(batch["sepal length (cm)"])),
)
return batch.drop(["sepal.length"])
return batch.drop(["sepal length (cm)"])

ds.map_batches(pyarrow_transform, batch_format="pyarrow").show(2)
# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
# 'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
# 'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}
# -> {'sepal width (cm)': 3.2, 'petal length (cm)': 4.7, 'petal width (cm)': 1.4,
# 'target': 0, 'normalized.sepal length (cm)': 1.0}
# -> {'sepal width (cm)': 3.2, 'petal length (cm)': 4.5, 'petal width (cm)': 1.5,
# 'target': 0, 'normalized.sepal length (cm)': 0.9142857142857144}
# __writing_arrow_udfs_end__

# __dataset_compute_strategy_begin__
Expand All @@ -106,14 +106,14 @@ def pyarrow_transform(batch: pa.Table) -> pa.Table:
import numpy as np
from ray.data import ActorPoolStrategy

# Dummy model to predict Iris variety.
# Dummy model to predict Iris target.
def predict_iris(df: pd.DataFrame) -> pd.DataFrame:
conditions = [
(df["sepal.length"] < 5.0),
(df["sepal.length"] >= 5.0) & (df["sepal.length"] < 6.0),
(df["sepal.length"] >= 6.0)
(df["sepal length (cm)"] < 5.0),
(df["sepal length (cm)"] >= 5.0) & (df["sepal length (cm)"] < 6.0),
(df["sepal length (cm)"] >= 6.0)
]
values = ["Setosa", "Versicolor", "Virginica"]
values = ["Setosa", "0", "Virginica"]
return pd.DataFrame({"predicted_variety": np.select(conditions, values)})

class IrisInferModel:
Expand All @@ -125,7 +125,7 @@ def __init__(self):
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
return self._model(batch)

ds = ray.data.read_csv("example://iris.csv").repartition(10)
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv").repartition(10)

# Batch inference processing with Ray tasks (the default compute strategy).
predicted = ds.map_batches(predict_iris, batch_format="pandas")
Expand All @@ -140,16 +140,16 @@ def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
from typing import Iterator

# Load iris data.
ds = ray.data.read_csv("example://iris.csv")
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

# UDF to repeat the dataframe 100 times, in chunks of 20.
def repeat_dataframe(df: pd.DataFrame) -> Iterator[pd.DataFrame]:
for _ in range(5):
yield pd.concat([df]*20)

ds.map_batches(repeat_dataframe, batch_format="pandas").show(2)
# -> {'sepal.length': 5.1, 'sepal.width': 3.5, 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.9, 'sepal.width': 3.0, 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}
# -> {'sepal length (cm)': 4.9, 'sepal width (cm)': 3.0, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}
# __writing_generator_udfs_end__

# __shuffle_begin__
Expand All @@ -176,26 +176,26 @@ def repeat_dataframe(df: pd.DataFrame) -> Iterator[pd.DataFrame]:
from typing import Dict

# Load iris data.
ds = ray.data.read_csv("example://iris.csv")
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

# The user function signature for `map_groups` is the same as that of `map_batches`.
# It takes in a batch representing the grouped data, and must return a batch of
# zero or more records as the result.
def custom_count(group: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
# Since we are grouping by variety, all elements in this batch are equal.
variety = group["variety"][0]
count = len(group["variety"])
# Since we are grouping by target, all elements in this batch are equal.
target = group["target"][0]
count = len(group["target"])
# Here we return a batch of a single record for the group (array of len 1).
return {
"variety": np.array([variety]),
"target": np.array([target]),
"count": np.array([count]),
}

ds = ds.groupby("variety").map_groups(custom_count)
ds = ds.groupby("target").map_groups(custom_count)
ds.show()
# -> {'variety': 'Setosa', 'count': 50}
# {'variety': 'Versicolor', 'count': 50}
# {'variety': 'Virginica', 'count': 50}
# -> {'target': 0, 'count': 50}
# {'target': 1, 'count': 50}
# {'target': 2, 'count': 50}
# __map_groups_end__

# fmt: on
2 changes: 1 addition & 1 deletion doc/source/data/inspecting-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ of the returned batch, set ``batch_format``.

import ray

ds = ray.data.read_images("example://image-datasets/simple")
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

batch = ds.take_batch(batch_size=2, batch_format="numpy")
print("Batch:", batch)
Expand Down
8 changes: 4 additions & 4 deletions doc/source/data/iterating-over-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ formats by calling one of the following methods:

import ray

ds = ray.data.read_images("example://image-datasets/simple")
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

for batch in ds.iter_batches(batch_size=2, batch_format="numpy"):
print(batch)
Expand Down Expand Up @@ -106,7 +106,7 @@ formats by calling one of the following methods:

import ray

ds = ray.data.read_images("example://image-datasets/simple")
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

for batch in ds.iter_torch_batches(batch_size=2):
print(batch)
Expand Down Expand Up @@ -172,7 +172,7 @@ movement.

import ray

ds = ray.data.read_images("example://image-datasets/simple")
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

for batch in ds.iter_batches(
batch_size=2,
Expand Down Expand Up @@ -223,7 +223,7 @@ movement.

import ray

ds = ray.data.read_images("example://image-datasets/simple")
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
for batch in ds.iter_torch_batches(
batch_size=2,
local_shuffle_buffer_size=250,
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/key-concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ or remote filesystems.

transformed_ds.write_parquet("/tmp/iris")

print(os.listdir("/tmp/iris"))
print(sorted(os.listdir("/tmp/iris")))

.. testoutput::

Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/performance-tips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Current Dataset will read all Parquet columns into memory.
If you only need a subset of the columns, make sure to specify the list of columns
explicitly when calling :meth:`ray.data.read_parquet() <ray.data.read_parquet>` to
avoid loading unnecessary data (projection pushdown).
For example, use ``ray.data.read_parquet("example://iris.parquet", columns=["sepal.length", "variety"])`` to read
For example, use ``ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet", columns=["sepal.length", "variety"])`` to read
just two of the five columns of Iris dataset.

.. _parquet_row_pruning:
Expand All @@ -57,7 +57,7 @@ Parquet row pruning
Similarly, you can pass in a filter to :meth:`ray.data.read_parquet() <ray.data.Dataset.read_parquet>` (filter pushdown)
which will be applied at the file scan so only rows that match the filter predicate
will be returned.
For example, use ``ray.data.read_parquet("example://iris.parquet", filter=pyarrow.dataset.field("sepal.length") > 5.0)``
For example, use ``ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet", filter=pyarrow.dataset.field("sepal.length") > 5.0)``
(where ``pyarrow`` has to be imported)
to read rows with sepal.length greater than 5.0.
This can be used in conjunction with column pruning when appropriate to get the benefits of both.
Expand Down
Loading

0 comments on commit 0a66ef4

Please sign in to comment.