Skip to content

Commit

Permalink
[SDK] Add resources per worker for Create Job API (kubeflow#1990)
Browse files Browse the repository at this point in the history
* [SDK] Add resources for create Job API

* Fix unbound var

* Assign values in get pod template

* Add torchrun issue

* Test to create PyTorchJob from Image

* Fix e2e to create from image

* Fix condition

* Modify check test conditions
  • Loading branch information
andreyvelich authored and johnugeorge committed Apr 28, 2024
1 parent de3e408 commit ff6eb55
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 200 deletions.
153 changes: 77 additions & 76 deletions sdk/python/kubeflow/training/api/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
from kubeflow.training.constants import constants
from kubeflow.training.utils import utils
from kubeflow.storage_initializer.constants import (
INIT_CONTAINER_MOUNT_PATH,
VOLUME_PATH_DATASET,
VOLUME_PATH_MODEL,
)


logger = logging.getLogger(__name__)

status_logger = utils.StatusLogger(
Expand Down Expand Up @@ -139,64 +139,50 @@ def train(

namespace = namespace or self.namespace

if isinstance(resources_per_worker, dict):
if "gpu" in resources_per_worker:
if resources_per_worker["gpu"] is not None and (
num_procs_per_worker > resources_per_worker["gpu"]
):
raise ValueError(
"Insufficient gpu resources allocated to the container."
)
if resources_per_worker["gpu"] is not None:
resources_per_worker["nvidia.com/gpu"] = resources_per_worker.pop(
"gpu"
)

if (
"cpu" not in resources_per_worker
or "memory" not in resources_per_worker
):
raise ValueError("cpu and memory resources not specified")

resources_per_worker = client.V1ResourceRequirements(
requests=resources_per_worker,
limits=resources_per_worker,
)

# TODO (andreyvelich): PVC Creation should be part of Training Operator Controller.
# Ref issue: https://github.com/kubeflow/training-operator/issues/1971
try:
self.core_api.create_namespaced_persistent_volume_claim(
namespace=namespace,
body=utils.get_pvc_spec(
pvc_name=constants.TRAINER_PVC_NAME,
pvc_name=constants.STORAGE_INITIALIZER,
namespace=namespace,
storage_size=storage_config["size"],
storage_class=storage_config["storage_class"],
storage_config=storage_config,
),
)
except Exception as e:
pvc_list = self.core_api.list_namespaced_persistent_volume_claim(namespace)
# Check if the PVC with the specified name exists
for pvc in pvc_list.items:
if pvc.metadata.name == constants.TRAINER_PVC_NAME:
if pvc.metadata.name == constants.STORAGE_INITIALIZER:
print(
f"PVC '{constants.TRAINER_PVC_NAME}' already exists in namespace '{namespace}'."
f"PVC '{constants.STORAGE_INITIALIZER}' already exists in namespace "
f"{namespace}."
)
break
else:
raise RuntimeError("failed to create pvc")

if isinstance(model_provider_parameters, HuggingFaceModelParams):
mp = "hf"
else:
raise ValueError(
f"Invalid model provider parameters {model_provider_parameters}"
)

if isinstance(dataset_provider_parameters, S3DatasetParams):
dp = "s3"
elif isinstance(dataset_provider_parameters, HfDatasetParams):
dp = "hf"
else:
raise ValueError(
f"Invalid dataset provider parameters {dataset_provider_parameters}"
)

# create init container spec
init_container_spec = utils.get_container_spec(
name=constants.STORAGE_CONTAINER,
image=constants.STORAGE_CONTAINER_IMAGE,
name=constants.STORAGE_INITIALIZER,
base_image=constants.STORAGE_INITIALIZER_IMAGE,
args=[
"--model_provider",
mp,
Expand All @@ -207,18 +193,13 @@ def train(
"--dataset_provider_parameters",
json.dumps(dataset_provider_parameters.__dict__),
],
volume_mounts=[
models.V1VolumeMount(
name=constants.TRAINER_PV,
mount_path=INIT_CONTAINER_MOUNT_PATH,
)
],
volume_mounts=[constants.STORAGE_INITIALIZER_VOLUME_MOUNT],
)

# create app container spec
container_spec = utils.get_container_spec(
name=constants.JOB_PARAMETERS[constants.PYTORCHJOB_KIND]["container"],
image=constants.TRAINER_TRANSFORMER_IMAGE,
base_image=constants.TRAINER_TRANSFORMER_IMAGE,
args=[
"--model_uri",
model_provider_parameters.model_uri,
Expand All @@ -235,41 +216,22 @@ def train(
"--training_parameters",
json.dumps(train_parameters.training_parameters.to_dict()),
],
volume_mounts=[
models.V1VolumeMount(
name=constants.TRAINER_PV,
mount_path=INIT_CONTAINER_MOUNT_PATH,
)
],
volume_mounts=[constants.STORAGE_INITIALIZER_VOLUME_MOUNT],
resources=resources_per_worker,
)

# create worker pod spec
worker_pod_template_spec = utils.get_pod_template_spec(
job_kind=constants.PYTORCHJOB_KIND,
containers_spec=[container_spec],
volumes_spec=[
models.V1Volume(
name=constants.TRAINER_PV,
persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(
claim_name=constants.TRAINER_PVC_NAME
),
)
],
containers=[container_spec],
init_containers=[init_container_spec],
volumes=[constants.STORAGE_INITIALIZER_VOLUME],
)

# create master pod spec
master_pod_template_spec = utils.get_pod_template_spec(
job_kind=constants.PYTORCHJOB_KIND,
containers_spec=[init_container_spec, container_spec],
volumes_spec=[
models.V1Volume(
name=constants.TRAINER_PV,
persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(
claim_name=constants.TRAINER_PVC_NAME
),
)
],
containers=[container_spec],
init_containers=[init_container_spec],
volumes=[constants.STORAGE_INITIALIZER_VOLUME],
)

job = utils.get_pytorchjob_template(
Expand All @@ -293,6 +255,7 @@ def create_job(
train_func: Optional[Callable] = None,
parameters: Optional[Dict[str, Any]] = None,
num_workers: Optional[int] = None,
resources_per_worker: Union[dict, models.V1ResourceRequirements, None] = None,
num_chief_replicas: Optional[int] = None,
num_ps_replicas: Optional[int] = None,
packages_to_install: Optional[List[str]] = None,
Expand Down Expand Up @@ -324,6 +287,26 @@ def create_job(
set, Base Image must support `bash` CLI to execute the training script.
parameters: Dict of input parameters that training function might receive.
num_workers: Number of Worker replicas for the Job.
resources_per_worker: A parameter that lets you specify how much
resources each Worker container should have. You can either specify a
kubernetes.client.V1ResourceRequirements object (documented here:
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ResourceRequirements.md)
or a dictionary that includes one or more of the following keys:
`cpu`, `memory`, or `gpu` (other keys will be ignored). Appropriate
values for these keys are documented here:
https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/.
For example:
```
{
"cpu": "1",
"memory": "2Gi",
"gpu": "1",
}
```
Please note, `gpu` specifies a resource request with a key of
`nvidia.com/gpu`, i.e. an NVIDIA GPU. If you need a different type
of GPU, pass in a V1ResourceRequirement instance instead, since it's
more flexible. This parameter is optional and defaults to None.
num_chief_replicas: Number of Chief replicas for the TFJob. Number
of Chief replicas can't be more than 1.
num_ps_replicas: Number of Parameter Server replicas for the TFJob.
Expand Down Expand Up @@ -353,29 +336,40 @@ def create_job(
namespace = namespace or self.namespace
job_kind = job_kind or self.job_kind
if job is not None:
job_kind = job.kind
job_kind = str(job.kind)

if job_kind not in constants.JOB_PARAMETERS:
raise ValueError(
f"Job kind must be one of these: {constants.JOB_PARAMETERS.keys()}"
)

# If Training function or base image is set, configure Job template.
if train_func is not None or base_image is not None:
if job is None and (train_func is not None or base_image is not None):
# Job name must be set to configure Job template.
if name is None:
raise ValueError(
"Job name must be set to configure Job from function or image"
)

# Get Pod template spec from function or image.
pod_template_spec = utils.get_pod_template_spec(
job_kind=job_kind,
# Assign the default base image.
# TODO (andreyvelich): Add base image for other Job kinds.
if base_image is None:
base_image = constants.JOB_PARAMETERS[job_kind]["base_image"]

# Get Training Container template.
container_spec = utils.get_container_spec(
name=constants.JOB_PARAMETERS[job_kind]["container"],
base_image=base_image,
train_func=train_func,
parameters=parameters,
train_func_parameters=parameters,
packages_to_install=packages_to_install,
pip_index_url=pip_index_url,
resources=resources_per_worker,
)

# Get Pod template spec using the above container.
pod_template_spec = utils.get_pod_template_spec(
containers=[container_spec],
)

# Configure template for different Jobs.
Expand Down Expand Up @@ -403,16 +397,21 @@ def create_job(
)

# Verify Job object type.
if not isinstance(job, constants.JOB_MODELS):
raise ValueError(f"Job must be one of these types: {constants.JOB_MODELS}")
if not isinstance(
job,
getattr(models, constants.JOB_PARAMETERS[job_kind]["model"]),
):
raise ValueError(
f"Job must be one of these types: {constants.JOB_MODELS}, but Job is: {type(job)}"
)

# Create the Training Job.
try:
self.custom_api.create_namespaced_custom_object(
constants.GROUP,
constants.VERSION,
namespace,
constants.JOB_PARAMETERS[job.kind]["plural"],
constants.JOB_PARAMETERS[job_kind]["plural"],
job,
)
except multiprocessing.TimeoutError:
Expand Down Expand Up @@ -580,7 +579,9 @@ def get_job_conditions(
f"Job kind must be one of these: {constants.JOB_PARAMETERS.keys()}"
)

if job is not None and not isinstance(job, constants.JOB_MODELS):
if job is not None and not isinstance(
job, getattr(models, constants.JOB_PARAMETERS[job_kind]["model"])
):
raise ValueError(f"Job must be one of these types: {constants.JOB_MODELS}")

# If Job is not set, get the Training Job.
Expand Down Expand Up @@ -1235,7 +1236,7 @@ def delete_job(
name: str,
namespace: Optional[str] = None,
job_kind: Optional[str] = None,
delete_options: Optional[client.V1DeleteOptions] = None,
delete_options: Optional[models.V1DeleteOptions] = None,
):
"""Delete the Training Job
Expand Down
19 changes: 10 additions & 9 deletions sdk/python/kubeflow/training/api/training_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,6 @@ def __init__(self, kind) -> None:
},
ValueError,
),
(
"invalid pod template spec parameters",
{
"name": "test job",
"train_func": lambda: "test train function",
"job_kind": constants.MXJOB_KIND,
},
KeyError,
),
(
"paddle job can't be created using function",
{
Expand Down Expand Up @@ -174,6 +165,16 @@ def __init__(self, kind) -> None:
},
"success",
),
(
"valid flow to create job using image",
{
"name": "test-job",
"namespace": "test",
"base_image": "docker.io/test-training",
"num_workers": 2,
},
"success",
),
]


Expand Down
Loading

0 comments on commit ff6eb55

Please sign in to comment.