Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Dask Client configuration to Component class and use multi-GPU in embed_images component #852

Merged
merged 6 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions docs/components/components.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from distributed import Client

# Components

Fondant makes it easy to build data preparation pipelines leveraging reusable components. Fondant
Expand Down Expand Up @@ -65,6 +67,36 @@ this data can be accessed using `dataframe["image"]`.
The `transform` method should return a single dataframe, with the columns complying to the
schema defined by the `produces` section of the component specification.

### Configuring Dask

You can configure the [Dask client](https://docs.dask.org/en/stable/scheduling.html) based on the
needs of your component by overriding the `dask_client` method:

```python
import os

from dask.distributed import Client, LocalCluster
from fondant.component import PandasTransformComponent

class Component(PandasTransformComponent):

def dask_client(self) -> Client:
"""Initialize the dask client to use for this component."""
cluster = LocalCluster(
processes=True,
n_workers=os.cpu_count(),
threads_per_worker=1,
)
return Client(cluster)
```

The default configuration uses a `LocalCluster` which works with processes, the same amount of
workers as logical CPUs available, and one thread per worker.

Some components might work more optimally using threads or a different combination of threads
and processes. To use multiple GPUs, you can use a
[`LocalCUDACluster`](https://docs.rapids.ai/api/dask-cuda/stable/quickstart/#localcudacluster).

## Component types

We can distinguish two different types of components:
Expand Down
41 changes: 32 additions & 9 deletions src/fondant/component/component.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""This module defines interfaces which components should implement to be executed by fondant."""

import os
import typing as t
from abc import abstractmethod

import dask
import dask.dataframe as dd
import pandas as pd
from dask.distributed import Client, LocalCluster


class BaseComponent:
Expand All @@ -16,26 +18,47 @@ class BaseComponent:
**kwargs: The provided user arguments are passed in as keyword arguments
"""

def __init__(
self,
**kwargs,
):
def __init__(self):
self.consumes = None
self.produces = None

def teardown(self) -> None:
"""Method called after the component has been executed."""


class DaskLoadComponent(BaseComponent):
class DaskComponent(BaseComponent):
"""Component built on Dask."""

def __init__(self, **kwargs):
super().__init__()

# don't assume every object is a string
dask.config.set({"dataframe.convert-string": False})
# worker.daemon is set to false because creating a worker process in daemon
# mode is not possible in our docker container setup.
dask.config.set({"distributed.worker.daemon": False})

self.dask_client()

def dask_client(self) -> Client:
"""Initialize the dask client to use for this component."""
cluster = LocalCluster(
processes=True,
n_workers=os.cpu_count(),
threads_per_worker=1,
)
return Client(cluster)


class DaskLoadComponent(DaskComponent):
"""Component that loads data and returns a Dask DataFrame."""

@abstractmethod
def load(self) -> dd.DataFrame:
pass


class DaskTransformComponent(BaseComponent):
class DaskTransformComponent(DaskComponent):
"""Component that transforms an incoming Dask DataFrame."""

@abstractmethod
Expand All @@ -49,15 +72,15 @@ def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame:
"""


class DaskWriteComponent(BaseComponent):
class DaskWriteComponent(DaskComponent):
"""Component that accepts a Dask DataFrame and writes its contents."""

@abstractmethod
def write(self, dataframe: dd.DataFrame) -> None:
pass


class PandasTransformComponent(BaseComponent):
class PandasTransformComponent(DaskComponent):
"""Component that transforms the incoming dataset partition per partition as a pandas
DataFrame.
"""
Expand Down
48 changes: 1 addition & 47 deletions src/fondant/component/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@
import argparse
import json
import logging
import os
import typing as t
from abc import abstractmethod
from distutils.util import strtobool
from pathlib import Path

import dask
import dask.dataframe as dd
import pandas as pd
from dask.distributed import Client, LocalCluster
from fsspec import open as fs_open

from fondant.component import (
Expand All @@ -30,7 +27,6 @@
from fondant.core.component_spec import Argument, OperationSpec
from fondant.core.manifest import Manifest, Metadata

dask.config.set({"dataframe.convert-string": False})
logger = logging.getLogger(__name__)


Expand All @@ -49,9 +45,6 @@ class Executor(t.Generic[Component]):
partition of dataframe.
Partitions are divided based on this number (n rows per partition).
Set to None for no row limit.
cluster_type: The type of cluster to use for distributed execution
(default is "local").
client_kwargs: Additional keyword arguments dict which will be used to
initialise the dask client, allowing for advanced configuration.
previous_index: The name of the index column of the previous component.
Used to remove all previous fields if the component changes the index
Expand All @@ -67,8 +60,6 @@ def __init__(
metadata: t.Dict[str, t.Any],
user_arguments: t.Dict[str, t.Any],
input_partition_rows: int,
cluster_type: t.Optional[str] = None,
client_kwargs: t.Optional[dict] = None,
previous_index: t.Optional[str] = None,
) -> None:
self.operation_spec = operation_spec
Expand All @@ -80,42 +71,13 @@ def __init__(
self.input_partition_rows = input_partition_rows
self.previous_index = previous_index

if cluster_type == "local":
client_kwargs = client_kwargs or {
"processes": True,
"n_workers": os.cpu_count(),
"threads_per_worker": 1,
}

logger.info(f"Initialize local dask cluster with arguments {client_kwargs}")

# Additional dask configuration have to be set before initialising the client
# worker.daemon is set to false because creating a worker process in daemon
# mode is not possible in our docker container setup.
dask.config.set({"distributed.worker.daemon": False})

local_cluster = LocalCluster(**client_kwargs, silence_logs=logging.ERROR)
self.client = Client(local_cluster)

elif cluster_type == "distributed":
msg = "The usage of the Dask distributed client is not supported yet."
raise NotImplementedError(msg)
else:
logger.info(
"Dask default local mode will be used for further executions."
"Our current supported options are limited to 'local' and 'default'.",
)
self.client = None

@classmethod
def from_args(cls) -> "Executor":
"""Create an executor from a passed argument containing the specification as a dict."""
parser = argparse.ArgumentParser()
parser.add_argument("--operation_spec", type=json.loads)
parser.add_argument("--cache", type=lambda x: bool(strtobool(x)))
parser.add_argument("--input_partition_rows", type=int)
parser.add_argument("--cluster_type", type=str)
parser.add_argument("--client_kwargs", type=json.loads)
args, _ = parser.parse_known_args()

if "operation_spec" not in args:
Expand All @@ -128,8 +90,6 @@ def from_args(cls) -> "Executor":
operation_spec,
cache=args.cache,
input_partition_rows=args.input_partition_rows,
cluster_type=args.cluster_type,
client_kwargs=args.client_kwargs,
)

@classmethod
Expand All @@ -139,8 +99,6 @@ def from_spec(
*,
cache: bool,
input_partition_rows: int,
cluster_type: t.Optional[str],
client_kwargs: t.Optional[dict],
) -> "Executor":
"""Create an executor from a component spec."""
args_dict = vars(cls._add_and_parse_args(operation_spec))
Expand All @@ -149,8 +107,6 @@ def from_spec(
"operation_spec",
"input_partition_rows",
"cache",
"cluster_type",
"client_kwargs",
"consumes",
"produces",
]:
Expand All @@ -169,8 +125,6 @@ def from_spec(
metadata=metadata,
user_arguments=args_dict,
input_partition_rows=input_partition_rows,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
previous_index=operation_spec.previous_index,
)

Expand Down Expand Up @@ -262,7 +216,7 @@ def _write_data(
operation_spec=self.operation_spec,
)

data_writer.write_dataframe(dataframe, self.client)
data_writer.write_dataframe(dataframe)

def _get_cache_reference_content(self) -> t.Union[str, None]:
"""
Expand Down
1 change: 1 addition & 0 deletions src/fondant/components/caption_images/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(
batch_size: int,
max_new_tokens: int,
):
super().__init__()
self.device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Device: {self.device}")

Expand Down
1 change: 1 addition & 0 deletions src/fondant/components/chunk_text/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
code_splitter
for more information on supported languages.
"""
super().__init__()
self.chunk_strategy = chunk_strategy
self.chunk_kwargs = chunk_kwargs
self.chunker = self._get_chunker_class(chunk_strategy)
Expand Down
1 change: 1 addition & 0 deletions src/fondant/components/crop_images/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(
cropping_threshold (int): threshold parameter used for detecting borders
padding (int): padding for the image cropping.
"""
super().__init__()
self.cropping_threshold = cropping_threshold
self.padding = padding

Expand Down
4 changes: 1 addition & 3 deletions src/fondant/components/download_images/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@
import logging
import typing as t

import dask
import httpx
import pandas as pd
from fondant.component import PandasTransformComponent
from resizer import Resizer

logger = logging.getLogger(__name__)

dask.config.set(scheduler="processes")


class DownloadImagesComponent(PandasTransformComponent):
"""Component that downloads images based on URLs."""
Expand Down Expand Up @@ -50,6 +47,7 @@ def __init__(
Returns:
Dask dataframe
"""
super().__init__()
self.timeout = timeout
self.retries = retries
self.n_connections = n_connections
Expand Down
3 changes: 2 additions & 1 deletion src/fondant/components/embed_images/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
Pillow==10.0.1
transformers==4.28.0
transformers==4.28.0
dask-cuda==23.12.0
Loading
Loading