diff --git a/.make.versions b/.make.versions index 4f1205076..223581e3a 100644 --- a/.make.versions +++ b/.make.versions @@ -16,7 +16,7 @@ FILTER_VERSION=0.3.0 NOOP_VERSION=0.8.0 RESIZE_VERSION=0.3.0 LANG_ID_VERSION=0.3.0 -TOKENIZER_VERSION=0.3.0 +TOKENIZATION_VERSION=0.3.0 MALWARE_VERSION=0.4.0 PROGLANG_SELECT_VERSION=0.3.0 CODE_QUALITY_VERSION=0.3.0 diff --git a/kfp/doc/multi_transform_pipeline.md b/kfp/doc/multi_transform_pipeline.md index f9b3fe2c4..a9364137b 100644 --- a/kfp/doc/multi_transform_pipeline.md +++ b/kfp/doc/multi_transform_pipeline.md @@ -23,13 +23,30 @@ In the list of its input parameters, we also see `data_s3_config`. Now, we have ![param list](param_list2.png) +### Examples -**Note** An example super pipeline that combines several transforms, `doc_id`, `ededup`, and `fdedup`, can be found in [superworkflow_dedups_sample_wf.py](../superworkflows/v1/superworkflow_dedups_sample_wf.py). +The sections that follow display two super pipelines as examples: + +1) [dedups super pipeline](#dedups) +1) [programming languages super pipeline](#code) + +### Dedups super pipeline + +This pipeline combines several transforms, `doc_id`, `ededup`, and `fdedup`, can be found in [superworkflow_dedups_sample_wf.py](../superworkflows/ray/kfp_v1/superworkflow_dedups_sample_wf.py). ![super pipeline](super_pipeline.png) +The input parameters of the super pipelines are described in this [section](#super-pipeline-Input-Parameters). + +### Programming languages super pipeline + +This pipeline combines transforms for programming languages data preprocessing: `ededup`, `doc_id`, `fdedup`, `proglang_select`, `code_quality`, `malware` and `tokenization`. It can be found in [superworkflow_code_wf.py](../superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py). + +![super pipeline](super-code-pipeline.png) + +The input parameters of the super pipelines are described in this [section](#Super-pipeline-Input-Parameters). -The input parameters of the super pipelines: +### Super Pipeline Input Parameters There are several `groups` of input parameters for super pipelines, each group of parameters has a prefix of "p_" string, where is an order number. diff --git a/kfp/doc/super-code-pipeline.png b/kfp/doc/super-code-pipeline.png new file mode 100644 index 000000000..327d799bb Binary files /dev/null and b/kfp/doc/super-code-pipeline.png differ diff --git a/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py b/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py index b20286602..1e58a5e66 100644 --- a/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py +++ b/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py @@ -62,7 +62,7 @@ # Execute Ray jobs execute_ray_jobs( name=cluster_name, - additional_params=KFPUtils.load_from_json(args.additional_params), + additional_params=KFPUtils.load_from_json(args.additional_params.replace("'", '"')), e_params=exec_params, exec_script_name=args.exec_script_name, server_url=args.server_url, diff --git a/kfp/superworkflows/Makefile b/kfp/superworkflows/Makefile index d4ddf0a3a..950286fcd 100644 --- a/kfp/superworkflows/Makefile +++ b/kfp/superworkflows/Makefile @@ -2,27 +2,22 @@ REPOROOT=../../ # Use make help, to see the available rules include ${REPOROOT}/.make.defaults -clean:: - @# Help: Recursively make $@ all subdirs - $(MAKE) RULE=$@ .recurse +.PHONY: workflow-venv +workflow-venv: + $(MAKE) -C ray/kfp_v1 workflow-venv -workflow-venv:: - @# Help: Recursively make $@ in subdirs - $(MAKE) RULE=$@ .recurse +.PHONY: workflow-build +workflow-build: + $(MAKE) -C ray/kfp_v1 workflow-build -workflow-build:: - @# Help: Recursively make $@ in subdirs - $(MAKE) RULE=$@ .recurse +.PHONY: workflow-test +workflow-test: + $(MAKE) -C ray/kfp_v1 workflow-test -workflow-test:: - @# Help: Recursively make $@ in subdirs - $(MAKE) RULE=$@ .recurse - -workflow-upload:: - @# Help: Recursively make $@ in subdirs - $(MAKE) RULE=$@ .recurse - -workflow-reconcile-requirements:: - @# Help: Recursively make $@ in all subdirs - @$(MAKE) RULE=$@ .recurse +.PHONY: workflow-upload +workflow-upload: + $(MAKE) -C ray/kfp_v1 workflow-upload +.PHONY: workflow-reconcile-requirements +workflow-reconcile-requirements: + $(MAKE) -C ray/kfp_v1 workflow-reconcile-requirements diff --git a/kfp/superworkflows/v1/Makefile b/kfp/superworkflows/ray/kfp_v1/Makefile similarity index 91% rename from kfp/superworkflows/v1/Makefile rename to kfp/superworkflows/ray/kfp_v1/Makefile index 643249760..9a74bce57 100644 --- a/kfp/superworkflows/v1/Makefile +++ b/kfp/superworkflows/ray/kfp_v1/Makefile @@ -1,8 +1,8 @@ -REPOROOT=${CURDIR}/../../.. +REPOROOT=${CURDIR}/../../../.. WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate include $(REPOROOT)/transforms/.make.transforms_workflows -PYTHON_WF := $(shell find ./ -name *_wf.py) +PYTHON_WF := $(shell find ./ -name "*_wf.py") YAML_WF=$(patsubst %.py, %.yaml, ${PYTHON_WF}) workflow-venv:: .check_python_version ${WORKFLOW_VENV_ACTIVATE} diff --git a/kfp/superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py b/kfp/superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py new file mode 100644 index 000000000..05a97df92 --- /dev/null +++ b/kfp/superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py @@ -0,0 +1,237 @@ +import kfp.compiler as compiler +import kfp.components as comp +import kfp.dsl as dsl +from kfp_support.workflow_support.utils import ONE_WEEK_SEC + + +# Components +# For every sub workflow we need a separate components, that knows about this subworkflow. +component_spec_path = "../../../kfp_ray_components/" +run_code_quality_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml") +run_malware_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml") +run_proglang_select_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml") +run_doc_id_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml") +run_exact_dedup_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml") +run_fuzzy_dedup_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml") +run_tokenization_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml") + +proglang_select_image = "quay.io/dataprep1/data-prep-kit/proglang_select:0.3.0" +code_quality_image = "quay.io/dataprep1/data-prep-kit/code_quality:0.3.0" +malware_image = "quay.io/dataprep1/data-prep-kit/malware:0.4.0" +doc_id_image = "quay.io/dataprep1/data-prep-kit/doc_id:0.3.1" +ededup_image = "quay.io/dataprep1/data-prep-kit/ededup:0.3.0" +fdedup_image = "quay.io/dataprep1/data-prep-kit/fdedup:0.3.0" +tokenizer_image = "quay.io/dataprep1/data-prep-kit/tokenization:0.3.0" + + +# Pipeline to invoke execution on remote resource +@dsl.pipeline( + name="super-kubeflow-pipeline-code", + description="Super pipeline for programming languages data preprocessing", +) +def sample_code_ray_orchestrator( + # the super pipeline parameters + p1_orch_code_quality_name: str = "code_quality_wf", + p1_orch_malware_name: str = "malware_wf", + p1_orch_proglang_select_name: str = "proglang_select_wf", + p1_orch_doc_id_name: str = "doc_id_wf", + p1_orch_exact_dedup_name: str = "ededup_wf", + p1_orch_fuzzy_dedup_name: str = "fdedup_wf", + p1_orch_tokenization_wf_name: str = "tokenization_wf", + p2_pipeline_runtime_pipeline_id: str = "pipeline_id", + p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}', + p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}', + p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", + p2_pipeline_input_parent_path: str = "test/ingest2parquet/output/", + p2_pipeline_output_parent_path: str = "test/super/output/", + p2_pipeline_parent_path_suffix: str = "", + p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + p2_pipeline_data_s3_access_secret: str = "s3-secret", + p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}', + p2_pipeline_runtime_actor_options: str = '{"num_cpus": 0.8}', + p2_pipeline_data_max_files: int = -1, + p2_pipeline_data_num_samples: int = -1, + # Exact dedup step parameters + p3_name: str = "ededup", + p3_skip: bool = False, + p3_ededup_doc_column: str = "contents", + p3_ededup_hash_cpu: float = 0.5, + # data sampling + p3_ededup_n_samples: int = 10, + # overriding parameters + p3_overriding_params: str = '{"ray_worker_options": {"image": "' + + ededup_image + + '"}, "ray_head_options": {"image": "' + + ededup_image + + '"}}', + # Document ID step parameters + p4_name: str = "doc_id", + p4_skip: bool = False, + # doc id parameters + p4_doc_id_doc_column: str = "contents", + p4_doc_id_hash_column: str = "hash_column", + p4_doc_id_int_column: str = "int_id_column", + # overriding parameters + p4_overriding_params: str = '{"ray_worker_options": {"image": "' + + doc_id_image + + '"}, "ray_head_options": {"image": "' + + doc_id_image + + '"}}', + # Fuzzy dedup step parameters + p5_name: str = "fdedup", + p5_skip: bool = False, + # columns used + p5_fdedup_doc_column: str = "contents", + p5_fdedup_id_column: str = "int_id_column", + p5_fdedup_cluster_column: str = "cluster", + # orchestrator + # infrastructure + p5_fdedup_bucket_cpu: float = 0.5, + p5_fdedup_doc_cpu: float = 0.5, + p5_fdedup_mhash_cpu: float = 0.5, + # fuzzy parameters + p5_fdedup_num_permutations: int = 64, + p5_fdedup_threshold: float = 0.8, + p5_fdedup_shingles_size: int = 5, + p5_fdedup_delimiters: str = " ", + # random delay between reads + p5_fdedup_random_delay_limit: int = 5, + # snapshotting + p5_fdedup_snapshot_delay: int = 1, + p5_fdedup_use_doc_snapshot: bool = False, + p5_fdedup_use_bucket_snapshot: bool = False, + # data sampling + p5_fdedup_n_samples: int = 10, + # overriding parameters + p5_overriding_params: str = '{"ray_worker_options": {"image": "' + + fdedup_image + + '"}, "ray_head_options": {"image": "' + + fdedup_image + + '"}}', + # proglang_select step parameters + p6_name: str = "proglang_select", + p6_skip: bool = False, + p6_proglang_select_allowed_langs_file: str = "test/proglang_select/languages/allowed-code-languages.txt", + p6_proglang_select_language_column: str = "programming_language", + p6_proglang_select_s3_access_secret: str = "s3-secret", + # overriding parameters + p6_overriding_params: str = '{"ray_worker_options": {"image": "' + + proglang_select_image + + '"}, "ray_head_options": {"image": "' + + proglang_select_image + + '"}}', + # Code quality step parameters + p7_name: str = "code_quality", + p7_skip: bool = False, + p7_cq_contents_column_name: str = "contents", + p7_cq_language_column_name: str = "programming_language", + p7_cq_tokenizer: str = "codeparrot/codeparrot", + p7_cq_hf_token: str = "None", + # orchestrator + # overriding parameters + p7_overriding_params: str = '{"ray_worker_options": {"image": "' + + code_quality_image + + '"}, "ray_head_options": {"image": "' + + code_quality_image + + '"}}', + # malware step parameters + p8_name: str = "malware", + p8_skip: bool = False, + p8_malware_input_column: str = "contents", + p8_malware_output_column: str = "virus_detection", + # orchestrator + # overriding parameters + p8_overriding_params: str = '{"ray_worker_options": {"image": "' + + malware_image + + '"}, "ray_head_options": {"image": "' + + malware_image + + '"}}', + # tokenization parameters + p9_name: str = "tokenization", + p9_skip: bool = False, + p9_tkn_tokenizer: str = "hf-internal-testing/llama-tokenizer", + p9_tkn_doc_id_column: str = "document_id", + p9_tkn_doc_content_column: str = "contents", + p9_tkn_text_lang: str = "en", + p9_tkn_tokenizer_args: str = "cache_dir=/tmp/hf", + p9_tkn_chunk_size: int = 0, + p9_overriding_params: str = '{"ray_worker_options": {"image": "' + + tokenizer_image + + '"}, "ray_head_options": {"image": "' + + tokenizer_image + + '"}}', +): + + # get all arguments + args = locals() + orch_host = "http://ml-pipeline:8888" + + def _set_component(op: dsl.BaseOp, displaied_name: str, prev_op: dsl.BaseOp = None): + # set the sub component UI name + op.set_display_name(displaied_name) + + # Add pod labels + op.add_pod_label("app", "ml-pipeline").add_pod_label("component", "code-pipelines") + # No cashing + op.execution_options.caching_strategy.max_cache_staleness = "P0D" + # image pull policy + op.set_image_pull_policy("Always") + # Set the timeout for each task to one week (in seconds) + op.set_timeout(ONE_WEEK_SEC) + if prev_op is not None: + op.after(prev_op) + + # exact deduplication + exact_dedup = run_exact_dedup_op( + name=p1_orch_exact_dedup_name, + prefix="p3_", + params=args, + host=orch_host, + input_folder=p2_pipeline_input_parent_path, + ) + _set_component(exact_dedup, "exact dedup") + # document ID + doc_id = run_doc_id_op( + name=p1_orch_doc_id_name, prefix="p4_", params=args, host=orch_host, input_folder=exact_dedup.output + ) + _set_component(doc_id, "doc ID", exact_dedup) + # fuzzy deduplication + fuzzy_dedup = run_fuzzy_dedup_op( + name=p1_orch_fuzzy_dedup_name, prefix="p5_", params=args, host=orch_host, input_folder=doc_id.output + ) + _set_component(fuzzy_dedup, "fuzzy dedup", doc_id) + + # proglang_select + proglang_select = run_proglang_select_op( + name=p1_orch_proglang_select_name, + prefix="p6_", + params=args, + host=orch_host, + input_folder=fuzzy_dedup.output, + ) + _set_component(proglang_select, "proglang_select", fuzzy_dedup) + + # code_quality + code_quality = run_code_quality_op( + name=p1_orch_code_quality_name, prefix="p7_", params=args, host=orch_host, input_folder=proglang_select.output + ) + _set_component(code_quality, "code_quality", proglang_select) + + # malware + malware = run_malware_op( + name=p1_orch_malware_name, prefix="p8_", params=args, host=orch_host, input_folder=code_quality.output + ) + _set_component(malware, "malware", code_quality) + # malware + tokenization = run_tokenization_op( + name=p1_orch_tokenization_wf_name, prefix="p9_", params=args, host=orch_host, input_folder=malware.output + ) + _set_component(tokenization, "tokenization", malware) + + # Configure the pipeline level to one week (in seconds) + dsl.get_pipeline_conf().set_timeout(ONE_WEEK_SEC) + + +if __name__ == "__main__": + # Compiling the pipeline + compiler.Compiler().compile(sample_code_ray_orchestrator, __file__.replace(".py", ".yaml")) diff --git a/kfp/superworkflows/v1/superworkflow_dedups_sample_wf.py b/kfp/superworkflows/ray/kfp_v1/superworkflow_dedups_sample_wf.py similarity index 90% rename from kfp/superworkflows/v1/superworkflow_dedups_sample_wf.py rename to kfp/superworkflows/ray/kfp_v1/superworkflow_dedups_sample_wf.py index 2f7c64f00..f63bb0638 100644 --- a/kfp/superworkflows/v1/superworkflow_dedups_sample_wf.py +++ b/kfp/superworkflows/ray/kfp_v1/superworkflow_dedups_sample_wf.py @@ -6,13 +6,13 @@ # Components # path to kfp component specifications files -component_spec_path = "../../kfp_ray_components/" +component_spec_path = "../../../kfp_ray_components/" # For every sub workflow we need a separate components, that knows about this subworkflow. run_doc_id_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml") run_exact_dedup_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml") run_fuzzy_dedup_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml") -doc_id_image = "quay.io/dataprep1/data-prep-kit/doc_id:0.3.0" +doc_id_image = "quay.io/dataprep1/data-prep-kit/doc_id:0.3.1" ededup_image = "quay.io/dataprep1/data-prep-kit/ededup:0.3.0" fdedup_image = "quay.io/dataprep1/data-prep-kit/fdedup:0.3.0" @@ -36,14 +36,14 @@ def sample_ray_orchestrator( p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', p2_pipeline_data_s3_access_secret: str = "s3-secret", p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}', + p2_pipeline_runtime_actor_options: str = '{"num_cpus": 0.8}', + # data access. + p2_pipeline_data_max_files: int = -1, + p2_pipeline_data_num_samples: int = -1, # Document ID step parameters p3_name: str = "doc_id", p3_skip: bool = False, - # data access. - p3_data_max_files: int = -1, - p3_data_num_samples: int = -1, # orchestrator - p3_runtime_actor_options: str = '{"num_cpus": 0.8}', # doc id parameters p3_doc_id_doc_column: str = "contents", p3_doc_id_hash_column: str = "hash_column", @@ -59,10 +59,6 @@ def sample_ray_orchestrator( p4_skip: bool = False, p4_ededup_doc_column: str = "contents", p4_ededup_hash_cpu: float = 0.5, - p4_runtime_actor_options: str = '{"num_cpus": 0.8}', - # data access. - p4_data_max_files: int = -1, - p4_data_num_samples: int = -1, # data sampling p4_ededup_n_samples: int = 10, # overriding parameters @@ -78,11 +74,6 @@ def sample_ray_orchestrator( p5_fdedup_doc_column: str = "contents", p5_fdedup_id_column: str = "int_id_column", p5_fdedup_cluster_column: str = "cluster", - # orchestrator - p5_runtime_actor_options: str = '{"num_cpus": 0.8}', - # data access. checkpointing is not supported by dedup - p5_data_num_samples: int = -1, - p5_data_max_files: int = -1, # infrastructure p5_fdedup_bucket_cpu: float = 0.5, p5_fdedup_doc_cpu: float = 0.5, diff --git a/transforms/universal/doc_id/kfp_ray/v1/doc_id_wf.py b/transforms/universal/doc_id/kfp_ray/v1/doc_id_wf.py index ce9fd68ef..ece31b63b 100644 --- a/transforms/universal/doc_id/kfp_ray/v1/doc_id_wf.py +++ b/transforms/universal/doc_id/kfp_ray/v1/doc_id_wf.py @@ -20,7 +20,7 @@ ) -task_image = "quay.io/dataprep1/data-prep-kit/doc_id:0.3.0" +task_image = "quay.io/dataprep1/data-prep-kit/doc_id:0.3.1" # the name of the job script EXEC_SCRIPT_NAME: str = "doc_id_transform.py" diff --git a/transforms/universal/tokenization/kfp_ray/v1/tokenization_wf.py b/transforms/universal/tokenization/kfp_ray/v1/tokenization_wf.py index 1e5a3ae9b..e9e56dc3d 100644 --- a/transforms/universal/tokenization/kfp_ray/v1/tokenization_wf.py +++ b/transforms/universal/tokenization/kfp_ray/v1/tokenization_wf.py @@ -23,7 +23,7 @@ # the name of the job script EXEC_SCRIPT_NAME: str = "tokenization_transform.py" -task_image = "quay.io/dataprep1/data-prep-kit/tokenization:0.2.0" +task_image = "quay.io/dataprep1/data-prep-kit/tokenization:0.3.0" # components base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.1.1" diff --git a/transforms/universal/tokenization/ray/Makefile b/transforms/universal/tokenization/ray/Makefile index bea983ba1..72d7b2790 100644 --- a/transforms/universal/tokenization/ray/Makefile +++ b/transforms/universal/tokenization/ray/Makefile @@ -8,7 +8,7 @@ include $(REPOROOT)/transforms/.make.transforms TRANSFORM_NAME=tokenization # $(REPOROOT)/.make.versions file contains the versions -DOCKER_IMAGE_VERSION=${TOKENIZER_VERSION} +DOCKER_IMAGE_VERSION=${TOKENIZATION_VERSION} venv:: .transforms.ray-venv