Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DataComp pipeline] Add first 2 components #223

Merged
merged 14 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,7 @@ MANIFEST
*.lock

# kubeflow artifacts
*.tgz
*.tgz

# docker artifacts
examples/pipelines/*/docker-compose.yml
2 changes: 1 addition & 1 deletion components/image_resolution_filtering/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ def transform(

if __name__ == "__main__":
component = ImageFilterComponent.from_args()
component.run()
component.run()
2 changes: 1 addition & 1 deletion components/load_from_hf_hub/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Load from hub
description: Component that loads a dataset from the hub
image: ghcr.io/ml6team/load_from_hf_hub:latest

consumes:
produces:
dummy_variable: #TODO: fill in here
fields:
data:
Expand Down
57 changes: 57 additions & 0 deletions examples/pipelines/datacomp/build_images.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/bash

function usage {
echo "Usage: $0 [options]"
echo "Options:"
echo " -c, --component <value> Set the component name. Pass the component folder name to build a certain components or 'all' to build all components in the current directory (required)"
echo " -n, --namespace <value> Set the namespace (default: ml6team)"
echo " -r, --repo <value> Set the repo (default: fondant)"
echo " -t, --tag <value> Set the tag (default: latest)"
echo " -h, --help Display this help message"
}

# Parse the arguments
while [[ "$#" -gt 0 ]]; do case $1 in
-n|--namespace) namespace="$2"; shift;;
-r|--repo) repo="$2"; shift;;
-t|--tag) tag="$2"; shift;;
-c|--component) component="$2"; shift;;
-h|--help) usage; exit;;
*) echo "Unknown parameter passed: $1"; exit 1;;
esac; shift; done

# Check for required argument
if [ -z "${component}" ]; then
echo "Error: component parameter is required"
usage
exit 1
fi

# Set default values for optional arguments if not passed
[ -n "${namespace-}" ] || namespace="ml6team"
[ -n "${repo-}" ] || repo="fondant"
[ -n "${tag-}" ] || tag="latest"

# Get the component directory
component_dir=$(pwd)/"components"

# Loop through all subdirectories
for dir in $component_dir/*/; do
cd "$dir"
BASENAME=${dir%/}
BASENAME=${BASENAME##*/}
# Build all images or one image depending on the passed argument
if [[ "$BASENAME" == "${component}" ]] || [[ "${component}" == "all" ]]; then
full_image_name=ghcr.io/${namespace}/${BASENAME}:${tag}
echo $full_image_name
docker build -t "$full_image_name" \
--build-arg COMMIT_SHA=$(git rev-parse HEAD) \
--build-arg GIT_BRANCH=$(git rev-parse --abbrev-ref HEAD) \
--build-arg BUILD_TIMESTAMP=$(date '+%F_%H:%M:%S') \
--label org.opencontainers.image.source=https://github.com/${namespace}/${repo} \
--platform=linux/arm64 \
.
docker push "$full_image_name"
fi
cd "$component_dir"
done
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
FROM --platform=linux/amd64 python:3.8-slim

## System dependencies
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install git -y

# install requirements
COPY requirements.txt /
RUN pip3 install --no-cache-dir -r requirements.txt
RUN python -m spacy download en_core_web_sm

# Set the working directory to the component folder
WORKDIR /component/src

# Copy over src-files
COPY src/ .

ENTRYPOINT ["python", "main.py"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: Filter text complexity
description: Component that filters text based on their dependency parse complexity and number of actions
image: ghcr.io/ml6team/filter_text_complexity:latest

consumes:
text:
fields:
data:
type: string

args:
spacy_pipeline:
description: SpaCy pipeline to use, e.g. "en_core_web_sm"
type: str
batch_size:
description: batch size to use when parsing text using SpaCy
type: int
min_complexity:
description: Minimum complexity to filter text on.
type: int
min_num_actions:
description: Minimum number of actions a text should contain.
type: int
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
git+https://github.com/ml6team/fondant.git@main
pyarrow>=7.0
gcsfs==2023.4.0
spacy==3.5.3
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""This component filters text based on:

- complexity of the dependency parse tree
- number of actions.

As proposed in [Radenovic et al., 2023](https://arxiv.org/abs/2301.02280).
"""
import logging

import pandas as pd
import spacy
from spacy.symbols import nsubj, VERB

from fondant.component import PandasTransformComponent
from fondant.logger import configure_logging

configure_logging()
logger = logging.getLogger(__name__)


def get_text_complexity(doc: spacy.tokens.doc.Doc):
complexity = 0
for token in doc:
num_children = len([child for child in token.children])
if num_children > complexity:
complexity = num_children

return complexity


def get_num_actions(doc: spacy.tokens.doc.Doc):
verbs = set()
for possible_subject in doc:
if possible_subject.dep == nsubj and possible_subject.head.pos == VERB:
verbs.add(possible_subject.head)

return len(verbs)


class FilterTextComplexity(PandasTransformComponent):
"""Component that filters text based on:

- complexity of the dependency parse tree
- number of actions"""

def setup(
self,
*,
spacy_pipeline,
batch_size: int,
min_complexity: int,
min_num_actions: int
) -> None:
self.nlp = spacy.load(spacy_pipeline, exclude=["ner"])
self.batch_size = batch_size
self.min_complexity = min_complexity
self.min_num_actions = min_num_actions

def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
texts = dataframe["text"]["data"]

docs = list(self.nlp.pipe(texts, batch_size=self.batch_size))
docs = pd.Series(docs)

caption_complexity = docs.apply(lambda doc: get_text_complexity(doc))
num_actions = docs.apply(lambda doc: get_num_actions(doc))

mask = (caption_complexity >= self.min_complexity) & (
num_actions >= self.min_num_actions
)
mask = mask.to_numpy()

dataframe = dataframe[mask]

dataframe = dataframe.drop(("text", "data"), axis=1)

return dataframe


if __name__ == "__main__":
component = FilterTextComplexity.from_args()
component.run()
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
name: Load from hub
description: Component that loads the DataComp dataset from the hub
image: ghcr.io/ml6team/load_from_hf_hub:latest

produces:
image:
fields:
url:
type: string
original_width:
type: int16
original_height:
type: int16
face_bboxes:
type: array
items:
type: array
items:
type: float32
sha256:
type: utf8

text:
fields:
data:
type: string

image_text:
fields:
clip_b32_similarity_score:
type: float32
clip_l14_similarity_score:
type: float32

args:
dataset_name:
description: Name of dataset on the hub
type: str
column_name_mapping:
description: Mapping of the consumed hub dataset to fondant column names
type: dict
image_column_names:
description: Optional argument, a list containing the original image column names in case the
dataset on the hub contains them. Used to format the image from HF hub format to a byte string.
type: list
default: None
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: None
70 changes: 70 additions & 0 deletions examples/pipelines/datacomp/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Pipeline used to filter the dataset of the Datacomp competition."""

import logging
import sys

sys.path.append("../")

from pipeline_configs import PipelineConfigs

from fondant.compiler import DockerCompiler
from fondant.pipeline import ComponentOp, Pipeline, Client
from fondant.logger import configure_logging

configure_logging()
logger = logging.getLogger(__name__)

# Initialize pipeline and client
pipeline = Pipeline(
pipeline_name="Datacomp filtering pipeline",
pipeline_description="A pipeline for filtering the Datacomp dataset",
# base_path=PipelineConfigs.BASE_PATH,
base_path="/Users/nielsrogge/Documents/fondant_artifacts_datacomp",
)
client = Client(host=PipelineConfigs.HOST)

# define ops
load_component_column_mapping = {
"url": "image_url",
"original_width": "image_original_width",
"original_height": "image_original_height",
"face_bboxes": "image_face_bboxes",
"sha256": "image_sha256",
"text": "text_data",
"clip_b32_similarity_score": "image_text_clip_b32_similarity_score",
"clip_l14_similarity_score": "image_text_clip_l14_similarity_score",
}

load_from_hub_op = ComponentOp.from_registry(
name="load_from_hf_hub",
component_spec_path="components/load_from_hf_hub/fondant_component.yaml",
arguments={
"dataset_name": "mlfoundations/datacomp_small",
"column_name_mapping": load_component_column_mapping,
"n_rows_to_load": 100,
},
)
filter_complexity_op = ComponentOp(
component_spec_path="components/filter_text_complexity/fondant_component.yaml",
arguments={
"spacy_pipeline": "en_core_web_sm",
"batch_size": 1000,
"min_complexity": 1,
"min_num_actions": 1,
},
)

# add ops to pipeline
pipeline.add_op(load_from_hub_op)
pipeline.add_op(filter_complexity_op, dependencies=load_from_hub_op)
# TODO add more ops

# compile
if __name__ == "__main__":
compiler = DockerCompiler()
# mount the gcloud credentials to the container
extra_volumes = [
"$HOME/.config/gcloud/application_default_credentials.json:/root/.config/gcloud/application_default_credentials.json:ro"
]
compiler.compile(pipeline=pipeline, extra_volumes=extra_volumes)
logger.info("Run `docker compose up` to run the pipeline.")