Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Super pipeline for code transforms. #172

Merged
merged 7 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .make.versions
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ FILTER_VERSION=0.3.0
NOOP_VERSION=0.8.0
RESIZE_VERSION=0.3.0
LANG_ID_VERSION=0.3.0
TOKENIZER_VERSION=0.3.0
TOKENIZATION_VERSION=0.3.0
MALWARE_VERSION=0.4.0
PROGLANG_SELECT_VERSION=0.3.0
CODE_QUALITY_VERSION=0.3.0
Expand Down
21 changes: 19 additions & 2 deletions kfp/doc/multi_transform_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,30 @@ In the list of its input parameters, we also see `data_s3_config`. Now, we have

![param list](param_list2.png)

### Examples

**Note** An example super pipeline that combines several transforms, `doc_id`, `ededup`, and `fdedup`, can be found in [superworkflow_dedups_sample_wf.py](../superworkflows/v1/superworkflow_dedups_sample_wf.py).
The sections that follow display two super pipelines as examples:

1) [dedups super pipeline](#De-dups-super-pipeline)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the link will not work.
better to use explicit names.

### Dedups super pipeline <a name = "dedups"></a>

so the link can be dedups super pipeline

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks

1) [programming language super pipeline](#programming-languages-super-pipeline)

### Dedups super pipeline

This pipeline combines several transforms, `doc_id`, `ededup`, and `fdedup`, can be found in [superworkflow_dedups_sample_wf.py](../superworkflows/ray/kfp_v1/superworkflow_dedups_sample_wf.py).

![super pipeline](super_pipeline.png)

The input parameters of the super pipelines are described in this [section](#super-pipeline-Input-Parameters).

### Programming languages Super pipeline

This pipeline combines several programming-languages transforms: `ededup`, `doc_id`, `fdedup`, `proglang_select`, `code_quality`, `malware` and `tokenization`. It can be found in [superworkflow_code_wf.py](../superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pipeline combines transforms for programming languages data preprocessing:

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks


![super pipeline](super-code-pipeline.png)

The input parameters of the super pipelines are described in this [section](#Super-pipeline-Input-Parameters).

The input parameters of the super pipelines:
### Super Pipeline Input Parameters

There are several `groups` of input parameters for super pipelines, each group of parameters has a prefix of "p<x>_" string, where <x> is an order number.

Expand Down
Binary file added kfp/doc/super-code-pipeline.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
# Execute Ray jobs
execute_ray_jobs(
name=cluster_name,
additional_params=KFPUtils.load_from_json(args.additional_params),
additional_params=KFPUtils.load_from_json(args.additional_params.replace("'", '"')),
e_params=exec_params,
exec_script_name=args.exec_script_name,
server_url=args.server_url,
Expand Down
35 changes: 15 additions & 20 deletions kfp/superworkflows/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,22 @@ REPOROOT=../../
# Use make help, to see the available rules
include ${REPOROOT}/.make.defaults

clean::
@# Help: Recursively make $@ all subdirs
$(MAKE) RULE=$@ .recurse
.PHONY: workflow-venv
workflow-venv:
$(MAKE) -C ray/kfp_v1 workflow-venv

workflow-venv::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse
.PHONY: workflow-build
workflow-build:
$(MAKE) -C ray/kfp_v1 workflow-build

workflow-build::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse
.PHONY: workflow-test
workflow-test:
$(MAKE) -C ray/kfp_v1 workflow-test

workflow-test::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse

workflow-upload::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse

workflow-reconcile-requirements::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse
.PHONY: workflow-upload
workflow-upload:
$(MAKE) -C ray/kfp_v1 workflow-upload

.PHONY: workflow-reconcile-requirements
workflow-reconcile-requirements:
$(MAKE) -C ray/kfp_v1 workflow-reconcile-requirements
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
REPOROOT=${CURDIR}/../../..
REPOROOT=${CURDIR}/../../../..
WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate
include $(REPOROOT)/transforms/.make.transforms_workflows

PYTHON_WF := $(shell find ./ -name *_wf.py)
PYTHON_WF := $(shell find ./ -name "*_wf.py")
YAML_WF=$(patsubst %.py, %.yaml, ${PYTHON_WF})

workflow-venv:: .check_python_version ${WORKFLOW_VENV_ACTIVATE}
Expand Down
260 changes: 260 additions & 0 deletions kfp/superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from kfp_support.workflow_support.utils import ONE_WEEK_SEC


# Components
# For every sub workflow we need a separate components, that knows about this subworkflow.
component_spec_path = "../../../kfp_ray_components/"
run_code_quality_op = comp.load_component_from_file(component_spec_path+"executeSubWorkflowComponent.yaml")
run_malware_op = comp.load_component_from_file(component_spec_path+"executeSubWorkflowComponent.yaml")
run_proglang_select_op = comp.load_component_from_file(component_spec_path+"executeSubWorkflowComponent.yaml")
run_doc_id_op = comp.load_component_from_file(component_spec_path+"executeSubWorkflowComponent.yaml")
run_exact_dedup_op = comp.load_component_from_file(component_spec_path+"executeSubWorkflowComponent.yaml")
run_fuzzy_dedup_op = comp.load_component_from_file(component_spec_path+"executeSubWorkflowComponent.yaml")
run_tokenization_op = comp.load_component_from_file(component_spec_path+"executeSubWorkflowComponent.yaml")

proglang_select_image = "quay.io/dataprep1/data-prep-kit/proglang_select:0.3.0"
code_quality_image = "quay.io/dataprep1/data-prep-kit/code_quality:0.3.0"
malware_image = "quay.io/dataprep1/data-prep-kit/malware:0.4.0"
doc_id_image = "quay.io/dataprep1/data-prep-kit/doc_id:0.3.1"
ededup_image = "quay.io/dataprep1/data-prep-kit/ededup:0.3.0"
fdedup_image = "quay.io/dataprep1/data-prep-kit/fdedup:0.3.0"
tokenizer_image = "quay.io/dataprep1/data-prep-kit/tokenization:0.3.0"


# Pipeline to invoke execution on remote resource
@dsl.pipeline(
name="sample-super-kubeflow-pipeline",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe to change the pipeline name and description to the specific usecase.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks

description="Pipeline to show how to run combine several transformer pipelines",
)
def sample_code_ray_orchestrator(
# the super pipeline parameters
p1_orch_code_quality_name: str = "code_quality_wf",
p1_orch_malware_name: str = "malware_wf",
p1_orch_proglang_select_name: str = "proglang_select_wf",
p1_orch_doc_id_name: str = "doc_id_wf",
p1_orch_exact_dedup_name: str = "ededup_wf",
p1_orch_fuzzy_dedup_name: str = "fdedup_wf",
p1_orch_tokenization_wf_name: str = "tokenization_wf",
p2_pipeline_runtime_pipeline_id: str = "pipeline_id",
p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
p2_pipeline_input_parent_path: str = "test/ingest2parquet/output/",
p2_pipeline_output_parent_path: str = "test/super/output/",
p2_pipeline_parent_path_suffix: str = "",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p2_pipeline_data_s3_access_secret: str = "s3-secret",
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
# Exact dedup step parameters
p3_name: str = "ededup",
p3_skip: bool = False,
p3_ededup_doc_column: str = "contents",
p3_ededup_hash_cpu: float = 0.5,
p3_runtime_actor_options: str = '{"num_cpus": 0.8}',
# data access.
p3_data_max_files: int = -1,
p3_data_num_samples: int = -1,
# data sampling
p3_ededup_n_samples: int = 10,
# overriding parameters
p3_overriding_params: str = '{"ray_worker_options": {"image": "'
+ ededup_image
+ '"}, "ray_head_options": {"image": "'
+ ededup_image
+ '"}}',
# Document ID step parameters
p4_name: str = "doc_id",
p4_skip: bool = False,
# data access.
p4_data_max_files: int = -1,
p4_data_num_samples: int = -1,
# orchestrator
p4_runtime_actor_options: str = '{"num_cpus": 0.8}',
# doc id parameters
p4_doc_id_doc_column: str = "contents",
p4_doc_id_hash_column: str = "hash_column",
p4_doc_id_int_column: str = "int_id_column",
# overriding parameters
p4_overriding_params: str = '{"ray_worker_options": {"image": "'
+ doc_id_image
+ '"}, "ray_head_options": {"image": "'
+ doc_id_image
+ '"}}',
# Fuzzy dedup step parameters
p5_name: str = "fdedup",
p5_skip: bool = False,
# columns used
p5_fdedup_doc_column: str = "contents",
p5_fdedup_id_column: str = "int_id_column",
p5_fdedup_cluster_column: str = "cluster",
# orchestrator
p5_runtime_actor_options: str = '{"num_cpus": 0.8}',
# data access. checkpointing is not supported by dedup
p5_data_num_samples: int = -1,
p5_data_max_files: int = -1,
# infrastructure
p5_fdedup_bucket_cpu: float = 0.5,
p5_fdedup_doc_cpu: float = 0.5,
p5_fdedup_mhash_cpu: float = 0.5,
# fuzzy parameters
p5_fdedup_num_permutations: int = 64,
p5_fdedup_threshold: float = 0.8,
p5_fdedup_shingles_size: int = 5,
p5_fdedup_delimiters: str = " ",
# random delay between reads
p5_fdedup_random_delay_limit: int = 5,
# snapshotting
p5_fdedup_snapshot_delay: int = 1,
p5_fdedup_use_doc_snapshot: bool = False,
p5_fdedup_use_bucket_snapshot: bool = False,
# data sampling
p5_fdedup_n_samples: int = 10,
# overriding parameters
p5_overriding_params: str = '{"ray_worker_options": {"image": "'
+ fdedup_image
+ '"}, "ray_head_options": {"image": "'
+ fdedup_image
+ '"}}',
# proglang_select step parameters
p6_name: str = "proglang_select",
p6_skip: bool = False,
p6_proglang_select_allowed_langs_file: str = "test/proglang_select/languages/allowed-code-languages.txt",
p6_proglang_select_language_column: str = "programming_language",
p6_proglang_select_s3_access_secret: str = "s3-secret",
# data access.
p6_data_max_files: int = -1,
p6_data_num_samples: int = -1,
# orchestrator
p6_runtime_actor_options: str = '{"num_cpus": 0.8}',
# overriding parameters
p6_overriding_params: str = '{"ray_worker_options": {"image": "'
+ proglang_select_image
+ '"}, "ray_head_options": {"image": "'
+ proglang_select_image
+ '"}}',
# Code quality step parameters
p7_name: str = "code_quality",
p7_skip: bool = False,
p7_cq_contents_column_name: str = "contents",
p7_cq_language_column_name: str = "programming_language",
p7_cq_tokenizer: str = "codeparrot/codeparrot",
p7_cq_hf_token: str = "None",
# data access.
p7_data_max_files: int = -1,
p7_data_num_samples: int = -1,
# orchestrator
p7_runtime_actor_options: str = '{"num_cpus": 0.8}',
# overriding parameters
p7_overriding_params: str = '{"ray_worker_options": {"image": "'
+ code_quality_image
+ '"}, "ray_head_options": {"image": "'
+ code_quality_image
+ '"}}',
# malware step parameters
p8_name: str = "malware",
p8_skip: bool = False,
p8_malware_input_column: str = "contents",
p8_malware_output_column: str = "virus_detection",
# orchestrator
p8_runtime_actor_options: str = '{"num_cpus": 0.8}',
# data access.
p8_data_max_files: int = -1,
p8_data_num_samples: int = -1,
# overriding parameters
p8_overriding_params: str = '{"ray_worker_options": {"image": "'
+ malware_image
+ '"}, "ray_head_options": {"image": "'
+ malware_image
+ '"}}',
# tokenization parameters
p9_name: str = "tokenization",
p9_skip: bool = False,
p9_tkn_tokenizer: str = "hf-internal-testing/llama-tokenizer",
p9_tkn_doc_id_column: str = "document_id",
p9_tkn_doc_content_column: str = "contents",
p9_tkn_text_lang: str = "en",
p9_tkn_tokenizer_args: str = "cache_dir=/tmp/hf",
p9_tkn_chunk_size: int = 0,
p9_overriding_params: str = '{"ray_worker_options": {"image": "'
+ tokenizer_image
+ '"}, "ray_head_options": {"image": "'
+ tokenizer_image
+ '"}}',
):

# get all arguments
args = locals()
orch_host = "http://ml-pipeline:8888"

def _set_component(op: dsl.BaseOp, displaied_name: str, prev_op: dsl.BaseOp = None):
# set the sub component UI name
op.set_display_name(displaied_name)

# Add pod labels
op.add_pod_label("app", "ml-pipeline").add_pod_label("component", "code-pipelines")
# No cashing
op.execution_options.caching_strategy.max_cache_staleness = "P0D"
# image pull policy
op.set_image_pull_policy("Always")
# Set the timeout for each task to one week (in seconds)
op.set_timeout(ONE_WEEK_SEC)
if prev_op is not None:
op.after(prev_op)

# exact deduplication
exact_dedup = run_exact_dedup_op(
name=p1_orch_exact_dedup_name,
prefix="p3_",
params=args,
host=orch_host,
input_folder=p2_pipeline_input_parent_path,
)
_set_component(exact_dedup, "exact dedup")
# document ID
doc_id = run_doc_id_op(
name=p1_orch_doc_id_name, prefix="p4_", params=args, host=orch_host, input_folder=exact_dedup.output
)
_set_component(doc_id, "doc ID", exact_dedup)
# fuzzy deduplication
fuzzy_dedup = run_fuzzy_dedup_op(
name=p1_orch_fuzzy_dedup_name, prefix="p5_", params=args, host=orch_host, input_folder=doc_id.output
)
_set_component(fuzzy_dedup, "fuzzy dedup", doc_id)

# proglang_select
proglang_select = run_proglang_select_op(
name=p1_orch_proglang_select_name,
prefix="p6_",
params=args,
host=orch_host,
input_folder=fuzzy_dedup.output,
)
_set_component(proglang_select, "proglang_select", fuzzy_dedup)

# code_quality
code_quality = run_code_quality_op(
name=p1_orch_code_quality_name, prefix="p7_", params=args, host=orch_host, input_folder=proglang_select.output
)
_set_component(code_quality, "code_quality", proglang_select)

# malware
malware = run_malware_op(
name=p1_orch_malware_name, prefix="p8_", params=args, host=orch_host, input_folder=code_quality.output
)
_set_component(malware, "malware", code_quality)
# malware
tokenization = run_tokenization_op(
name=p1_orch_tokenization_wf_name, prefix="p9_", params=args, host=orch_host, input_folder=malware.output
)
_set_component(tokenization, "tokenization", malware)

# Configure the pipeline level to one week (in seconds)
dsl.get_pipeline_conf().set_timeout(ONE_WEEK_SEC)


if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(sample_code_ray_orchestrator, __file__.replace(".py", ".yaml"))
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

# Components
# path to kfp component specifications files
component_spec_path = "../../kfp_ray_components/"
component_spec_path = "../../../kfp_ray_components/"
# For every sub workflow we need a separate components, that knows about this subworkflow.
run_doc_id_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_exact_dedup_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_fuzzy_dedup_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")

doc_id_image = "quay.io/dataprep1/data-prep-kit/doc_id:0.3.0"
doc_id_image = "quay.io/dataprep1/data-prep-kit/doc_id:0.3.1"
ededup_image = "quay.io/dataprep1/data-prep-kit/ededup:0.3.0"
fdedup_image = "quay.io/dataprep1/data-prep-kit/fdedup:0.3.0"

Expand Down
2 changes: 1 addition & 1 deletion transforms/universal/doc_id/kfp_ray/v1/doc_id_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
)


task_image = "quay.io/dataprep1/data-prep-kit/doc_id:0.3.0"
task_image = "quay.io/dataprep1/data-prep-kit/doc_id:0.3.1"

# the name of the job script
EXEC_SCRIPT_NAME: str = "doc_id_transform.py"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# the name of the job script
EXEC_SCRIPT_NAME: str = "tokenization_transform.py"

task_image = "quay.io/dataprep1/data-prep-kit/tokenization:0.2.0"
task_image = "quay.io/dataprep1/data-prep-kit/tokenization:0.3.0"

# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.1.1"
Expand Down
Loading