From 45bf4307e87e93bbccc43a6a225af0f4c4fd27bc Mon Sep 17 00:00:00 2001 From: Niels Rogge Date: Tue, 8 Aug 2023 10:40:50 +0200 Subject: [PATCH 1/4] Add dataset_length --- .../load_from_hf_hub/fondant_component.yaml | 4 +++ components/load_from_hf_hub/src/main.py | 25 ++++++++++++++++--- examples/pipelines/datacomp/pipeline.py | 1 + 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/components/load_from_hf_hub/fondant_component.yaml b/components/load_from_hf_hub/fondant_component.yaml index 0099a92f8..56f47a2f2 100644 --- a/components/load_from_hf_hub/fondant_component.yaml +++ b/components/load_from_hf_hub/fondant_component.yaml @@ -23,4 +23,8 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int + default: None + dataset_length: + description: Optional argument that defines the length of the dataset. Required in case `n_rows_to_load` is specified. + type: int default: None \ No newline at end of file diff --git a/components/load_from_hf_hub/src/main.py b/components/load_from_hf_hub/src/main.py index a55daf0c9..11caa556d 100644 --- a/components/load_from_hf_hub/src/main.py +++ b/components/load_from_hf_hub/src/main.py @@ -16,6 +16,7 @@ def __init__(self, *_, column_name_mapping: dict, image_column_names: t.Optional[list], n_rows_to_load: t.Optional[int], + dataset_length: int, ) -> None: """ Args: @@ -25,11 +26,14 @@ def __init__(self, *_, format the image from HF hub format to a byte string n_rows_to_load: optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale. + dataset_length: optional argument that specifies the length of the entire dataset. Only + required in case n_rows_to_load is specified. """ self.dataset_name = dataset_name self.column_name_mapping = column_name_mapping self.image_column_names = image_column_names self.n_rows_to_load = n_rows_to_load + self.dataset_length = dataset_length def load(self) -> dd.DataFrame: # 1) Load data, read as Dask dataframe @@ -44,12 +48,27 @@ def load(self) -> dd.DataFrame: ) # 3) Rename columns + logger.info("Renaming columns...") dask_df = dask_df.rename(columns=self.column_name_mapping) # 4) Optional: only return specific amount of rows - if self.n_rows_to_load: - dask_df = dask_df.head(self.n_rows_to_load) - dask_df = dd.from_pandas(dask_df, npartitions=1) + if self.n_rows_to_load is not None: + if self.dataset_length is None: + raise ValueError("""Make sure to also specify the length of the entire + dataset. This is required as otherwise only the first + partition can be loaded""") + logger.info(f"""Loading approximately {self.n_rows_to_load} rows... + at least one partition""") + partition_length = self.dataset_length // dask_df.npartitions + npartitions = max(self.n_rows_to_load // partition_length, 1) + dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions) + dask_df = dd.from_pandas(dask_df, npartitions=npartitions) + + # Set monotonically increasing index + logger.info("Setting the index...") + dask_df["id"] = 1 + dask_df["id"] = dask_df.id.cumsum() + dask_df = dask_df.set_index("id", sort=True) return dask_df diff --git a/examples/pipelines/datacomp/pipeline.py b/examples/pipelines/datacomp/pipeline.py index 9af28c365..efce7d7ab 100644 --- a/examples/pipelines/datacomp/pipeline.py +++ b/examples/pipelines/datacomp/pipeline.py @@ -40,6 +40,7 @@ "dataset_name": "nielsr/datacomp-small-with-embeddings", "column_name_mapping": load_component_column_mapping, "n_rows_to_load": 100, + "dataset_length": 12800000, }, ) filter_image_resolution_op = ComponentOp.from_registry( From 31d6a55ad8abcf66682195458aa3e1c8dcf34224 Mon Sep 17 00:00:00 2001 From: Niels Rogge Date: Tue, 8 Aug 2023 12:06:27 +0200 Subject: [PATCH 2/4] Use suggestion --- .../load_from_hf_hub/fondant_component.yaml | 6 +----- components/load_from_hf_hub/src/main.py | 18 ++++++----------- .../load_from_hf_hub/fondant_component.yaml | 2 +- examples/pipelines/datacomp/pipeline.py | 20 ++++--------------- 4 files changed, 12 insertions(+), 34 deletions(-) diff --git a/components/load_from_hf_hub/fondant_component.yaml b/components/load_from_hf_hub/fondant_component.yaml index 56f47a2f2..c63050f5e 100644 --- a/components/load_from_hf_hub/fondant_component.yaml +++ b/components/load_from_hf_hub/fondant_component.yaml @@ -1,6 +1,6 @@ name: Load from hub description: Component that loads a dataset from the hub -image: ghcr.io/ml6team/load_from_hf_hub:dev +image: ghcr.io/ml6team/load_from_hf_hub:566a5b6fd3d422c98f7890a6e4101d8af10a42bf produces: dummy_variable: #TODO: fill in here @@ -23,8 +23,4 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int - default: None - dataset_length: - description: Optional argument that defines the length of the dataset. Required in case `n_rows_to_load` is specified. - type: int default: None \ No newline at end of file diff --git a/components/load_from_hf_hub/src/main.py b/components/load_from_hf_hub/src/main.py index 11caa556d..2775a6d8e 100644 --- a/components/load_from_hf_hub/src/main.py +++ b/components/load_from_hf_hub/src/main.py @@ -16,7 +16,6 @@ def __init__(self, *_, column_name_mapping: dict, image_column_names: t.Optional[list], n_rows_to_load: t.Optional[int], - dataset_length: int, ) -> None: """ Args: @@ -26,14 +25,11 @@ def __init__(self, *_, format the image from HF hub format to a byte string n_rows_to_load: optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale. - dataset_length: optional argument that specifies the length of the entire dataset. Only - required in case n_rows_to_load is specified. """ self.dataset_name = dataset_name self.column_name_mapping = column_name_mapping self.image_column_names = image_column_names self.n_rows_to_load = n_rows_to_load - self.dataset_length = dataset_length def load(self) -> dd.DataFrame: # 1) Load data, read as Dask dataframe @@ -53,14 +49,12 @@ def load(self) -> dd.DataFrame: # 4) Optional: only return specific amount of rows if self.n_rows_to_load is not None: - if self.dataset_length is None: - raise ValueError("""Make sure to also specify the length of the entire - dataset. This is required as otherwise only the first - partition can be loaded""") - logger.info(f"""Loading approximately {self.n_rows_to_load} rows... - at least one partition""") - partition_length = self.dataset_length // dask_df.npartitions - npartitions = max(self.n_rows_to_load // partition_length, 1) + partitions_length = 0 + for npartitions, partition in enumerate(dask_df.partitions): + if partitions_length >= self.n_rows_to_load: + logger.info(f"Required number of partitions to load {self.n_rows_to_load} is {npartitions}") + break + partitions_length += len(partition) dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions) dask_df = dd.from_pandas(dask_df, npartitions=npartitions) diff --git a/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml b/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml index 611b6b886..b44023860 100644 --- a/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml +++ b/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml @@ -1,6 +1,6 @@ name: Load from hub description: Component that loads a dataset from the hub -image: ghcr.io/ml6team/load_from_hf_hub:dev +image: ghcr.io/ml6team/load_from_hf_hub:566a5b6fd3d422c98f7890a6e4101d8af10a42bf produces: image: diff --git a/examples/pipelines/datacomp/pipeline.py b/examples/pipelines/datacomp/pipeline.py index efce7d7ab..5248aef4e 100644 --- a/examples/pipelines/datacomp/pipeline.py +++ b/examples/pipelines/datacomp/pipeline.py @@ -7,14 +7,13 @@ from pipeline_configs import PipelineConfigs -from fondant.compiler import DockerCompiler from fondant.pipeline import ComponentOp, Pipeline, Client logger = logging.getLogger(__name__) # Initialize pipeline and client pipeline = Pipeline( - pipeline_name="Datacomp filtering pipeline", + pipeline_name="datacomp-filtering", pipeline_description="A pipeline for filtering the Datacomp dataset", # base_path=PipelineConfigs.BASE_PATH, base_path="/Users/nielsrogge/Documents/fondant_artifacts_datacomp", @@ -40,7 +39,6 @@ "dataset_name": "nielsr/datacomp-small-with-embeddings", "column_name_mapping": load_component_column_mapping, "n_rows_to_load": 100, - "dataset_length": 12800000, }, ) filter_image_resolution_op = ComponentOp.from_registry( @@ -66,17 +64,7 @@ # add ops to pipeline pipeline.add_op(load_from_hub_op) -pipeline.add_op(filter_image_resolution_op, dependencies=load_from_hub_op) -pipeline.add_op(filter_complexity_op, dependencies=filter_image_resolution_op) -pipeline.add_op(cluster_image_embeddings_op, dependencies=filter_complexity_op) +# pipeline.add_op(filter_image_resolution_op, dependencies=load_from_hub_op) +# pipeline.add_op(filter_complexity_op, dependencies=filter_image_resolution_op) +# pipeline.add_op(cluster_image_embeddings_op, dependencies=filter_complexity_op) # TODO add more ops - -# compile -if __name__ == "__main__": - compiler = DockerCompiler() - # mount the gcloud credentials to the container - extra_volumes = [ - "$HOME/.config/gcloud/application_default_credentials.json:/root/.config/gcloud/application_default_credentials.json:ro" - ] - compiler.compile(pipeline=pipeline, extra_volumes=extra_volumes) - logger.info("Run `docker compose up` to run the pipeline.") From 9a5076b067db28eb4430920bdf4ded9ba525c869 Mon Sep 17 00:00:00 2001 From: Niels Rogge Date: Tue, 8 Aug 2023 14:02:12 +0200 Subject: [PATCH 3/4] Address comments --- components/load_from_hf_hub/fondant_component.yaml | 2 +- .../components/load_from_hf_hub/fondant_component.yaml | 2 +- examples/pipelines/datacomp/pipeline.py | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/components/load_from_hf_hub/fondant_component.yaml b/components/load_from_hf_hub/fondant_component.yaml index c63050f5e..0099a92f8 100644 --- a/components/load_from_hf_hub/fondant_component.yaml +++ b/components/load_from_hf_hub/fondant_component.yaml @@ -1,6 +1,6 @@ name: Load from hub description: Component that loads a dataset from the hub -image: ghcr.io/ml6team/load_from_hf_hub:566a5b6fd3d422c98f7890a6e4101d8af10a42bf +image: ghcr.io/ml6team/load_from_hf_hub:dev produces: dummy_variable: #TODO: fill in here diff --git a/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml b/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml index b44023860..611b6b886 100644 --- a/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml +++ b/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml @@ -1,6 +1,6 @@ name: Load from hub description: Component that loads a dataset from the hub -image: ghcr.io/ml6team/load_from_hf_hub:566a5b6fd3d422c98f7890a6e4101d8af10a42bf +image: ghcr.io/ml6team/load_from_hf_hub:dev produces: image: diff --git a/examples/pipelines/datacomp/pipeline.py b/examples/pipelines/datacomp/pipeline.py index 5248aef4e..7f0e762a3 100644 --- a/examples/pipelines/datacomp/pipeline.py +++ b/examples/pipelines/datacomp/pipeline.py @@ -64,7 +64,7 @@ # add ops to pipeline pipeline.add_op(load_from_hub_op) -# pipeline.add_op(filter_image_resolution_op, dependencies=load_from_hub_op) -# pipeline.add_op(filter_complexity_op, dependencies=filter_image_resolution_op) -# pipeline.add_op(cluster_image_embeddings_op, dependencies=filter_complexity_op) +pipeline.add_op(filter_image_resolution_op, dependencies=load_from_hub_op) +pipeline.add_op(filter_complexity_op, dependencies=filter_image_resolution_op) +pipeline.add_op(cluster_image_embeddings_op, dependencies=filter_complexity_op) # TODO add more ops From 97df532bc4ff92ad0912b74874c318a7d7c77596 Mon Sep 17 00:00:00 2001 From: Niels Rogge Date: Tue, 8 Aug 2023 14:11:11 +0200 Subject: [PATCH 4/4] Fix precommit --- components/load_from_hf_hub/src/main.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/components/load_from_hf_hub/src/main.py b/components/load_from_hf_hub/src/main.py index 2775a6d8e..2f1492bab 100644 --- a/components/load_from_hf_hub/src/main.py +++ b/components/load_from_hf_hub/src/main.py @@ -49,11 +49,12 @@ def load(self) -> dd.DataFrame: # 4) Optional: only return specific amount of rows if self.n_rows_to_load is not None: - partitions_length = 0 + partitions_length = 0 for npartitions, partition in enumerate(dask_df.partitions): if partitions_length >= self.n_rows_to_load: - logger.info(f"Required number of partitions to load {self.n_rows_to_load} is {npartitions}") - break + logger.info(f"""Required number of partitions to load\n + {self.n_rows_to_load} is {npartitions}""") + break partitions_length += len(partition) dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions) dask_df = dd.from_pandas(dask_df, npartitions=npartitions)