Skip to content

Commit

Permalink
Merge pull request #311 from D-Sai-Venkatesh/generate-pipeline
Browse files Browse the repository at this point in the history
Updated generate (simple pipeline) pipeline
  • Loading branch information
roytman authored Jun 24, 2024
2 parents 3ea974d + 0b058a9 commit 9bd6ff3
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 241 deletions.
2 changes: 1 addition & 1 deletion kfp/pipeline_generator/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## Steps to generate a new pipeline
- create a `pipeline_definitions.yaml` file for the required task (similar to the example [pipeline_definitions.yaml for the noop task](./example/pipeline_definitions.yaml).
- execute `./run.sh <pipeline_definitions_file_path> <destination directory>`. When `pipeline_definitions_file_path` is the path of the `pipeline_definitions.yaml` file that defines the pipeline and `destination directory` is a directory where new pipeline file
- execute `./run.sh --config_file <pipeline_definitions_file_path> --output_dir_file <destination directory>`. When `pipeline_definitions_file_path` is the path of the `pipeline_definitions.yaml` file that defines the pipeline and `destination directory` is a directory where new pipeline file
will be generated.
6 changes: 3 additions & 3 deletions kfp/pipeline_generator/example/pipeline_definitions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ pipeline_parameters:
description: "Pipeline for noop task"
script_name: "noop_transform.py"
prefix: ""
multi_s3: False
multi_s3: True
compute_func_name: ""
compute_func_import: ""
component_spec_path: ""

pipeline_common_input_parameters_values:
kfp_base_image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.1.1"
transform_image: "quay.io/dataprep1/data-prep-kit/noop:0.8.0"
kfp_base_image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0.dev6"
transform_image: "quay.io/dataprep1/data-prep-kit/noop-ray:0.9.0.dev6"
s3_access_secret: "s3-secret"
image_pull_secret: "prod-all-icr-io"
input_folder: "test/noop/input/"
Expand Down
125 changes: 0 additions & 125 deletions kfp/pipeline_generator/pipeline.ptmpl

This file was deleted.

138 changes: 29 additions & 109 deletions kfp/pipeline_generator/pipeline_generator.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import json

import yaml


PRE_COMMIT = "./pre-commit-config.yaml"
PIPELINE_TEMPLATE_FILE = "pipeline.ptmpl"
PIPELINE_TEMPLATE_FILE = "simple_pipeline.py"

INPUT_PARAMETERS = "input_parameters"
PIPELINE_PARAMETERS = "pipeline_parameters"
Expand All @@ -16,128 +11,53 @@
VALUE = "value"
DESCRIPTION = "description"


def get_pipeline_input_parameters(arguments) -> str:
ret_str = ""
ret_str += get_generic_params(arguments.get("pipeline_arguments", None))
return ret_str


def get_generic_params(params) -> str:
ret_str = ""
if params is None:
return ret_str
for param in params:
ret_str += f"\n {param[NAME]}: {param[TYPE]} = "
if param[TYPE] == "str":
ret_str += f'"{param[VALUE]}"'
else:
ret_str += f"{param[VALUE]}"
ret_str += f", {param.get(DESCRIPTION, '')}"
return ret_str


def get_execute_job_params_guf(args) -> (str):
ret_execute_job_params = ""
if args is not None:
pargs = args.get("pipeline_arguments", None)
if pargs is not None:
for a in pargs:
ret_execute_job_params += f'"{a[NAME]}": {a[NAME]},\n'
return ret_execute_job_params


if __name__ == "__main__":
import yaml
import argparse
import os
from pathlib import Path
from jinja2 import Environment, FileSystemLoader

from pre_commit.main import main
environment = Environment(loader=FileSystemLoader("templates/"))
template = environment.get_template(PIPELINE_TEMPLATE_FILE)

parser = argparse.ArgumentParser(description="Kubeflow pipeline generator for Foundation Models")
parser.add_argument("-c", "--config_file", type=str, default="")
parser.add_argument("-od", "--output_dir_file", type=str, default="")

args = parser.parse_args()
# open configuration file

with open(args.config_file, "r") as file:
pipeline_definitions = yaml.safe_load(file)

pipeline_parameters = pipeline_definitions[PIPELINE_PARAMETERS]
common_input_params_values = pipeline_definitions[PIPELINE_COMMON_INPUT_PARAMETERS_VALUES]

# Pipeline template file
fin = open(PIPELINE_TEMPLATE_FILE, "rt")

# Output file to write the pipeline
fout = open(f"{pipeline_parameters[NAME]}_wf.py", "wt")

# define the generated pipeline input parameters
transform_input_parameters = get_pipeline_input_parameters(
pipeline_definitions[PIPELINE_TRANSFORM_INPUT_PARAMETERS]
)

# define arguments to the Ray execution job
execute_job_params = get_execute_job_params_guf(pipeline_definitions[PIPELINE_TRANSFORM_INPUT_PARAMETERS])
pipeline_transform_input_parameters = pipeline_definitions[PIPELINE_TRANSFORM_INPUT_PARAMETERS]

component_spec_path = pipeline_parameters.get("component_spec_path", "")
if component_spec_path == "":
component_spec_path = "../../../../../kfp/kfp_ray_components/"

compute_func_name = pipeline_parameters.get("compute_func_name", "")
if compute_func_name == "":
compute_func_name = "ComponentUtils.default_compute_execution_params"

compute_func_import = pipeline_parameters.get("compute_func_import", "")

execute_comp_file = "executeRayJobComponent.yaml"
prefix_name = ""
prefix_execute = ""
prefix_set_secret = ""
if pipeline_parameters.get("multi_s3", False) == True:
execute_comp_file = "executeRayJobComponent_multi_s3.yaml"
prefix_name = pipeline_parameters.get("prefix", "")
prefix_execute = "prefix=PREFIX"
prefix_set_secret = (
f"ComponentUtils.set_s3_env_vars_to_component(execute_job, {prefix_name}_s3_access_secret, prefix=PREFIX)"
)

# For each line in the template file
for line in fin:
# read replace the string and write to output pipeline file
fout.write(
line.replace("__pipeline_name__", pipeline_parameters[NAME])
.replace("__pipeline_description__", pipeline_parameters["description"])
.replace("__pipeline_arguments__", transform_input_parameters)
.replace("__execute_job_params__", execute_job_params)
.replace("__compute_func_name__", compute_func_name)
.replace("__component_spec_path__", component_spec_path)
.replace("__compute_import__", compute_func_import)
.replace("__script_name__", pipeline_parameters["script_name"])
.replace("__image_pull_secret__", common_input_params_values["image_pull_secret"])
.replace("__s3_access_secret__", common_input_params_values["s3_access_secret"])
.replace("__input_folder__", common_input_params_values.get("input_folder", ""))
.replace("__output_folder__", common_input_params_values.get("output_folder", ""))
.replace("__transform_image__", common_input_params_values["transform_image"])
.replace("__kfp_base_image__", common_input_params_values["kfp_base_image"])
.replace("__execute_comp__", execute_comp_file)
.replace("__prefix_name__", prefix_name)
.replace("__prefix_execute__", prefix_execute)
.replace("__prefix_set_secret__", prefix_set_secret)
)
# Move the generated file to the output directory
curr_dir = os.getcwd()
src_file = Path(f"{curr_dir}/{pipeline_parameters[NAME]}_wf.py")
dst_file = Path(f"{args.output_dir_file}/{pipeline_parameters[NAME]}_wf.py")
src_file.rename(dst_file)
component_spec_path = "../../../../kfp/kfp_ray_components/"

content = template.render(
transform_image=common_input_params_values["transform_image"],
script_name=pipeline_parameters["script_name"],
kfp_base_image=common_input_params_values["kfp_base_image"],
component_spec_path=component_spec_path,
pipeline_arguments=pipeline_transform_input_parameters["pipeline_arguments"],
pipeline_name=pipeline_parameters[NAME],
pipeline_description=pipeline_parameters["description"],
input_folder=common_input_params_values.get("input_folder", ""),
output_folder=common_input_params_values.get("output_folder", ""),
s3_access_secret=common_input_params_values["s3_access_secret"],
image_pull_secret=common_input_params_values["image_pull_secret"],
multi_s3=pipeline_parameters["multi_s3"]
)

fout.close()
output_file = f"{args.output_dir_file}/{pipeline_parameters[NAME]}_wf.py"
with open(output_file, mode="w", encoding="utf-8") as message:
message.write(content)
print(f"... wrote {output_file}")

import sys

from pre_commit.main import main

print(f"Pipeline ${dst_file} auto generation completed")
# format the pipeline python file
args = ["run", "--file", f"{dst_file}", "-c", PRE_COMMIT]
print(f"Pipeline ${output_file} auto generation completed")
args = ["run", "--file", f"{output_file}", "-c", PRE_COMMIT]
sys.exit(main(args))
59 changes: 56 additions & 3 deletions kfp/pipeline_generator/run.sh
Original file line number Diff line number Diff line change
@@ -1,11 +1,64 @@
#!/bin/bash

DEF_FILE=$1
DIST_DIR=$2
POSITIONAL_ARGS=()

while [[ $# -gt 0 ]]; do
case $1 in
-c|--config_file)
DEF_FILE="$2"
if [[ "$2" = -* ]]
then
echo "ERROR: config_file value not provided."
exit 1
fi
shift # past argument
shift # past value
;;
-od|--output_dir_file)
DIST_DIR="$2"
if [[ "$2" = -* ]]
then
echo "ERROR: output_dir_file value not provided."
exit 1
fi
shift # past argument
shift # past value
;;
-h|--help)
echo "-c/--config_file(required): file path to config_file(pipeline_definition.yaml)."
echo "-od/--output_dir_file(required): output folder path to store generated pipeline."
exit 1
;;
-*|--*)
echo "Unknown option $1"
exit 1
;;
*)
POSITIONAL_ARGS+=("$1") # save positional arg
shift # past argument
;;
esac
done


if [ -z ${DEF_FILE+x} ]
then
echo "ERROR: config_file is not defined."
exit 1
fi

if [ -z ${DIST_DIR+x} ]
then
echo "ERROR: output_dir_file is not defined."
exit 1
fi


ROOT_DIR=${PWD}

mkdir -p ${ROOT_DIR}/${DIST_DIR}/
python3 -m venv venv
source venv/bin/activate
pip install pre-commit
python3 pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/
pip install jinja2
python3 pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/
Loading

0 comments on commit 9bd6ff3

Please sign in to comment.