diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9324ead7e0..e6c5a6a480 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -111,5 +111,5 @@ jobs: export DEPLOY_KUBEFLOW=1 make -C kind setup make -C kfp/kfp_support_lib test - make -C transforms/universal/noop/ workflow-build + make -C transforms workflow-build make -C transforms/universal/noop workflow-test diff --git a/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/kfp_utils.py b/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/kfp_utils.py index 2b5e6509bd..3b992c8ceb 100644 --- a/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/kfp_utils.py +++ b/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/kfp_utils.py @@ -94,9 +94,6 @@ def dict_to_req(d: dict[str, Any], executor: str = "transformer_launcher.py") -> if '"' in value: logger.warning(f"can't parse inputs with double quotation marks, please use single quotation marks instead") res += f'--{key}="{value}" ' - elif isinstance(value, bool): - if value: - res += f"--{key} " else: res += f"--{key}={value} " diff --git a/kind/hack/populate_minio.sh b/kind/hack/populate_minio.sh index c30ebea54a..3484475877 100755 --- a/kind/hack/populate_minio.sh +++ b/kind/hack/populate_minio.sh @@ -12,8 +12,8 @@ mc mb kfp/test echo "copying data" # code modules mc cp --recursive ${ROOT_DIR}/../transforms/code/code_quality/ray/test-data/input/ kfp/test/code_quality/input -mc cp --recursive ${ROOT_DIR}/../transforms/code/ingest2parquet/ray/test-data/input/ kfp/test/ingest2parquet/input -mc cp --recursive ${ROOT_DIR}/../transforms/code/ingest2parquet/ray/test-data/languages/ kfp/test/ingest2parquet/languages +mc cp --recursive ${ROOT_DIR}/../transforms/code/ingest_2_parquet/ray/test-data/input/data-processing-lib.zip kfp/test/ingest_2_parquet/input +mc cp --recursive ${ROOT_DIR}/../transforms/code/ingest_2_parquet/ray/test-data/languages/ kfp/test/ingest_2_parquet/languages mc cp --recursive ${ROOT_DIR}/../transforms/code/proglang_select/ray/test-data/input/ kfp/test/proglang_select/input mc cp --recursive ${ROOT_DIR}/../transforms/code/proglang_select/ray/test-data/languages/ kfp/test/proglang_select/languages mc cp --recursive ${ROOT_DIR}/../transforms/code/malware/ray/test-data/input/ kfp/test/malware/input diff --git a/transforms/code/ingest_2_parquet/Makefile b/transforms/code/ingest_2_parquet/Makefile index 923d291852..227d850505 100644 --- a/transforms/code/ingest_2_parquet/Makefile +++ b/transforms/code/ingest_2_parquet/Makefile @@ -43,21 +43,21 @@ load-image:: .PHONY: workflow-venv workflow-venv: - $(MAKE) -C kfp_ray/v1 workflow-venv + $(MAKE) -C kfp_ray workflow-venv .PHONY: workflow-build workflow-build: - $(MAKE) -C kfp_ray/v1 workflow-build + $(MAKE) -C kfp_ray workflow-build .PHONY: workflow-test workflow-test: - $(MAKE) -C kfp_ray/v1 workflow-test + $(MAKE) -C kfp_ray workflow-test .PHONY: workflow-upload workflow-upload: - $(MAKE) -C kfp_ray/v1 workflow-upload + $(MAKE) -C kfp_ray workflow-upload .PHONY: workflow-reconcile-requirements workflow-reconcile-requirements: - $(MAKE) -C kfp_ray/v1 workflow-reconcile-requirements + $(MAKE) -C kfp_ray workflow-reconcile-requirements diff --git a/transforms/code/ingest_2_parquet/kfp_ray/Makefile b/transforms/code/ingest_2_parquet/kfp_ray/Makefile new file mode 100644 index 0000000000..aa34444cd4 --- /dev/null +++ b/transforms/code/ingest_2_parquet/kfp_ray/Makefile @@ -0,0 +1,44 @@ +REPOROOT=${CURDIR}/../../../../ +WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate +include $(REPOROOT)/transforms/.make.workflows + +SRC_DIR=${CURDIR}/../ray/ + +PYTHON_WF := $(shell find ./ -name '*_wf.py') +YAML_WF := $(patsubst %.py, %.yaml, ${PYTHON_WF}) + +workflow-venv: .check_python_version ${WORKFLOW_VENV_ACTIVATE} + +venv:: + +build:: + +test:: + +test-src:: + +test-image:: + +image:: + +load-image:: + +.PHONY: workflow-build +workflow-build: workflow-venv + $(MAKE) $(YAML_WF) + +.PHONY: workflow-test +workflow-test: workflow-build + $(MAKE) .transforms_workflows.test-pipeline TRANSFORM_SRC=${SRC_DIR} PIPELINE_FILE=ingest_2_parquet_wf.yaml + +.PHONY: workflow-upload +workflow-upload: workflow-build + @for file in $(YAML_WF); do \ + $(MAKE) .transforms_workflows.upload-pipeline PIPELINE_FILE=$$file; \ + done + +.PHONY: workflow-reconcile-requirements +workflow-reconcile-requirements: + @for file in $(PYTHON_WF); do \ + $(MAKE) .transforms_workflows.reconcile-requirements PIPELINE_FILE=$$file; \ + done diff --git a/transforms/code/ingest_2_parquet/kfp_ray/v1/ingest_2_parquet_wf.py b/transforms/code/ingest_2_parquet/kfp_ray/ingest_2_parquet_wf.py similarity index 62% rename from transforms/code/ingest_2_parquet/kfp_ray/v1/ingest_2_parquet_wf.py rename to transforms/code/ingest_2_parquet/kfp_ray/ingest_2_parquet_wf.py index a3008dbfa9..6f033f4a39 100644 --- a/transforms/code/ingest_2_parquet/kfp_ray/v1/ingest_2_parquet_wf.py +++ b/transforms/code/ingest_2_parquet/kfp_ray/ingest_2_parquet_wf.py @@ -10,33 +10,85 @@ # limitations under the License. ################################################################################ +import os + +from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils + import kfp.compiler as compiler import kfp.components as comp import kfp.dsl as dsl -from kfp_support.workflow_support.utils import ( - ONE_HOUR_SEC, - ONE_WEEK_SEC, - ComponentUtils, -) -from data_processing.utils import GB - # the name of the job script -EXEC_SCRIPT_NAME: str = "ingest_2_parquet_transform.py" +EXEC_SCRIPT_NAME: str = "ingest_2_parquet_transform_ray.py" + +task_image = "quay.io/dataprep1/data-prep-kit/ingest_2_parquet-ray:0.4.0.dev6" -task_image = "quay.io/dataprep1/data-prep-kit/ingest_2_parquet:0.3.0" # components base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0.dev6" # path to kfp component specifications files -component_spec_path = "../../../../../kfp/kfp_ray_components/" +component_spec_path = "../../../../kfp/kfp_ray_components/" # compute execution parameters. Here different transforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. -compute_exec_params_op = comp.func_to_container_op( - func=ComponentUtils.default_compute_execution_params, base_image=base_kfp_image -) +def compute_exec_params_func( + worker_options: str, + actor_options: str, + data_s3_config: str, + data_max_files: int, + data_num_samples: int, + data_files_to_use: str, + runtime_pipeline_id: str, + runtime_job_id: str, + runtime_code_location: str, + ingest_to_parquet_supported_langs_file: str, + ingest_to_parquet_domain: str, + ingest_to_parquet_snapshot: str, + ingest_to_parquet_detect_programming_lang: bool, +) -> dict: + from workflow_support.runtime_utils import KFPUtils + + return { + "data_s3_config": data_s3_config, + "data_max_files": data_max_files, + "data_num_samples": data_num_samples, + "data_files_to_use": data_files_to_use, + "runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options), + "runtime_worker_options": actor_options, + "runtime_pipeline_id": runtime_pipeline_id, + "runtime_job_id": runtime_job_id, + "runtime_code_location": runtime_code_location, + "ingest_to_parquet_supported_langs_file": ingest_to_parquet_supported_langs_file, + "ingest_to_parquet_domain": ingest_to_parquet_domain, + "ingest_to_parquet_snapshot": ingest_to_parquet_snapshot, + "ingest_to_parquet_detect_programming_lang": ingest_to_parquet_detect_programming_lang, + } + + +# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the +# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path. +# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use +# this if/else statement and explicitly call the decorator. +if os.getenv("KFPv2", "0") == "1": + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at + # compilation time. + import uuid + + compute_exec_params_op = dsl.component_decorator.component( + func=compute_exec_params_func, base_image=base_kfp_image + ) + print( + "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + + "same version of the same pipeline !!!") + run_id = uuid.uuid4().hex +else: + compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) + run_id = dsl.RUN_ID_PLACEHOLDER + + # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") # execute job @@ -59,17 +111,17 @@ def ingest_to_parquet( '"image_pull_secret": "", "image": "' + task_image + '"}', server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access - data_s3_config: str = "{'input_folder': 'test/ingest2parquet/input', 'output_folder': 'test/ingest2parquet/output/'}", + data_s3_config: str = "{'input_folder': 'test/ingest_2_parquet/input', 'output_folder': 'test/ingest_2_parquet/output/'}", data_s3_access_secret: str = "s3-secret", data_max_files: int = -1, data_num_samples: int = -1, data_files_to_use: str = "['.zip']", # orchestrator - runtime_actor_options: str = f"{{'num_cpus': 0.8, 'memory': {2*GB}}}", + runtime_actor_options: str = "{'num_cpus': 0.8}", runtime_pipeline_id: str = "pipeline_id", runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", # Proglang match parameters - ingest_to_parquet_supported_langs_file: str = "test/ingest2parquet/languages/lang_extensions.json", + ingest_to_parquet_supported_langs_file: str = "test/ingest_2_parquet/languages/lang_extensions.json", ingest_to_parquet_detect_programming_lang: bool = True, ingest_to_parquet_domain: str = "code", ingest_to_parquet_snapshot: str = "github", @@ -118,7 +170,7 @@ def ingest_to_parquet( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) ComponentUtils.add_settings_to_component(clean_up_task, 60) # pipeline definition with dsl.ExitHandler(clean_up_task): @@ -126,12 +178,23 @@ def ingest_to_parquet( compute_exec_params = compute_exec_params_op( worker_options=ray_worker_options, actor_options=runtime_actor_options, + data_s3_config=data_s3_config, + data_max_files=data_max_files, + data_num_samples=data_num_samples, + data_files_to_use=data_files_to_use, + runtime_pipeline_id=runtime_pipeline_id, + runtime_job_id=run_id, + runtime_code_location=runtime_code_location, + ingest_to_parquet_supported_langs_file=ingest_to_parquet_supported_langs_file, + ingest_to_parquet_domain=ingest_to_parquet_domain, + ingest_to_parquet_snapshot=ingest_to_parquet_snapshot, + ingest_to_parquet_detect_programming_lang=ingest_to_parquet_detect_programming_lang, ) ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2) # start Ray cluster ray_cluster = create_ray_op( ray_name=ray_name, - run_id=dsl.RUN_ID_PLACEHOLDER, + run_id=run_id, ray_head_options=ray_head_options, ray_worker_options=ray_worker_options, server_url=server_url, @@ -142,24 +205,10 @@ def ingest_to_parquet( # Execute job execute_job = execute_ray_jobs_op( ray_name=ray_name, - run_id=dsl.RUN_ID_PLACEHOLDER, + run_id=run_id, additional_params=additional_params, - # note that the parameters below are specific for NOOP transform - exec_params={ - "data_s3_config": data_s3_config, - "data_max_files": data_max_files, - "data_num_samples": data_num_samples, - "data_files_to_use": data_files_to_use, - "runtime_num_workers": compute_exec_params.output, - "runtime_worker_options": runtime_actor_options, - "runtime_pipeline_id": runtime_pipeline_id, - "runtime_job_id": dsl.RUN_ID_PLACEHOLDER, - "runtime_code_location": runtime_code_location, - "ingest_to_parquet_supported_langs_file": ingest_to_parquet_supported_langs_file, - "ingest_to_parquet_domain": ingest_to_parquet_domain, - "ingest_to_parquet_snapshot": ingest_to_parquet_snapshot, - "ingest_to_parquet_detect_programming_lang": ingest_to_parquet_detect_programming_lang, - }, + # note that the parameters below are specific for this transform + exec_params=compute_exec_params.output, exec_script_name=EXEC_SCRIPT_NAME, server_url=server_url, prefix=PREFIX, diff --git a/transforms/code/ingest_2_parquet/kfp_ray/v1/Makefile b/transforms/code/ingest_2_parquet/kfp_ray/v1/Makefile deleted file mode 100644 index e3cd775bf8..0000000000 --- a/transforms/code/ingest_2_parquet/kfp_ray/v1/Makefile +++ /dev/null @@ -1,25 +0,0 @@ -REPOROOT=${CURDIR}/../../../../../ -WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate -include $(REPOROOT)/transforms/.make.workflows - -SRC_DIR=${CURDIR}/../../ray/ - -YAML_FILE=ingest_2_parquet_wf.yaml - -workflow-venv: .check_python_version ${WORKFLOW_VENV_ACTIVATE} - -.PHONY: workflow-build -workflow-build: workflow-venv - $(MAKE) ${YAML_FILE} - -.PHONY: workflow-test -workflow-test: workflow-build - $(MAKE) .transforms_workflows.test-pipeline TRANSFORM_SRC=${SRC_DIR} PIPELINE_FILE=${YAML_FILE} - -.PHONY: workflow-upload -workflow-upload: workflow-build - $(MAKE) .transforms_workflows.upload-pipeline PIPELINE_FILE=${YAML_FILE} - -.PHONY: workflow-reconcile-requirements -workflow-reconcile-requirements: - $(MAKE) .transforms_workflows.reconcile-requirements PIPELINE_FILE=ingest_2_parquet_wf.py diff --git a/transforms/code/ingest_2_parquet/ray/test-data/expected/data-processing-lib.parquet b/transforms/code/ingest_2_parquet/ray/test-data/expected/data-processing-lib.parquet new file mode 100644 index 0000000000..5211160202 Binary files /dev/null and b/transforms/code/ingest_2_parquet/ray/test-data/expected/data-processing-lib.parquet differ diff --git a/transforms/code/ingest_2_parquet/ray/test-data/input/data-processing-lib.zip b/transforms/code/ingest_2_parquet/ray/test-data/input/data-processing-lib.zip new file mode 100644 index 0000000000..069bc536a5 Binary files /dev/null and b/transforms/code/ingest_2_parquet/ray/test-data/input/data-processing-lib.zip differ diff --git a/transforms/code/ingest_2_parquet/ray/test/test_ingest_to_parquet.py b/transforms/code/ingest_2_parquet/ray/test/test_ingest_to_parquet.py index 97a060a5cd..c295a94fe6 100644 --- a/transforms/code/ingest_2_parquet/ray/test/test_ingest_to_parquet.py +++ b/transforms/code/ingest_2_parquet/ray/test/test_ingest_to_parquet.py @@ -38,7 +38,7 @@ def get_test_transform_fixtures(self) -> list[tuple]: input_dir = os.path.join(basedir, "input") input_files = get_files_in_folder(input_dir, ".zip") input_files = [(name, binary) for name, binary in input_files.items()] - expected_metadata_list = [{'number of rows': 2}, {'number of rows': 52}, {}] + expected_metadata_list = [{'number of rows': 2}, {'number of rows': 20}, {'number of rows': 52}, {}] config = { ingest_supported_langs_file_key: lang_supported_file, ingest_detect_programming_lang_key: True,