Skip to content

Commit

Permalink
Adding direct KubeRay compatibility to the SDK (#358)
Browse files Browse the repository at this point in the history
* Added component generation

* Added multi-resource YAML support

* Cluster.up on ray cluster object

* Basic status and down for RayCluster

* Finished up/down and added unit tests

* Remove unused utils import

* Applied review feedback

* Changed naming of internal funcs

* Review feedback applied, auto-select

* OAuth conflict resolution
  • Loading branch information
Maxusmusti authored Oct 23, 2023
1 parent 2441f4f commit 830cbce
Show file tree
Hide file tree
Showing 7 changed files with 450 additions and 84 deletions.
222 changes: 161 additions & 61 deletions src/codeflare_sdk/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(self, config: ClusterConfiguration):
self.config = config
self.app_wrapper_yaml = self.create_app_wrapper()
self.app_wrapper_name = self.app_wrapper_yaml.split(".")[0]
self._client = None
self._job_submission_client = None

@property
def _client_headers(self):
Expand All @@ -86,23 +86,25 @@ def _client_verify_tls(self):
return not self.config.openshift_oauth

@property
def client(self):
if self._client:
return self._client
def job_client(self):
if self._job_submission_client:
return self._job_submission_client
if self.config.openshift_oauth:
print(
api_config_handler().configuration.get_api_key_with_prefix(
"authorization"
)
)
self._client = JobSubmissionClient(
self._job_submission_client = JobSubmissionClient(
self.cluster_dashboard_uri(),
headers=self._client_headers,
verify=self._client_verify_tls,
)
else:
self._client = JobSubmissionClient(self.cluster_dashboard_uri())
return self._client
self._job_submission_client = JobSubmissionClient(
self.cluster_dashboard_uri()
)
return self._job_submission_client

def evaluate_dispatch_priority(self):
priority_class = self.config.dispatch_priority
Expand Down Expand Up @@ -141,6 +143,10 @@ def create_app_wrapper(self):

# Before attempting to create the cluster AW, let's evaluate the ClusterConfig
if self.config.dispatch_priority:
if not self.config.mcad:
raise ValueError(
"Invalid Cluster Configuration, cannot have dispatch priority without MCAD"
)
priority_val = self.evaluate_dispatch_priority()
if priority_val == None:
raise ValueError(
Expand All @@ -163,6 +169,7 @@ def create_app_wrapper(self):
template = self.config.template
image = self.config.image
instascale = self.config.instascale
mcad = self.config.mcad
instance_types = self.config.machine_types
env = self.config.envs
local_interactive = self.config.local_interactive
Expand All @@ -183,6 +190,7 @@ def create_app_wrapper(self):
template=template,
image=image,
instascale=instascale,
mcad=mcad,
instance_types=instance_types,
env=env,
local_interactive=local_interactive,
Expand All @@ -207,15 +215,18 @@ def up(self):
try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
with open(self.app_wrapper_yaml) as f:
aw = yaml.load(f, Loader=yaml.FullLoader)
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
namespace=namespace,
plural="appwrappers",
body=aw,
)
if self.config.mcad:
with open(self.app_wrapper_yaml) as f:
aw = yaml.load(f, Loader=yaml.FullLoader)
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
namespace=namespace,
plural="appwrappers",
body=aw,
)
else:
self._component_resources_up(namespace, api_instance)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)

Expand All @@ -228,13 +239,16 @@ def down(self):
try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
api_instance.delete_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
namespace=namespace,
plural="appwrappers",
name=self.app_wrapper_name,
)
if self.config.mcad:
api_instance.delete_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
namespace=namespace,
plural="appwrappers",
name=self.app_wrapper_name,
)
else:
self._component_resources_down(namespace, api_instance)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)

Expand All @@ -252,42 +266,46 @@ def status(
"""
ready = False
status = CodeFlareClusterStatus.UNKNOWN
# check the app wrapper status
appwrapper = _app_wrapper_status(self.config.name, self.config.namespace)
if appwrapper:
if appwrapper.status in [
AppWrapperStatus.RUNNING,
AppWrapperStatus.COMPLETED,
AppWrapperStatus.RUNNING_HOLD_COMPLETION,
]:
ready = False
status = CodeFlareClusterStatus.STARTING
elif appwrapper.status in [
AppWrapperStatus.FAILED,
AppWrapperStatus.DELETED,
]:
ready = False
status = CodeFlareClusterStatus.FAILED # should deleted be separate
return status, ready # exit early, no need to check ray status
elif appwrapper.status in [
AppWrapperStatus.PENDING,
AppWrapperStatus.QUEUEING,
]:
ready = False
if appwrapper.status == AppWrapperStatus.PENDING:
status = CodeFlareClusterStatus.QUEUED
else:
status = CodeFlareClusterStatus.QUEUEING
if print_to_console:
pretty_print.print_app_wrappers_status([appwrapper])
return (
status,
ready,
) # no need to check the ray status since still in queue
if self.config.mcad:
# check the app wrapper status
appwrapper = _app_wrapper_status(self.config.name, self.config.namespace)
if appwrapper:
if appwrapper.status in [
AppWrapperStatus.RUNNING,
AppWrapperStatus.COMPLETED,
AppWrapperStatus.RUNNING_HOLD_COMPLETION,
]:
ready = False
status = CodeFlareClusterStatus.STARTING
elif appwrapper.status in [
AppWrapperStatus.FAILED,
AppWrapperStatus.DELETED,
]:
ready = False
status = CodeFlareClusterStatus.FAILED # should deleted be separate
return status, ready # exit early, no need to check ray status
elif appwrapper.status in [
AppWrapperStatus.PENDING,
AppWrapperStatus.QUEUEING,
]:
ready = False
if appwrapper.status == AppWrapperStatus.PENDING:
status = CodeFlareClusterStatus.QUEUED
else:
status = CodeFlareClusterStatus.QUEUEING
if print_to_console:
pretty_print.print_app_wrappers_status([appwrapper])
return (
status,
ready,
) # no need to check the ray status since still in queue

# check the ray cluster status
cluster = _ray_cluster_status(self.config.name, self.config.namespace)
if cluster and not cluster.status == RayClusterStatus.UNKNOWN:
if cluster:
if cluster.status == RayClusterStatus.UNKNOWN:
ready = False
status = CodeFlareClusterStatus.STARTING
if cluster.status == RayClusterStatus.READY:
ready = True
status = CodeFlareClusterStatus.READY
Expand Down Expand Up @@ -407,19 +425,19 @@ def list_jobs(self) -> List:
"""
This method accesses the head ray node in your cluster and lists the running jobs.
"""
return self.client.list_jobs()
return self.job_client.list_jobs()

def job_status(self, job_id: str) -> str:
"""
This method accesses the head ray node in your cluster and returns the job status for the provided job id.
"""
return self.client.get_job_status(job_id)
return self.job_client.get_job_status(job_id)

def job_logs(self, job_id: str) -> str:
"""
This method accesses the head ray node in your cluster and returns the logs for the provided job id.
"""
return self.client.get_job_logs(job_id)
return self.job_client.get_job_logs(job_id)

def torchx_config(
self, working_dir: str = None, requirements: str = None
Expand All @@ -435,7 +453,7 @@ def torchx_config(
to_return["requirements"] = requirements
return to_return

def from_k8_cluster_object(rc):
def from_k8_cluster_object(rc, mcad=True):
machine_types = (
rc["metadata"]["labels"]["orderedinstance"].split("_")
if "orderedinstance" in rc["metadata"]["labels"]
Expand Down Expand Up @@ -474,6 +492,7 @@ def from_k8_cluster_object(rc):
0
]["image"],
local_interactive=local_interactive,
mcad=mcad,
)
return Cluster(cluster_config)

Expand All @@ -484,6 +503,66 @@ def local_client_url(self):
else:
return "None"

def _component_resources_up(
self, namespace: str, api_instance: client.CustomObjectsApi
):
with open(self.app_wrapper_yaml) as f:
yamls = yaml.load_all(f, Loader=yaml.FullLoader)
for resource in yamls:
if resource["kind"] == "RayCluster":
api_instance.create_namespaced_custom_object(
group="ray.io",
version="v1alpha1",
namespace=namespace,
plural="rayclusters",
body=resource,
)
elif resource["kind"] == "Route":
api_instance.create_namespaced_custom_object(
group="route.openshift.io",
version="v1",
namespace=namespace,
plural="routes",
body=resource,
)
elif resource["kind"] == "Secret":
secret_instance = client.CoreV1Api(api_config_handler())
secret_instance.create_namespaced_secret(
namespace=namespace,
body=resource,
)

def _component_resources_down(
self, namespace: str, api_instance: client.CustomObjectsApi
):
with open(self.app_wrapper_yaml) as f:
yamls = yaml.load_all(f, Loader=yaml.FullLoader)
for resource in yamls:
if resource["kind"] == "RayCluster":
api_instance.delete_namespaced_custom_object(
group="ray.io",
version="v1alpha1",
namespace=namespace,
plural="rayclusters",
name=self.app_wrapper_name,
)
elif resource["kind"] == "Route":
name = resource["metadata"]["name"]
api_instance.delete_namespaced_custom_object(
group="route.openshift.io",
version="v1",
namespace=namespace,
plural="routes",
name=name,
)
elif resource["kind"] == "Secret":
name = resource["metadata"]["name"]
secret_instance = client.CoreV1Api(api_config_handler())
secret_instance.delete_namespaced_secret(
namespace=namespace,
name=name,
)


def list_all_clusters(namespace: str, print_to_console: bool = True):
"""
Expand Down Expand Up @@ -549,13 +628,33 @@ def get_cluster(cluster_name: str, namespace: str = "default"):

for rc in rcs["items"]:
if rc["metadata"]["name"] == cluster_name:
return Cluster.from_k8_cluster_object(rc)
mcad = _check_aw_exists(cluster_name, namespace)
return Cluster.from_k8_cluster_object(rc, mcad=mcad)
raise FileNotFoundError(
f"Cluster {cluster_name} is not found in {namespace} namespace"
)


# private methods
def _check_aw_exists(name: str, namespace: str) -> bool:
try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
aws = api_instance.list_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
namespace=namespace,
plural="appwrappers",
)
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e, print_error=False)

for aw in aws["items"]:
if aw["metadata"]["name"] == name:
return True
return False


def _get_ingress_domain():
try:
config_check()
Expand Down Expand Up @@ -660,6 +759,7 @@ def _map_to_ray_cluster(rc) -> Optional[RayCluster]:

config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
# UPDATE THIS
routes = api_instance.list_namespaced_custom_object(
group="route.openshift.io",
version="v1",
Expand Down
1 change: 1 addition & 0 deletions src/codeflare_sdk/cluster/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class ClusterConfiguration:
num_gpus: int = 0
template: str = f"{dir}/templates/base-template.yaml"
instascale: bool = False
mcad: bool = True
envs: dict = field(default_factory=dict)
image: str = "quay.io/project-codeflare/ray:latest-py39-cu118"
local_interactive: bool = False
Expand Down
7 changes: 2 additions & 5 deletions src/codeflare_sdk/job/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
from torchx.schedulers.ray_scheduler import RayScheduler
from torchx.specs import AppHandle, parse_app_handle, AppDryRunInfo

from ray.job_submission import JobSubmissionClient

import openshift as oc

if TYPE_CHECKING:
from ..cluster.cluster import Cluster
Expand Down Expand Up @@ -96,9 +93,9 @@ def __init__(

def _dry_run(self, cluster: "Cluster"):
j = f"{cluster.config.num_workers}x{max(cluster.config.num_gpus, 1)}" # # of proc. = # of gpus
runner = get_runner(ray_client=cluster.client)
runner = get_runner(ray_client=cluster.job_client)
runner._scheduler_instances["ray"] = RayScheduler(
session_name=runner._name, ray_client=cluster.client
session_name=runner._name, ray_client=cluster.job_client
)
return (
runner.dryrun(
Expand Down
Loading

0 comments on commit 830cbce

Please sign in to comment.