Skip to content

Commit

Permalink
Adjust ingest_2_parquet workflow.
Browse files Browse the repository at this point in the history
Signed-off-by: Revital Sur <eres@il.ibm.com>
Co-authored-by: Boris Lublinsky <blublinsky@ibm.com>
  • Loading branch information
revit13 and Boris Lublinsky committed Jun 11, 2024
1 parent abab08c commit 5e12e17
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 72 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,5 @@ jobs:
export DEPLOY_KUBEFLOW=1
make -C kind setup
make -C kfp/kfp_support_lib test
make -C transforms/universal/noop/ workflow-build
make -C transforms workflow-build
make -C transforms/universal/noop workflow-test
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ def dict_to_req(d: dict[str, Any], executor: str = "transformer_launcher.py") ->
if '"' in value:
logger.warning(f"can't parse inputs with double quotation marks, please use single quotation marks instead")
res += f'--{key}="{value}" '
elif isinstance(value, bool):
if value:
res += f"--{key} "
else:
res += f"--{key}={value} "

Expand Down
4 changes: 2 additions & 2 deletions kind/hack/populate_minio.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ mc mb kfp/test
echo "copying data"
# code modules
mc cp --recursive ${ROOT_DIR}/../transforms/code/code_quality/ray/test-data/input/ kfp/test/code_quality/input
mc cp --recursive ${ROOT_DIR}/../transforms/code/ingest2parquet/ray/test-data/input/ kfp/test/ingest2parquet/input
mc cp --recursive ${ROOT_DIR}/../transforms/code/ingest2parquet/ray/test-data/languages/ kfp/test/ingest2parquet/languages
mc cp --recursive ${ROOT_DIR}/../transforms/code/ingest_2_parquet/ray/test-data/input/data-processing-lib.zip kfp/test/ingest_2_parquet/input
mc cp --recursive ${ROOT_DIR}/../transforms/code/ingest_2_parquet/ray/test-data/languages/ kfp/test/ingest_2_parquet/languages
mc cp --recursive ${ROOT_DIR}/../transforms/code/proglang_select/ray/test-data/input/ kfp/test/proglang_select/input
mc cp --recursive ${ROOT_DIR}/../transforms/code/proglang_select/ray/test-data/languages/ kfp/test/proglang_select/languages
mc cp --recursive ${ROOT_DIR}/../transforms/code/malware/ray/test-data/input/ kfp/test/malware/input
Expand Down
10 changes: 5 additions & 5 deletions transforms/code/ingest_2_parquet/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,21 @@ load-image::

.PHONY: workflow-venv
workflow-venv:
$(MAKE) -C kfp_ray/v1 workflow-venv
$(MAKE) -C kfp_ray workflow-venv

.PHONY: workflow-build
workflow-build:
$(MAKE) -C kfp_ray/v1 workflow-build
$(MAKE) -C kfp_ray workflow-build

.PHONY: workflow-test
workflow-test:
$(MAKE) -C kfp_ray/v1 workflow-test
$(MAKE) -C kfp_ray workflow-test

.PHONY: workflow-upload
workflow-upload:
$(MAKE) -C kfp_ray/v1 workflow-upload
$(MAKE) -C kfp_ray workflow-upload

.PHONY: workflow-reconcile-requirements
workflow-reconcile-requirements:
$(MAKE) -C kfp_ray/v1 workflow-reconcile-requirements
$(MAKE) -C kfp_ray workflow-reconcile-requirements

44 changes: 44 additions & 0 deletions transforms/code/ingest_2_parquet/kfp_ray/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
REPOROOT=${CURDIR}/../../../../
WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate
include $(REPOROOT)/transforms/.make.workflows

SRC_DIR=${CURDIR}/../ray/

PYTHON_WF := $(shell find ./ -name '*_wf.py')
YAML_WF := $(patsubst %.py, %.yaml, ${PYTHON_WF})

workflow-venv: .check_python_version ${WORKFLOW_VENV_ACTIVATE}

venv::

build::

test::

test-src::

test-image::

image::

load-image::

.PHONY: workflow-build
workflow-build: workflow-venv
$(MAKE) $(YAML_WF)

.PHONY: workflow-test
workflow-test: workflow-build
$(MAKE) .transforms_workflows.test-pipeline TRANSFORM_SRC=${SRC_DIR} PIPELINE_FILE=ingest_2_parquet_wf.yaml

.PHONY: workflow-upload
workflow-upload: workflow-build
@for file in $(YAML_WF); do \
$(MAKE) .transforms_workflows.upload-pipeline PIPELINE_FILE=$$file; \
done

.PHONY: workflow-reconcile-requirements
workflow-reconcile-requirements:
@for file in $(PYTHON_WF); do \
$(MAKE) .transforms_workflows.reconcile-requirements PIPELINE_FILE=$$file; \
done
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,85 @@
# limitations under the License.
################################################################################

import os

from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils

import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from kfp_support.workflow_support.utils import (
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)
from data_processing.utils import GB


# the name of the job script
EXEC_SCRIPT_NAME: str = "ingest_2_parquet_transform.py"
EXEC_SCRIPT_NAME: str = "ingest_2_parquet_transform_ray.py"

task_image = "quay.io/dataprep1/data-prep-kit/ingest_2_parquet-ray:0.4.0.dev6"

task_image = "quay.io/dataprep1/data-prep-kit/ingest_2_parquet:0.3.0"

# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0.dev6"

# path to kfp component specifications files
component_spec_path = "../../../../../kfp/kfp_ray_components/"
component_spec_path = "../../../../kfp/kfp_ray_components/"

# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
compute_exec_params_op = comp.func_to_container_op(
func=ComponentUtils.default_compute_execution_params, base_image=base_kfp_image
)
def compute_exec_params_func(
worker_options: str,
actor_options: str,
data_s3_config: str,
data_max_files: int,
data_num_samples: int,
data_files_to_use: str,
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: str,
ingest_to_parquet_supported_langs_file: str,
ingest_to_parquet_domain: str,
ingest_to_parquet_snapshot: str,
ingest_to_parquet_detect_programming_lang: bool,
) -> dict:
from workflow_support.runtime_utils import KFPUtils

return {
"data_s3_config": data_s3_config,
"data_max_files": data_max_files,
"data_num_samples": data_num_samples,
"data_files_to_use": data_files_to_use,
"runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options),
"runtime_worker_options": actor_options,
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": runtime_job_id,
"runtime_code_location": runtime_code_location,
"ingest_to_parquet_supported_langs_file": ingest_to_parquet_supported_langs_file,
"ingest_to_parquet_domain": ingest_to_parquet_domain,
"ingest_to_parquet_snapshot": ingest_to_parquet_snapshot,
"ingest_to_parquet_detect_programming_lang": ingest_to_parquet_detect_programming_lang,
}


# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the
# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path.
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " +
"same version of the same pipeline !!!")
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER


# create Ray cluster
create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml")
# execute job
Expand All @@ -59,17 +111,17 @@ def ingest_to_parquet(
'"image_pull_secret": "", "image": "' + task_image + '"}',
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "{'input_folder': 'test/ingest2parquet/input', 'output_folder': 'test/ingest2parquet/output/'}",
data_s3_config: str = "{'input_folder': 'test/ingest_2_parquet/input', 'output_folder': 'test/ingest_2_parquet/output/'}",
data_s3_access_secret: str = "s3-secret",
data_max_files: int = -1,
data_num_samples: int = -1,
data_files_to_use: str = "['.zip']",
# orchestrator
runtime_actor_options: str = f"{{'num_cpus': 0.8, 'memory': {2*GB}}}",
runtime_actor_options: str = "{'num_cpus': 0.8}",
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}",
# Proglang match parameters
ingest_to_parquet_supported_langs_file: str = "test/ingest2parquet/languages/lang_extensions.json",
ingest_to_parquet_supported_langs_file: str = "test/ingest_2_parquet/languages/lang_extensions.json",
ingest_to_parquet_detect_programming_lang: bool = True,
ingest_to_parquet_domain: str = "code",
ingest_to_parquet_snapshot: str = "github",
Expand Down Expand Up @@ -118,20 +170,31 @@ def ingest_to_parquet(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
compute_exec_params = compute_exec_params_op(
worker_options=ray_worker_options,
actor_options=runtime_actor_options,
data_s3_config=data_s3_config,
data_max_files=data_max_files,
data_num_samples=data_num_samples,
data_files_to_use=data_files_to_use,
runtime_pipeline_id=runtime_pipeline_id,
runtime_job_id=run_id,
runtime_code_location=runtime_code_location,
ingest_to_parquet_supported_langs_file=ingest_to_parquet_supported_langs_file,
ingest_to_parquet_domain=ingest_to_parquet_domain,
ingest_to_parquet_snapshot=ingest_to_parquet_snapshot,
ingest_to_parquet_detect_programming_lang=ingest_to_parquet_detect_programming_lang,
)
ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
ray_cluster = create_ray_op(
ray_name=ray_name,
run_id=dsl.RUN_ID_PLACEHOLDER,
run_id=run_id,
ray_head_options=ray_head_options,
ray_worker_options=ray_worker_options,
server_url=server_url,
Expand All @@ -142,24 +205,10 @@ def ingest_to_parquet(
# Execute job
execute_job = execute_ray_jobs_op(
ray_name=ray_name,
run_id=dsl.RUN_ID_PLACEHOLDER,
run_id=run_id,
additional_params=additional_params,
# note that the parameters below are specific for NOOP transform
exec_params={
"data_s3_config": data_s3_config,
"data_max_files": data_max_files,
"data_num_samples": data_num_samples,
"data_files_to_use": data_files_to_use,
"runtime_num_workers": compute_exec_params.output,
"runtime_worker_options": runtime_actor_options,
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": dsl.RUN_ID_PLACEHOLDER,
"runtime_code_location": runtime_code_location,
"ingest_to_parquet_supported_langs_file": ingest_to_parquet_supported_langs_file,
"ingest_to_parquet_domain": ingest_to_parquet_domain,
"ingest_to_parquet_snapshot": ingest_to_parquet_snapshot,
"ingest_to_parquet_detect_programming_lang": ingest_to_parquet_detect_programming_lang,
},
# note that the parameters below are specific for this transform
exec_params=compute_exec_params.output,
exec_script_name=EXEC_SCRIPT_NAME,
server_url=server_url,
prefix=PREFIX,
Expand Down
25 changes: 0 additions & 25 deletions transforms/code/ingest_2_parquet/kfp_ray/v1/Makefile

This file was deleted.

Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def get_test_transform_fixtures(self) -> list[tuple]:
input_dir = os.path.join(basedir, "input")
input_files = get_files_in_folder(input_dir, ".zip")
input_files = [(name, binary) for name, binary in input_files.items()]
expected_metadata_list = [{'number of rows': 2}, {'number of rows': 52}, {}]
expected_metadata_list = [{'number of rows': 2}, {'number of rows': 20}, {'number of rows': 52}, {}]
config = {
ingest_supported_langs_file_key: lang_supported_file,
ingest_detect_programming_lang_key: True,
Expand Down

0 comments on commit 5e12e17

Please sign in to comment.