From ad5c282aa915b10f48a35419689cfb3fa4aac7db Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Mon, 26 Jun 2023 08:01:11 +0000 Subject: [PATCH 1/9] fix pytorch WORLD_SIZE env inconsistent --- docs/api/kubeflow.org_v1_generated.asciidoc | 2 +- hack/python-sdk/swagger.json | 10 +++---- pkg/apis/kubeflow.org/v1/openapi_generated.go | 14 ++++----- pkg/apis/kubeflow.org/v1/pytorch_types.go | 4 +-- .../kubeflow.org/v1/zz_generated.deepcopy.go | 10 +++---- pkg/controller.v1/pytorch/elastic.go | 10 ++----- pkg/controller.v1/pytorch/elastic_test.go | 7 +---- pkg/controller.v1/pytorch/envvar.go | 29 +++++++++++++++++- sdk/python/docs/KubeflowOrgV1ElasticPolicy.md | 1 - .../docs/KubeflowOrgV1PyTorchJobSpec.md | 1 + .../models/kubeflow_org_v1_elastic_policy.py | 30 +------------------ .../kubeflow_org_v1_py_torch_job_spec.py | 30 ++++++++++++++++++- .../test_kubeflow_org_v1_elastic_policy.py | 1 - .../test/test_kubeflow_org_v1_py_torch_job.py | 2 +- .../test_kubeflow_org_v1_py_torch_job_list.py | 4 +-- .../test_kubeflow_org_v1_py_torch_job_spec.py | 2 +- 16 files changed, 86 insertions(+), 71 deletions(-) diff --git a/docs/api/kubeflow.org_v1_generated.asciidoc b/docs/api/kubeflow.org_v1_generated.asciidoc index 6c4964f425..6182cebcc4 100644 --- a/docs/api/kubeflow.org_v1_generated.asciidoc +++ b/docs/api/kubeflow.org_v1_generated.asciidoc @@ -53,7 +53,6 @@ Package v1 contains API Schema definitions for the kubeflow.org v1 API group | *`rdzvId`* __string__ | | *`rdzvConf`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-rdzvconf[$$RDZVConf$$] array__ | RDZVConf contains additional rendezvous configuration (=,=,...). | *`standalone`* __boolean__ | Start a local standalone rendezvous backend that is represented by a C10d TCP store on port 29400. Useful when launching single-node, multi-worker job. If specified --rdzv_backend, --rdzv_endpoint, --rdzv_id are auto-assigned; any explicitly set values are ignored. -| *`nProcPerNode`* __integer__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. | *`maxRestarts`* __integer__ | | *`metrics`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#metricspec-v2-autoscaling[$$MetricSpec$$] array__ | Metrics contains the specifications which are used to calculate the desired replica count (the maximum replica count across all metrics will be used). The desired replica count is calculated with multiplying the ratio between the target value and the current value by the current number of pods. Ergo, metrics used must decrease as the pod count is increased, and vice-versa. See the individual metric source types for more information about how each type of metric must respond. If not set, the HPA will not be created. |=== @@ -396,6 +395,7 @@ PyTorchJobSpec is a desired state description of the PyTorchJob. | *`runPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-runpolicy[$$RunPolicy$$]__ | RunPolicy encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active. | *`elasticPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-elasticpolicy[$$ElasticPolicy$$]__ | | *`pytorchReplicaSpecs`* __object (keys:xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-replicatype[$$ReplicaType$$], values:xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-replicaspec[$$ReplicaSpec$$])__ | A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example, { "Master": PyTorchReplicaSpec, "Worker": PyTorchReplicaSpec, } +| *`nprocPerNode`* __integer__ | Number of workers per node |=== diff --git a/hack/python-sdk/swagger.json b/hack/python-sdk/swagger.json index 4fff444db9..ba75706113 100644 --- a/hack/python-sdk/swagger.json +++ b/hack/python-sdk/swagger.json @@ -32,11 +32,6 @@ "type": "integer", "format": "int32" }, - "nProcPerNode": { - "description": "Number of workers per node; supported values: [auto, cpu, gpu, int].", - "type": "integer", - "format": "int32" - }, "rdzvBackend": { "type": "string" }, @@ -491,6 +486,11 @@ "elasticPolicy": { "$ref": "#/definitions/kubeflow.org.v1.ElasticPolicy" }, + "nprocPerNode": { + "description": "Number of workers per node", + "type": "integer", + "format": "int32" + }, "pytorchReplicaSpecs": { "description": "A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example,\n {\n \"Master\": PyTorchReplicaSpec,\n \"Worker\": PyTorchReplicaSpec,\n }", "type": "object", diff --git a/pkg/apis/kubeflow.org/v1/openapi_generated.go b/pkg/apis/kubeflow.org/v1/openapi_generated.go index ea93510040..cb9fd1d3fe 100644 --- a/pkg/apis/kubeflow.org/v1/openapi_generated.go +++ b/pkg/apis/kubeflow.org/v1/openapi_generated.go @@ -124,13 +124,6 @@ func schema_pkg_apis_kubefloworg_v1_ElasticPolicy(ref common.ReferenceCallback) Format: "", }, }, - "nProcPerNode": { - SchemaProps: spec.SchemaProps{ - Description: "Number of workers per node; supported values: [auto, cpu, gpu, int].", - Type: []string{"integer"}, - Format: "int32", - }, - }, "maxRestarts": { SchemaProps: spec.SchemaProps{ Type: []string{"integer"}, @@ -909,6 +902,13 @@ func schema_pkg_apis_kubefloworg_v1_PyTorchJobSpec(ref common.ReferenceCallback) }, }, }, + "nprocPerNode": { + SchemaProps: spec.SchemaProps{ + Description: "Number of workers per node", + Type: []string{"integer"}, + Format: "int32", + }, + }, }, Required: []string{"runPolicy", "pytorchReplicaSpecs"}, }, diff --git a/pkg/apis/kubeflow.org/v1/pytorch_types.go b/pkg/apis/kubeflow.org/v1/pytorch_types.go index b932b4469a..b17b8eebfa 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_types.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_types.go @@ -84,6 +84,8 @@ type PyTorchJobSpec struct { // "Worker": PyTorchReplicaSpec, // } PyTorchReplicaSpecs map[ReplicaType]*ReplicaSpec `json:"pytorchReplicaSpecs"` + // Number of workers per node + NprocPerNode *int32 `json:"nprocPerNode,omitempty"` } type ElasticPolicy struct { @@ -106,8 +108,6 @@ type ElasticPolicy struct { // --rdzv_backend, --rdzv_endpoint, --rdzv_id are auto-assigned; any explicitly set values // are ignored. Standalone *bool `json:"standalone,omitempty"` - // Number of workers per node; supported values: [auto, cpu, gpu, int]. - NProcPerNode *int32 `json:"nProcPerNode,omitempty"` MaxRestarts *int32 `json:"maxRestarts,omitempty"` diff --git a/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go b/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go index 57c01e70bf..9026d5d24a 100644 --- a/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go @@ -70,11 +70,6 @@ func (in *ElasticPolicy) DeepCopyInto(out *ElasticPolicy) { *out = new(bool) **out = **in } - if in.NProcPerNode != nil { - in, out := &in.NProcPerNode, &out.NProcPerNode - *out = new(int32) - **out = **in - } if in.MaxRestarts != nil { in, out := &in.MaxRestarts, &out.MaxRestarts *out = new(int32) @@ -585,6 +580,11 @@ func (in *PyTorchJobSpec) DeepCopyInto(out *PyTorchJobSpec) { (*out)[key] = outVal } } + if in.NprocPerNode != nil { + in, out := &in.NprocPerNode, &out.NprocPerNode + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PyTorchJobSpec. diff --git a/pkg/controller.v1/pytorch/elastic.go b/pkg/controller.v1/pytorch/elastic.go index 52ba459a52..a30e76fb8c 100644 --- a/pkg/controller.v1/pytorch/elastic.go +++ b/pkg/controller.v1/pytorch/elastic.go @@ -49,8 +49,8 @@ const ( // Worker/node size related arguments. - // EnvNProcPerNode is the environment variable name for the number of processes per node. - EnvNProcPerNode = "PET_NPROC_PER_NODE" + // EnvNprocPerNode is the environment variable name for the number of processes per node. + EnvNprocPerNode = "PET_NPROC_PER_NODE" // EnvNNodes is the environment variable name for the number of nodes. EnvNNodes = "PET_NNODES" ) @@ -101,12 +101,6 @@ func (e ElasticEnvVarGenerator) Generate( Value: strconv.Itoa(int(*elasticPolicy.MaxRestarts)), }) } - if elasticPolicy.NProcPerNode != nil { - envVars = append(envVars, corev1.EnvVar{ - Name: EnvNProcPerNode, - Value: strconv.Itoa(int(*elasticPolicy.NProcPerNode)), - }) - } if elasticPolicy.RDZVID != nil { envVars = append(envVars, corev1.EnvVar{ Name: EnvRDZVID, diff --git a/pkg/controller.v1/pytorch/elastic_test.go b/pkg/controller.v1/pytorch/elastic_test.go index 295b0bfbb9..3a11a9725a 100644 --- a/pkg/controller.v1/pytorch/elastic_test.go +++ b/pkg/controller.v1/pytorch/elastic_test.go @@ -73,8 +73,7 @@ func TestElasticGenerate(t *testing.T) { Value: "rdzv-conf-value-1", }, }, - NProcPerNode: pointer.Int32(1), - MaxRestarts: pointer.Int32(3), + MaxRestarts: pointer.Int32(3), }, PyTorchReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{ kubeflowv1.PyTorchJobReplicaTypeWorker: { @@ -89,10 +88,6 @@ func TestElasticGenerate(t *testing.T) { Name: EnvMaxRestarts, Value: "3", }, - { - Name: EnvNProcPerNode, - Value: "1", - }, { Name: EnvRDZVBackend, Value: "c10d", diff --git a/pkg/controller.v1/pytorch/envvar.go b/pkg/controller.v1/pytorch/envvar.go index 1497444ec1..124a6c1d0b 100644 --- a/pkg/controller.v1/pytorch/envvar.go +++ b/pkg/controller.v1/pytorch/envvar.go @@ -69,15 +69,25 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, } totalReplicas := getTotalReplicas(pytorchjob) + worldSize := getWorldSize(pytorchjob) + nprocPerNode := getNprocPerNode(pytorchjob) podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ Name: "WORLD_SIZE", - Value: strconv.Itoa(int(totalReplicas)), + Value: strconv.Itoa(int(worldSize)), }) podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ Name: "RANK", Value: strconv.Itoa(rank), }) + podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ + Name: EnvNprocPerNode, + Value: strconv.Itoa(int(nprocPerNode)), + }) + podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ + Name: EnvNNodes, + Value: strconv.Itoa(int(totalReplicas)), + }) } // Set the elastic environment variables if the elasticPolicy is not null. @@ -95,6 +105,23 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, return nil } +func getNprocPerNode(job *kubeflowv1.PyTorchJob) int32 { + if job.Spec.NprocPerNode != nil { + return *job.Spec.NprocPerNode + } else { + return 1 + } +} + +func getWorldSize(job *kubeflowv1.PyTorchJob) int32 { + worldSize := int32(0) + nprocPerNode := getNprocPerNode(job) + for _, r := range job.Spec.PyTorchReplicaSpecs { + worldSize += *r.Replicas * nprocPerNode + } + return worldSize +} + func getTotalReplicas(job *kubeflowv1.PyTorchJob) int32 { jobReplicas := int32(0) for _, r := range job.Spec.PyTorchReplicaSpecs { diff --git a/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md b/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md index 3a7a0589ee..a80e680941 100644 --- a/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md +++ b/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md @@ -7,7 +7,6 @@ Name | Type | Description | Notes **max_restarts** | **int** | | [optional] **metrics** | [**list[K8sIoApiAutoscalingV2MetricSpec]**](K8sIoApiAutoscalingV2MetricSpec.md) | Metrics contains the specifications which are used to calculate the desired replica count (the maximum replica count across all metrics will be used). The desired replica count is calculated with multiplying the ratio between the target value and the current value by the current number of pods. Ergo, metrics used must decrease as the pod count is increased, and vice-versa. See the individual metric source types for more information about how each type of metric must respond. If not set, the HPA will not be created. | [optional] **min_replicas** | **int** | minReplicas is the lower limit for the number of replicas to which the training job can scale down. It defaults to null. | [optional] -**n_proc_per_node** | **int** | Number of workers per node; supported values: [auto, cpu, gpu, int]. | [optional] **rdzv_backend** | **str** | | [optional] **rdzv_conf** | [**list[KubeflowOrgV1RDZVConf]**](KubeflowOrgV1RDZVConf.md) | RDZVConf contains additional rendezvous configuration (<key1>=<value1>,<key2>=<value2>,...). | [optional] **rdzv_host** | **str** | | [optional] diff --git a/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md b/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md index 7647a472d2..e4693fa9b7 100644 --- a/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md +++ b/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md @@ -5,6 +5,7 @@ PyTorchJobSpec is a desired state description of the PyTorchJob. Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **elastic_policy** | [**KubeflowOrgV1ElasticPolicy**](KubeflowOrgV1ElasticPolicy.md) | | [optional] +**nproc_per_node** | **int** | Number of workers per node | [optional] **pytorch_replica_specs** | [**dict(str, KubeflowOrgV1ReplicaSpec)**](KubeflowOrgV1ReplicaSpec.md) | A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example, { \"Master\": PyTorchReplicaSpec, \"Worker\": PyTorchReplicaSpec, } | **run_policy** | [**KubeflowOrgV1RunPolicy**](KubeflowOrgV1RunPolicy.md) | | diff --git a/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py b/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py index c6e3d9f6bd..3ad393786e 100644 --- a/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py +++ b/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py @@ -37,7 +37,6 @@ class KubeflowOrgV1ElasticPolicy(object): 'max_restarts': 'int', 'metrics': 'list[K8sIoApiAutoscalingV2MetricSpec]', 'min_replicas': 'int', - 'n_proc_per_node': 'int', 'rdzv_backend': 'str', 'rdzv_conf': 'list[KubeflowOrgV1RDZVConf]', 'rdzv_host': 'str', @@ -51,7 +50,6 @@ class KubeflowOrgV1ElasticPolicy(object): 'max_restarts': 'maxRestarts', 'metrics': 'metrics', 'min_replicas': 'minReplicas', - 'n_proc_per_node': 'nProcPerNode', 'rdzv_backend': 'rdzvBackend', 'rdzv_conf': 'rdzvConf', 'rdzv_host': 'rdzvHost', @@ -60,7 +58,7 @@ class KubeflowOrgV1ElasticPolicy(object): 'standalone': 'standalone' } - def __init__(self, max_replicas=None, max_restarts=None, metrics=None, min_replicas=None, n_proc_per_node=None, rdzv_backend=None, rdzv_conf=None, rdzv_host=None, rdzv_id=None, rdzv_port=None, standalone=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, max_replicas=None, max_restarts=None, metrics=None, min_replicas=None, rdzv_backend=None, rdzv_conf=None, rdzv_host=None, rdzv_id=None, rdzv_port=None, standalone=None, local_vars_configuration=None): # noqa: E501 """KubeflowOrgV1ElasticPolicy - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() @@ -70,7 +68,6 @@ def __init__(self, max_replicas=None, max_restarts=None, metrics=None, min_repli self._max_restarts = None self._metrics = None self._min_replicas = None - self._n_proc_per_node = None self._rdzv_backend = None self._rdzv_conf = None self._rdzv_host = None @@ -87,8 +84,6 @@ def __init__(self, max_replicas=None, max_restarts=None, metrics=None, min_repli self.metrics = metrics if min_replicas is not None: self.min_replicas = min_replicas - if n_proc_per_node is not None: - self.n_proc_per_node = n_proc_per_node if rdzv_backend is not None: self.rdzv_backend = rdzv_backend if rdzv_conf is not None: @@ -192,29 +187,6 @@ def min_replicas(self, min_replicas): self._min_replicas = min_replicas - @property - def n_proc_per_node(self): - """Gets the n_proc_per_node of this KubeflowOrgV1ElasticPolicy. # noqa: E501 - - Number of workers per node; supported values: [auto, cpu, gpu, int]. # noqa: E501 - - :return: The n_proc_per_node of this KubeflowOrgV1ElasticPolicy. # noqa: E501 - :rtype: int - """ - return self._n_proc_per_node - - @n_proc_per_node.setter - def n_proc_per_node(self, n_proc_per_node): - """Sets the n_proc_per_node of this KubeflowOrgV1ElasticPolicy. - - Number of workers per node; supported values: [auto, cpu, gpu, int]. # noqa: E501 - - :param n_proc_per_node: The n_proc_per_node of this KubeflowOrgV1ElasticPolicy. # noqa: E501 - :type: int - """ - - self._n_proc_per_node = n_proc_per_node - @property def rdzv_backend(self): """Gets the rdzv_backend of this KubeflowOrgV1ElasticPolicy. # noqa: E501 diff --git a/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py b/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py index 34a915d257..c828fe7e0e 100644 --- a/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py +++ b/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py @@ -34,29 +34,34 @@ class KubeflowOrgV1PyTorchJobSpec(object): """ openapi_types = { 'elastic_policy': 'KubeflowOrgV1ElasticPolicy', + 'nproc_per_node': 'int', 'pytorch_replica_specs': 'dict(str, KubeflowOrgV1ReplicaSpec)', 'run_policy': 'KubeflowOrgV1RunPolicy' } attribute_map = { 'elastic_policy': 'elasticPolicy', + 'nproc_per_node': 'nprocPerNode', 'pytorch_replica_specs': 'pytorchReplicaSpecs', 'run_policy': 'runPolicy' } - def __init__(self, elastic_policy=None, pytorch_replica_specs=None, run_policy=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, elastic_policy=None, nproc_per_node=None, pytorch_replica_specs=None, run_policy=None, local_vars_configuration=None): # noqa: E501 """KubeflowOrgV1PyTorchJobSpec - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() self.local_vars_configuration = local_vars_configuration self._elastic_policy = None + self._nproc_per_node = None self._pytorch_replica_specs = None self._run_policy = None self.discriminator = None if elastic_policy is not None: self.elastic_policy = elastic_policy + if nproc_per_node is not None: + self.nproc_per_node = nproc_per_node self.pytorch_replica_specs = pytorch_replica_specs self.run_policy = run_policy @@ -81,6 +86,29 @@ def elastic_policy(self, elastic_policy): self._elastic_policy = elastic_policy + @property + def nproc_per_node(self): + """Gets the nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 + + Number of workers per node # noqa: E501 + + :return: The nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 + :rtype: int + """ + return self._nproc_per_node + + @nproc_per_node.setter + def nproc_per_node(self, nproc_per_node): + """Sets the nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. + + Number of workers per node # noqa: E501 + + :param nproc_per_node: The nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 + :type: int + """ + + self._nproc_per_node = nproc_per_node + @property def pytorch_replica_specs(self): """Gets the pytorch_replica_specs of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 diff --git a/sdk/python/test/test_kubeflow_org_v1_elastic_policy.py b/sdk/python/test/test_kubeflow_org_v1_elastic_policy.py index 11eaab937f..f445e8261f 100644 --- a/sdk/python/test/test_kubeflow_org_v1_elastic_policy.py +++ b/sdk/python/test/test_kubeflow_org_v1_elastic_policy.py @@ -42,7 +42,6 @@ def make_instance(self, include_optional): None ], min_replicas = 56, - n_proc_per_node = 56, rdzv_backend = '0', rdzv_conf = [ kubeflow_org_v1_rdzv_conf.KubeflowOrgV1RDZVConf( diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py index 39758b1777..470d136ac7 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py @@ -47,7 +47,6 @@ def make_instance(self, include_optional): None ], min_replicas = 56, - n_proc_per_node = 56, rdzv_backend = '0', rdzv_conf = [ kubeflow_org_v1_rdzv_conf.KubeflowOrgV1RDZVConf( @@ -58,6 +57,7 @@ def make_instance(self, include_optional): rdzv_id = '0', rdzv_port = 56, standalone = True, ), + nproc_per_node = 56, pytorch_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( replicas = 56, diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py index af3cffec1a..8255823bc3 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py @@ -50,7 +50,6 @@ def make_instance(self, include_optional): None ], min_replicas = 56, - n_proc_per_node = 56, rdzv_backend = '0', rdzv_conf = [ kubeflow_org_v1_rdzv_conf.KubeflowOrgV1RDZVConf( @@ -61,6 +60,7 @@ def make_instance(self, include_optional): rdzv_id = '0', rdzv_port = 56, standalone = True, ), + nproc_per_node = 56, pytorch_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( replicas = 56, @@ -120,7 +120,6 @@ def make_instance(self, include_optional): None ], min_replicas = 56, - n_proc_per_node = 56, rdzv_backend = '0', rdzv_conf = [ kubeflow_org_v1_rdzv_conf.KubeflowOrgV1RDZVConf( @@ -131,6 +130,7 @@ def make_instance(self, include_optional): rdzv_id = '0', rdzv_port = 56, standalone = True, ), + nproc_per_node = 56, pytorch_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( replicas = 56, diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py index 749bd7b959..9698294f2d 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py @@ -43,7 +43,6 @@ def make_instance(self, include_optional): None ], min_replicas = 56, - n_proc_per_node = 56, rdzv_backend = '0', rdzv_conf = [ kubeflow_org_v1_rdzv_conf.KubeflowOrgV1RDZVConf( @@ -54,6 +53,7 @@ def make_instance(self, include_optional): rdzv_id = '0', rdzv_port = 56, standalone = True, ), + nproc_per_node = 56, pytorch_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( replicas = 56, From d29362cdfc5c13d3664dd2df001e3a190fd60ce0 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Wed, 28 Jun 2023 15:29:41 +0800 Subject: [PATCH 2/9] add correct env node_rank, nnodes for torch --- .../base/crds/kubeflow.org_pytorchjobs.yaml | 9 ++--- pkg/controller.v1/pytorch/elastic.go | 15 +++---- pkg/controller.v1/pytorch/elastic_test.go | 2 +- pkg/controller.v1/pytorch/envvar.go | 40 ++++++++++++------- pkg/controller.v1/pytorch/master.go | 11 +++++ .../pytorch/pytorchjob_controller_test.go | 2 +- 6 files changed, 47 insertions(+), 32 deletions(-) diff --git a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml index afe8ab040a..60cc09dd15 100644 --- a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml +++ b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml @@ -552,11 +552,6 @@ spec: to null. format: int32 type: integer - nProcPerNode: - description: 'Number of workers per node; supported values: [auto, - cpu, gpu, int].' - format: int32 - type: integer rdzvBackend: type: string rdzvConf: @@ -585,6 +580,10 @@ spec: set values are ignored. type: boolean type: object + nprocPerNode: + description: Number of workers per node + format: int32 + type: integer pytorchReplicaSpecs: additionalProperties: description: ReplicaSpec is a description of the replica diff --git a/pkg/controller.v1/pytorch/elastic.go b/pkg/controller.v1/pytorch/elastic.go index a30e76fb8c..0b84a4726e 100644 --- a/pkg/controller.v1/pytorch/elastic.go +++ b/pkg/controller.v1/pytorch/elastic.go @@ -47,12 +47,7 @@ const ( // EnvStartMethod is the environment variable name for the multiprocessing start method to use when creating workers, which could be fork, spawn and forkserver. EnvStartMethod = "PET_START_METHOD" - // Worker/node size related arguments. - - // EnvNprocPerNode is the environment variable name for the number of processes per node. - EnvNprocPerNode = "PET_NPROC_PER_NODE" - // EnvNNodes is the environment variable name for the number of nodes. - EnvNNodes = "PET_NNODES" + // EnvNNodes is the common environment variable name from envvar ) var ( @@ -89,7 +84,7 @@ func (e ElasticEnvVarGenerator) Generate( // Generate RDZV_BACKEND. envVars = append(envVars, e.generateEnvBackend(elasticPolicy)) // Generate NNODES. - if envVar, err := e.generateEnvNNodes(job); err != nil { + if envVar, err := e.generateEnvNnodes(job); err != nil { return nil, err } else { envVars = append(envVars, *envVar) @@ -120,7 +115,7 @@ func (e ElasticEnvVarGenerator) Generate( return envVars, nil } -func (e ElasticEnvVarGenerator) generateEnvNNodes(job *kubeflowv1.PyTorchJob) (*corev1.EnvVar, error) { +func (e ElasticEnvVarGenerator) generateEnvNnodes(job *kubeflowv1.PyTorchJob) (*corev1.EnvVar, error) { // Return worker.replicas if there is no max and min replicas specified. if job.Spec.ElasticPolicy.MinReplicas == nil && job.Spec.ElasticPolicy.MaxReplicas == nil { @@ -128,7 +123,7 @@ func (e ElasticEnvVarGenerator) generateEnvNNodes(job *kubeflowv1.PyTorchJob) (* return nil, fmt.Errorf("cannot find the worker spec") } return &corev1.EnvVar{ - Name: EnvNNodes, + Name: EnvNnodes, Value: strconv.Itoa( int(*job.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker]. Replicas)), @@ -136,7 +131,7 @@ func (e ElasticEnvVarGenerator) generateEnvNNodes(job *kubeflowv1.PyTorchJob) (* } return &corev1.EnvVar{ - Name: EnvNNodes, + Name: EnvNnodes, Value: fmt.Sprintf("%d:%d", *job.Spec.ElasticPolicy.MinReplicas, *job.Spec.ElasticPolicy.MaxReplicas), }, nil diff --git a/pkg/controller.v1/pytorch/elastic_test.go b/pkg/controller.v1/pytorch/elastic_test.go index 3a11a9725a..af3d2cbbe8 100644 --- a/pkg/controller.v1/pytorch/elastic_test.go +++ b/pkg/controller.v1/pytorch/elastic_test.go @@ -105,7 +105,7 @@ func TestElasticGenerate(t *testing.T) { Value: "rdzv-conf-name=rdzv-conf-value,rdzv-conf-name-1=rdzv-conf-value-1", }, { - Name: EnvNNodes, + Name: EnvNnodes, Value: "1:3", }, }, diff --git a/pkg/controller.v1/pytorch/envvar.go b/pkg/controller.v1/pytorch/envvar.go index 124a6c1d0b..6a82673b4b 100644 --- a/pkg/controller.v1/pytorch/envvar.go +++ b/pkg/controller.v1/pytorch/envvar.go @@ -25,6 +25,17 @@ import ( kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" ) +const ( + // Worker/node size related arguments. + + // EnvNprocPerNode is the environment variable name for the number of processes per node. + EnvNprocPerNode = "PET_NPROC_PER_NODE" + // EnvNnodes is the environment variable name for the number of nodes. + EnvNnodes = "PET_NNODES" + // EnvNodeRank is the environment variable name for the rank of nodes. + EnvNodeRank = "PET_NODE_RANK" +) + // EnvVarGenerator is the environment variable generator interface. type EnvVarGenerator interface { Generate(job *kubeflowv1.PyTorchJob) ([]corev1.EnvVar, error) @@ -49,6 +60,10 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, Value: "0", }) + totalReplicas := getTotalReplicas(pytorchjob) + nprocPerNode := getNprocPerNode(pytorchjob) + worldSize := totalReplicas * nprocPerNode + // If the master is not null, then we need to set the MASTER_ADDR and RANK. if pytorchjob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster] != nil { envVars, err := GetMasterEnvVarGenerator().Generate(pytorchjob) @@ -68,10 +83,6 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, rank = rank + 1 } - totalReplicas := getTotalReplicas(pytorchjob) - worldSize := getWorldSize(pytorchjob) - nprocPerNode := getNprocPerNode(pytorchjob) - podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ Name: "WORLD_SIZE", Value: strconv.Itoa(int(worldSize)), @@ -85,12 +96,14 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, Value: strconv.Itoa(int(nprocPerNode)), }) podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ - Name: EnvNNodes, - Value: strconv.Itoa(int(totalReplicas)), + Name: EnvNodeRank, + Value: strconv.Itoa(rank), }) } // Set the elastic environment variables if the elasticPolicy is not null. + // nnodes is set in range format in elastic mode, e.g. nnodes=1:4 + // otherwise, nnodes is set by int, e.g. nnodes=2 if pytorchjob.Spec.ElasticPolicy != nil { envVars, err := GetElasticEnvVarGenerator().Generate(pytorchjob) if err != nil { @@ -99,6 +112,12 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, // Set elastic related environment variables. podTemplateSpec.Spec.Containers[i].Env = append( podTemplateSpec.Spec.Containers[i].Env, envVars...) + } else { + podTemplateSpec.Spec.Containers[i].Env = append( + podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ + Name: EnvNnodes, + Value: strconv.Itoa(int(totalReplicas)), + }) } } @@ -113,15 +132,6 @@ func getNprocPerNode(job *kubeflowv1.PyTorchJob) int32 { } } -func getWorldSize(job *kubeflowv1.PyTorchJob) int32 { - worldSize := int32(0) - nprocPerNode := getNprocPerNode(job) - for _, r := range job.Spec.PyTorchReplicaSpecs { - worldSize += *r.Replicas * nprocPerNode - } - return worldSize -} - func getTotalReplicas(job *kubeflowv1.PyTorchJob) int32 { jobReplicas := int32(0) for _, r := range job.Spec.PyTorchReplicaSpecs { diff --git a/pkg/controller.v1/pytorch/master.go b/pkg/controller.v1/pytorch/master.go index ce116db88e..064c7054b9 100644 --- a/pkg/controller.v1/pytorch/master.go +++ b/pkg/controller.v1/pytorch/master.go @@ -14,6 +14,9 @@ var ( onceMaster sync.Once EnvMasterPort = "MASTER_PORT" EnvMasterAddr = "MASTER_ADDR" + + PETMasterPort = "PET_MASTER_PORT" + PETMasterAddr = "PET_MASTER_ADDR" ) // MasterEnvVarGenerator is the environment variable generator for Master related arguments. @@ -42,10 +45,18 @@ func (e MasterEnvVarGenerator) Generate( Name: EnvMasterPort, Value: strconv.Itoa(int(masterPort)), }) + envVars = append(envVars, corev1.EnvVar{ + Name: PETMasterPort, + Value: strconv.Itoa(int(masterPort)), + }) envVars = append(envVars, corev1.EnvVar{ Name: EnvMasterAddr, Value: masterAddr, }) + envVars = append(envVars, corev1.EnvVar{ + Name: PETMasterAddr, + Value: masterAddr, + }) } return envVars, nil } diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go index 99f2b2107c..cb6c891d37 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go @@ -247,7 +247,7 @@ var _ = Describe("PyTorchJob controller", func() { Name: EnvRDZVBackend, Value: string(backendC10D), }, corev1.EnvVar{ - Name: EnvNNodes, + Name: EnvNnodes, Value: fmt.Sprintf("%d:%d", *minReplicas, *maxReplicas), }, corev1.EnvVar{ Name: EnvRDZVEndpoint, From 731a12e2e6b759fe76509a23a29b77c6a01c69a1 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Wed, 28 Jun 2023 17:04:37 +0800 Subject: [PATCH 3/9] restore elastic nnproc --- docs/api/kubeflow.org_v1_generated.asciidoc | 1 + hack/python-sdk/swagger.json | 5 ++++ .../base/crds/kubeflow.org_pytorchjobs.yaml | 5 ++++ pkg/apis/kubeflow.org/v1/openapi_generated.go | 7 +++++ pkg/apis/kubeflow.org/v1/pytorch_types.go | 2 ++ .../kubeflow.org/v1/zz_generated.deepcopy.go | 5 ++++ pkg/controller.v1/pytorch/elastic.go | 9 ++++++ sdk/python/docs/KubeflowOrgV1ElasticPolicy.md | 1 + .../models/kubeflow_org_v1_elastic_policy.py | 30 ++++++++++++++++++- .../test_kubeflow_org_v1_elastic_policy.py | 1 + .../test/test_kubeflow_org_v1_py_torch_job.py | 1 + .../test_kubeflow_org_v1_py_torch_job_list.py | 2 ++ .../test_kubeflow_org_v1_py_torch_job_spec.py | 1 + 13 files changed, 69 insertions(+), 1 deletion(-) diff --git a/docs/api/kubeflow.org_v1_generated.asciidoc b/docs/api/kubeflow.org_v1_generated.asciidoc index 6182cebcc4..0c3a3b2853 100644 --- a/docs/api/kubeflow.org_v1_generated.asciidoc +++ b/docs/api/kubeflow.org_v1_generated.asciidoc @@ -53,6 +53,7 @@ Package v1 contains API Schema definitions for the kubeflow.org v1 API group | *`rdzvId`* __string__ | | *`rdzvConf`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-rdzvconf[$$RDZVConf$$] array__ | RDZVConf contains additional rendezvous configuration (=,=,...). | *`standalone`* __boolean__ | Start a local standalone rendezvous backend that is represented by a C10d TCP store on port 29400. Useful when launching single-node, multi-worker job. If specified --rdzv_backend, --rdzv_endpoint, --rdzv_id are auto-assigned; any explicitly set values are ignored. +| *`nProcPerNode`* __integer__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. | *`maxRestarts`* __integer__ | | *`metrics`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#metricspec-v2-autoscaling[$$MetricSpec$$] array__ | Metrics contains the specifications which are used to calculate the desired replica count (the maximum replica count across all metrics will be used). The desired replica count is calculated with multiplying the ratio between the target value and the current value by the current number of pods. Ergo, metrics used must decrease as the pod count is increased, and vice-versa. See the individual metric source types for more information about how each type of metric must respond. If not set, the HPA will not be created. |=== diff --git a/hack/python-sdk/swagger.json b/hack/python-sdk/swagger.json index ba75706113..4b2ea8cd8f 100644 --- a/hack/python-sdk/swagger.json +++ b/hack/python-sdk/swagger.json @@ -32,6 +32,11 @@ "type": "integer", "format": "int32" }, + "nProcPerNode": { + "description": "Number of workers per node; supported values: [auto, cpu, gpu, int].", + "type": "integer", + "format": "int32" + }, "rdzvBackend": { "type": "string" }, diff --git a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml index 60cc09dd15..7c8c0ba2cd 100644 --- a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml +++ b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml @@ -552,6 +552,11 @@ spec: to null. format: int32 type: integer + nProcPerNode: + description: 'Number of workers per node; supported values: [auto, + cpu, gpu, int].' + format: int32 + type: integer rdzvBackend: type: string rdzvConf: diff --git a/pkg/apis/kubeflow.org/v1/openapi_generated.go b/pkg/apis/kubeflow.org/v1/openapi_generated.go index cb9fd1d3fe..1eaa44e08a 100644 --- a/pkg/apis/kubeflow.org/v1/openapi_generated.go +++ b/pkg/apis/kubeflow.org/v1/openapi_generated.go @@ -124,6 +124,13 @@ func schema_pkg_apis_kubefloworg_v1_ElasticPolicy(ref common.ReferenceCallback) Format: "", }, }, + "nProcPerNode": { + SchemaProps: spec.SchemaProps{ + Description: "Number of workers per node; supported values: [auto, cpu, gpu, int].", + Type: []string{"integer"}, + Format: "int32", + }, + }, "maxRestarts": { SchemaProps: spec.SchemaProps{ Type: []string{"integer"}, diff --git a/pkg/apis/kubeflow.org/v1/pytorch_types.go b/pkg/apis/kubeflow.org/v1/pytorch_types.go index b17b8eebfa..54cc02a66f 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_types.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_types.go @@ -108,6 +108,8 @@ type ElasticPolicy struct { // --rdzv_backend, --rdzv_endpoint, --rdzv_id are auto-assigned; any explicitly set values // are ignored. Standalone *bool `json:"standalone,omitempty"` + // Number of workers per node; supported values: [auto, cpu, gpu, int]. + NProcPerNode *int32 `json:"nProcPerNode,omitempty"` MaxRestarts *int32 `json:"maxRestarts,omitempty"` diff --git a/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go b/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go index 9026d5d24a..c53468d9ef 100644 --- a/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go @@ -70,6 +70,11 @@ func (in *ElasticPolicy) DeepCopyInto(out *ElasticPolicy) { *out = new(bool) **out = **in } + if in.NProcPerNode != nil { + in, out := &in.NProcPerNode, &out.NProcPerNode + *out = new(int32) + **out = **in + } if in.MaxRestarts != nil { in, out := &in.MaxRestarts, &out.MaxRestarts *out = new(int32) diff --git a/pkg/controller.v1/pytorch/elastic.go b/pkg/controller.v1/pytorch/elastic.go index 0b84a4726e..e3b031a8eb 100644 --- a/pkg/controller.v1/pytorch/elastic.go +++ b/pkg/controller.v1/pytorch/elastic.go @@ -48,6 +48,9 @@ const ( EnvStartMethod = "PET_START_METHOD" // EnvNNodes is the common environment variable name from envvar + + // EnvNProcPerNode is the environment variable name for the number of processes per node. + EnvNProcPerNode = "PET_NPROC_PER_NODE" ) var ( @@ -102,6 +105,12 @@ func (e ElasticEnvVarGenerator) Generate( Value: *elasticPolicy.RDZVID, }) } + if elasticPolicy.NProcPerNode != nil { + envVars = append(envVars, corev1.EnvVar{ + Name: EnvNProcPerNode, + Value: strconv.Itoa(int(*elasticPolicy.NProcPerNode)), + }) + } if envVar := e.generateEnvRDZVConf(elasticPolicy); envVar != nil { envVars = append(envVars, *envVar) } diff --git a/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md b/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md index a80e680941..3a7a0589ee 100644 --- a/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md +++ b/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md @@ -7,6 +7,7 @@ Name | Type | Description | Notes **max_restarts** | **int** | | [optional] **metrics** | [**list[K8sIoApiAutoscalingV2MetricSpec]**](K8sIoApiAutoscalingV2MetricSpec.md) | Metrics contains the specifications which are used to calculate the desired replica count (the maximum replica count across all metrics will be used). The desired replica count is calculated with multiplying the ratio between the target value and the current value by the current number of pods. Ergo, metrics used must decrease as the pod count is increased, and vice-versa. See the individual metric source types for more information about how each type of metric must respond. If not set, the HPA will not be created. | [optional] **min_replicas** | **int** | minReplicas is the lower limit for the number of replicas to which the training job can scale down. It defaults to null. | [optional] +**n_proc_per_node** | **int** | Number of workers per node; supported values: [auto, cpu, gpu, int]. | [optional] **rdzv_backend** | **str** | | [optional] **rdzv_conf** | [**list[KubeflowOrgV1RDZVConf]**](KubeflowOrgV1RDZVConf.md) | RDZVConf contains additional rendezvous configuration (<key1>=<value1>,<key2>=<value2>,...). | [optional] **rdzv_host** | **str** | | [optional] diff --git a/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py b/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py index 3ad393786e..c6e3d9f6bd 100644 --- a/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py +++ b/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py @@ -37,6 +37,7 @@ class KubeflowOrgV1ElasticPolicy(object): 'max_restarts': 'int', 'metrics': 'list[K8sIoApiAutoscalingV2MetricSpec]', 'min_replicas': 'int', + 'n_proc_per_node': 'int', 'rdzv_backend': 'str', 'rdzv_conf': 'list[KubeflowOrgV1RDZVConf]', 'rdzv_host': 'str', @@ -50,6 +51,7 @@ class KubeflowOrgV1ElasticPolicy(object): 'max_restarts': 'maxRestarts', 'metrics': 'metrics', 'min_replicas': 'minReplicas', + 'n_proc_per_node': 'nProcPerNode', 'rdzv_backend': 'rdzvBackend', 'rdzv_conf': 'rdzvConf', 'rdzv_host': 'rdzvHost', @@ -58,7 +60,7 @@ class KubeflowOrgV1ElasticPolicy(object): 'standalone': 'standalone' } - def __init__(self, max_replicas=None, max_restarts=None, metrics=None, min_replicas=None, rdzv_backend=None, rdzv_conf=None, rdzv_host=None, rdzv_id=None, rdzv_port=None, standalone=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, max_replicas=None, max_restarts=None, metrics=None, min_replicas=None, n_proc_per_node=None, rdzv_backend=None, rdzv_conf=None, rdzv_host=None, rdzv_id=None, rdzv_port=None, standalone=None, local_vars_configuration=None): # noqa: E501 """KubeflowOrgV1ElasticPolicy - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() @@ -68,6 +70,7 @@ def __init__(self, max_replicas=None, max_restarts=None, metrics=None, min_repli self._max_restarts = None self._metrics = None self._min_replicas = None + self._n_proc_per_node = None self._rdzv_backend = None self._rdzv_conf = None self._rdzv_host = None @@ -84,6 +87,8 @@ def __init__(self, max_replicas=None, max_restarts=None, metrics=None, min_repli self.metrics = metrics if min_replicas is not None: self.min_replicas = min_replicas + if n_proc_per_node is not None: + self.n_proc_per_node = n_proc_per_node if rdzv_backend is not None: self.rdzv_backend = rdzv_backend if rdzv_conf is not None: @@ -187,6 +192,29 @@ def min_replicas(self, min_replicas): self._min_replicas = min_replicas + @property + def n_proc_per_node(self): + """Gets the n_proc_per_node of this KubeflowOrgV1ElasticPolicy. # noqa: E501 + + Number of workers per node; supported values: [auto, cpu, gpu, int]. # noqa: E501 + + :return: The n_proc_per_node of this KubeflowOrgV1ElasticPolicy. # noqa: E501 + :rtype: int + """ + return self._n_proc_per_node + + @n_proc_per_node.setter + def n_proc_per_node(self, n_proc_per_node): + """Sets the n_proc_per_node of this KubeflowOrgV1ElasticPolicy. + + Number of workers per node; supported values: [auto, cpu, gpu, int]. # noqa: E501 + + :param n_proc_per_node: The n_proc_per_node of this KubeflowOrgV1ElasticPolicy. # noqa: E501 + :type: int + """ + + self._n_proc_per_node = n_proc_per_node + @property def rdzv_backend(self): """Gets the rdzv_backend of this KubeflowOrgV1ElasticPolicy. # noqa: E501 diff --git a/sdk/python/test/test_kubeflow_org_v1_elastic_policy.py b/sdk/python/test/test_kubeflow_org_v1_elastic_policy.py index f445e8261f..11eaab937f 100644 --- a/sdk/python/test/test_kubeflow_org_v1_elastic_policy.py +++ b/sdk/python/test/test_kubeflow_org_v1_elastic_policy.py @@ -42,6 +42,7 @@ def make_instance(self, include_optional): None ], min_replicas = 56, + n_proc_per_node = 56, rdzv_backend = '0', rdzv_conf = [ kubeflow_org_v1_rdzv_conf.KubeflowOrgV1RDZVConf( diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py index 470d136ac7..7c361dbd62 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py @@ -47,6 +47,7 @@ def make_instance(self, include_optional): None ], min_replicas = 56, + n_proc_per_node = 56, rdzv_backend = '0', rdzv_conf = [ kubeflow_org_v1_rdzv_conf.KubeflowOrgV1RDZVConf( diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py index 8255823bc3..11973de4dc 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py @@ -50,6 +50,7 @@ def make_instance(self, include_optional): None ], min_replicas = 56, + n_proc_per_node = 56, rdzv_backend = '0', rdzv_conf = [ kubeflow_org_v1_rdzv_conf.KubeflowOrgV1RDZVConf( @@ -120,6 +121,7 @@ def make_instance(self, include_optional): None ], min_replicas = 56, + n_proc_per_node = 56, rdzv_backend = '0', rdzv_conf = [ kubeflow_org_v1_rdzv_conf.KubeflowOrgV1RDZVConf( diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py index 9698294f2d..0e11a22e64 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py @@ -43,6 +43,7 @@ def make_instance(self, include_optional): None ], min_replicas = 56, + n_proc_per_node = 56, rdzv_backend = '0', rdzv_conf = [ kubeflow_org_v1_rdzv_conf.KubeflowOrgV1RDZVConf( From f98077dca7085538c6f887211c0e2a66c248b36a Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Thu, 29 Jun 2023 14:46:22 +0800 Subject: [PATCH 4/9] use string for nproc_per_node --- docs/api/kubeflow.org_v1_generated.asciidoc | 4 +-- hack/python-sdk/swagger.json | 7 ++--- .../base/crds/kubeflow.org_pytorchjobs.yaml | 8 +++--- pkg/apis/kubeflow.org/v1/openapi_generated.go | 8 +++--- pkg/apis/kubeflow.org/v1/pytorch_types.go | 6 ++-- .../kubeflow.org/v1/zz_generated.deepcopy.go | 2 +- pkg/controller.v1/paddlepaddle/envvar.go | 2 +- pkg/controller.v1/pytorch/envvar.go | 28 +++++++++++++------ .../pytorch/pytorchjob_controller_test.go | 7 ++++- pkg/controller.v1/xgboost/xgboost.go | 2 +- sdk/python/docs/KubeflowOrgV1ElasticPolicy.md | 2 +- .../docs/KubeflowOrgV1PyTorchJobSpec.md | 2 +- .../models/kubeflow_org_v1_elastic_policy.py | 4 +-- .../kubeflow_org_v1_py_torch_job_spec.py | 10 +++---- .../test/test_kubeflow_org_v1_py_torch_job.py | 2 +- .../test_kubeflow_org_v1_py_torch_job_list.py | 4 +-- .../test_kubeflow_org_v1_py_torch_job_spec.py | 2 +- 17 files changed, 58 insertions(+), 42 deletions(-) diff --git a/docs/api/kubeflow.org_v1_generated.asciidoc b/docs/api/kubeflow.org_v1_generated.asciidoc index 0c3a3b2853..5c4d0f68aa 100644 --- a/docs/api/kubeflow.org_v1_generated.asciidoc +++ b/docs/api/kubeflow.org_v1_generated.asciidoc @@ -53,7 +53,7 @@ Package v1 contains API Schema definitions for the kubeflow.org v1 API group | *`rdzvId`* __string__ | | *`rdzvConf`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-rdzvconf[$$RDZVConf$$] array__ | RDZVConf contains additional rendezvous configuration (=,=,...). | *`standalone`* __boolean__ | Start a local standalone rendezvous backend that is represented by a C10d TCP store on port 29400. Useful when launching single-node, multi-worker job. If specified --rdzv_backend, --rdzv_endpoint, --rdzv_id are auto-assigned; any explicitly set values are ignored. -| *`nProcPerNode`* __integer__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. +| *`nProcPerNode`* __integer__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ | *`maxRestarts`* __integer__ | | *`metrics`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#metricspec-v2-autoscaling[$$MetricSpec$$] array__ | Metrics contains the specifications which are used to calculate the desired replica count (the maximum replica count across all metrics will be used). The desired replica count is calculated with multiplying the ratio between the target value and the current value by the current number of pods. Ergo, metrics used must decrease as the pod count is increased, and vice-versa. See the individual metric source types for more information about how each type of metric must respond. If not set, the HPA will not be created. |=== @@ -396,7 +396,7 @@ PyTorchJobSpec is a desired state description of the PyTorchJob. | *`runPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-runpolicy[$$RunPolicy$$]__ | RunPolicy encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active. | *`elasticPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-elasticpolicy[$$ElasticPolicy$$]__ | | *`pytorchReplicaSpecs`* __object (keys:xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-replicatype[$$ReplicaType$$], values:xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-replicaspec[$$ReplicaSpec$$])__ | A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example, { "Master": PyTorchReplicaSpec, "Worker": PyTorchReplicaSpec, } -| *`nprocPerNode`* __integer__ | Number of workers per node +| *`nprocPerNode`* __string__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. |=== diff --git a/hack/python-sdk/swagger.json b/hack/python-sdk/swagger.json index 4b2ea8cd8f..48b7db0146 100644 --- a/hack/python-sdk/swagger.json +++ b/hack/python-sdk/swagger.json @@ -33,7 +33,7 @@ "format": "int32" }, "nProcPerNode": { - "description": "Number of workers per node; supported values: [auto, cpu, gpu, int].", + "description": "Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+", "type": "integer", "format": "int32" }, @@ -492,9 +492,8 @@ "$ref": "#/definitions/kubeflow.org.v1.ElasticPolicy" }, "nprocPerNode": { - "description": "Number of workers per node", - "type": "integer", - "format": "int32" + "description": "Number of workers per node; supported values: [auto, cpu, gpu, int].", + "type": "string" }, "pytorchReplicaSpecs": { "description": "A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example,\n {\n \"Master\": PyTorchReplicaSpec,\n \"Worker\": PyTorchReplicaSpec,\n }", diff --git a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml index 7c8c0ba2cd..34510114b3 100644 --- a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml +++ b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml @@ -554,7 +554,7 @@ spec: type: integer nProcPerNode: description: 'Number of workers per node; supported values: [auto, - cpu, gpu, int].' + cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+' format: int32 type: integer rdzvBackend: @@ -586,9 +586,9 @@ spec: type: boolean type: object nprocPerNode: - description: Number of workers per node - format: int32 - type: integer + description: 'Number of workers per node; supported values: [auto, + cpu, gpu, int].' + type: string pytorchReplicaSpecs: additionalProperties: description: ReplicaSpec is a description of the replica diff --git a/pkg/apis/kubeflow.org/v1/openapi_generated.go b/pkg/apis/kubeflow.org/v1/openapi_generated.go index 1eaa44e08a..b8a734f8fc 100644 --- a/pkg/apis/kubeflow.org/v1/openapi_generated.go +++ b/pkg/apis/kubeflow.org/v1/openapi_generated.go @@ -126,7 +126,7 @@ func schema_pkg_apis_kubefloworg_v1_ElasticPolicy(ref common.ReferenceCallback) }, "nProcPerNode": { SchemaProps: spec.SchemaProps{ - Description: "Number of workers per node; supported values: [auto, cpu, gpu, int].", + Description: "Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+", Type: []string{"integer"}, Format: "int32", }, @@ -911,9 +911,9 @@ func schema_pkg_apis_kubefloworg_v1_PyTorchJobSpec(ref common.ReferenceCallback) }, "nprocPerNode": { SchemaProps: spec.SchemaProps{ - Description: "Number of workers per node", - Type: []string{"integer"}, - Format: "int32", + Description: "Number of workers per node; supported values: [auto, cpu, gpu, int].", + Type: []string{"string"}, + Format: "", }, }, }, diff --git a/pkg/apis/kubeflow.org/v1/pytorch_types.go b/pkg/apis/kubeflow.org/v1/pytorch_types.go index 54cc02a66f..ba6bea6ea1 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_types.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_types.go @@ -84,8 +84,9 @@ type PyTorchJobSpec struct { // "Worker": PyTorchReplicaSpec, // } PyTorchReplicaSpecs map[ReplicaType]*ReplicaSpec `json:"pytorchReplicaSpecs"` - // Number of workers per node - NprocPerNode *int32 `json:"nprocPerNode,omitempty"` + + // Number of workers per node; supported values: [auto, cpu, gpu, int]. + NprocPerNode *string `json:"nprocPerNode,omitempty"` } type ElasticPolicy struct { @@ -109,6 +110,7 @@ type ElasticPolicy struct { // are ignored. Standalone *bool `json:"standalone,omitempty"` // Number of workers per node; supported values: [auto, cpu, gpu, int]. + // Deprecated: This API is deprecated in v1.7+ NProcPerNode *int32 `json:"nProcPerNode,omitempty"` MaxRestarts *int32 `json:"maxRestarts,omitempty"` diff --git a/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go b/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go index c53468d9ef..106b79a47f 100644 --- a/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go @@ -587,7 +587,7 @@ func (in *PyTorchJobSpec) DeepCopyInto(out *PyTorchJobSpec) { } if in.NprocPerNode != nil { in, out := &in.NprocPerNode, &out.NprocPerNode - *out = new(int32) + *out = new(string) **out = **in } } diff --git a/pkg/controller.v1/paddlepaddle/envvar.go b/pkg/controller.v1/paddlepaddle/envvar.go index 853e1da8f7..53e5b1d44c 100644 --- a/pkg/controller.v1/paddlepaddle/envvar.go +++ b/pkg/controller.v1/paddlepaddle/envvar.go @@ -60,7 +60,7 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, // Ref https://stackoverflow.com/questions/59812009/what-is-the-use-of-pythonunbuffered-in-docker-file. podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ Name: "PYTHONUNBUFFERED", - Value: "0", + Value: "1", }) podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ diff --git a/pkg/controller.v1/pytorch/envvar.go b/pkg/controller.v1/pytorch/envvar.go index 6a82673b4b..fd5034a668 100644 --- a/pkg/controller.v1/pytorch/envvar.go +++ b/pkg/controller.v1/pytorch/envvar.go @@ -57,12 +57,12 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, podTemplateSpec.Spec.Containers[i].Env = append( podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ Name: "PYTHONUNBUFFERED", - Value: "0", + Value: "1", }) totalReplicas := getTotalReplicas(pytorchjob) - nprocPerNode := getNprocPerNode(pytorchjob) - worldSize := totalReplicas * nprocPerNode + nprocPerNode := getNprocPerNodeInt(pytorchjob) + worldSize := int(totalReplicas) * nprocPerNode // If the master is not null, then we need to set the MASTER_ADDR and RANK. if pytorchjob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster] != nil { @@ -85,7 +85,7 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ Name: "WORLD_SIZE", - Value: strconv.Itoa(int(worldSize)), + Value: strconv.Itoa(worldSize), }) podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ Name: "RANK", @@ -93,7 +93,7 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, }) podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ Name: EnvNprocPerNode, - Value: strconv.Itoa(int(nprocPerNode)), + Value: getNprocPerNodeEnv(pytorchjob), }) podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ Name: EnvNodeRank, @@ -124,12 +124,22 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, return nil } -func getNprocPerNode(job *kubeflowv1.PyTorchJob) int32 { - if job.Spec.NprocPerNode != nil { - return *job.Spec.NprocPerNode - } else { +func getNprocPerNodeInt(job *kubeflowv1.PyTorchJob) int { + if job.Spec.NprocPerNode == nil { return 1 } + if np, err := strconv.Atoi(*job.Spec.NprocPerNode); err == nil { + return np + } + return 1 +} + +func getNprocPerNodeEnv(job *kubeflowv1.PyTorchJob) string { + if job.Spec.NprocPerNode == nil { + return "auto" + } else { + return *job.Spec.NprocPerNode + } } func getTotalReplicas(job *kubeflowv1.PyTorchJob) int32 { diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go index cb6c891d37..c40d0f0f12 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go @@ -37,6 +37,7 @@ var _ = Describe("PyTorchJob controller", func() { interval = time.Millisecond * 250 expectedPort = int32(8080) ) + var nprocPerNode = "auto" Context("When creating the PyTorchJob", func() { It("Should get the corresponding resources successfully", func() { @@ -89,6 +90,7 @@ var _ = Describe("PyTorchJob controller", func() { }, }, } + job.Spec.NprocPerNode = &nprocPerNode Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) @@ -119,13 +121,16 @@ var _ = Describe("PyTorchJob controller", func() { Name: kubeflowv1.PytorchJobDefaultPortName, ContainerPort: expectedPort, Protocol: corev1.ProtocolTCP})) - // Check MASTER_PORT and MASTER_ADDR env variable + // Check env variable Expect(masterPod.Spec.Containers[0].Env).To(ContainElements(corev1.EnvVar{ Name: EnvMasterPort, Value: fmt.Sprintf("%d", masterSvc.Spec.Ports[0].Port), }, corev1.EnvVar{ Name: EnvMasterAddr, Value: masterSvc.Name, + }, corev1.EnvVar{ + Name: EnvNprocPerNode, + Value: nprocPerNode, })) // Check service port. Expect(masterSvc.Spec.Ports[0].Port).To(Equal(expectedPort)) diff --git a/pkg/controller.v1/xgboost/xgboost.go b/pkg/controller.v1/xgboost/xgboost.go index 8c2a94e13b..4b002f4784 100644 --- a/pkg/controller.v1/xgboost/xgboost.go +++ b/pkg/controller.v1/xgboost/xgboost.go @@ -93,7 +93,7 @@ func SetPodEnv(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, inde }) podTemplate.Spec.Containers[i].Env = append(podTemplate.Spec.Containers[i].Env, corev1.EnvVar{ Name: "PYTHONUNBUFFERED", - Value: "0", + Value: "1", }) // This variables are used if it is a LightGBM job if totalReplicas > 1 { diff --git a/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md b/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md index 3a7a0589ee..60dd6cd7a0 100644 --- a/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md +++ b/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md @@ -7,7 +7,7 @@ Name | Type | Description | Notes **max_restarts** | **int** | | [optional] **metrics** | [**list[K8sIoApiAutoscalingV2MetricSpec]**](K8sIoApiAutoscalingV2MetricSpec.md) | Metrics contains the specifications which are used to calculate the desired replica count (the maximum replica count across all metrics will be used). The desired replica count is calculated with multiplying the ratio between the target value and the current value by the current number of pods. Ergo, metrics used must decrease as the pod count is increased, and vice-versa. See the individual metric source types for more information about how each type of metric must respond. If not set, the HPA will not be created. | [optional] **min_replicas** | **int** | minReplicas is the lower limit for the number of replicas to which the training job can scale down. It defaults to null. | [optional] -**n_proc_per_node** | **int** | Number of workers per node; supported values: [auto, cpu, gpu, int]. | [optional] +**n_proc_per_node** | **int** | Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ | [optional] **rdzv_backend** | **str** | | [optional] **rdzv_conf** | [**list[KubeflowOrgV1RDZVConf]**](KubeflowOrgV1RDZVConf.md) | RDZVConf contains additional rendezvous configuration (<key1>=<value1>,<key2>=<value2>,...). | [optional] **rdzv_host** | **str** | | [optional] diff --git a/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md b/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md index e4693fa9b7..6fc56de25b 100644 --- a/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md +++ b/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md @@ -5,7 +5,7 @@ PyTorchJobSpec is a desired state description of the PyTorchJob. Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **elastic_policy** | [**KubeflowOrgV1ElasticPolicy**](KubeflowOrgV1ElasticPolicy.md) | | [optional] -**nproc_per_node** | **int** | Number of workers per node | [optional] +**nproc_per_node** | **str** | Number of workers per node; supported values: [auto, cpu, gpu, int]. | [optional] **pytorch_replica_specs** | [**dict(str, KubeflowOrgV1ReplicaSpec)**](KubeflowOrgV1ReplicaSpec.md) | A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example, { \"Master\": PyTorchReplicaSpec, \"Worker\": PyTorchReplicaSpec, } | **run_policy** | [**KubeflowOrgV1RunPolicy**](KubeflowOrgV1RunPolicy.md) | | diff --git a/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py b/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py index c6e3d9f6bd..e5eab75685 100644 --- a/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py +++ b/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py @@ -196,7 +196,7 @@ def min_replicas(self, min_replicas): def n_proc_per_node(self): """Gets the n_proc_per_node of this KubeflowOrgV1ElasticPolicy. # noqa: E501 - Number of workers per node; supported values: [auto, cpu, gpu, int]. # noqa: E501 + Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ # noqa: E501 :return: The n_proc_per_node of this KubeflowOrgV1ElasticPolicy. # noqa: E501 :rtype: int @@ -207,7 +207,7 @@ def n_proc_per_node(self): def n_proc_per_node(self, n_proc_per_node): """Sets the n_proc_per_node of this KubeflowOrgV1ElasticPolicy. - Number of workers per node; supported values: [auto, cpu, gpu, int]. # noqa: E501 + Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ # noqa: E501 :param n_proc_per_node: The n_proc_per_node of this KubeflowOrgV1ElasticPolicy. # noqa: E501 :type: int diff --git a/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py b/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py index c828fe7e0e..adb6f9b287 100644 --- a/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py +++ b/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py @@ -34,7 +34,7 @@ class KubeflowOrgV1PyTorchJobSpec(object): """ openapi_types = { 'elastic_policy': 'KubeflowOrgV1ElasticPolicy', - 'nproc_per_node': 'int', + 'nproc_per_node': 'str', 'pytorch_replica_specs': 'dict(str, KubeflowOrgV1ReplicaSpec)', 'run_policy': 'KubeflowOrgV1RunPolicy' } @@ -90,10 +90,10 @@ def elastic_policy(self, elastic_policy): def nproc_per_node(self): """Gets the nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 - Number of workers per node # noqa: E501 + Number of workers per node; supported values: [auto, cpu, gpu, int]. # noqa: E501 :return: The nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 - :rtype: int + :rtype: str """ return self._nproc_per_node @@ -101,10 +101,10 @@ def nproc_per_node(self): def nproc_per_node(self, nproc_per_node): """Sets the nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. - Number of workers per node # noqa: E501 + Number of workers per node; supported values: [auto, cpu, gpu, int]. # noqa: E501 :param nproc_per_node: The nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 - :type: int + :type: str """ self._nproc_per_node = nproc_per_node diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py index 7c361dbd62..f31bdec8cf 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py @@ -58,7 +58,7 @@ def make_instance(self, include_optional): rdzv_id = '0', rdzv_port = 56, standalone = True, ), - nproc_per_node = 56, + nproc_per_node = '0', pytorch_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( replicas = 56, diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py index 11973de4dc..ab041186dc 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py @@ -61,7 +61,7 @@ def make_instance(self, include_optional): rdzv_id = '0', rdzv_port = 56, standalone = True, ), - nproc_per_node = 56, + nproc_per_node = '0', pytorch_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( replicas = 56, @@ -132,7 +132,7 @@ def make_instance(self, include_optional): rdzv_id = '0', rdzv_port = 56, standalone = True, ), - nproc_per_node = 56, + nproc_per_node = '0', pytorch_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( replicas = 56, diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py index 0e11a22e64..366188de7b 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py @@ -54,7 +54,7 @@ def make_instance(self, include_optional): rdzv_id = '0', rdzv_port = 56, standalone = True, ), - nproc_per_node = 56, + nproc_per_node = '0', pytorch_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( replicas = 56, From aae9541a6b91a39795394b16d564dd32666b0211 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Thu, 29 Jun 2023 16:44:56 +0800 Subject: [PATCH 5/9] add defaults in api --- docs/api/kubeflow.org_v1_generated.asciidoc | 4 ++-- hack/python-sdk/swagger.json | 4 ++-- manifests/base/crds/kubeflow.org_pytorchjobs.yaml | 5 +++-- pkg/apis/kubeflow.org/v1/openapi_generated.go | 4 ++-- pkg/apis/kubeflow.org/v1/pytorch_types.go | 2 ++ sdk/python/docs/KubeflowOrgV1ElasticPolicy.md | 2 +- sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md | 2 +- .../training/models/kubeflow_org_v1_elastic_policy.py | 4 ++-- .../training/models/kubeflow_org_v1_py_torch_job_spec.py | 4 ++-- 9 files changed, 17 insertions(+), 14 deletions(-) diff --git a/docs/api/kubeflow.org_v1_generated.asciidoc b/docs/api/kubeflow.org_v1_generated.asciidoc index 5c4d0f68aa..ce731ff67c 100644 --- a/docs/api/kubeflow.org_v1_generated.asciidoc +++ b/docs/api/kubeflow.org_v1_generated.asciidoc @@ -53,7 +53,7 @@ Package v1 contains API Schema definitions for the kubeflow.org v1 API group | *`rdzvId`* __string__ | | *`rdzvConf`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-rdzvconf[$$RDZVConf$$] array__ | RDZVConf contains additional rendezvous configuration (=,=,...). | *`standalone`* __boolean__ | Start a local standalone rendezvous backend that is represented by a C10d TCP store on port 29400. Useful when launching single-node, multi-worker job. If specified --rdzv_backend, --rdzv_endpoint, --rdzv_id are auto-assigned; any explicitly set values are ignored. -| *`nProcPerNode`* __integer__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ +| *`nProcPerNode`* __integer__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead. | *`maxRestarts`* __integer__ | | *`metrics`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#metricspec-v2-autoscaling[$$MetricSpec$$] array__ | Metrics contains the specifications which are used to calculate the desired replica count (the maximum replica count across all metrics will be used). The desired replica count is calculated with multiplying the ratio between the target value and the current value by the current number of pods. Ergo, metrics used must decrease as the pod count is increased, and vice-versa. See the individual metric source types for more information about how each type of metric must respond. If not set, the HPA will not be created. |=== @@ -396,7 +396,7 @@ PyTorchJobSpec is a desired state description of the PyTorchJob. | *`runPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-runpolicy[$$RunPolicy$$]__ | RunPolicy encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active. | *`elasticPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-elasticpolicy[$$ElasticPolicy$$]__ | | *`pytorchReplicaSpecs`* __object (keys:xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-replicatype[$$ReplicaType$$], values:xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-replicaspec[$$ReplicaSpec$$])__ | A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example, { "Master": PyTorchReplicaSpec, "Worker": PyTorchReplicaSpec, } -| *`nprocPerNode`* __string__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. +| *`nprocPerNode`* __string__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. Defaults to auto. |=== diff --git a/hack/python-sdk/swagger.json b/hack/python-sdk/swagger.json index 48b7db0146..616a7ce2b2 100644 --- a/hack/python-sdk/swagger.json +++ b/hack/python-sdk/swagger.json @@ -33,7 +33,7 @@ "format": "int32" }, "nProcPerNode": { - "description": "Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+", + "description": "Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead.", "type": "integer", "format": "int32" }, @@ -492,7 +492,7 @@ "$ref": "#/definitions/kubeflow.org.v1.ElasticPolicy" }, "nprocPerNode": { - "description": "Number of workers per node; supported values: [auto, cpu, gpu, int].", + "description": "Number of workers per node; supported values: [auto, cpu, gpu, int]. Defaults to auto.", "type": "string" }, "pytorchReplicaSpecs": { diff --git a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml index 34510114b3..a01ec9eebe 100644 --- a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml +++ b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml @@ -554,7 +554,8 @@ spec: type: integer nProcPerNode: description: 'Number of workers per node; supported values: [auto, - cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+' + cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ + Use .spec.nprocPerNode instead.' format: int32 type: integer rdzvBackend: @@ -587,7 +588,7 @@ spec: type: object nprocPerNode: description: 'Number of workers per node; supported values: [auto, - cpu, gpu, int].' + cpu, gpu, int]. Defaults to auto.' type: string pytorchReplicaSpecs: additionalProperties: diff --git a/pkg/apis/kubeflow.org/v1/openapi_generated.go b/pkg/apis/kubeflow.org/v1/openapi_generated.go index b8a734f8fc..a12202e990 100644 --- a/pkg/apis/kubeflow.org/v1/openapi_generated.go +++ b/pkg/apis/kubeflow.org/v1/openapi_generated.go @@ -126,7 +126,7 @@ func schema_pkg_apis_kubefloworg_v1_ElasticPolicy(ref common.ReferenceCallback) }, "nProcPerNode": { SchemaProps: spec.SchemaProps{ - Description: "Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+", + Description: "Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead.", Type: []string{"integer"}, Format: "int32", }, @@ -911,7 +911,7 @@ func schema_pkg_apis_kubefloworg_v1_PyTorchJobSpec(ref common.ReferenceCallback) }, "nprocPerNode": { SchemaProps: spec.SchemaProps{ - Description: "Number of workers per node; supported values: [auto, cpu, gpu, int].", + Description: "Number of workers per node; supported values: [auto, cpu, gpu, int]. Defaults to auto.", Type: []string{"string"}, Format: "", }, diff --git a/pkg/apis/kubeflow.org/v1/pytorch_types.go b/pkg/apis/kubeflow.org/v1/pytorch_types.go index ba6bea6ea1..46aa48ee2e 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_types.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_types.go @@ -86,6 +86,7 @@ type PyTorchJobSpec struct { PyTorchReplicaSpecs map[ReplicaType]*ReplicaSpec `json:"pytorchReplicaSpecs"` // Number of workers per node; supported values: [auto, cpu, gpu, int]. + // Defaults to auto. NprocPerNode *string `json:"nprocPerNode,omitempty"` } @@ -111,6 +112,7 @@ type ElasticPolicy struct { Standalone *bool `json:"standalone,omitempty"` // Number of workers per node; supported values: [auto, cpu, gpu, int]. // Deprecated: This API is deprecated in v1.7+ + // Use .spec.nprocPerNode instead. NProcPerNode *int32 `json:"nProcPerNode,omitempty"` MaxRestarts *int32 `json:"maxRestarts,omitempty"` diff --git a/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md b/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md index 60dd6cd7a0..c39927a013 100644 --- a/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md +++ b/sdk/python/docs/KubeflowOrgV1ElasticPolicy.md @@ -7,7 +7,7 @@ Name | Type | Description | Notes **max_restarts** | **int** | | [optional] **metrics** | [**list[K8sIoApiAutoscalingV2MetricSpec]**](K8sIoApiAutoscalingV2MetricSpec.md) | Metrics contains the specifications which are used to calculate the desired replica count (the maximum replica count across all metrics will be used). The desired replica count is calculated with multiplying the ratio between the target value and the current value by the current number of pods. Ergo, metrics used must decrease as the pod count is increased, and vice-versa. See the individual metric source types for more information about how each type of metric must respond. If not set, the HPA will not be created. | [optional] **min_replicas** | **int** | minReplicas is the lower limit for the number of replicas to which the training job can scale down. It defaults to null. | [optional] -**n_proc_per_node** | **int** | Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ | [optional] +**n_proc_per_node** | **int** | Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead. | [optional] **rdzv_backend** | **str** | | [optional] **rdzv_conf** | [**list[KubeflowOrgV1RDZVConf]**](KubeflowOrgV1RDZVConf.md) | RDZVConf contains additional rendezvous configuration (<key1>=<value1>,<key2>=<value2>,...). | [optional] **rdzv_host** | **str** | | [optional] diff --git a/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md b/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md index 6fc56de25b..4fc56df07e 100644 --- a/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md +++ b/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md @@ -5,7 +5,7 @@ PyTorchJobSpec is a desired state description of the PyTorchJob. Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **elastic_policy** | [**KubeflowOrgV1ElasticPolicy**](KubeflowOrgV1ElasticPolicy.md) | | [optional] -**nproc_per_node** | **str** | Number of workers per node; supported values: [auto, cpu, gpu, int]. | [optional] +**nproc_per_node** | **str** | Number of workers per node; supported values: [auto, cpu, gpu, int]. Defaults to auto. | [optional] **pytorch_replica_specs** | [**dict(str, KubeflowOrgV1ReplicaSpec)**](KubeflowOrgV1ReplicaSpec.md) | A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example, { \"Master\": PyTorchReplicaSpec, \"Worker\": PyTorchReplicaSpec, } | **run_policy** | [**KubeflowOrgV1RunPolicy**](KubeflowOrgV1RunPolicy.md) | | diff --git a/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py b/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py index e5eab75685..311bd6d288 100644 --- a/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py +++ b/sdk/python/kubeflow/training/models/kubeflow_org_v1_elastic_policy.py @@ -196,7 +196,7 @@ def min_replicas(self, min_replicas): def n_proc_per_node(self): """Gets the n_proc_per_node of this KubeflowOrgV1ElasticPolicy. # noqa: E501 - Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ # noqa: E501 + Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead. # noqa: E501 :return: The n_proc_per_node of this KubeflowOrgV1ElasticPolicy. # noqa: E501 :rtype: int @@ -207,7 +207,7 @@ def n_proc_per_node(self): def n_proc_per_node(self, n_proc_per_node): """Sets the n_proc_per_node of this KubeflowOrgV1ElasticPolicy. - Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ # noqa: E501 + Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead. # noqa: E501 :param n_proc_per_node: The n_proc_per_node of this KubeflowOrgV1ElasticPolicy. # noqa: E501 :type: int diff --git a/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py b/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py index adb6f9b287..6918e24b54 100644 --- a/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py +++ b/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py @@ -90,7 +90,7 @@ def elastic_policy(self, elastic_policy): def nproc_per_node(self): """Gets the nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 - Number of workers per node; supported values: [auto, cpu, gpu, int]. # noqa: E501 + Number of workers per node; supported values: [auto, cpu, gpu, int]. Defaults to auto. # noqa: E501 :return: The nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 :rtype: str @@ -101,7 +101,7 @@ def nproc_per_node(self): def nproc_per_node(self, nproc_per_node): """Sets the nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. - Number of workers per node; supported values: [auto, cpu, gpu, int]. # noqa: E501 + Number of workers per node; supported values: [auto, cpu, gpu, int]. Defaults to auto. # noqa: E501 :param nproc_per_node: The nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 :type: str From b50fd83f85429b0c735939b0be6502122da3df4c Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Fri, 30 Jun 2023 23:19:19 +0800 Subject: [PATCH 6/9] add validation for two nproc_per_node, use auto for defaulter --- pkg/apis/kubeflow.org/v1/pytorch_defaults.go | 5 +++++ pkg/apis/kubeflow.org/v1/pytorch_types.go | 4 ++++ pkg/apis/kubeflow.org/v1/pytorch_validation.go | 10 ++++++++++ pkg/controller.v1/pytorch/envvar.go | 10 +--------- .../pytorch/pytorchjob_controller_test.go | 2 +- 5 files changed, 21 insertions(+), 10 deletions(-) diff --git a/pkg/apis/kubeflow.org/v1/pytorch_defaults.go b/pkg/apis/kubeflow.org/v1/pytorch_defaults.go index a05e60ed54..50719440f1 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_defaults.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_defaults.go @@ -79,4 +79,9 @@ func SetDefaults_PyTorchJob(job *PyTorchJob) { } // Set default elastic policy. setElasticPolicy(job) + + if job.Spec.NprocPerNode == nil { + defaultNprocPerNode := "auto" + job.Spec.NprocPerNode = &defaultNprocPerNode + } } diff --git a/pkg/apis/kubeflow.org/v1/pytorch_types.go b/pkg/apis/kubeflow.org/v1/pytorch_types.go index 46aa48ee2e..3629b2a9e8 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_types.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_types.go @@ -67,6 +67,10 @@ type PyTorchJob struct { Status JobStatus `json:"status,omitempty"` } +// For PyTorch launch/run related spec declaration, please see the following doc for more detail: +// https://pytorch.org/docs/stable/elastic/run.html +// Or run command `torchrun --help` for a brief description. + // PyTorchJobSpec is a desired state description of the PyTorchJob. type PyTorchJobSpec struct { // RunPolicy encapsulates various runtime policies of the distributed training diff --git a/pkg/apis/kubeflow.org/v1/pytorch_validation.go b/pkg/apis/kubeflow.org/v1/pytorch_validation.go index 4a15293d62..19932d6834 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_validation.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_validation.go @@ -27,6 +27,16 @@ func ValidateV1PyTorchJob(pytorchJob *PyTorchJob) error { if err := validatePyTorchReplicaSpecs(pytorchJob.Spec.PyTorchReplicaSpecs); err != nil { return err } + if err := validateNprocPerNode(pytorchJob); err != nil { + return err + } + return nil +} + +func validateNprocPerNode(pytorchJob *PyTorchJob) error { + if pytorchJob.Spec.NprocPerNode != nil && pytorchJob.Spec.ElasticPolicy != nil && pytorchJob.Spec.ElasticPolicy.NProcPerNode != nil { + return fmt.Errorf(".spec.elasticPolicy.nProcPerNode is deprecated, use .spec.nprocPerNode instead") + } return nil } diff --git a/pkg/controller.v1/pytorch/envvar.go b/pkg/controller.v1/pytorch/envvar.go index fd5034a668..7f6c24df25 100644 --- a/pkg/controller.v1/pytorch/envvar.go +++ b/pkg/controller.v1/pytorch/envvar.go @@ -93,7 +93,7 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, }) podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ Name: EnvNprocPerNode, - Value: getNprocPerNodeEnv(pytorchjob), + Value: *pytorchjob.Spec.NprocPerNode, }) podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ Name: EnvNodeRank, @@ -134,14 +134,6 @@ func getNprocPerNodeInt(job *kubeflowv1.PyTorchJob) int { return 1 } -func getNprocPerNodeEnv(job *kubeflowv1.PyTorchJob) string { - if job.Spec.NprocPerNode == nil { - return "auto" - } else { - return *job.Spec.NprocPerNode - } -} - func getTotalReplicas(job *kubeflowv1.PyTorchJob) int32 { jobReplicas := int32(0) for _, r := range job.Spec.PyTorchReplicaSpecs { diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go index c40d0f0f12..32d111c1dc 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go @@ -90,7 +90,7 @@ var _ = Describe("PyTorchJob controller", func() { }, }, } - job.Spec.NprocPerNode = &nprocPerNode + job.Spec.NprocPerNode = nil Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) From 741a4de0bc61e5a99a24e7142875f453e218d938 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Mon, 10 Jul 2023 21:48:07 +0800 Subject: [PATCH 7/9] add ut for defaults and validation --- pkg/apis/kubeflow.org/v1/pytorch_defaults.go | 18 +++++++++--- .../kubeflow.org/v1/pytorch_defaults_test.go | 22 ++++++++++++++ .../v1/pytorch_validation_test.go | 29 +++++++++++++++++++ pkg/controller.v1/pytorch/elastic.go | 12 ++++---- 4 files changed, 71 insertions(+), 10 deletions(-) diff --git a/pkg/apis/kubeflow.org/v1/pytorch_defaults.go b/pkg/apis/kubeflow.org/v1/pytorch_defaults.go index 50719440f1..2dc1a28ba1 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_defaults.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_defaults.go @@ -19,6 +19,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) +var ( + defaultNprocPerNode = "auto" +) + func addPytorchDefaultingFuncs(scheme *runtime.Scheme) error { return RegisterDefaults(scheme) } @@ -61,6 +65,14 @@ func setPytorchTypeNamesToCamelCase(pytorchJob *PyTorchJob) { } } +func setDefaultNprocPerNode(job *PyTorchJob) { + if (job.Spec.ElasticPolicy != nil && job.Spec.ElasticPolicy.NProcPerNode == nil) || (job.Spec.ElasticPolicy == nil) { + if job.Spec.NprocPerNode == nil { + job.Spec.NprocPerNode = &defaultNprocPerNode + } + } +} + // SetDefaults_PyTorchJob sets any unspecified values to defaults. func SetDefaults_PyTorchJob(job *PyTorchJob) { // Set default cleanpod policy to None. @@ -80,8 +92,6 @@ func SetDefaults_PyTorchJob(job *PyTorchJob) { // Set default elastic policy. setElasticPolicy(job) - if job.Spec.NprocPerNode == nil { - defaultNprocPerNode := "auto" - job.Spec.NprocPerNode = &defaultNprocPerNode - } + // Set default nproc_per_node. + setDefaultNprocPerNode(job) } diff --git a/pkg/apis/kubeflow.org/v1/pytorch_defaults_test.go b/pkg/apis/kubeflow.org/v1/pytorch_defaults_test.go index 4a9ef9f895..3867c53510 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_defaults_test.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_defaults_test.go @@ -152,3 +152,25 @@ func TestSetElasticPolicy(t *testing.T) { }) } } + +func TestSetDefaultNprocPerNode(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + t.Run("test default nproc per node", func(t *testing.T) { + job := &PyTorchJob{ + Spec: PyTorchJobSpec{ + ElasticPolicy: &ElasticPolicy{ + NProcPerNode: nil, + }, + PyTorchReplicaSpecs: map[ReplicaType]*ReplicaSpec{ + PyTorchJobReplicaTypeWorker: { + Replicas: pointer.Int32(1), + }, + }, + }, + } + + setDefaultNprocPerNode(job) + gomega.Expect(job.Spec.NprocPerNode). + To(gomega.Equal(&defaultNprocPerNode)) + }) +} diff --git a/pkg/apis/kubeflow.org/v1/pytorch_validation_test.go b/pkg/apis/kubeflow.org/v1/pytorch_validation_test.go index 98eb71a761..0b46da3742 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_validation_test.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_validation_test.go @@ -180,6 +180,35 @@ func TestValidateV1PyTorchJob(t *testing.T) { }, wantErr: true, }, + "Spec.NprocPerNode and Spec.ElasticPolicy.NProcPerNode are set": { + pytorchJob: &PyTorchJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: PyTorchJobSpec{ + NprocPerNode: pointer.String("1"), + ElasticPolicy: &ElasticPolicy{ + NProcPerNode: pointer.Int32(1), + }, + PyTorchReplicaSpecs: map[ReplicaType]*ReplicaSpec{ + PyTorchJobReplicaTypeMaster: { + Replicas: pointer.Int32(2), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "pytorch", + Image: "gcr.io/kubeflow-ci/pytorch-dist-mnist_test:1.0", + }, + }, + }, + }, + }, + }, + }, + }, + wantErr: true, + }, } for name, tc := range testCases { diff --git a/pkg/controller.v1/pytorch/elastic.go b/pkg/controller.v1/pytorch/elastic.go index e3b031a8eb..f1c9be94ae 100644 --- a/pkg/controller.v1/pytorch/elastic.go +++ b/pkg/controller.v1/pytorch/elastic.go @@ -99,18 +99,18 @@ func (e ElasticEnvVarGenerator) Generate( Value: strconv.Itoa(int(*elasticPolicy.MaxRestarts)), }) } - if elasticPolicy.RDZVID != nil { - envVars = append(envVars, corev1.EnvVar{ - Name: EnvRDZVID, - Value: *elasticPolicy.RDZVID, - }) - } if elasticPolicy.NProcPerNode != nil { envVars = append(envVars, corev1.EnvVar{ Name: EnvNProcPerNode, Value: strconv.Itoa(int(*elasticPolicy.NProcPerNode)), }) } + if elasticPolicy.RDZVID != nil { + envVars = append(envVars, corev1.EnvVar{ + Name: EnvRDZVID, + Value: *elasticPolicy.RDZVID, + }) + } if envVar := e.generateEnvRDZVConf(elasticPolicy); envVar != nil { envVars = append(envVars, *envVar) } From 2f1b5df78a2882b517999bfc437791f8fcaed4b7 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Tue, 11 Jul 2023 17:39:46 +0800 Subject: [PATCH 8/9] fix ut --- pkg/apis/kubeflow.org/v1/pytorch_defaults.go | 4 ++-- .../kubeflow.org/v1/pytorch_defaults_test.go | 18 +++++++++++++++++- pkg/controller.v1/paddlepaddle/envvar.go | 2 +- pkg/controller.v1/pytorch/envvar.go | 2 +- .../pytorch/pytorchjob_controller_test.go | 3 +-- pkg/controller.v1/xgboost/xgboost.go | 2 +- 6 files changed, 23 insertions(+), 8 deletions(-) diff --git a/pkg/apis/kubeflow.org/v1/pytorch_defaults.go b/pkg/apis/kubeflow.org/v1/pytorch_defaults.go index 2dc1a28ba1..1483158682 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_defaults.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_defaults.go @@ -20,7 +20,7 @@ import ( ) var ( - defaultNprocPerNode = "auto" + DefaultNprocPerNode = "auto" ) func addPytorchDefaultingFuncs(scheme *runtime.Scheme) error { @@ -68,7 +68,7 @@ func setPytorchTypeNamesToCamelCase(pytorchJob *PyTorchJob) { func setDefaultNprocPerNode(job *PyTorchJob) { if (job.Spec.ElasticPolicy != nil && job.Spec.ElasticPolicy.NProcPerNode == nil) || (job.Spec.ElasticPolicy == nil) { if job.Spec.NprocPerNode == nil { - job.Spec.NprocPerNode = &defaultNprocPerNode + job.Spec.NprocPerNode = &DefaultNprocPerNode } } } diff --git a/pkg/apis/kubeflow.org/v1/pytorch_defaults_test.go b/pkg/apis/kubeflow.org/v1/pytorch_defaults_test.go index 3867c53510..a489b6eef5 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_defaults_test.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_defaults_test.go @@ -171,6 +171,22 @@ func TestSetDefaultNprocPerNode(t *testing.T) { setDefaultNprocPerNode(job) gomega.Expect(job.Spec.NprocPerNode). - To(gomega.Equal(&defaultNprocPerNode)) + To(gomega.Equal(&DefaultNprocPerNode)) + }) + t.Run("test default nproc per node", func(t *testing.T) { + job := &PyTorchJob{ + Spec: PyTorchJobSpec{ + ElasticPolicy: nil, + PyTorchReplicaSpecs: map[ReplicaType]*ReplicaSpec{ + PyTorchJobReplicaTypeWorker: { + Replicas: pointer.Int32(1), + }, + }, + }, + } + + setDefaultNprocPerNode(job) + gomega.Expect(job.Spec.NprocPerNode). + To(gomega.Equal(&DefaultNprocPerNode)) }) } diff --git a/pkg/controller.v1/paddlepaddle/envvar.go b/pkg/controller.v1/paddlepaddle/envvar.go index 53e5b1d44c..853e1da8f7 100644 --- a/pkg/controller.v1/paddlepaddle/envvar.go +++ b/pkg/controller.v1/paddlepaddle/envvar.go @@ -60,7 +60,7 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, // Ref https://stackoverflow.com/questions/59812009/what-is-the-use-of-pythonunbuffered-in-docker-file. podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ Name: "PYTHONUNBUFFERED", - Value: "1", + Value: "0", }) podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ diff --git a/pkg/controller.v1/pytorch/envvar.go b/pkg/controller.v1/pytorch/envvar.go index 7f6c24df25..fa06ce1c2c 100644 --- a/pkg/controller.v1/pytorch/envvar.go +++ b/pkg/controller.v1/pytorch/envvar.go @@ -57,7 +57,7 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, podTemplateSpec.Spec.Containers[i].Env = append( podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{ Name: "PYTHONUNBUFFERED", - Value: "1", + Value: "0", }) totalReplicas := getTotalReplicas(pytorchjob) diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go index 32d111c1dc..5315ccccc0 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go @@ -37,7 +37,6 @@ var _ = Describe("PyTorchJob controller", func() { interval = time.Millisecond * 250 expectedPort = int32(8080) ) - var nprocPerNode = "auto" Context("When creating the PyTorchJob", func() { It("Should get the corresponding resources successfully", func() { @@ -130,7 +129,7 @@ var _ = Describe("PyTorchJob controller", func() { Value: masterSvc.Name, }, corev1.EnvVar{ Name: EnvNprocPerNode, - Value: nprocPerNode, + Value: kubeflowv1.DefaultNprocPerNode, })) // Check service port. Expect(masterSvc.Spec.Ports[0].Port).To(Equal(expectedPort)) diff --git a/pkg/controller.v1/xgboost/xgboost.go b/pkg/controller.v1/xgboost/xgboost.go index 4b002f4784..8c2a94e13b 100644 --- a/pkg/controller.v1/xgboost/xgboost.go +++ b/pkg/controller.v1/xgboost/xgboost.go @@ -93,7 +93,7 @@ func SetPodEnv(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, inde }) podTemplate.Spec.Containers[i].Env = append(podTemplate.Spec.Containers[i].Env, corev1.EnvVar{ Name: "PYTHONUNBUFFERED", - Value: "1", + Value: "0", }) // This variables are used if it is a LightGBM job if totalReplicas > 1 { From a54264c650d4c06b138356bd4a6398b7cbb6c172 Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Tue, 11 Jul 2023 20:57:09 +0800 Subject: [PATCH 9/9] add doc for nproc_per_node --- docs/api/kubeflow.org_v1_generated.asciidoc | 2 +- hack/python-sdk/swagger.json | 2 +- manifests/base/crds/kubeflow.org_pytorchjobs.yaml | 3 ++- pkg/apis/kubeflow.org/v1/openapi_generated.go | 2 +- pkg/apis/kubeflow.org/v1/pytorch_types.go | 1 + pkg/controller.v1/pytorch/envvar.go | 3 +++ sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md | 2 +- .../training/models/kubeflow_org_v1_py_torch_job_spec.py | 4 ++-- 8 files changed, 12 insertions(+), 7 deletions(-) diff --git a/docs/api/kubeflow.org_v1_generated.asciidoc b/docs/api/kubeflow.org_v1_generated.asciidoc index ce731ff67c..0ad1208aa5 100644 --- a/docs/api/kubeflow.org_v1_generated.asciidoc +++ b/docs/api/kubeflow.org_v1_generated.asciidoc @@ -396,7 +396,7 @@ PyTorchJobSpec is a desired state description of the PyTorchJob. | *`runPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-runpolicy[$$RunPolicy$$]__ | RunPolicy encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active. | *`elasticPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-elasticpolicy[$$ElasticPolicy$$]__ | | *`pytorchReplicaSpecs`* __object (keys:xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-replicatype[$$ReplicaType$$], values:xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-replicaspec[$$ReplicaSpec$$])__ | A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example, { "Master": PyTorchReplicaSpec, "Worker": PyTorchReplicaSpec, } -| *`nprocPerNode`* __string__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. Defaults to auto. +| *`nprocPerNode`* __string__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto. |=== diff --git a/hack/python-sdk/swagger.json b/hack/python-sdk/swagger.json index 616a7ce2b2..ec48eddd03 100644 --- a/hack/python-sdk/swagger.json +++ b/hack/python-sdk/swagger.json @@ -492,7 +492,7 @@ "$ref": "#/definitions/kubeflow.org.v1.ElasticPolicy" }, "nprocPerNode": { - "description": "Number of workers per node; supported values: [auto, cpu, gpu, int]. Defaults to auto.", + "description": "Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto.", "type": "string" }, "pytorchReplicaSpecs": { diff --git a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml index a01ec9eebe..6855393617 100644 --- a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml +++ b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml @@ -588,7 +588,8 @@ spec: type: object nprocPerNode: description: 'Number of workers per node; supported values: [auto, - cpu, gpu, int]. Defaults to auto.' + cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. + Defaults to auto.' type: string pytorchReplicaSpecs: additionalProperties: diff --git a/pkg/apis/kubeflow.org/v1/openapi_generated.go b/pkg/apis/kubeflow.org/v1/openapi_generated.go index a12202e990..9944bf0fde 100644 --- a/pkg/apis/kubeflow.org/v1/openapi_generated.go +++ b/pkg/apis/kubeflow.org/v1/openapi_generated.go @@ -911,7 +911,7 @@ func schema_pkg_apis_kubefloworg_v1_PyTorchJobSpec(ref common.ReferenceCallback) }, "nprocPerNode": { SchemaProps: spec.SchemaProps{ - Description: "Number of workers per node; supported values: [auto, cpu, gpu, int]. Defaults to auto.", + Description: "Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto.", Type: []string{"string"}, Format: "", }, diff --git a/pkg/apis/kubeflow.org/v1/pytorch_types.go b/pkg/apis/kubeflow.org/v1/pytorch_types.go index 3629b2a9e8..8e5b4030b0 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_types.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_types.go @@ -90,6 +90,7 @@ type PyTorchJobSpec struct { PyTorchReplicaSpecs map[ReplicaType]*ReplicaSpec `json:"pytorchReplicaSpecs"` // Number of workers per node; supported values: [auto, cpu, gpu, int]. + // For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. // Defaults to auto. NprocPerNode *string `json:"nprocPerNode,omitempty"` } diff --git a/pkg/controller.v1/pytorch/envvar.go b/pkg/controller.v1/pytorch/envvar.go index fa06ce1c2c..8b57316ee9 100644 --- a/pkg/controller.v1/pytorch/envvar.go +++ b/pkg/controller.v1/pytorch/envvar.go @@ -124,6 +124,9 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, return nil } +// getNprocPerNodeInt return the int value of NprocPerNode, return 1 if not int +// When nproc_per_node set to auto, it means the number of process will be determinated +// in the user process phase, in this case, world size env will not be used. func getNprocPerNodeInt(job *kubeflowv1.PyTorchJob) int { if job.Spec.NprocPerNode == nil { return 1 diff --git a/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md b/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md index 4fc56df07e..6e24755a14 100644 --- a/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md +++ b/sdk/python/docs/KubeflowOrgV1PyTorchJobSpec.md @@ -5,7 +5,7 @@ PyTorchJobSpec is a desired state description of the PyTorchJob. Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **elastic_policy** | [**KubeflowOrgV1ElasticPolicy**](KubeflowOrgV1ElasticPolicy.md) | | [optional] -**nproc_per_node** | **str** | Number of workers per node; supported values: [auto, cpu, gpu, int]. Defaults to auto. | [optional] +**nproc_per_node** | **str** | Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto. | [optional] **pytorch_replica_specs** | [**dict(str, KubeflowOrgV1ReplicaSpec)**](KubeflowOrgV1ReplicaSpec.md) | A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example, { \"Master\": PyTorchReplicaSpec, \"Worker\": PyTorchReplicaSpec, } | **run_policy** | [**KubeflowOrgV1RunPolicy**](KubeflowOrgV1RunPolicy.md) | | diff --git a/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py b/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py index 6918e24b54..663c27376d 100644 --- a/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py +++ b/sdk/python/kubeflow/training/models/kubeflow_org_v1_py_torch_job_spec.py @@ -90,7 +90,7 @@ def elastic_policy(self, elastic_policy): def nproc_per_node(self): """Gets the nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 - Number of workers per node; supported values: [auto, cpu, gpu, int]. Defaults to auto. # noqa: E501 + Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto. # noqa: E501 :return: The nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 :rtype: str @@ -101,7 +101,7 @@ def nproc_per_node(self): def nproc_per_node(self, nproc_per_node): """Sets the nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. - Number of workers per node; supported values: [auto, cpu, gpu, int]. Defaults to auto. # noqa: E501 + Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto. # noqa: E501 :param nproc_per_node: The nproc_per_node of this KubeflowOrgV1PyTorchJobSpec. # noqa: E501 :type: str