Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set correct ENV for PytorchJob to support torchrun #1840

Merged
merged 9 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/api/kubeflow.org_v1_generated.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Package v1 contains API Schema definitions for the kubeflow.org v1 API group
| *`rdzvId`* __string__ |
| *`rdzvConf`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-rdzvconf[$$RDZVConf$$] array__ | RDZVConf contains additional rendezvous configuration (<key1>=<value1>,<key2>=<value2>,...).
| *`standalone`* __boolean__ | Start a local standalone rendezvous backend that is represented by a C10d TCP store on port 29400. Useful when launching single-node, multi-worker job. If specified --rdzv_backend, --rdzv_endpoint, --rdzv_id are auto-assigned; any explicitly set values are ignored.
| *`nProcPerNode`* __integer__ | Number of workers per node; supported values: [auto, cpu, gpu, int].
| *`nProcPerNode`* __integer__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead.
| *`maxRestarts`* __integer__ |
| *`metrics`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#metricspec-v2-autoscaling[$$MetricSpec$$] array__ | Metrics contains the specifications which are used to calculate the desired replica count (the maximum replica count across all metrics will be used). The desired replica count is calculated with multiplying the ratio between the target value and the current value by the current number of pods. Ergo, metrics used must decrease as the pod count is increased, and vice-versa. See the individual metric source types for more information about how each type of metric must respond. If not set, the HPA will not be created.
|===
Expand Down Expand Up @@ -396,6 +396,7 @@ PyTorchJobSpec is a desired state description of the PyTorchJob.
| *`runPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-runpolicy[$$RunPolicy$$]__ | RunPolicy encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active.
| *`elasticPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-elasticpolicy[$$ElasticPolicy$$]__ |
| *`pytorchReplicaSpecs`* __object (keys:xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-replicatype[$$ReplicaType$$], values:xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-replicaspec[$$ReplicaSpec$$])__ | A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example, { "Master": PyTorchReplicaSpec, "Worker": PyTorchReplicaSpec, }
| *`nprocPerNode`* __string__ | Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto.
|===


Expand Down
6 changes: 5 additions & 1 deletion hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"format": "int32"
},
"nProcPerNode": {
"description": "Number of workers per node; supported values: [auto, cpu, gpu, int].",
"description": "Number of workers per node; supported values: [auto, cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+ Use .spec.nprocPerNode instead.",
"type": "integer",
"format": "int32"
},
Expand Down Expand Up @@ -491,6 +491,10 @@
"elasticPolicy": {
"$ref": "#/definitions/kubeflow.org.v1.ElasticPolicy"
},
"nprocPerNode": {
"description": "Number of workers per node; supported values: [auto, cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658. Defaults to auto.",
"type": "string"
},
"pytorchReplicaSpecs": {
"description": "A map of PyTorchReplicaType (type) to ReplicaSpec (value). Specifies the PyTorch cluster configuration. For example,\n {\n \"Master\": PyTorchReplicaSpec,\n \"Worker\": PyTorchReplicaSpec,\n }",
"type": "object",
Expand Down
8 changes: 7 additions & 1 deletion manifests/base/crds/kubeflow.org_pytorchjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,8 @@ spec:
type: integer
nProcPerNode:
description: 'Number of workers per node; supported values: [auto,
cpu, gpu, int].'
cpu, gpu, int]. Deprecated: This API is deprecated in v1.7+
Use .spec.nprocPerNode instead.'
format: int32
type: integer
rdzvBackend:
Expand Down Expand Up @@ -585,6 +586,11 @@ spec:
set values are ignored.
type: boolean
type: object
nprocPerNode:
description: 'Number of workers per node; supported values: [auto,
cpu, gpu, int]. For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658.
Defaults to auto.'
type: string
pytorchReplicaSpecs:
additionalProperties:
description: ReplicaSpec is a description of the replica
Expand Down
9 changes: 8 additions & 1 deletion pkg/apis/kubeflow.org/v1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions pkg/apis/kubeflow.org/v1/pytorch_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"k8s.io/apimachinery/pkg/runtime"
)

var (
DefaultNprocPerNode = "auto"
)

func addPytorchDefaultingFuncs(scheme *runtime.Scheme) error {
return RegisterDefaults(scheme)
}
Expand Down Expand Up @@ -61,6 +65,14 @@ func setPytorchTypeNamesToCamelCase(pytorchJob *PyTorchJob) {
}
}

func setDefaultNprocPerNode(job *PyTorchJob) {
if (job.Spec.ElasticPolicy != nil && job.Spec.ElasticPolicy.NProcPerNode == nil) || (job.Spec.ElasticPolicy == nil) {
if job.Spec.NprocPerNode == nil {
job.Spec.NprocPerNode = &DefaultNprocPerNode
}
}
}

// SetDefaults_PyTorchJob sets any unspecified values to defaults.
func SetDefaults_PyTorchJob(job *PyTorchJob) {
// Set default cleanpod policy to None.
Expand All @@ -79,4 +91,7 @@ func SetDefaults_PyTorchJob(job *PyTorchJob) {
}
// Set default elastic policy.
setElasticPolicy(job)

// Set default nproc_per_node.
setDefaultNprocPerNode(job)
}
38 changes: 38 additions & 0 deletions pkg/apis/kubeflow.org/v1/pytorch_defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,41 @@ func TestSetElasticPolicy(t *testing.T) {
})
}
}

func TestSetDefaultNprocPerNode(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this test doesn't verify some edge cases.
For example, the current case can not verify that .spec.nprocPerNode isn't overridden when elasticPolicy is nil and .spec.elasticPolicy isn't nil.

So should we add more test cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

gomega.RegisterFailHandler(ginkgo.Fail)
t.Run("test default nproc per node", func(t *testing.T) {
job := &PyTorchJob{
Spec: PyTorchJobSpec{
ElasticPolicy: &ElasticPolicy{
NProcPerNode: nil,
},
PyTorchReplicaSpecs: map[ReplicaType]*ReplicaSpec{
PyTorchJobReplicaTypeWorker: {
Replicas: pointer.Int32(1),
},
},
},
}

setDefaultNprocPerNode(job)
gomega.Expect(job.Spec.NprocPerNode).
To(gomega.Equal(&DefaultNprocPerNode))
})
t.Run("test default nproc per node", func(t *testing.T) {
job := &PyTorchJob{
Spec: PyTorchJobSpec{
ElasticPolicy: nil,
PyTorchReplicaSpecs: map[ReplicaType]*ReplicaSpec{
PyTorchJobReplicaTypeWorker: {
Replicas: pointer.Int32(1),
},
},
},
}

setDefaultNprocPerNode(job)
gomega.Expect(job.Spec.NprocPerNode).
To(gomega.Equal(&DefaultNprocPerNode))
})
}
11 changes: 11 additions & 0 deletions pkg/apis/kubeflow.org/v1/pytorch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type PyTorchJob struct {
Status JobStatus `json:"status,omitempty"`
}

// For PyTorch launch/run related spec declaration, please see the following doc for more detail:
// https://pytorch.org/docs/stable/elastic/run.html
// Or run command `torchrun --help` for a brief description.

// PyTorchJobSpec is a desired state description of the PyTorchJob.
type PyTorchJobSpec struct {
// RunPolicy encapsulates various runtime policies of the distributed training
Expand All @@ -84,6 +88,11 @@ type PyTorchJobSpec struct {
// "Worker": PyTorchReplicaSpec,
// }
PyTorchReplicaSpecs map[ReplicaType]*ReplicaSpec `json:"pytorchReplicaSpecs"`

// Number of workers per node; supported values: [auto, cpu, gpu, int].
kuizhiqing marked this conversation as resolved.
Show resolved Hide resolved
// For more, https://github.com/pytorch/pytorch/blob/26f7f470df64d90e092081e39507e4ac751f55d6/torch/distributed/run.py#L629-L658.
// Defaults to auto.
NprocPerNode *string `json:"nprocPerNode,omitempty"`
}

type ElasticPolicy struct {
Expand All @@ -107,6 +116,8 @@ type ElasticPolicy struct {
// are ignored.
Standalone *bool `json:"standalone,omitempty"`
// Number of workers per node; supported values: [auto, cpu, gpu, int].
// Deprecated: This API is deprecated in v1.7+
kuizhiqing marked this conversation as resolved.
Show resolved Hide resolved
// Use .spec.nprocPerNode instead.
NProcPerNode *int32 `json:"nProcPerNode,omitempty"`
johnugeorge marked this conversation as resolved.
Show resolved Hide resolved

MaxRestarts *int32 `json:"maxRestarts,omitempty"`
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/kubeflow.org/v1/pytorch_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ func ValidateV1PyTorchJob(pytorchJob *PyTorchJob) error {
if err := validatePyTorchReplicaSpecs(pytorchJob.Spec.PyTorchReplicaSpecs); err != nil {
return err
}
if err := validateNprocPerNode(pytorchJob); err != nil {
return err
}
return nil
}

func validateNprocPerNode(pytorchJob *PyTorchJob) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a unit test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if pytorchJob.Spec.NprocPerNode != nil && pytorchJob.Spec.ElasticPolicy != nil && pytorchJob.Spec.ElasticPolicy.NProcPerNode != nil {
return fmt.Errorf(".spec.elasticPolicy.nProcPerNode is deprecated, use .spec.nprocPerNode instead")
}
return nil
}

Expand Down
29 changes: 29 additions & 0 deletions pkg/apis/kubeflow.org/v1/pytorch_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,35 @@ func TestValidateV1PyTorchJob(t *testing.T) {
},
wantErr: true,
},
"Spec.NprocPerNode and Spec.ElasticPolicy.NProcPerNode are set": {
pytorchJob: &PyTorchJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: PyTorchJobSpec{
NprocPerNode: pointer.String("1"),
ElasticPolicy: &ElasticPolicy{
NProcPerNode: pointer.Int32(1),
},
PyTorchReplicaSpecs: map[ReplicaType]*ReplicaSpec{
PyTorchJobReplicaTypeMaster: {
Replicas: pointer.Int32(2),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "pytorch",
Image: "gcr.io/kubeflow-ci/pytorch-dist-mnist_test:1.0",
},
},
},
},
},
},
},
},
wantErr: true,
},
}

for name, tc := range testCases {
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions pkg/controller.v1/pytorch/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,10 @@ const (
// EnvStartMethod is the environment variable name for the multiprocessing start method to use when creating workers, which could be fork, spawn and forkserver.
EnvStartMethod = "PET_START_METHOD"

// Worker/node size related arguments.
// EnvNNodes is the common environment variable name from envvar

// EnvNProcPerNode is the environment variable name for the number of processes per node.
EnvNProcPerNode = "PET_NPROC_PER_NODE"
// EnvNNodes is the environment variable name for the number of nodes.
EnvNNodes = "PET_NNODES"
)

var (
Expand Down Expand Up @@ -89,7 +87,7 @@ func (e ElasticEnvVarGenerator) Generate(
// Generate RDZV_BACKEND.
envVars = append(envVars, e.generateEnvBackend(elasticPolicy))
// Generate NNODES.
if envVar, err := e.generateEnvNNodes(job); err != nil {
if envVar, err := e.generateEnvNnodes(job); err != nil {
return nil, err
} else {
envVars = append(envVars, *envVar)
Expand Down Expand Up @@ -126,23 +124,23 @@ func (e ElasticEnvVarGenerator) Generate(
return envVars, nil
}

func (e ElasticEnvVarGenerator) generateEnvNNodes(job *kubeflowv1.PyTorchJob) (*corev1.EnvVar, error) {
func (e ElasticEnvVarGenerator) generateEnvNnodes(job *kubeflowv1.PyTorchJob) (*corev1.EnvVar, error) {
// Return worker.replicas if there is no max and min replicas specified.
if job.Spec.ElasticPolicy.MinReplicas == nil &&
job.Spec.ElasticPolicy.MaxReplicas == nil {
if job.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker] == nil {
return nil, fmt.Errorf("cannot find the worker spec")
}
return &corev1.EnvVar{
Name: EnvNNodes,
Name: EnvNnodes,
Value: strconv.Itoa(
int(*job.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].
Replicas)),
}, nil
}

return &corev1.EnvVar{
Name: EnvNNodes,
Name: EnvNnodes,
Value: fmt.Sprintf("%d:%d",
*job.Spec.ElasticPolicy.MinReplicas, *job.Spec.ElasticPolicy.MaxReplicas),
}, nil
Expand Down
9 changes: 2 additions & 7 deletions pkg/controller.v1/pytorch/elastic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ func TestElasticGenerate(t *testing.T) {
Value: "rdzv-conf-value-1",
},
},
NProcPerNode: pointer.Int32(1),
MaxRestarts: pointer.Int32(3),
MaxRestarts: pointer.Int32(3),
},
PyTorchReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
kubeflowv1.PyTorchJobReplicaTypeWorker: {
Expand All @@ -89,10 +88,6 @@ func TestElasticGenerate(t *testing.T) {
Name: EnvMaxRestarts,
Value: "3",
},
{
Name: EnvNProcPerNode,
Value: "1",
},
{
Name: EnvRDZVBackend,
Value: "c10d",
Expand All @@ -110,7 +105,7 @@ func TestElasticGenerate(t *testing.T) {
Value: "rdzv-conf-name=rdzv-conf-value,rdzv-conf-name-1=rdzv-conf-value-1",
},
{
Name: EnvNNodes,
Name: EnvNnodes,
Value: "1:3",
},
},
Expand Down
Loading