Skip to content

Commit

Permalink
update noop kfpv2
Browse files Browse the repository at this point in the history
  • Loading branch information
roytman committed Jun 3, 2024
1 parent 2ae0984 commit 96f8dd4
Show file tree
Hide file tree
Showing 20 changed files with 390 additions and 12 deletions.
4 changes: 2 additions & 2 deletions .make.versions
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# Data prep lab wheel version
DPK_LIB_VERSION=0.2.0
DPK_LIB_KFP_VERSION=0.2.0
DPK_LIB_KFP_VERSION_v2=0.1.1-dev1
DPK_LIB_KFP_VERSION_v2=0.2.0

# Begin transform versions/tags
BLOCKLIST_VERSION=0.4.0
Expand All @@ -28,5 +28,5 @@ CODE_QUALITY_VERSION=0.4.0
DOC_QUALITY_VERSION=0.4.0
INGEST_TO_PARQUET_VERSION=0.4.0

KFP_DOCKER_VERSION_v2=0.1.1
KFP_DOCKER_VERSION_v2=0.2.0-v2
KFP_DOCKER_VERSION=0.2.0-v2
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/src/create_ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

kfp_v2 = os.getenv("KFP_v2", 0)
if kfp_v2 == 1:
from kfp_v1_workflow_support.utils import KFPUtils, RayRemoteJobs
from workflow_support.runtime_utils import KFPUtils, RayRemoteJobs
print(f"Load KFPv2 libs")
else:
from workflow_support.utils import KFPUtils, RayRemoteJobs
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/src/delete_ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

kfp_v2 = os.getenv("KFP_v2", 0)
if kfp_v2 == 1:
from kfp_v1_workflow_support.utils import KFPUtils, RayRemoteJobs
from workflow_support.runtime_utils import KFPUtils, RayRemoteJobs
print(f"Load KFPv2 libs")
else:
from workflow_support.utils import KFPUtils, RayRemoteJobs
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/src/execute_ray_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

kfp_v2 = os.getenv("KFP_v2", 0)
if kfp_v2 == 1:
from workflow_support.utils import KFPUtils, execute_ray_jobs
from workflow_support.runtime_utils import KFPUtils, execute_ray_jobs
print(f"Load KFPv2 libs")
else:
from workflow_support.utils import KFPUtils, execute_ray_jobs
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

kfp_v2 = os.getenv("KFP_v2", 0)
if kfp_v2 == 1:
from workflow_support.utils import KFPUtils, execute_ray_jobs
from workflow_support.runtime_utils import KFPUtils, execute_ray_jobs
print(f"Load KFPv2 libs")
else:
from workflow_support.utils import KFPUtils, execute_ray_jobs
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/src/subworkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

kfp_v2 = os.getenv("KFP_v2", 0)
if kfp_v2 == 1:
from workflow_support.utils import KFPUtils, PipelinesUtils
from workflow_support.runtime_utils import KFPUtils, PipelinesUtils
print(f"Load KFPv2 libs")
else:
from workflow_support.utils import KFPUtils, PipelinesUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from data_processing.utils import get_logger, str2bool

from . import PipelinesUtils
from workflow_support.utils import PipelinesUtils


logger = get_logger(__name__)
Expand Down
2 changes: 2 additions & 0 deletions kfp/kfp_support_lib/kfp_v2_workflow_support/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ venv:: pyproject.toml .check-env
rm -rf venv
$(PYTHON) -m venv venv
. ${VENV_ACTIVATE}; \
pip install -e ../python_apiserver_client; \
pip install -e ../../../data-processing-lib/python; \
pip install -e .; \
pip install ray==${RAY} \
pip install pytest pytest-cov
Expand Down
8 changes: 5 additions & 3 deletions kfp/kfp_support_lib/kfp_v2_workflow_support/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ authors = [
{ name = "Revital Eres", email = "eres@il.ibm.com" },
]
dependencies = [
"kfp==2.2.0",
"kfp==2.7.0",
"ray==2.9.3",
"requests",
"data-prep-toolkit==0.1.1",
"data-prep-toolkit==0.2.0",
"python_apiserver_client",
]

[build-system]
Expand All @@ -36,7 +38,7 @@ dev = [
package_dir = ["src"]

[options.packages.find]
where = ["src/kfp_support"]
where = ["src/workflow_support"]

[tool.pytest.ini_options]
addopts = "--cov --cov-report term-missing --cov-fail-under 10"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from workflow_support.pipeline_utils.pipeline_utils import PipelinesUtils
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import datetime
import time
from typing import Any, Optional

from data_processing.utils import get_logger
from kfp_server_api import models

from kfp import Client


logger = get_logger(__name__)


class PipelinesUtils:
"""
Helper class for pipeline management
"""

def __init__(self, host: str = "http://localhost:8080"):
"""
Initialization
:param host: host to connect to
"""
self.kfp_client = Client(host=host)

def upload_pipeline(
self,
pipeline_package_path: str = None,
pipeline_name: str = None,
overwrite: bool = False,
description: str = None,
) -> models.api_pipeline.ApiPipeline:
"""
Uploads the pipeline
:param pipeline_package_path: Local path to the pipeline package.
:param pipeline_name: Optional. Name of the pipeline to be shown in the UI
:param overwrite: Optional. If pipeline exists, delete it before creating a new one.
:param description: Optional. Description of the pipeline to be shown in the UI.
:return: Server response object containing pipeline id and other information.
"""
if overwrite:
pipeline = self.get_pipeline_by_name(name=pipeline_name)
if pipeline is not None:
try:
logger.info(f"pipeline {pipeline_name} already exists. Trying to delete it.")
self.kfp_client.delete_pipeline(pipeline_id=pipeline.id)
except Exception as e:
logger.warning(f"Exception deleting pipeline {e} before uploading")
return None
try:
pipeline = self.kfp_client.upload_pipeline(
pipeline_package_path=pipeline_package_path, pipeline_name=pipeline_name, description=description
)
except Exception as e:
logger.warning(f"Exception uploading pipeline {e}")
return None
if pipeline is None:
logger.warning(f"Failed to upload pipeline {pipeline_name}.")
return None
logger.info("Pipeline uploaded")
return pipeline

def delete_pipeline(self, pipeline_id):
"""
Delete pipeline.
:param pipeline_id: id of the pipeline.
:return
Returns:
Object. If the method is called asynchronously, returns the request thread.
Raises:
kfp_server_api.ApiException: If pipeline is not found.
"""
return self.kfp_client.delete_pipeline(pipeline_id)

def start_pipeline(
self,
pipeline: models.api_pipeline.ApiPipeline,
experiment: models.api_experiment.ApiExperiment,
params: Optional[dict[str, Any]],
) -> str:
"""
Start a specified pipeline.
:param pipeline: pipeline definition
:param experiment: experiment to use
:param params: pipeline parameters
:return: the id of the run object
"""
job_name = pipeline.name + " " + datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
try:
run_id = self.kfp_client.run_pipeline(
experiment_id=experiment.id, job_name=job_name, pipeline_id=pipeline.id, params=params
)
logger.info(f"Pipeline run {job_name} submitted")
return run_id.id
except Exception as e:
logger.warning(f"Exception starting pipeline {e}")
return None

def get_experiment_by_name(self, name: str = "Default") -> models.api_experiment.ApiExperiment:
"""
Get experiment by name
:param name: name
:return: experiment
"""
try:
return self.kfp_client.get_experiment(experiment_name=name)
except Exception as e:
logger.warning(f"Exception getting experiment {e}")
return None

def get_pipeline_by_name(self, name: str, np: int = 100) -> models.api_pipeline.ApiPipeline:
"""
Given pipeline name, return the pipeline
:param name: pipeline name
:param np: page size for pipeline query. For large clusters with many pipelines, you might need to
increase this number
:return: pipeline
"""
try:
# Get all pipelines
pipelines = self.kfp_client.list_pipelines(page_size=np).pipelines
required = list(filter(lambda p: name in p.name, pipelines))
if len(required) != 1:
logger.warning(f"Failure to get pipeline. Number of pipelines with name {name} is {len(required)}")
return None
return required[0]

except Exception as e:
logger.warning(f"Exception getting pipeline {e}")
return None

def wait_pipeline_completion(self, run_id: str, timeout: int = -1, wait: int = 600) -> tuple[str, str]:
"""
Waits for a pipeline run to complete
:param run_id: run id
:param timeout: timeout (sec) (-1 wait forever)
:param wait: internal wait (sec)
:return: Completion status and an error message if such exists
"""
try:
if timeout > 0:
end = time.time() + timeout
else:
end = 2**63 - 1
run_details = self.kfp_client.get_run(run_id=run_id)
status = run_details.run.status
while status is None or status.lower() not in ["succeeded", "completed", "failed", "skipped", "error"]:
time.sleep(wait)
if (end - time.time()) < 0:
return "failed", f"Execution is taking too long"
run_details = self.kfp_client.get_run(run_id=run_id)
status = run_details.run.status
logger.info(f"Got pipeline execution status {status}")

if status.lower() in ["succeeded", "completed"]:
return status, ""
return status, run_details.run.error

except Exception as e:
logger.warning(f"Failed waiting pipeline completion {e}")
return "failed", str(e)
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import os
import sys

from data_processing.utils import get_logger, str2bool

from . import PipelinesUtils


logger = get_logger(__name__)


def run_test(pipeline_package_path: str, endpoint: str = "http://localhost:8080/", overwrite: bool = True):
"""
Upload and run a single pipeline
:param pipeline_package_path: Local path to the pipeline package.
:param endpoint: endpoint to kfp service.
:return the pipeline name as it appears in the kfp GUI.
"""
tmout: int = 800
wait: int = 60
file_name = os.path.basename(pipeline_package_path)
pipeline_name = os.path.splitext(file_name)[0]
utils = PipelinesUtils(host=endpoint)
pipeline = utils.upload_pipeline(
pipeline_package_path=pipeline_package_path,
pipeline_name=pipeline_name,
overwrite=overwrite,
)
if pipeline is None:
return None
experiment = utils.get_experiment_by_name()
run_id = utils.start_pipeline(pipeline, experiment, params=[])
status, error = utils.wait_pipeline_completion(run_id=run_id, timeout=tmout, wait=wait)
if status.lower() not in ["succeeded", "completed"]:
# Execution failed
logger.warning(f"Pipeline {pipeline_name} failed with error {error} and status {status}")
return None
logger.info(f"Pipeline {pipeline_name} successfully completed")
return pipeline_name


if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser(description="Run sanity test")
parser.add_argument("-c", "--command", type=str, choices=["upload", "sanity-test"])
parser.add_argument("-e", "--endpoint", type=str, default="http://localhost:8080/")
parser.add_argument("-p", "--pipeline_package_path", type=str, default="")
parser.add_argument("-o", "--overwrite", type=str, default="True")

args = parser.parse_args()
match args.command:
case "upload":
file_name = os.path.basename(args.pipeline_package_path)
pipeline_name = os.path.splitext(file_name)[0]
utils = PipelinesUtils(host=args.endpoint)
pipeline = utils.upload_pipeline(
pipeline_package_path=args.pipeline_package_path,
pipeline_name=pipeline_name,
overwrite=str2bool(args.overwrite),
)
if pipeline is None:
sys.exit(1)
case "sanity-test":
run = run_test(
endpoint=args.endpoint,
pipeline_package_path=args.pipeline_package_path,
overwrite=str2bool(args.overwrite),
)
if run is None:
sys.exit(1)
case _:
logger.warning("Unsupported command")
exit(1)
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from workflow_support.utils import PipelinesUtils

server_url = "http://localhost:8080/"

def test_pipelines():
"""
Test pipelines utils
"""
utils = PipelinesUtils(host=server_url)
# get pipeline by name
pipeline = utils.get_pipeline_by_name("[Tutorial] Data passing in python components")
assert pipeline is not None
# get default experiment
experiment = utils.get_experiment_by_name()
assert experiment is not None
# start pipeline
run = utils.start_pipeline(pipeline=pipeline, experiment=experiment, params={})
assert run is not None
# wait for completion
status, error = utils.wait_pipeline_completion(run_id=run, wait=10)
assert status.lower() == "succeeded"
assert error == ""
Loading

0 comments on commit 96f8dd4

Please sign in to comment.