From 009519f58e6bff58b1c4af5fc4f28bc25469b78c Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Thu, 2 Mar 2023 16:00:23 -0800 Subject: [PATCH] [AIR] Add computer vision guide (#32885) Computer vision is a popular use case. This PR adds a guide that explains how to perform common vision tasks with AIR. --------- Signed-off-by: Balaji Veeramani Signed-off-by: Edward Oakes --- doc/source/_toc.yml | 3 +- doc/source/ray-air/computer-vision.rst | 329 ++++++++++++++ .../ray-air/doc_code/computer_vision.py | 430 ++++++++++++++++++ 3 files changed, 761 insertions(+), 1 deletion(-) create mode 100644 doc/source/ray-air/computer-vision.rst create mode 100644 doc/source/ray-air/doc_code/computer_vision.py diff --git a/doc/source/_toc.yml b/doc/source/_toc.yml index 7efdb2018488..b4c64873e971 100644 --- a/doc/source/_toc.yml +++ b/doc/source/_toc.yml @@ -2,7 +2,7 @@ format: jb-book root: index parts: - caption: Ray - chapters: + chapters: - file: ray-overview/index title: "Overview" - file: ray-overview/getting-started @@ -48,6 +48,7 @@ parts: - file: ray-air/check-ingest - file: ray-air/tuner - file: ray-air/predictors + - file: ray-air/computer-vision - file: ray-air/examples/serving_guide - file: ray-air/deployment - file: ray-air/examples/index diff --git a/doc/source/ray-air/computer-vision.rst b/doc/source/ray-air/computer-vision.rst new file mode 100644 index 000000000000..5de6d71623c2 --- /dev/null +++ b/doc/source/ray-air/computer-vision.rst @@ -0,0 +1,329 @@ +.. _computer-vision: + +Computer Vision +=============== + +This guide explains how to perform common computer vision tasks like: + +* `Reading image data`_ +* `Transforming images`_ +* `Training vision models`_ +* `Batch predicting images`_ +* `Serving vision models`_ + +Reading image data +------------------ + +.. tabbed:: Raw images + + Datasets like ImageNet store files like this: + + .. code-block:: + + root/dog/xxx.png + root/dog/xxy.png + root/dog/[...]/xxz.png + + root/cat/123.png + root/cat/nsdf3.png + root/cat/[...]/asd932_.png + + To load images stored in this layout, read the raw images and include the + class names. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __read_images1_start__ + :end-before: __read_images1_stop__ + :dedent: + + Then, apply a :ref:`user-defined function ` to + encode the class names as integer targets. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __read_images2_start__ + :end-before: __read_images2_stop__ + :dedent: + + .. tip:: + + You can also use :class:`~ray.data.preprocessors.LabelEncoder` to encode labels. + +.. tabbed:: NumPy + + To load NumPy arrays into a :class:`~ray.data.dataset.Dataset`, separately read the image and label arrays. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __read_numpy1_start__ + :end-before: __read_numpy1_stop__ + :dedent: + + Then, combine the datasets and rename the columns. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __read_numpy2_start__ + :end-before: __read_numpy2_stop__ + :dedent: + +.. tabbed:: TFRecords + + Image datasets often contain ``tf.train.Example`` messages that look like this: + + .. code-block:: + + features { + feature { + key: "image" + value { + bytes_list { + value: ... # Raw image bytes + } + } + } + feature { + key: "label" + value { + int64_list { + value: 3 + } + } + } + } + + To load examples stored in this format, read the TFRecords into a :class:`~ray.data.dataset.Dataset`. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __read_tfrecords1_start__ + :end-before: __read_tfrecords1_stop__ + :dedent: + + Then, apply a :ref:`user-defined function ` to + decode the raw image bytes. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __read_tfrecords2_start__ + :end-before: __read_tfrecords2_stop__ + :dedent: + +.. tabbed:: Parquet + + To load image data stored in Parquet files, call :func:`ray.data.read_parquet`. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __read_parquet_start__ + :end-before: __read_parquet_stop__ + :dedent: + + +For more information on creating datasets, see :ref:`Creating Datasets `. + + +Transforming images +------------------- + +To transform images, create a :class:`~ray.data.preprocessor.Preprocessor`. They're the +standard way to preprocess data with Ray. + +.. tabbed:: Torch + + To apply TorchVision transforms, create a :class:`~ray.data.preprocessors.TorchVisionPreprocessor`. + + Create two :class:`TorchVisionPreprocessors ` + -- one to normalize images, and another to augment images. Later, you'll pass the preprocessors to :class:`Trainers `, + :class:`Predictors `, and + :class:`PredictorDeployments `. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __torch_preprocessors_start__ + :end-before: __torch_preprocessors_stop__ + :dedent: + +.. tabbed:: TensorFlow + + To apply TorchVision transforms, create a :class:`~ray.data.preprocessors.BatchMapper`. + + Create two :class:`~ray.data.preprocessors.BatchMapper` -- one to normalize images, and another to + augment images. Later, you'll pass the preprocessors to :class:`Trainers `, + :class:`Predictors `, and + :class:`PredictorDeployments `. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __tensorflow_preprocessors_start__ + :end-before: __tensorflow_preprocessors_stop__ + :dedent: + +For more information on transforming data, see +:ref:`Using Preprocessors ` and +:ref:`Transforming Datasets `. + +Training vision models +---------------------- + +:class:`Trainers ` let you train models in parallel. + +.. tabbed:: Torch + + To train a vision model, define the training loop per worker. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __torch_training_loop_start__ + :end-before: __torch_training_loop_stop__ + :dedent: + + Then, create a :class:`~ray.train.torch.TorchTrainer` and call + :meth:`~ray.train.torch.TorchTrainer.fit`. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __torch_trainer_start__ + :end-before: __torch_trainer_stop__ + :dedent: + + For more in-depth examples, read :doc:`/ray-air/examples/torch_image_example` and + :ref:`Using Trainers `. + +.. tabbed:: TensorFlow + + To train a vision model, define the training loop per worker. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __tensorflow_training_loop_start__ + :end-before: __tensorflow_training_loop_stop__ + :dedent: + + Then, create a :class:`~ray.train.tensorflow.TensorflowTrainer` and call + :meth:`~ray.train.tensorflow.TensorflowTrainer.fit`. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __tensorflow_trainer_start__ + :end-before: __tensorflow_trainer_stop__ + :dedent: + + For more information, read :ref:`Using Trainers `. + +Creating checkpoints +-------------------- + +:class:`Checkpoints ` are required for batch inference and model +serving. They contain model state and optionally a preprocessor. + +If you're going from training to prediction, don't create a new checkpoint. +:meth:`Trainer.fit() ` returns a +:class:`~ray.air.result.Result` object. Use +:attr:`Result.checkpoint ` instead. + +.. tabbed:: Torch + + To create a :class:`~ray.train.torch.TorchCheckpoint`, pass a Torch model and + the :class:`~ray.data.preprocessor.Preprocessor` you created in `Transforming images`_ + to :meth:`TorchCheckpoint.from_model() `. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __torch_checkpoint_start__ + :end-before: __torch_checkpoint_stop__ + :dedent: + +.. tabbed:: TensorFlow + + To create a :class:`~ray.train.tensorflow.TensorflowCheckpoint`, pass a TensorFlow model and + the :class:`~ray.data.preprocessor.Preprocessor` you created in `Transforming images`_ + to :meth:`TensorflowCheckpoint.from_model() `. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __tensorflow_checkpoint_start__ + :end-before: __tensorflow_checkpoint_stop__ + :dedent: + + +Batch predicting images +----------------------- + +:class:`~ray.train.batch_predictor.BatchPredictor` lets you perform inference on large +image datasets. + +.. tabbed:: Torch + + To create a :class:`~ray.train.batch_predictor.BatchPredictor`, call + :meth:`BatchPredictor.from_checkpoint ` and pass the checkpoint + you created in `Creating checkpoints`_. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __torch_batch_predictor_start__ + :end-before: __torch_batch_predictor_stop__ + :dedent: + + For more in-depth examples, read :doc:`/ray-air/examples/pytorch_resnet_batch_prediction` + and :ref:`Using Predictors for Inference `. + +.. tabbed:: TensorFlow + + To create a :class:`~ray.train.batch_predictor.BatchPredictor`, call + :meth:`BatchPredictor.from_checkpoint ` and pass the checkpoint + you created in `Creating checkpoints`_. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __tensorflow_batch_predictor_start__ + :end-before: __tensorflow_batch_predictor_stop__ + :dedent: + + For more information, read :ref:`Using Predictors for Inference `. + +Serving vision models +--------------------- + +:class:`~ray.serve.air_integrations.PredictorDeployment` lets you +deploy a model to an endpoint and make predictions over the Internet. + +Deployments use :ref:`HTTP adapters ` to define how HTTP messages are converted to model +inputs. For example, :func:`~ray.serve.http_adapters.json_to_ndarray` converts HTTP messages like this: + +.. code-block:: + + {"array": [[1, 2], [3, 4]]} + +To NumPy ndarrays like this: + +.. code-block:: + + array([[1., 2.], + [3., 4.]]) + + +.. tabbed:: Torch + + To deploy a Torch model to an endpoint, pass the checkpoint you created in `Creating checkpoints`_ + to :meth:`PredictorDeployment.bind ` and specify + :func:`~ray.serve.http_adapters.json_to_ndarray` as the HTTP adapter. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __torch_serve_start__ + :end-before: __torch_serve_stop__ + :dedent: + + Then, make a request to classify an image. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __torch_online_predict_start__ + :end-before: __torch_online_predict_stop__ + :dedent: + + For more in-depth examples, read :doc:`/ray-air/examples/torch_image_example` + and :doc:`/ray-air/examples/serving_guide`. + +.. tabbed:: TensorFlow + + To deploy a TensorFlow model to an endpoint, pass the checkpoint you created in `Creating checkpoints`_ + to :meth:`PredictorDeployment.bind ` and specify + :func:`~ray.serve.http_adapters.json_to_multi_ndarray` as the HTTP adapter. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __tensorflow_serve_start__ + :end-before: __tensorflow_serve_stop__ + :dedent: + + Then, make a request to classify an image. + + .. literalinclude:: ./doc_code/computer_vision.py + :start-after: __tensorflow_online_predict_start__ + :end-before: __tensorflow_online_predict_stop__ + :dedent: + + For more information, read :doc:`/ray-air/examples/serving_guide`. diff --git a/doc/source/ray-air/doc_code/computer_vision.py b/doc/source/ray-air/doc_code/computer_vision.py new file mode 100644 index 000000000000..d409103154c7 --- /dev/null +++ b/doc/source/ray-air/doc_code/computer_vision.py @@ -0,0 +1,430 @@ +def main(): + for framework in "torch", "tensorflow": + for datasource in "tfrecords", "images", "numpy", "parquet": + test(framework=framework, datasource=datasource) + + +def test(*, framework: str, datasource: str): + assert framework in {"torch", "tensorflow"} + assert datasource in {"tfrecords", "images", "numpy", "parquet"} + + if datasource == "tfrecords": + dataset = read_tfrecords() + if datasource == "images": + dataset = read_images() + if datasource == "numpy": + dataset = read_numpy() + if datasource == "parquet": + dataset = read_parquet() + + dataset = dataset.limit(32) + + if framework == "torch": + preprocessor, per_epoch_preprocessor = create_torch_preprocessors() + train_torch_model(dataset, preprocessor, per_epoch_preprocessor) + checkpoint = create_torch_checkpoint(preprocessor) + batch_predict_torch(dataset, checkpoint) + online_predict_torch(checkpoint) + if framework == "tensorflow": + preprocessor, per_epoch_preprocessor = create_tensorflow_preprocessors() + train_tensorflow_model(dataset, preprocessor, per_epoch_preprocessor) + checkpoint = create_tensorflow_checkpoint(preprocessor) + batch_predict_tensorflow(dataset, checkpoint) + online_predict_tensorflow(checkpoint) + + +def read_tfrecords(): + # __read_tfrecords1_start__ + import ray + + dataset = ray.data.read_tfrecords( + "s3://anonymous@air-example-data/cifar-10/tfrecords" + ) + # __read_tfrecords1_stop__ + + # __read_tfrecords2_start__ + import io + from typing import Dict + + import numpy as np + from PIL import Image + + def decode_bytes(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: + images = [] + for data in batch["image"]: + image = Image.open(io.BytesIO(data)) + images.append(np.array(image)) + batch["image"] = np.array(images) + return batch + + dataset = dataset.map_batches(decode_bytes, batch_format="numpy") + # __read_tfrecords2_stop__ + return dataset + + +def read_numpy(): + # __read_numpy1_start__ + import ray + + images = ray.data.read_numpy("s3://anonymous@air-example-data/cifar-10/images.npy") + labels = ray.data.read_numpy("s3://anonymous@air-example-data/cifar-10/labels.npy") + # __read_numpy1_stop__ + + # __read_numpy2_start__ + dataset = images.zip(labels) + dataset = dataset.map_batches( + lambda batch: batch.rename( + columns={"__value__": "image", "__value___1": "label"} + ) + ) + # __read_numpy2_stop__ + return dataset + + +def read_parquet(): + # __read_parquet_start__ + import ray + + dataset = ray.data.read_parquet("s3://anonymous@air-example-data/cifar-10/parquet") + # __read_parquet_stop__ + return dataset + + +def read_images(): + # __read_images1_start__ + import ray + from ray.data.datasource.partitioning import Partitioning + + root = "s3://anonymous@air-example-data/cifar-10/images" + partitioning = Partitioning("dir", field_names=["class"], base_dir=root) + dataset = ray.data.read_images(root, partitioning=partitioning) + # __read_images1_stop__ + + dataset = dataset.limit(32) + + # __read_images2_start__ + from typing import Dict + + import numpy as np + + CLASS_TO_LABEL = { + "airplane": 0, + "automobile": 1, + "bird": 2, + "cat": 3, + "deer": 4, + "dog": 5, + "frog": 6, + "horse": 7, + "ship": 8, + "truck": 9, + } + + def add_label_column(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: + labels = [] + for name in batch["class"]: + label = CLASS_TO_LABEL[name] + labels.append(label) + batch["label"] = np.array(labels) + return batch + + def remove_class_column(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: + del batch["class"] + return batch + + dataset = dataset.map_batches(add_label_column).map_batches(remove_class_column) + # __read_images2_stop__ + return dataset + + +def create_torch_preprocessors(): + # __torch_preprocessors_start__ + from torchvision import transforms + + from ray.data.preprocessors import TorchVisionPreprocessor + + transform = transforms.Compose([transforms.ToTensor(), transforms.CenterCrop(224)]) + preprocessor = TorchVisionPreprocessor(columns=["image"], transform=transform) + + per_epoch_transform = transforms.RandomHorizontalFlip(p=0.5) + per_epoch_preprocessor = TorchVisionPreprocessor( + columns=["image"], transform=per_epoch_transform + ) + # __torch_preprocessors_stop__ + return preprocessor, per_epoch_preprocessor + + +def create_tensorflow_preprocessors(): + # __tensorflow_preprocessors_start__ + from typing import Dict + + import numpy as np + import tensorflow as tf + from tensorflow.keras.applications import imagenet_utils + + from ray.data.preprocessors import BatchMapper + + def preprocess(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: + batch["image"] = imagenet_utils.preprocess_input(batch["image"]) + batch["image"] = tf.image.resize(batch["image"], (224, 224)).numpy() + return batch + + preprocessor = BatchMapper(preprocess, batch_format="numpy") + + def augment(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: + batch["image"] = tf.image.random_flip_left_right(batch["image"]).numpy() + return batch + + per_epoch_preprocessor = BatchMapper(augment, batch_format="numpy") + # __tensorflow_preprocessors_stop__ + return preprocessor, per_epoch_preprocessor + + +def train_torch_model(dataset, preprocessor, per_epoch_preprocessor): + # __torch_training_loop_start__ + import torch.nn as nn + import torch.optim as optim + from torchvision import models + + from ray import train + from ray.air import session + from ray.air.config import DatasetConfig, ScalingConfig + from ray.train.torch import TorchCheckpoint, TorchTrainer + + def train_one_epoch(model, *, criterion, optimizer, batch_size, epoch): + dataset_shard = session.get_dataset_shard("train") + + running_loss = 0 + for i, batch in enumerate( + dataset_shard.iter_torch_batches( + batch_size=batch_size, local_shuffle_buffer_size=256 + ) + ): + inputs, labels = batch["image"], batch["label"] + + outputs = model(inputs) + loss = criterion(outputs, labels) + + optimizer.zero_grad() + loss.backward() + optimizer.step() + + running_loss += loss.item() + if i % 2000 == 1999: + session.report( + metrics={ + "epoch": epoch, + "batch": i, + "running_loss": running_loss / 2000, + }, + checkpoint=TorchCheckpoint.from_model(model), + ) + running_loss = 0 + + def train_loop_per_worker(config): + model = train.torch.prepare_model(models.resnet50()) + criterion = nn.CrossEntropyLoss() + optimizer = optim.SGD(model.parameters(), lr=config["lr"]) + + for epoch in range(config["epochs"]): + train_one_epoch( + model, + criterion=criterion, + optimizer=optimizer, + batch_size=config["batch_size"], + epoch=epoch, + ) + + # __torch_training_loop_stop__ + + # __torch_trainer_start__ + trainer = TorchTrainer( + train_loop_per_worker=train_loop_per_worker, + train_loop_config={"batch_size": 32, "lr": 0.02, "epochs": 1}, + datasets={"train": dataset}, + dataset_config={ + "train": DatasetConfig(per_epoch_preprocessor=per_epoch_preprocessor) + }, + scaling_config=ScalingConfig(num_workers=2), + preprocessor=preprocessor, + ) + results = trainer.fit() + # __torch_trainer_stop__ + return results + + +def train_tensorflow_model(dataset, preprocessor, per_epoch_preprocessor): + # __tensorflow_training_loop_start__ + import tensorflow as tf + + from ray.air import session + from ray.air.integrations.keras import ReportCheckpointCallback + + def train_loop_per_worker(config): + strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() + + train_shard = session.get_dataset_shard("train") + train_dataset = train_shard.to_tf( + "image", + "label", + batch_size=config["batch_size"], + local_shuffle_buffer_size=256, + ) + + with strategy.scope(): + model = tf.keras.applications.resnet50.ResNet50(weights=None) + optimizer = tf.keras.optimizers.Adam(config["lr"]) + model.compile( + optimizer=optimizer, + loss="sparse_categorical_crossentropy", + metrics=["accuracy"], + ) + + model.fit( + train_dataset, + epochs=config["epochs"], + callbacks=[ReportCheckpointCallback()], + ) + + # __tensorflow_training_loop_stop__ + + # __tensorflow_trainer_start__ + from ray.air import DatasetConfig, ScalingConfig + from ray.train.tensorflow import TensorflowTrainer + + trainer = TensorflowTrainer( + train_loop_per_worker=train_loop_per_worker, + train_loop_config={"batch_size": 32, "lr": 0.02, "epochs": 1}, + datasets={"train": dataset}, + dataset_config={ + "train": DatasetConfig(per_epoch_preprocessor=per_epoch_preprocessor) + }, + scaling_config=ScalingConfig(num_workers=2), + preprocessor=preprocessor, + ) + results = trainer.fit() + # __tensorflow_trainer_stop__ + return results + + +def create_torch_checkpoint(preprocessor): + # __torch_checkpoint_start__ + from torchvision import models + + from ray.train.torch import TorchCheckpoint + + model = models.resnet50(pretrained=True) + checkpoint = TorchCheckpoint.from_model(model, preprocessor=preprocessor) + # __torch_checkpoint_stop__ + return checkpoint + + +def create_tensorflow_checkpoint(preprocessor): + # __tensorflow_checkpoint_start__ + import tensorflow as tf + + from ray.train.tensorflow import TensorflowCheckpoint + + model = tf.keras.applications.resnet50.ResNet50() + checkpoint = TensorflowCheckpoint.from_model(model, preprocessor=preprocessor) + # __tensorflow_checkpoint_stop__ + return checkpoint + + +def batch_predict_torch(dataset, checkpoint): + # __torch_batch_predictor_start__ + from ray.train.batch_predictor import BatchPredictor + from ray.train.torch import TorchPredictor + + predictor = BatchPredictor.from_checkpoint(checkpoint, TorchPredictor) + predictor.predict(dataset, feature_columns=["image"], keep_columns=["label"]) + # __torch_batch_predictor_stop__ + + +def batch_predict_tensorflow(dataset, checkpoint): + # __tensorflow_batch_predictor_start__ + import tensorflow as tf + + from ray.train.batch_predictor import BatchPredictor + from ray.train.tensorflow import TensorflowPredictor + + predictor = BatchPredictor.from_checkpoint( + checkpoint, + TensorflowPredictor, + model_definition=tf.keras.applications.resnet50.ResNet50, + ) + predictor.predict(dataset, feature_columns=["image"], keep_columns=["label"]) + # __tensorflow_batch_predictor_stop__ + + +def online_predict_torch(checkpoint): + # __torch_serve_start__ + from ray import serve + from ray.serve import PredictorDeployment + from ray.serve.http_adapters import json_to_ndarray + from ray.train.torch import TorchPredictor + + serve.run( + PredictorDeployment.bind( + TorchPredictor, + checkpoint, + http_adapter=json_to_ndarray, + ) + ) + # __torch_serve_stop__ + + # __torch_online_predict_start__ + from io import BytesIO + + import numpy as np + import requests + from PIL import Image + + response = requests.get("http://placekitten.com/200/300") + image = Image.open(BytesIO(response.content)) + + payload = {"array": np.array(image).tolist(), "dtype": "float32"} + response = requests.post("http://localhost:8000/", json=payload) + predictions = response.json() + # __torch_online_predict_stop__ + predictions + + +def online_predict_tensorflow(checkpoint): + # __tensorflow_serve_start__ + import tensorflow as tf + + from ray import serve + from ray.serve import PredictorDeployment + from ray.serve.http_adapters import json_to_multi_ndarray + from ray.train.tensorflow import TensorflowPredictor + + serve.run( + PredictorDeployment.bind( + TensorflowPredictor, + checkpoint, + http_adapter=json_to_multi_ndarray, + model_definition=tf.keras.applications.resnet50.ResNet50, + ) + ) + # __tensorflow_serve_stop__ + + # __tensorflow_online_predict_start__ + from io import BytesIO + + import numpy as np + import requests + from PIL import Image + + response = requests.get("http://placekitten.com/200/300") + image = Image.open(BytesIO(response.content)) + + payload = {"image": {"array": np.array(image).tolist(), "dtype": "float32"}} + response = requests.post("http://localhost:8000/", json=payload) + predictions = response.json() + # __tensorflow_online_predict_stop__ + predictions + + +if __name__ == "__main__": + main()