Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Add resources per worker for Create Job API #1990

Merged
merged 8 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 77 additions & 76 deletions sdk/python/kubeflow/training/api/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
from kubeflow.training.constants import constants
from kubeflow.training.utils import utils
from kubeflow.storage_initializer.constants import (
INIT_CONTAINER_MOUNT_PATH,
VOLUME_PATH_DATASET,
VOLUME_PATH_MODEL,
)


logger = logging.getLogger(__name__)

status_logger = utils.StatusLogger(
Expand Down Expand Up @@ -139,64 +139,50 @@ def train(

namespace = namespace or self.namespace

if isinstance(resources_per_worker, dict):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andreyvelich how are these validations stopping the user from running the training on cpus?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong. I isn't stopping user of running train API on CPUs, but we validate if cpu and memory is set in the resources_per_worker parameter. Which might be not required.
E.g. user can only specify number of GPUs in resources_per_worker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andreyvelich then shall we change the default value of resources_per_worker as it is None currently, what if the user passes an empty dict

Copy link
Member Author

@andreyvelich andreyvelich Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deepanker13 If will be fine if user passes the empty dict, since Kubernetes will assign the resources automatically.
E.g. this works for me:

TrainingClient().create_job(
    resources_per_worker={},
    name="test-empty",
    num_workers=1,
    base_image="docker.io/hello-world",
)

As I said, if we understand that we need additional validation in the future, we can always do it in a separate PRs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

if "gpu" in resources_per_worker:
if resources_per_worker["gpu"] is not None and (
num_procs_per_worker > resources_per_worker["gpu"]
):
raise ValueError(
"Insufficient gpu resources allocated to the container."
)
if resources_per_worker["gpu"] is not None:
resources_per_worker["nvidia.com/gpu"] = resources_per_worker.pop(
"gpu"
)

if (
"cpu" not in resources_per_worker
or "memory" not in resources_per_worker
):
raise ValueError("cpu and memory resources not specified")

resources_per_worker = client.V1ResourceRequirements(
requests=resources_per_worker,
limits=resources_per_worker,
)

# TODO (andreyvelich): PVC Creation should be part of Training Operator Controller.
# Ref issue: https://github.com/kubeflow/training-operator/issues/1971
try:
self.core_api.create_namespaced_persistent_volume_claim(
namespace=namespace,
body=utils.get_pvc_spec(
pvc_name=constants.TRAINER_PVC_NAME,
pvc_name=constants.STORAGE_INITIALIZER,
namespace=namespace,
storage_size=storage_config["size"],
storage_class=storage_config["storage_class"],
storage_config=storage_config,
),
)
except Exception as e:
pvc_list = self.core_api.list_namespaced_persistent_volume_claim(namespace)
# Check if the PVC with the specified name exists
for pvc in pvc_list.items:
if pvc.metadata.name == constants.TRAINER_PVC_NAME:
if pvc.metadata.name == constants.STORAGE_INITIALIZER:
print(
f"PVC '{constants.TRAINER_PVC_NAME}' already exists in namespace '{namespace}'."
f"PVC '{constants.STORAGE_INITIALIZER}' already exists in namespace "
f"{namespace}."
)
break
else:
raise RuntimeError("failed to create pvc")

if isinstance(model_provider_parameters, HuggingFaceModelParams):
mp = "hf"
else:
raise ValueError(
f"Invalid model provider parameters {model_provider_parameters}"
)

if isinstance(dataset_provider_parameters, S3DatasetParams):
dp = "s3"
elif isinstance(dataset_provider_parameters, HfDatasetParams):
dp = "hf"
else:
raise ValueError(
f"Invalid dataset provider parameters {dataset_provider_parameters}"
)

# create init container spec
init_container_spec = utils.get_container_spec(
name=constants.STORAGE_CONTAINER,
image=constants.STORAGE_CONTAINER_IMAGE,
name=constants.STORAGE_INITIALIZER,
base_image=constants.STORAGE_INITIALIZER_IMAGE,
args=[
"--model_provider",
mp,
Expand All @@ -207,18 +193,13 @@ def train(
"--dataset_provider_parameters",
json.dumps(dataset_provider_parameters.__dict__),
],
volume_mounts=[
models.V1VolumeMount(
name=constants.TRAINER_PV,
mount_path=INIT_CONTAINER_MOUNT_PATH,
)
],
volume_mounts=[constants.STORAGE_INITIALIZER_VOLUME_MOUNT],
)

# create app container spec
container_spec = utils.get_container_spec(
name=constants.JOB_PARAMETERS[constants.PYTORCHJOB_KIND]["container"],
image=constants.TRAINER_TRANSFORMER_IMAGE,
base_image=constants.TRAINER_TRANSFORMER_IMAGE,
args=[
"--model_uri",
model_provider_parameters.model_uri,
Expand All @@ -235,41 +216,22 @@ def train(
"--training_parameters",
json.dumps(train_parameters.training_parameters.to_dict()),
],
volume_mounts=[
models.V1VolumeMount(
name=constants.TRAINER_PV,
mount_path=INIT_CONTAINER_MOUNT_PATH,
)
],
volume_mounts=[constants.STORAGE_INITIALIZER_VOLUME_MOUNT],
resources=resources_per_worker,
)

# create worker pod spec
worker_pod_template_spec = utils.get_pod_template_spec(
job_kind=constants.PYTORCHJOB_KIND,
containers_spec=[container_spec],
volumes_spec=[
models.V1Volume(
name=constants.TRAINER_PV,
persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(
claim_name=constants.TRAINER_PVC_NAME
),
)
],
containers=[container_spec],
init_containers=[init_container_spec],
volumes=[constants.STORAGE_INITIALIZER_VOLUME],
)

# create master pod spec
master_pod_template_spec = utils.get_pod_template_spec(
job_kind=constants.PYTORCHJOB_KIND,
containers_spec=[init_container_spec, container_spec],
volumes_spec=[
models.V1Volume(
name=constants.TRAINER_PV,
persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(
claim_name=constants.TRAINER_PVC_NAME
),
)
],
containers=[container_spec],
init_containers=[init_container_spec],
volumes=[constants.STORAGE_INITIALIZER_VOLUME],
)

job = utils.get_pytorchjob_template(
Expand All @@ -293,6 +255,7 @@ def create_job(
train_func: Optional[Callable] = None,
parameters: Optional[Dict[str, Any]] = None,
num_workers: Optional[int] = None,
resources_per_worker: Union[dict, models.V1ResourceRequirements, None] = None,
num_chief_replicas: Optional[int] = None,
num_ps_replicas: Optional[int] = None,
packages_to_install: Optional[List[str]] = None,
Expand Down Expand Up @@ -324,6 +287,26 @@ def create_job(
set, Base Image must support `bash` CLI to execute the training script.
parameters: Dict of input parameters that training function might receive.
num_workers: Number of Worker replicas for the Job.
resources_per_worker: A parameter that lets you specify how much
resources each Worker container should have. You can either specify a
kubernetes.client.V1ResourceRequirements object (documented here:
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ResourceRequirements.md)
or a dictionary that includes one or more of the following keys:
`cpu`, `memory`, or `gpu` (other keys will be ignored). Appropriate
values for these keys are documented here:
https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/.
For example:
```
{
"cpu": "1",
"memory": "2Gi",
"gpu": "1",
}
```
Please note, `gpu` specifies a resource request with a key of
`nvidia.com/gpu`, i.e. an NVIDIA GPU. If you need a different type
of GPU, pass in a V1ResourceRequirement instance instead, since it's
more flexible. This parameter is optional and defaults to None.
num_chief_replicas: Number of Chief replicas for the TFJob. Number
of Chief replicas can't be more than 1.
num_ps_replicas: Number of Parameter Server replicas for the TFJob.
Expand Down Expand Up @@ -353,29 +336,40 @@ def create_job(
namespace = namespace or self.namespace
job_kind = job_kind or self.job_kind
if job is not None:
job_kind = job.kind
job_kind = str(job.kind)

if job_kind not in constants.JOB_PARAMETERS:
raise ValueError(
f"Job kind must be one of these: {constants.JOB_PARAMETERS.keys()}"
)

# If Training function or base image is set, configure Job template.
if train_func is not None or base_image is not None:
if job is None and (train_func is not None or base_image is not None):
# Job name must be set to configure Job template.
if name is None:
raise ValueError(
"Job name must be set to configure Job from function or image"
)

# Get Pod template spec from function or image.
pod_template_spec = utils.get_pod_template_spec(
job_kind=job_kind,
# Assign the default base image.
# TODO (andreyvelich): Add base image for other Job kinds.
if base_image is None:
base_image = constants.JOB_PARAMETERS[job_kind]["base_image"]

# Get Training Container template.
container_spec = utils.get_container_spec(
name=constants.JOB_PARAMETERS[job_kind]["container"],
base_image=base_image,
train_func=train_func,
parameters=parameters,
train_func_parameters=parameters,
packages_to_install=packages_to_install,
pip_index_url=pip_index_url,
resources=resources_per_worker,
)

# Get Pod template spec using the above container.
pod_template_spec = utils.get_pod_template_spec(
containers=[container_spec],
)

# Configure template for different Jobs.
Expand Down Expand Up @@ -403,16 +397,21 @@ def create_job(
)

# Verify Job object type.
if not isinstance(job, constants.JOB_MODELS):
raise ValueError(f"Job must be one of these types: {constants.JOB_MODELS}")
if not isinstance(
job,
getattr(models, constants.JOB_PARAMETERS[job_kind]["model"]),
):
raise ValueError(
f"Job must be one of these types: {constants.JOB_MODELS}, but Job is: {type(job)}"
)

# Create the Training Job.
try:
self.custom_api.create_namespaced_custom_object(
constants.GROUP,
constants.VERSION,
namespace,
constants.JOB_PARAMETERS[job.kind]["plural"],
constants.JOB_PARAMETERS[job_kind]["plural"],
job,
)
except multiprocessing.TimeoutError:
Expand Down Expand Up @@ -580,7 +579,9 @@ def get_job_conditions(
f"Job kind must be one of these: {constants.JOB_PARAMETERS.keys()}"
)

if job is not None and not isinstance(job, constants.JOB_MODELS):
if job is not None and not isinstance(
job, getattr(models, constants.JOB_PARAMETERS[job_kind]["model"])
):
raise ValueError(f"Job must be one of these types: {constants.JOB_MODELS}")

# If Job is not set, get the Training Job.
Expand Down Expand Up @@ -1235,7 +1236,7 @@ def delete_job(
name: str,
namespace: Optional[str] = None,
job_kind: Optional[str] = None,
delete_options: Optional[client.V1DeleteOptions] = None,
delete_options: Optional[models.V1DeleteOptions] = None,
):
"""Delete the Training Job
Expand Down
19 changes: 10 additions & 9 deletions sdk/python/kubeflow/training/api/training_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,6 @@ def __init__(self, kind) -> None:
},
ValueError,
),
(
"invalid pod template spec parameters",
{
"name": "test job",
"train_func": lambda: "test train function",
"job_kind": constants.MXJOB_KIND,
},
KeyError,
),
(
"paddle job can't be created using function",
{
Expand Down Expand Up @@ -174,6 +165,16 @@ def __init__(self, kind) -> None:
},
"success",
),
(
"valid flow to create job using image",
{
"name": "test-job",
"namespace": "test",
"base_image": "docker.io/test-training",
"num_workers": 2,
},
"success",
),
]


Expand Down
Loading