Skip to content

Commit

Permalink
Generic read write component (#214)
Browse files Browse the repository at this point in the history
Draft PR for implementing generic read/write components. 

Note: the changes from `feature/local-starcoder` were merged to this
branch to make testing easier. Merge this PR after the local startcoder
PR.

Things to do: 

- [x] Modify `from_registry` method to enable passing path to custom
spec
- [x] Figure out how to present spec template to user. Should we keep
the image/args and just add a todo in the `produces` and `consumes`
(open for suggestions)
- [x] Switch over all pipelines to use the generic load/write and remove
the custom ones
- [x] Add components to affected pipelines to add missing metadata (e.g.
width/height)
- [x] Add documentation
  • Loading branch information
PhilippeMoussalli authored Jun 21, 2023
1 parent e8d672e commit 3e25077
Show file tree
Hide file tree
Showing 36 changed files with 675 additions and 234 deletions.
2 changes: 1 addition & 1 deletion components/filter_comments/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ RUN apt-get update && \
COPY requirements.txt ./
RUN pip3 install --no-cache-dir -r requirements.txt

# Set the working directory to the compoent folder
# Set the working directory to the component folder
WORKDIR /component/src

# Copy over src-files
Expand Down
2 changes: 1 addition & 1 deletion components/filter_line_length/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ RUN apt-get update && \
COPY requirements.txt ./
RUN pip3 install --no-cache-dir -r requirements.txt

# Set the working directory to the compoent folder
# Set the working directory to the component folder
WORKDIR /component/src

# Copy over src-files
Expand Down
2 changes: 1 addition & 1 deletion components/image_cropping/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ RUN apt-get update && \
COPY requirements.txt /
RUN pip3 install --no-cache-dir -r requirements.txt

# Set the working directory to the compoent folder
# Set the working directory to the component folder
WORKDIR /component/src

# Copy over src-files
Expand Down
19 changes: 19 additions & 0 deletions components/image_resolution_extraction/fondant_component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
name: Image resolution extraction
description: Component that extracts image resolution data from the images
image: ghcr.io/ml6team/image_resolution_extraction:latest

consumes:
images:
fields:
data:
type: binary

produces:
images:
fields:
width:
type: int16
height:
type: int16
data:
type: binary
4 changes: 4 additions & 0 deletions components/image_resolution_extraction/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fondant
pyarrow>=7.0
gcsfs==2023.4.0
imagesize==1.4.1
52 changes: 52 additions & 0 deletions components/image_resolution_extraction/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""This component filters images of the dataset based on image size (minimum height and width)."""
import io
import logging
import typing as t

import imagesize
import numpy as np
import pandas as pd

from fondant.component import PandasTransformComponent
from fondant.logger import configure_logging

configure_logging()
logger = logging.getLogger(__name__)


def extract_dimensions(images: bytes) -> t.Tuple[np.int16, np.int16]:
"""Extract the width and height of an image.
Args:
images: input dataframe with images_data column
Returns:
np.int16: width of the image
np.int16: height of the image
"""
width, height = imagesize.get(io.BytesIO(images))

return np.int16(width), np.int16(height)


class ImageResolutionExtractionComponent(PandasTransformComponent):
"""Component that extracts image dimensions."""

def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
"""
Args:
dataframe: Dask dataframe
Returns:
dataset.
"""
logger.info("Filtering dataset...")

dataframe[[("images", "width"), ("images", "height")]] = \
dataframe[[("images", "data")]].map(extract_dimensions)

return dataframe


if __name__ == "__main__":
component = ImageResolutionExtractionComponent.from_args()
component.run()
2 changes: 1 addition & 1 deletion components/image_resolution_filtering/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ RUN apt-get update && \
COPY requirements.txt /
RUN pip3 install --no-cache-dir -r requirements.txt

# Set the working directory to the compoent folder
# Set the working directory to the component folder
WORKDIR /component/src

# Copy over src-files
Expand Down
24 changes: 14 additions & 10 deletions components/load_from_hf_hub/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@ name: Load from hub
description: Component that loads a dataset from the hub
image: ghcr.io/ml6team/load_from_hf_hub:latest

produces:
images:
consumes:
dummy_variable: #TODO: fill in here
fields:
data:
type: binary
width:
type: int16
height:
type: int16
captions:
fields:
data:
type: string

args:
dataset_name:
description: Name of dataset on the hub
type: str
column_name_mapping:
description: Mapping of the consumed hub dataset to fondant column names
type: dict
image_column_names:
description: Optional argument, a list containing the original image column names in case the
dataset on the hub contains them. Used to format the image from HF hub format to a byte string.
type: list
default: None
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: None
63 changes: 26 additions & 37 deletions components/load_from_hf_hub/src/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
"""This component loads a seed dataset from the hub."""
import io
import logging
import typing as t

import dask.dataframe as dd
import numpy as np
from PIL import Image

from fondant.component import LoadComponent
from fondant.logger import configure_logging
Expand All @@ -13,52 +11,43 @@
logger = logging.getLogger(__name__)


def extract_width(image_bytes):
# Decode image bytes to PIL Image object
pil_image = Image.open(io.BytesIO(image_bytes))
width = pil_image.size[0]

return np.int16(width)


def extract_height(image_bytes):
# Decode image bytes to PIL Image object
pil_image = Image.open(io.BytesIO(image_bytes))
height = pil_image.size[1]

return np.int16(height)


class LoadFromHubComponent(LoadComponent):
def load(self, *, dataset_name: str) -> dd.DataFrame:
def load(self,
*,
dataset_name: str,
column_name_mapping: dict,
image_column_names: t.Optional[list],
n_rows_to_load: t.Optional[int]) -> dd.DataFrame:
"""
Args:
dataset_name: name of the dataset to load.
column_name_mapping: Mapping of the consumed hub dataset to fondant column names
image_column_names: A list containing the original hub image column names. Used to
format the image from HF hub format to a byte string
n_rows_to_load: optional argument that defines the number of rows to load. Useful for
testing pipeline runs on a small scale
Returns:
Dataset: HF dataset
Dataset: HF dataset.
"""
# 1) Load data, read as Dask dataframe
logger.info("Loading dataset from the hub...")
dask_df = dd.read_parquet(f"hf://datasets/{dataset_name}")

# 2) Rename columns
dask_df = dask_df.rename(
columns={"image": "images_data", "text": "captions_data"}
)
# 2) Make sure images are bytes instead of dicts
if image_column_names is not None:
for image_column_name in image_column_names:
dask_df[image_column_name] = dask_df[image_column_name].map(
lambda x: x["bytes"], meta=("bytes", bytes)
)

# 3) Rename columns
dask_df = dask_df.rename(columns=column_name_mapping)

# 3) Make sure images are bytes instead of dicts
dask_df["images_data"] = dask_df["images_data"].map(
lambda x: x["bytes"], meta=("bytes", bytes)
)
# 4) Optional: only return specific amount of rows

# 4) Add width and height columns
dask_df["images_width"] = dask_df["images_data"].map(
extract_width, meta=("images_width", int)
)
dask_df["images_height"] = dask_df["images_data"].map(
extract_height, meta=("images_height", int)
)
if n_rows_to_load:
dask_df = dask_df.head(n_rows_to_load)
dask_df = dd.from_pandas(dask_df, npartitions=1)

return dask_df

Expand Down
Empty file.
28 changes: 28 additions & 0 deletions components/write_to_hf_hub/fondant_component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Write to hub
description: Component that writes a dataset to the hub
image: ghcr.io/ml6team/write_to_hf_hub:latest

consumes:
dummy_variable: #TODO: fill in here
fields:
data:
type: binary

args:
hf_token:
description: The hugging face token used to write to the hub
type: str
username:
description: The username under which to upload the dataset
type: str
dataset_name:
description: The name of the dataset to upload
type: str
image_column_names:
description: A list containing the image column names. Used to format to image to HF hub format
type: list
default: None
column_name_mapping:
description: Mapping of the consumed fondant column names to the written hub column names
type: dict
default: None
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
huggingface_hub==0.14.1
datasets==2.10.1
fondant
pyarrow>=7.0
Pillow==9.4.0
Expand Down
101 changes: 101 additions & 0 deletions components/write_to_hf_hub/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""This component writes an image dataset to the hub."""
import logging
import typing as t
from io import BytesIO

import dask.dataframe as dd
import datasets

# Define the schema for the struct using PyArrow
import huggingface_hub
from PIL import Image

from fondant.component import WriteComponent
from fondant.logger import configure_logging

configure_logging()
logger = logging.getLogger(__name__)


def convert_bytes_to_image(image_bytes: bytes, feature_encoder: datasets.Image) -> \
t.Dict[str, t.Any]:
"""
Function that converts image bytes to hf image format
Args:
image_bytes: the images as a bytestring
feature_encoder: hf image feature encoder
Returns:
HF image representation.
"""
image = Image.open(BytesIO(image_bytes))
image = feature_encoder.encode_example(image)
return image


class WriteToHubComponent(WriteComponent):
def write(
self,
dataframe: dd.DataFrame,
*,
hf_token: str,
username: str,
dataset_name: str,
image_column_names: t.Optional[list],
column_name_mapping: t.Optional[dict]
):
"""
Args:
dataframe: Dask dataframe
hf_token: The hugging face token used to write to the hub
username: The username under which to upload the dataset
dataset_name: The name of the dataset to upload
image_column_names: A list containing the subset image column names. Used to format the
image fields to HF hub format
column_name_mapping: Mapping of the consumed fondant column names to the written hub
column names.
"""
# login
huggingface_hub.login(token=hf_token)

# Create HF dataset repository
repo_id = f"{username}/{dataset_name}"
repo_path = f"hf://datasets/{repo_id}"
logger.info(f"Creating HF dataset repository under ID: '{repo_id}'")
huggingface_hub.create_repo(repo_id=repo_id, repo_type="dataset", exist_ok=True)

# Get columns to write and schema
write_columns = []
schema_dict = {}
for subset_name, subset in self.spec.consumes.items():
for field in subset.fields.values():
column_name = f"{subset_name}_{field.name}"
write_columns.append(column_name)
if image_column_names and column_name in image_column_names:
schema_dict[column_name] = datasets.Image()
else:
schema_dict[column_name] = datasets.Value(str(field.type.value))

schema = datasets.Features(schema_dict).arrow_schema
dataframe = dataframe[write_columns]

# Map image column to hf data format
feature_encoder = datasets.Image(decode=True)

if image_column_names is not None:
for image_column_name in image_column_names:
dataframe[image_column_name] = dataframe[image_column_name].map(
lambda x: convert_bytes_to_image(x, feature_encoder),
meta=(image_column_name, "object")
)

# Map column names to hf data format
if column_name_mapping:
dataframe = dataframe.rename(columns=column_name_mapping)

# Write dataset to the hub
dd.to_parquet(dataframe, path=f"{repo_path}/data", schema=schema)


if __name__ == "__main__":
component = WriteToHubComponent.from_args()
component.run()
2 changes: 1 addition & 1 deletion docs/component_spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ args:
```

These arguments are passed in when the component is instantiated.
If an argument is not explicitly provided, the default value will be used instead if available.```
If an argument is not explicitly provided, the default value will be used instead if available.
```python
from fondant.pipeline import ComponentOp
Expand Down
2 changes: 1 addition & 1 deletion docs/custom_component.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ RUN apt-get update && \
COPY requirements.txt ./
RUN pip3 install --no-cache-dir -r requirements.txt

# Set the working directory to the compoent folder
# Set the working directory to the component folder
WORKDIR /component/src

# Copy over src-files and spec of the component
Expand Down
Loading

0 comments on commit 3e25077

Please sign in to comment.