Skip to content

Commit

Permalink
Merge pull request #172 from revit13/code-super
Browse files Browse the repository at this point in the history
Add Super pipeline for code transforms.
  • Loading branch information
revit13 authored May 30, 2024
2 parents f59a33a + 3945dbd commit 5441bd5
Show file tree
Hide file tree
Showing 11 changed files with 284 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .make.versions
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ NOOP_VERSION=0.8.0
NOOP_SPARK_VERSION=0.1.0
RESIZE_VERSION=0.3.0
LANG_ID_VERSION=0.3.0
TOKENIZER_VERSION=0.3.0
TOKENIZATION_VERSION=0.3.0
MALWARE_VERSION=0.4.0
PROGLANG_SELECT_VERSION=0.3.0
CODE_QUALITY_VERSION=0.3.0
Expand Down
21 changes: 19 additions & 2 deletions kfp/doc/multi_transform_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,30 @@ In the list of its input parameters, we also see `data_s3_config`. Now, we have

![param list](param_list2.png)

### Examples

**Note** An example super pipeline that combines several transforms, `doc_id`, `ededup`, and `fdedup`, can be found in [superworkflow_dedups_sample_wf.py](../superworkflows/v1/superworkflow_dedups_sample_wf.py).
The sections that follow display two super pipelines as examples:

1) [dedups super pipeline](#dedups)
1) [programming languages super pipeline](#code)

### Dedups super pipeline <a name = "dedups"></a>

This pipeline combines several transforms, `doc_id`, `ededup`, and `fdedup`, can be found in [superworkflow_dedups_sample_wf.py](../superworkflows/ray/kfp_v1/superworkflow_dedups_sample_wf.py).

![super pipeline](super_pipeline.png)

The input parameters of the super pipelines are described in this [section](#super-pipeline-Input-Parameters).

### Programming languages super pipeline <a name = "code"></a>

This pipeline combines transforms for programming languages data preprocessing: `ededup`, `doc_id`, `fdedup`, `proglang_select`, `code_quality`, `malware` and `tokenization`. It can be found in [superworkflow_code_wf.py](../superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py).

![super pipeline](super-code-pipeline.png)

The input parameters of the super pipelines are described in this [section](#Super-pipeline-Input-Parameters).

The input parameters of the super pipelines:
### Super Pipeline Input Parameters

There are several `groups` of input parameters for super pipelines, each group of parameters has a prefix of "p<x>_" string, where <x> is an order number.

Expand Down
Binary file added kfp/doc/super-code-pipeline.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
# Execute Ray jobs
execute_ray_jobs(
name=cluster_name,
additional_params=KFPUtils.load_from_json(args.additional_params),
additional_params=KFPUtils.load_from_json(args.additional_params.replace("'", '"')),
e_params=exec_params,
exec_script_name=args.exec_script_name,
server_url=args.server_url,
Expand Down
35 changes: 15 additions & 20 deletions kfp/superworkflows/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,22 @@ REPOROOT=../../
# Use make help, to see the available rules
include ${REPOROOT}/.make.defaults

clean::
@# Help: Recursively make $@ all subdirs
$(MAKE) RULE=$@ .recurse
.PHONY: workflow-venv
workflow-venv:
$(MAKE) -C ray/kfp_v1 workflow-venv

workflow-venv::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse
.PHONY: workflow-build
workflow-build:
$(MAKE) -C ray/kfp_v1 workflow-build

workflow-build::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse
.PHONY: workflow-test
workflow-test:
$(MAKE) -C ray/kfp_v1 workflow-test

workflow-test::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse

workflow-upload::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse

workflow-reconcile-requirements::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse
.PHONY: workflow-upload
workflow-upload:
$(MAKE) -C ray/kfp_v1 workflow-upload

.PHONY: workflow-reconcile-requirements
workflow-reconcile-requirements:
$(MAKE) -C ray/kfp_v1 workflow-reconcile-requirements
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
REPOROOT=${CURDIR}/../../..
REPOROOT=${CURDIR}/../../../..
WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate
include $(REPOROOT)/transforms/.make.transforms_workflows

PYTHON_WF := $(shell find ./ -name *_wf.py)
PYTHON_WF := $(shell find ./ -name "*_wf.py")
YAML_WF=$(patsubst %.py, %.yaml, ${PYTHON_WF})

workflow-venv:: .check_python_version ${WORKFLOW_VENV_ACTIVATE}
Expand Down
237 changes: 237 additions & 0 deletions kfp/superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from kfp_support.workflow_support.utils import ONE_WEEK_SEC


# Components
# For every sub workflow we need a separate components, that knows about this subworkflow.
component_spec_path = "../../../kfp_ray_components/"
run_code_quality_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_malware_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_proglang_select_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_doc_id_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_exact_dedup_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_fuzzy_dedup_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_tokenization_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")

proglang_select_image = "quay.io/dataprep1/data-prep-kit/proglang_select:0.3.0"
code_quality_image = "quay.io/dataprep1/data-prep-kit/code_quality:0.3.0"
malware_image = "quay.io/dataprep1/data-prep-kit/malware:0.4.0"
doc_id_image = "quay.io/dataprep1/data-prep-kit/doc_id:0.3.1"
ededup_image = "quay.io/dataprep1/data-prep-kit/ededup:0.3.0"
fdedup_image = "quay.io/dataprep1/data-prep-kit/fdedup:0.3.0"
tokenizer_image = "quay.io/dataprep1/data-prep-kit/tokenization:0.3.0"


# Pipeline to invoke execution on remote resource
@dsl.pipeline(
name="super-kubeflow-pipeline-code",
description="Super pipeline for programming languages data preprocessing",
)
def sample_code_ray_orchestrator(
# the super pipeline parameters
p1_orch_code_quality_name: str = "code_quality_wf",
p1_orch_malware_name: str = "malware_wf",
p1_orch_proglang_select_name: str = "proglang_select_wf",
p1_orch_doc_id_name: str = "doc_id_wf",
p1_orch_exact_dedup_name: str = "ededup_wf",
p1_orch_fuzzy_dedup_name: str = "fdedup_wf",
p1_orch_tokenization_wf_name: str = "tokenization_wf",
p2_pipeline_runtime_pipeline_id: str = "pipeline_id",
p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
p2_pipeline_input_parent_path: str = "test/ingest2parquet/output/",
p2_pipeline_output_parent_path: str = "test/super/output/",
p2_pipeline_parent_path_suffix: str = "",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p2_pipeline_data_s3_access_secret: str = "s3-secret",
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
p2_pipeline_runtime_actor_options: str = '{"num_cpus": 0.8}',
p2_pipeline_data_max_files: int = -1,
p2_pipeline_data_num_samples: int = -1,
# Exact dedup step parameters
p3_name: str = "ededup",
p3_skip: bool = False,
p3_ededup_doc_column: str = "contents",
p3_ededup_hash_cpu: float = 0.5,
# data sampling
p3_ededup_n_samples: int = 10,
# overriding parameters
p3_overriding_params: str = '{"ray_worker_options": {"image": "'
+ ededup_image
+ '"}, "ray_head_options": {"image": "'
+ ededup_image
+ '"}}',
# Document ID step parameters
p4_name: str = "doc_id",
p4_skip: bool = False,
# doc id parameters
p4_doc_id_doc_column: str = "contents",
p4_doc_id_hash_column: str = "hash_column",
p4_doc_id_int_column: str = "int_id_column",
# overriding parameters
p4_overriding_params: str = '{"ray_worker_options": {"image": "'
+ doc_id_image
+ '"}, "ray_head_options": {"image": "'
+ doc_id_image
+ '"}}',
# Fuzzy dedup step parameters
p5_name: str = "fdedup",
p5_skip: bool = False,
# columns used
p5_fdedup_doc_column: str = "contents",
p5_fdedup_id_column: str = "int_id_column",
p5_fdedup_cluster_column: str = "cluster",
# orchestrator
# infrastructure
p5_fdedup_bucket_cpu: float = 0.5,
p5_fdedup_doc_cpu: float = 0.5,
p5_fdedup_mhash_cpu: float = 0.5,
# fuzzy parameters
p5_fdedup_num_permutations: int = 64,
p5_fdedup_threshold: float = 0.8,
p5_fdedup_shingles_size: int = 5,
p5_fdedup_delimiters: str = " ",
# random delay between reads
p5_fdedup_random_delay_limit: int = 5,
# snapshotting
p5_fdedup_snapshot_delay: int = 1,
p5_fdedup_use_doc_snapshot: bool = False,
p5_fdedup_use_bucket_snapshot: bool = False,
# data sampling
p5_fdedup_n_samples: int = 10,
# overriding parameters
p5_overriding_params: str = '{"ray_worker_options": {"image": "'
+ fdedup_image
+ '"}, "ray_head_options": {"image": "'
+ fdedup_image
+ '"}}',
# proglang_select step parameters
p6_name: str = "proglang_select",
p6_skip: bool = False,
p6_proglang_select_allowed_langs_file: str = "test/proglang_select/languages/allowed-code-languages.txt",
p6_proglang_select_language_column: str = "programming_language",
p6_proglang_select_s3_access_secret: str = "s3-secret",
# overriding parameters
p6_overriding_params: str = '{"ray_worker_options": {"image": "'
+ proglang_select_image
+ '"}, "ray_head_options": {"image": "'
+ proglang_select_image
+ '"}}',
# Code quality step parameters
p7_name: str = "code_quality",
p7_skip: bool = False,
p7_cq_contents_column_name: str = "contents",
p7_cq_language_column_name: str = "programming_language",
p7_cq_tokenizer: str = "codeparrot/codeparrot",
p7_cq_hf_token: str = "None",
# orchestrator
# overriding parameters
p7_overriding_params: str = '{"ray_worker_options": {"image": "'
+ code_quality_image
+ '"}, "ray_head_options": {"image": "'
+ code_quality_image
+ '"}}',
# malware step parameters
p8_name: str = "malware",
p8_skip: bool = False,
p8_malware_input_column: str = "contents",
p8_malware_output_column: str = "virus_detection",
# orchestrator
# overriding parameters
p8_overriding_params: str = '{"ray_worker_options": {"image": "'
+ malware_image
+ '"}, "ray_head_options": {"image": "'
+ malware_image
+ '"}}',
# tokenization parameters
p9_name: str = "tokenization",
p9_skip: bool = False,
p9_tkn_tokenizer: str = "hf-internal-testing/llama-tokenizer",
p9_tkn_doc_id_column: str = "document_id",
p9_tkn_doc_content_column: str = "contents",
p9_tkn_text_lang: str = "en",
p9_tkn_tokenizer_args: str = "cache_dir=/tmp/hf",
p9_tkn_chunk_size: int = 0,
p9_overriding_params: str = '{"ray_worker_options": {"image": "'
+ tokenizer_image
+ '"}, "ray_head_options": {"image": "'
+ tokenizer_image
+ '"}}',
):

# get all arguments
args = locals()
orch_host = "http://ml-pipeline:8888"

def _set_component(op: dsl.BaseOp, displaied_name: str, prev_op: dsl.BaseOp = None):
# set the sub component UI name
op.set_display_name(displaied_name)

# Add pod labels
op.add_pod_label("app", "ml-pipeline").add_pod_label("component", "code-pipelines")
# No cashing
op.execution_options.caching_strategy.max_cache_staleness = "P0D"
# image pull policy
op.set_image_pull_policy("Always")
# Set the timeout for each task to one week (in seconds)
op.set_timeout(ONE_WEEK_SEC)
if prev_op is not None:
op.after(prev_op)

# exact deduplication
exact_dedup = run_exact_dedup_op(
name=p1_orch_exact_dedup_name,
prefix="p3_",
params=args,
host=orch_host,
input_folder=p2_pipeline_input_parent_path,
)
_set_component(exact_dedup, "exact dedup")
# document ID
doc_id = run_doc_id_op(
name=p1_orch_doc_id_name, prefix="p4_", params=args, host=orch_host, input_folder=exact_dedup.output
)
_set_component(doc_id, "doc ID", exact_dedup)
# fuzzy deduplication
fuzzy_dedup = run_fuzzy_dedup_op(
name=p1_orch_fuzzy_dedup_name, prefix="p5_", params=args, host=orch_host, input_folder=doc_id.output
)
_set_component(fuzzy_dedup, "fuzzy dedup", doc_id)

# proglang_select
proglang_select = run_proglang_select_op(
name=p1_orch_proglang_select_name,
prefix="p6_",
params=args,
host=orch_host,
input_folder=fuzzy_dedup.output,
)
_set_component(proglang_select, "proglang_select", fuzzy_dedup)

# code_quality
code_quality = run_code_quality_op(
name=p1_orch_code_quality_name, prefix="p7_", params=args, host=orch_host, input_folder=proglang_select.output
)
_set_component(code_quality, "code_quality", proglang_select)

# malware
malware = run_malware_op(
name=p1_orch_malware_name, prefix="p8_", params=args, host=orch_host, input_folder=code_quality.output
)
_set_component(malware, "malware", code_quality)
# malware
tokenization = run_tokenization_op(
name=p1_orch_tokenization_wf_name, prefix="p9_", params=args, host=orch_host, input_folder=malware.output
)
_set_component(tokenization, "tokenization", malware)

# Configure the pipeline level to one week (in seconds)
dsl.get_pipeline_conf().set_timeout(ONE_WEEK_SEC)


if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(sample_code_ray_orchestrator, __file__.replace(".py", ".yaml"))
Loading

0 comments on commit 5441bd5

Please sign in to comment.