Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto generate kfp pipelines. #193

Merged
merged 9 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kfp/kfp_support_lib/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ venv:: pyproject.toml .check-env
. ${VENV_ACTIVATE}; \
pip install -e .; \
pip install ray==${RAY} \
pip install pre-commit \
roytman marked this conversation as resolved.
Show resolved Hide resolved
pip install pytest pytest-cov

test:: venv
Expand Down
4 changes: 4 additions & 0 deletions kfp/pipeline_generator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## Steps to generate a new pipeline
- create a `pipeline_definitions.yaml` file for the required task (similar to the example [pipeline_definitions.yaml for the noop task](./example/pipeline_definitions.yaml).
- execute `./run.sh <pipeline_definitions_file_path> <destination directory>`. When `pipeline_definitions_file_path` is the path of the `pipeline_definitions.yaml` file that defines the pipeline and `destination directory` is a directory where new pipeline file
will be generated.
24 changes: 24 additions & 0 deletions kfp/pipeline_generator/example/pipeline_definitions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
pipeline_parameters:
name: "noop"
description: "Pipeline for noop task"
script_name: "noop_transform.py"
prefix: ""
multi_s3: False
compute_func_name: ""
compute_func_import: ""
component_spec_path: ""

pipeline_common_input_parameters_values:
kfp_base_image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.1.1"
transform_image: "quay.io/dataprep1/data-prep-kit/noop:0.8.0"
s3_access_secret: "s3-secret"
image_pull_secret: "prod-all-icr-io"
input_folder: "test/noop/input/"
roytman marked this conversation as resolved.
Show resolved Hide resolved
output_folder: "test/noop/output/"

pipeline_transform_input_parameters:
pipeline_arguments:
- name: "noop_sleep_sec"
type: "int"
value: 10
description: "# noop sleep time"
125 changes: 125 additions & 0 deletions kfp/pipeline_generator/pipeline.ptmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# NOTE: This file is auto generated by Pipeline Generator.

import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from kfp_support.workflow_support.utils import (
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)
__compute_import__

task_image = "__transform_image__"

# the name of the job script
EXEC_SCRIPT_NAME: str = "__script_name__"
PREFIX: str = "__prefix_name__"

# components
base_kfp_image = "__kfp_base_image__"

# path to kfp component specifications files
component_spec_path = "__component_spec_path__"

# compute execution parameters. Here different tranforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
compute_exec_params_op = comp.func_to_container_op(
func=__compute_func_name__, base_image=base_kfp_image
)
# create Ray cluster
create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml")
# execute job
execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "__execute_comp__")
# clean up Ray
cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml")

# Task name is part of the pipeline name, the ray cluster name and the job name in DMF.
TASK_NAME: str = "__pipeline_name__"


# Pipeline to invoke execution on remote resource
@dsl.pipeline(
name=TASK_NAME + "-ray-pipeline",
description="__pipeline_description__",
)
def __pipeline_name__(
ray_name: str = "__pipeline_name__-kfp-ray", # name of Ray cluster
ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": "__image_pull_secret__", '
'"image": "' + task_image + '", "image_pull_policy": "Always" }',
ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, '
'"image_pull_secret": "__image_pull_secret__", "image": "' + task_image + '", "image_pull_policy": "Always" }',
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "{'input_folder': '__input_folder__', 'output_folder': '__output_folder__'}",
data_s3_access_secret: str = "__s3_access_secret__",
data_max_files: int = -1,
data_num_samples: int = -1,
data_checkpointing: bool = False,
data_data_sets: str = "",
data_files_to_use: str = "['.parquet']",
# orchestrator
runtime_actor_options: str = "{'num_cpus': 0.8}",
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}",

additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
roytman marked this conversation as resolved.
Show resolved Hide resolved
__pipeline_arguments__
):
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
compute_exec_params = compute_exec_params_op(
worker_options=ray_worker_options,
actor_options=runtime_actor_options,
)
ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
ray_cluster = create_ray_op(
ray_name=ray_name,
run_id=dsl.RUN_ID_PLACEHOLDER,
ray_head_options=ray_head_options,
ray_worker_options=ray_worker_options,
server_url=server_url,
additional_params=additional_params,
)
ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2)
ray_cluster.after(compute_exec_params)

# Execute job
execute_job = execute_ray_jobs_op(
ray_name=ray_name,
run_id=dsl.RUN_ID_PLACEHOLDER,
additional_params=additional_params,
exec_params={
"data_s3_config": data_s3_config,
"data_max_files": data_max_files,
"data_num_samples": data_num_samples,
"data_checkpointing": data_checkpointing,
"data_data_sets": data_data_sets,
"data_files_to_use": data_files_to_use,
"runtime_num_workers": compute_exec_params.output,
"runtime_worker_options": runtime_actor_options,
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": dsl.RUN_ID_PLACEHOLDER,
"runtime_code_location": runtime_code_location,
__execute_job_params__
},
exec_script_name=EXEC_SCRIPT_NAME,
server_url=server_url,
__prefix_execute__
)
ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC)
ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret)
__prefix_set_secret__
execute_job.after(ray_cluster)

# Configure the pipeline level to one week (in seconds)
dsl.get_pipeline_conf().set_timeout(ONE_WEEK_SEC)

if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(__pipeline_name__, __file__.replace(".py", ".yaml"))
139 changes: 139 additions & 0 deletions kfp/pipeline_generator/pipeline_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import json

import yaml


PRE_COMMIT = "./pre-commit-config.yaml"
PIPELINE_TEMPLATE_FILE = "pipeline.ptmpl"

INPUT_PARAMETERS = "input_parameters"
PIPELINE_PARAMETERS = "pipeline_parameters"
PIPELINE_COMMON_INPUT_PARAMETERS_VALUES = "pipeline_common_input_parameters_values"
PIPELINE_TRANSFORM_INPUT_PARAMETERS = "pipeline_transform_input_parameters"

NAME = "name"
TYPE = "type"
VALUE = "value"
DESCRIPTION = "description"


def get_pipeline_input_parameters(arguments) -> str:
ret_str = ""
ret_str += get_generic_params(arguments.get("pipeline_arguments", None))
return ret_str


def get_generic_params(params) -> str:
ret_str = ""
if params is None:
return ret_str
for param in params:
ret_str += f"\n {param[NAME]}: {param[TYPE]} = "
if param[TYPE] == "str":
ret_str += f'"{param[VALUE]}"'
else:
ret_str += f"{param[VALUE]}"
ret_str += f", {param.get(DESCRIPTION, '')}"
return ret_str


def get_execute_job_params_guf(args) -> (str):
ret_execute_job_params = ""
if args is not None:
pargs = args.get("pipeline_arguments", None)
if pargs is not None:
for a in pargs:
ret_execute_job_params += f'"{a[NAME]}": {a[NAME]},\n'
return ret_execute_job_params


if __name__ == "__main__":
import argparse
import os
from pathlib import Path

from pre_commit.main import main

parser = argparse.ArgumentParser(description="Kubeflow pipeline generator for Foundation Models")
parser.add_argument("-c", "--config_file", type=str, default="")
parser.add_argument("-od", "--output_dir_file", type=str, default="")

args = parser.parse_args()
# open configuration file
with open(args.config_file, "r") as file:
pipeline_definitions = yaml.safe_load(file)

pipeline_parameters = pipeline_definitions[PIPELINE_PARAMETERS]
common_input_params_values = pipeline_definitions[PIPELINE_COMMON_INPUT_PARAMETERS_VALUES]

# Pipeline template file
fin = open(PIPELINE_TEMPLATE_FILE, "rt")

# Output file to write the pipeline
fout = open(f"{pipeline_parameters[NAME]}_wf.py", "wt")

# define the generated pipeline input parameters
transform_input_parameters = get_pipeline_input_parameters(pipeline_definitions[PIPELINE_TRANSFORM_INPUT_PARAMETERS])

# define arguments to the Ray execution job
execute_job_params = get_execute_job_params_guf(pipeline_definitions[PIPELINE_TRANSFORM_INPUT_PARAMETERS])

component_spec_path = pipeline_parameters.get("component_spec_path", "")
if component_spec_path == "":
component_spec_path = "../../../../../kfp/kfp_ray_components/"

compute_func_name = pipeline_parameters.get("compute_func_name", "")
if compute_func_name == "":
compute_func_name = "ComponentUtils.default_compute_execution_params"

compute_func_import = pipeline_parameters.get("compute_func_import", "")

execute_comp_file = "executeRayJobComponent.yaml"
prefix_name = ""
prefix_execute = ""
prefix_set_secret = ""
if pipeline_parameters.get("multi_s3", False) == True:
execute_comp_file = "executeRayJobComponent_multi_s3.yaml"
prefix_name = pipeline_parameters.get("prefix", "")
prefix_execute = "prefix=PREFIX"
prefix_set_secret = f"ComponentUtils.set_s3_env_vars_to_component(execute_job, {prefix_name}_s3_access_secret, prefix=PREFIX)"

# For each line in the template file
for line in fin:
# read replace the string and write to output pipeline file
fout.write(
line.replace("__pipeline_name__", pipeline_parameters[NAME])
.replace("__pipeline_description__", pipeline_parameters["description"])
.replace("__pipeline_arguments__", transform_input_parameters)
.replace("__execute_job_params__", execute_job_params)
.replace("__compute_func_name__", compute_func_name)
.replace("__component_spec_path__", component_spec_path)
.replace("__compute_import__", compute_func_import)
.replace("__script_name__", pipeline_parameters["script_name"])
.replace("__image_pull_secret__", common_input_params_values["image_pull_secret"])
.replace("__s3_access_secret__", common_input_params_values["s3_access_secret"])
.replace("__input_folder__", common_input_params_values.get("input_folder", ""))
.replace("__output_folder__", common_input_params_values.get("output_folder", ""))
.replace("__transform_image__", common_input_params_values["transform_image"])
.replace("__kfp_base_image__", common_input_params_values["kfp_base_image"])
.replace("__execute_comp__", execute_comp_file)
.replace("__prefix_name__", prefix_name)
.replace("__prefix_execute__", prefix_execute)
.replace("__prefix_set_secret__", prefix_set_secret)
)
# Move the generated file to the output directory
curr_dir = os.getcwd()
src_file = Path(f"{curr_dir}/{pipeline_parameters[NAME]}_wf.py")
dst_file = Path(f"{args.output_dir_file}/{pipeline_parameters[NAME]}_wf.py")
src_file.rename(dst_file)

fout.close()

import sys

from pre_commit.main import main

print(f"Pipeline ${dst_file} auto generation completed")
# format the pipeline python file
args = ["run", "--file", f"{dst_file}", "-c", PRE_COMMIT]
sys.exit(main(args))
31 changes: 31 additions & 0 deletions kfp/pipeline_generator/pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
######################################################### {COPYRIGHT-TOP} ###
roytman marked this conversation as resolved.
Show resolved Hide resolved
# IBM Confidential
# IBM Watson Machine Learning Core - Internal Tooling
# Copyright IBM Corp. 2022
######################################################### {COPYRIGHT-END} ###
repos:
- repo: https://github.com/pre-commit/mirrors-prettier
rev: v3.0.0-alpha.9-for-vscode
hooks:
- id: prettier
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
args: [--config=.black.toml]
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
hooks:
- id: isort
### Exclude submodules as some are part of other organizations with their own policies
exclude: |
(?x)^(
autopilot/.*|
codeflare-cli/.*|
codeflare-sdk/.*|
docker_build_scripts/.*|
mcad/.*|
datalake/.*|
torchx/.*|
tsfm/.*
)$
11 changes: 11 additions & 0 deletions kfp/pipeline_generator/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

DEF_FILE=$1
DIST_DIR=$2
ROOT_DIR=${PWD}

mkdir -p ${ROOT_DIR}/${DIST_DIR}/
python3 -m venv venv
source venv/bin/activate
pip install pre-commit
python3 pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/