diff --git a/kfp/pipeline_generator/single-pipeline/README.md b/kfp/pipeline_generator/single-pipeline/README.md index c7e8cea0a..b9dbf363e 100644 --- a/kfp/pipeline_generator/single-pipeline/README.md +++ b/kfp/pipeline_generator/single-pipeline/README.md @@ -1,4 +1,6 @@ ## Steps to generate a new pipeline -- create a `pipeline_definitions.yaml` file for the required task (similar to the example [pipeline_definitions.yaml for the noop task](./example/pipeline_definitions.yaml)). +- create a `pipeline_definitions.yaml` file for the required task (similar to the example [pipeline_definitions.yaml for the noop task](../../../transforms/universal/noop/kfp_ray/pipeline_definitions.yaml)). +- execute `make -C ../../../transforms workflow-venv` from this directory +- execute `source ../../../transforms/venv/bin/activate` - execute `./run.sh --config_file --output_dir_file `. When `pipeline_definitions_file_path` is the path of the `pipeline_definitions.yaml` file that defines the pipeline and `destination directory` is a directory where new pipeline file will be generated. diff --git a/kfp/pipeline_generator/single-pipeline/pipeline_generator.py b/kfp/pipeline_generator/single-pipeline/pipeline_generator.py index 701e29d2a..b496b08ca 100644 --- a/kfp/pipeline_generator/single-pipeline/pipeline_generator.py +++ b/kfp/pipeline_generator/single-pipeline/pipeline_generator.py @@ -1,4 +1,4 @@ -PRE_COMMIT = "../pre-commit-config.yaml" + PIPELINE_TEMPLATE_FILE = "simple_pipeline.py" INPUT_PARAMETERS = "input_parameters" @@ -13,13 +13,15 @@ if __name__ == "__main__": import argparse - + import os import yaml from jinja2 import Environment, FileSystemLoader - environment = Environment(loader=FileSystemLoader("templates/")) + script_dir = os.path.dirname(os.path.abspath(__file__)) + environment = Environment(loader=FileSystemLoader(f"{script_dir}/templates/")) template = environment.get_template(PIPELINE_TEMPLATE_FILE) + #pre_commit_config = f"{script_dir}/../pre-commit-config.yaml" parser = argparse.ArgumentParser(description="Kubeflow pipeline generator for Foundation Models") parser.add_argument("-c", "--config_file", type=str, default="") parser.add_argument("-od", "--output_dir_file", type=str, default="") @@ -50,16 +52,8 @@ image_pull_secret=common_input_params_values["image_pull_secret"], multi_s3=pipeline_parameters["multi_s3"], ) - - output_file = f"{args.output_dir_file}/{pipeline_parameters[NAME]}_wf.py" + output_file = f"{args.output_dir_file}{pipeline_parameters[NAME]}_wf.py" with open(output_file, mode="w", encoding="utf-8") as message: message.write(content) print(f"... wrote {output_file}") - import sys - - from pre_commit.main import main - - print(f"Pipeline ${output_file} auto generation completed") - args = ["run", "--file", f"{output_file}", "-c", PRE_COMMIT] - sys.exit(main(args)) diff --git a/kfp/pipeline_generator/single-pipeline/run.sh b/kfp/pipeline_generator/single-pipeline/run.sh index 7364bb441..f1747dc9e 100755 --- a/kfp/pipeline_generator/single-pipeline/run.sh +++ b/kfp/pipeline_generator/single-pipeline/run.sh @@ -57,8 +57,7 @@ fi ROOT_DIR=${PWD} mkdir -p ${ROOT_DIR}/${DIST_DIR}/ -python3 -m venv venv -source venv/bin/activate -pip install pre-commit -pip install jinja2 -python3 pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/ \ No newline at end of file + +script_dir="$(dirname "$(readlink -f "$0")")" +echo $PYTHONPATH +python3 ${script_dir}/pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/ diff --git a/kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py b/kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py index 1ada35c19..bed89f647 100644 --- a/kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py +++ b/kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py @@ -9,12 +9,12 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - import os import kfp.compiler as compiler import kfp.components as comp import kfp.dsl as dsl + from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils @@ -101,19 +101,21 @@ def compute_exec_params_func( ) def {{ pipeline_name }}( # Ray cluster - ray_name: str = "{{ pipeline_name }}-kfp-ray", + ray_name: str = "{{ pipeline_name }}-kfp-ray", # name of Ray cluster + # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access - {% if multi_s3 == False %} + {%- if multi_s3 == False %} data_s3_config: str = "{'input_folder': '{{ input_folder }}', 'output_folder': '{{ output_folder }}'}", - {% else %} + {%- else %} data_s3_config: str = ["{'input_folder': '{{ input_folder }}', 'output_folder': '{{ output_folder }}'}"], - {% endif %} + {%- endif %} data_s3_access_secret: str = "{{ s3_access_secret }}", data_max_files: int = -1, data_num_samples: int = -1, + data_checkpointing: bool = False, # orchestrator runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", @@ -209,4 +211,4 @@ def {{ pipeline_name }}( if __name__ == "__main__": # Compiling the pipeline - compiler.Compiler().compile({{ pipeline_name }}, __file__.replace(".py", ".yaml")) \ No newline at end of file + compiler.Compiler().compile({{ pipeline_name }}, __file__.replace(".py", ".yaml")) diff --git a/kfp/pipeline_generator/superpipeline/README.md b/kfp/pipeline_generator/superpipeline/README.md index 59d0d1a4f..5b7a6f603 100644 --- a/kfp/pipeline_generator/superpipeline/README.md +++ b/kfp/pipeline_generator/superpipeline/README.md @@ -1,6 +1,8 @@ ## Steps to generate a new super pipeline in KFP v1. - The super pipeline allows you to execute several transforms within a single pipeline. For more details, refer [multi_transform_pipeline.md](../../doc/multi_transform_pipeline.md). - Create a `super_pipeline_definitions.yaml` file for the required task. You can refer to the example [super_pipeline_definitions.yaml](./super_pipeline_definitions.yaml). +- execute `make -C ../../../transforms workflow-venv` from this directory +- execute `source ../../../transforms/venv/bin/activate` - Execute `./run.sh --config_file < super_pipeline_definitions.yaml> --output_dir_file `. Here, `super_pipeline_definitions.yaml` is the super pipeline definition file, that you created above, and `destination_directory` is the directory where the new super pipeline file will be generated. diff --git a/kfp/pipeline_generator/superpipeline/run.sh b/kfp/pipeline_generator/superpipeline/run.sh index 36a7b1e30..9f9913560 100755 --- a/kfp/pipeline_generator/superpipeline/run.sh +++ b/kfp/pipeline_generator/superpipeline/run.sh @@ -57,8 +57,5 @@ fi ROOT_DIR=${PWD} mkdir -p ${ROOT_DIR}/${DIST_DIR}/ -python3 -m venv venv -source venv/bin/activate -pip install pre-commit -pip install jinja2 + python3 super_pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/ \ No newline at end of file diff --git a/transforms/.make.workflows b/transforms/.make.workflows index 89f7f4da1..adbf721e6 100644 --- a/transforms/.make.workflows +++ b/transforms/.make.workflows @@ -57,6 +57,9 @@ ${WORKFLOW_VENV_ACTIVATE}: ${REPOROOT}/.make.versions ${REPOROOT}/kfp/kfp_ray_co pip install -e $(REPOROOT)/kfp/kfp_support_lib/shared_workflow_support; \ pip install -e $(REPOROOT)/kfp/kfp_support_lib/$(WORKFLOW_SUPPORT_LIB); \ $(MAKE) -C ${REPOROOT}/kfp/kfp_ray_components set-versions + pip install jinja2 + pip install pyyaml + pip install pre-commit @# Help: Create the virtual environment common to all workflows .PHONY: .workflows.upload-pipeline diff --git a/transforms/universal/noop/kfp_ray/Makefile b/transforms/universal/noop/kfp_ray/Makefile index 1a3a0153c..d1198e5a2 100644 --- a/transforms/universal/noop/kfp_ray/Makefile +++ b/transforms/universal/noop/kfp_ray/Makefile @@ -49,3 +49,7 @@ workflow-upload: workflow-build @for file in $(YAML_WF); do \ $(MAKE) .workflows.upload-pipeline PIPELINE_FILE=$$file; \ done + +.PHONY: workflow-generate +workflow-generate: workflow-venv + . ${WORKFLOW_VENV_ACTIVATE} && ../../../../kfp/pipeline_generator/single-pipeline/run.sh -c `pwd`/pipeline_definitions.yaml -od . diff --git a/transforms/universal/noop/kfp_ray/README.md b/transforms/universal/noop/kfp_ray/README.md index bfc6c9305..e47f3d0ca 100644 --- a/transforms/universal/noop/kfp_ray/README.md +++ b/transforms/universal/noop/kfp_ray/README.md @@ -7,6 +7,13 @@ This project allows execution of the [noop Ray transform](../ray) as a The detail pipeline is presented in the [Simplest Transform pipeline tutorial](../../../../kfp/doc/simple_transform_pipeline.md) +## Pipeline file generation +In order to generate a pipeline python file run +```shell +make workflow-generate +``` +This will use the [pipeline_definitions.yaml](pipeline_definitions.yaml) to generate the python file of the pipeline. It uses the [pipeline generator](../../../../kfp/pipeline_generator/single-pipeline/) directory. + ## Compilation In order to compile pipeline definitions run diff --git a/transforms/universal/noop/kfp_ray/noop_wf.py b/transforms/universal/noop/kfp_ray/noop_wf.py index dfa044017..969051faf 100644 --- a/transforms/universal/noop/kfp_ray/noop_wf.py +++ b/transforms/universal/noop/kfp_ray/noop_wf.py @@ -14,6 +14,7 @@ import kfp.compiler as compiler import kfp.components as comp import kfp.dsl as dsl + from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils @@ -92,7 +93,7 @@ def compute_exec_params_func( @dsl.pipeline( name=TASK_NAME + "-ray-pipeline", - description="Pipeline for noop", + description="Pipeline for noop task", ) def noop( # Ray cluster @@ -103,7 +104,7 @@ def noop( server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/noop/input/', 'output_folder': 'test/noop/output/'}", - data_s3_access_secret: str = "s3-minio", + data_s3_access_secret: str = "s3-secret", data_max_files: int = -1, data_num_samples: int = -1, data_checkpointing: bool = False, @@ -117,7 +118,7 @@ def noop( additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', ): """ - Pipeline to execute NOOP transform + Pipeline to execute noop transform :param ray_name: name of the Ray cluster :param ray_head_options: head node options, containing the following: cpu - number of cpus @@ -167,6 +168,7 @@ def noop( runtime_code_location=runtime_code_location, noop_sleep_sec=noop_sleep_sec, ) + ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2) # start Ray cluster ray_cluster = create_ray_op( @@ -179,12 +181,12 @@ def noop( ) ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2) ray_cluster.after(compute_exec_params) + # Execute job execute_job = execute_ray_jobs_op( ray_name=ray_name, run_id=run_id, additional_params=additional_params, - # note that the parameters below are specific for NOOP transform exec_params=compute_exec_params.output, exec_script_name=EXEC_SCRIPT_NAME, server_url=server_url, @@ -193,7 +195,6 @@ def noop( ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret) execute_job.after(ray_cluster) - if __name__ == "__main__": # Compiling the pipeline - compiler.Compiler().compile(noop, __file__.replace(".py", ".yaml")) + compiler.Compiler().compile(noop, __file__.replace(".py", ".yaml")) \ No newline at end of file diff --git a/transforms/universal/noop/kfp_ray/pipeline_definitions.yaml b/transforms/universal/noop/kfp_ray/pipeline_definitions.yaml new file mode 100644 index 000000000..f4536a0df --- /dev/null +++ b/transforms/universal/noop/kfp_ray/pipeline_definitions.yaml @@ -0,0 +1,24 @@ +pipeline_parameters: + name: "noop" + description: "Pipeline for noop task" + script_name: "noop_transform_ray.py" + prefix: "" + multi_s3: False + compute_func_name: "" + compute_func_import: "" + component_spec_path: "" + +pipeline_common_input_parameters_values: + kfp_base_image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest" + transform_image: "quay.io/dataprep1/data-prep-kit/noop-ray:latest" + s3_access_secret: "s3-secret" + image_pull_secret: "" + input_folder: "test/noop/input/" + output_folder: "test/noop/output/" + +pipeline_transform_input_parameters: + pipeline_arguments: + - name: "noop_sleep_sec" + type: "int" + value: 10 + description: "noop sleep time"